Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
Showing results for 
Search instead for 
Did you mean: 

Delta comparison architecture using flatMapGroupsWithState in Structured Streaming

VamsiDatabricks
New Contributor
 
I am designing structured streaming job in Azure data bricks(using Scala) which will consume messages from two event hubs, lets call them source and target.

I would like your feedback on below flow, whether it is will survive the production load and suggestions to make it better.

  1. we consume messages from source and target EventHub
  2. Parse and get the id which will be used to identify the message uniquely and also store the actual message in storage account.
  3. Now we will have a one data set for both source and target.
  4. Combine both source and target dataset and create a single dataset and group them by unique key.
  5. On the keyed Dataset, we will run flatmapgroupswithstate operator so that the comparison runs for every minute and checks if source and target keys exist.
  6. Once both Source and Target events for a given key are available, we fetch their corresponding JSONs from Delta using the stored pointers, perform the comparison, emit a DeltaRecord, and clear the state.

This is the design diagram. Structured Streaming

Here’s a simplified pseudocode snippet:

keyed.flatMapGroupsWithState[StateValue, DeltaRecord]( OutputMode.Append(), GroupStateTimeout.EventTimeTimeout() )(handleKey)

Could you please validate this approach for ,

Any hidden pitfalls in production especially around Delta I/O under load, event skew, or watermarking.

Whether others have adopted similar pointer-based approaches for large-scale streaming comparisons, and any tuning lessons learned.

Appreciate any feedback, design critiques, or optimization suggestions from those who’ve run this pattern at scale

1 REPLY 1

Hubert-Dudek
Esteemed Contributor III

It is hard to understand what the source is and what the target is. Some charts could be useful. Also, information on how long the state is kept. My solution usually is:
- Use declarative lakeflow pipelines if possible (dlt)

- if not, consider handling the state by yourself using transformWithStateInPandas (here is my example https://databrickster.medium.com/transformwithstate-is-here-to-clean-duplicates-77b86c359392)

- also sometimes easiest is just to use forEatchBatch and process stream as micro batches