Could you please try the making the following changes. Based on the code in :
python/morpheus_dfp/morpheus_dfp/stages/dfp_rolling_window_stage.py: (around lines 147-152)
# Hash the incoming data rows to find a match incoming_hash = pd.util.hash_pandas_object(incoming_df.iloc[[0, -1]], index=False) # Find the index of the first and last row match = train_df[train_df["_row_hash"] == incoming_hash.iloc[0]] if (len(match) == 0): raise RuntimeError(f"Invalid rolling window for user {user_id}")
This appears to be the result of a cache error.
We would recommend you first try to rm the /workspace/cache dir
I was trying to create a new custom SingleOutputSource input stage which reads jsonlines data from a the 30GB gzip file and output’s logs in batches of cudf’s with 10,000 rows. However does Digital Fingerprinting need to read data from smaller 1-4GB local files/AWS S3 and custom sources/Kafka stage aren’t supported as an input?
We are not aware of any such limit, however there is no reason that a custom source stage couldn’t be used.
This is because the logs must be processed in DFPFileToDataFrameStage to group logs by user per day to avoid overlapping timestamps?
Yes, you are correct. In particular the DFPFileBatcherStage assumes that a timestamp is embedded in the file name of the source files ex: input_file_2025-07-23T01:0203.045Z.jsonlines
If a date is not embedded in the file name, some other mechanism can be used to determine this, such as using the filesystem’s modified time, or reading the first record of the file this can be a custom function passed in as the date_conversion_func argument to the DFPFileBatcherStage.
DFPFileBatcherStage OR MultiFileSource → DFPFileToDataFrameStage → DFPSplitUsersStage → DFPRollingWindowStage → DFPPreprocessingStage → Training
It should be:
MultiFileSource → DFPFileBatcherStage → DFPFileToDataFrameStage → DFPSplitUsersStage → DFPRollingWindowStage → DFPPreprocessingStage → DFPTraining → DFPMLFlowModelWriterStage
Of note is that the MultiFileSource and DFPFileBatcherStage stages do not read the incoming files they simply emit a list of file handles as fsspec.core.OpenFiles objects.
I’ve added a custom stage to sort the data frame by the timestamp after DFPSplitUsersStage and before DFPRollingWindowStage.
The code for your DFPSortTimestampsStage stage looks correct. The DFP pipeline assumes that the input data is sorted by timestamps, and that the time-window can be inferred from the file, ex:
app_log_2025-07-23T01:00:00Z.jsonlines app_log_2025-07-23T02:00:00Z.jsonlines app_log_2025-07-23T03:00:00Z.jsonlines ...
What could get tricky is if say source data was being ingested from multiple sources say:
server_1_app_log_2025-07-23T01:00:00Z.jsonlines server_2_app_log_2025-07-23T01:00:00Z.jsonlines server_3_app_log_2025-07-23T01:00:00Z.jsonlines server_1_app_log_2025-07-23T02:00:00Z.jsonlines ...
In this type of situation either one of two things would need to happen:
- Define the server name as the username
- Combind all datasets for each time window
The relevant docs for this are in :