Aggregating Ad Events With Kafka Stream and interactive queries Marcus Bergenlid & Stanislav Chizhov INVIDI
Ad serving in a nutshell Adserver Show this ad 1000 times in 1 hour Advertiser
Ad serving in a nutshell Adserver Show this ad 1000 times in 1 hour Advertiser Viewer
Ad serving in a nutshell Adserver Show this ad 1000 times in 1 hour Give me an ad please! Advertiser Viewer
Ad serving in a nutshell Adserver Delivery Metrics Show this ad 1000 times in 1 hour Give me an ad please! I just watched this ad Advertiser Viewer
Ad serving in a nutshell Adserver Delivery Metrics Show this ad 1000 times in 1 hour Give me an ad please! I just watched this ad How many views has this ad delivered? Advertiser Viewer
Ad serving in a nutshell Adserver Delivery Metrics Show this ad 1000 times in 1 hour Give me an ad please! I just watched this ad How many views has this ad delivered? Advertiser Viewer
What does Delivery Metrics do? How many ads have been fully viewed for a particular customer today? (key, timerange) -> timeseries ● Hourly breakdown for last 1 month ● Daily breakdown for last 1 year ● Totals (forever)
Delivery Metrics requirements ● Timeliness of the data: <1 min ● Availability: ~1 successful query a minute ● Query latency: < 10 sec ● Throughput up to 300K ad events
Delivery Metrics requirements ● Timeliness of the data: <1 min ● Availability: ~1 successful query a minute ● Query latency: < 10 sec ● Throughput up to 300K ad events
Version 1: The naive approach stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
Version 1: The naive approach stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum) AdEvent { customerId: “Acme”, adId: “xxx”, eventId: 2, time: 2024-03-01 15:09:51 . . . } Key { customerId: “Acme” } Aggregate { count: 1 } Window: 2024-03-01 15:00:00 - 16:00:00
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
Version 1: Query Side
Processing rates Process rate per task sub-topology 0 (min vs max) Process rate per task sub-topology 1 (min vs max)
Skewed key distribution
Lagging behind Scale up Input topic lag Repartition topic lag
Lagging behind n - number of partitions R - max throughput of a single task
Lagging behind n - number of partitions R - max throughput of a single task ɑ - fraction of events with for certain key
Lagging behind n - number of partitions R - max throughput of a single task ɑ - fraction of events with for certain key L - lag increase rate L = nɑR - R
Some very sad facts ● When is it a problem? L > 0 ⇔ ɑ > 1/n ● Scaling vertically won’t help: R↑ ⇒ L↑ ● Horizontal scaling won’t help either: n↑ ⇒L↑ L = nɑR - R = R(nɑ - 1)
Version 2: Unskew
Version 2: Query side
Version 2: Memory usage P × A × S × T P: Number of partitions A: Number of unique keys per time window S: Size of a key/aggregate pair T: Number of time windows
Version 2: Memory usage P × A × S × T P: Number of partitions A: Number of unique keys per time window S: Size of a key/aggregate pair T: Number of time windows
Version 2: Memory usage P × A × S × T P: Number of partitions A: Number of unique keys per time window S: Size of a key/aggregate pair T: Number of time windows
Memory waste A × S × T = 4GB P × A × S × T = 72 × 4GB ~ 300GB 😱 P: Number of partitions (72) A: Number of unique keys per time window (105 ) S: Size of a key/aggregate pair (100 bytes) T: Number of time windows (400)
Version 3: Reduce traffic
Version 3: Pre-aggregate
Version 3: Pre-aggregate
Version 3: Pre-aggregate
Version 3: Query stitching Tnow - wall clock time S - stitching time [Ts , Te ] - query time range
How to find the stitching point? te te - time of event
How to find the stitching point? te tL Δg te - time of event Δg - grace period tL - event is in the live store
How to find the stitching point? te tL Δg tf Δf te - time of event Δg - grace period tL - event is in the live store Δf - flush period tf - event is flushed to compact store
How to find the stitching point? te tL Δg tf Δf te - time of event Δg - grace period tL - event is in the live store Δf - flush period tf - event is flushed to compact store Δg + Δf - event is in compact store
Version 3: Query stitching Tnow - wall clock time S - stitching time [Ts , Te ] - query time range Δg - grace period Δf - flush period S = ⌊ Tnow - Δg - Δf ⌋
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:11 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:10 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:13 1 A 12:00 1 A 12:00 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:12 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 2 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:00 1 B 12:00 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 2 A 12:00 2 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:00 1 B 12:00 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 3 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:00 1 B 12:00 1 B 12:00 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 3 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 B 12:00 1 B 12:00 1
Live, Delta, Compact stores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 13:00 2 A 12:00 3 A 14:00 1 A 13:00 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 13:00 5 A 14:00 2 B 12:00 1
Version 3: Memory usage P × A × S × Tlive + A × S × Tcompact = A × S ×(P × Tlive + Tcompact ) P : Number of partitions (72) A : Number of unique keys per time window (105 ) S : Size of a key/aggregate pair (100 bytes) Tcompact : Total number if time windows(400) Tlive : Number of live time windows (4) 7GB << 300GB
Store caching issue Producer rate Live store [messages/sec] Producer rate Delta store [messages/sec]
Store Cache ● Caches for all stores share the same memory ○ cache.max.bytes.buffering ● Cache entries are only removed when ○ Trying to insert a new value and the cache memory is full ○ The entry is explicitly removed from the underlying store ● Cache entries are not removed if older than the retention interval
Cache starvation explained Total cache memory buffer Empty
Cache starvation explained Total cache memory buffer Empty Live Delta
Cache starvation explained Total cache memory buffer Empty Live Delta
Cache starvation explained Total cache memory buffer Empty Live
Cache starvation explained Total cache memory buffer Empty Live Delta
Cache starvation explained Total cache memory buffer Live Live Delta Producer rate Commit interval
Cache starvation explained Total cache memory buffer Commit interval Live Producer rate
Cache starvation explained Total cache memory buffer Live Wasted cache space Retention
Cache starvation solution Total cache memory buffer Empty Live Delta Retention
Finally Producer rate Live store [messages/sec] Producer rate Delta store [messages/sec]
Takeaways ● Current implementation is both a cache and a commit buffer ● We need buffering and we don’t need caching since we use in-memory stores ● Deleting entries only works since the entire store fits in memory
Event time as stream time ● Can be set on the client ● Arbitrary unpredictable deviations ● Late arriving events have to be dropped, which is not always acceptable in practice ● Customers affect each other
Ingress time Ti - host system time |Ti -Tj | < 𝟄 Agei = Ti -ingressTime ageinflight = max{Agei }
Mapping event time to ingress time ● Event time is just another attribute of an ad event ● Aggregate is a map of event time buckets to counts ● Padding = IngressTime - EventTime Ingress Time Event 01:00 01:00 -> 10 02:00 -> 1 02:00 01:00 -> 3 02:00 -> 7 11:00 01:00 -> 1 11:00 -> 20
Conclusions ● Know and control your skew: max(ɑ) << 1/n ○ Metric process-rate by subtopology ● Dealing with the skew ○ Pre-aggregate traffic before repartition ○ Don’t repartition and sum up in IQ ● Mind the cache ● Control your stream time
Aggregating Ad Events With Kafka Stream and interactive queries 😱after some skewing around 😱 Thank You!

Aggregating Ad Events with Kafka Streams and Interactive Queries at Invidi

  • 1.
    Aggregating Ad Events WithKafka Stream and interactive queries Marcus Bergenlid & Stanislav Chizhov INVIDI
  • 2.
    Ad serving ina nutshell Adserver Show this ad 1000 times in 1 hour Advertiser
  • 3.
    Ad serving ina nutshell Adserver Show this ad 1000 times in 1 hour Advertiser Viewer
  • 4.
    Ad serving ina nutshell Adserver Show this ad 1000 times in 1 hour Give me an ad please! Advertiser Viewer
  • 5.
    Ad serving ina nutshell Adserver Delivery Metrics Show this ad 1000 times in 1 hour Give me an ad please! I just watched this ad Advertiser Viewer
  • 6.
    Ad serving ina nutshell Adserver Delivery Metrics Show this ad 1000 times in 1 hour Give me an ad please! I just watched this ad How many views has this ad delivered? Advertiser Viewer
  • 7.
    Ad serving ina nutshell Adserver Delivery Metrics Show this ad 1000 times in 1 hour Give me an ad please! I just watched this ad How many views has this ad delivered? Advertiser Viewer
  • 8.
    What does DeliveryMetrics do? How many ads have been fully viewed for a particular customer today? (key, timerange) -> timeseries ● Hourly breakdown for last 1 month ● Daily breakdown for last 1 year ● Totals (forever)
  • 9.
    Delivery Metrics requirements ●Timeliness of the data: <1 min ● Availability: ~1 successful query a minute ● Query latency: < 10 sec ● Throughput up to 300K ad events
  • 10.
    Delivery Metrics requirements ●Timeliness of the data: <1 min ● Availability: ~1 successful query a minute ● Query latency: < 10 sec ● Throughput up to 300K ad events
  • 11.
    Version 1: Thenaive approach stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 12.
    Version 1: Thenaive approach stream .map((UUID, AdEvent) -> (Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum) AdEvent { customerId: “Acme”, adId: “xxx”, eventId: 2, time: 2024-03-01 15:09:51 . . . } Key { customerId: “Acme” } Aggregate { count: 1 } Window: 2024-03-01 15:00:00 - 16:00:00
  • 13.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 14.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 15.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 16.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 17.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 18.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 19.
    stream .map((UUID, AdEvent) ->(Key, Aggregate)) .groupByKey() .windowedBy(HOUR) .reduce(sum)
  • 20.
  • 21.
    Processing rates Process rateper task sub-topology 0 (min vs max) Process rate per task sub-topology 1 (min vs max)
  • 22.
  • 23.
    Lagging behind Scale up Inputtopic lag Repartition topic lag
  • 24.
    Lagging behind n -number of partitions R - max throughput of a single task
  • 25.
    Lagging behind n -number of partitions R - max throughput of a single task ɑ - fraction of events with for certain key
  • 26.
    Lagging behind n -number of partitions R - max throughput of a single task ɑ - fraction of events with for certain key L - lag increase rate L = nɑR - R
  • 27.
    Some very sadfacts ● When is it a problem? L > 0 ⇔ ɑ > 1/n ● Scaling vertically won’t help: R↑ ⇒ L↑ ● Horizontal scaling won’t help either: n↑ ⇒L↑ L = nɑR - R = R(nɑ - 1)
  • 28.
  • 29.
  • 30.
    Version 2: Memoryusage P × A × S × T P: Number of partitions A: Number of unique keys per time window S: Size of a key/aggregate pair T: Number of time windows
  • 31.
    Version 2: Memoryusage P × A × S × T P: Number of partitions A: Number of unique keys per time window S: Size of a key/aggregate pair T: Number of time windows
  • 32.
    Version 2: Memoryusage P × A × S × T P: Number of partitions A: Number of unique keys per time window S: Size of a key/aggregate pair T: Number of time windows
  • 33.
    Memory waste A ×S × T = 4GB P × A × S × T = 72 × 4GB ~ 300GB 😱 P: Number of partitions (72) A: Number of unique keys per time window (105 ) S: Size of a key/aggregate pair (100 bytes) T: Number of time windows (400)
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
    Version 3: Querystitching Tnow - wall clock time S - stitching time [Ts , Te ] - query time range
  • 39.
    How to findthe stitching point? te te - time of event
  • 40.
    How to findthe stitching point? te tL Δg te - time of event Δg - grace period tL - event is in the live store
  • 41.
    How to findthe stitching point? te tL Δg tf Δf te - time of event Δg - grace period tL - event is in the live store Δf - flush period tf - event is flushed to compact store
  • 42.
    How to findthe stitching point? te tL Δg tf Δf te - time of event Δg - grace period tL - event is in the live store Δf - flush period tf - event is flushed to compact store Δg + Δf - event is in compact store
  • 43.
    Version 3: Querystitching Tnow - wall clock time S - stitching time [Ts , Te ] - query time range Δg - grace period Δf - flush period S = ⌊ Tnow - Δg - Δf ⌋
  • 44.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value
  • 45.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:11 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:10 1
  • 46.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1
  • 47.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:13 1 A 12:00 1 A 12:00 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:12 1
  • 48.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 2 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:00 1 B 12:00 1
  • 49.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 2 A 12:00 2 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:00 1 B 12:00 1
  • 50.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 3 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 A 12:00 1 B 12:00 1 B 12:00 1 B 12:00 1
  • 51.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 2 A 12:00 3 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 12:00 1 B 12:00 1 B 12:00 1
  • 52.
    Live, Delta, Compactstores Partition 1 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 13:00 2 A 12:00 3 A 14:00 1 A 13:00 1 Partition 2 Live Delta Compact Key Time Value Key Window Value Key Window Value Key Window Value A 13:00 5 A 14:00 2 B 12:00 1
  • 53.
    Version 3: Memoryusage P × A × S × Tlive + A × S × Tcompact = A × S ×(P × Tlive + Tcompact ) P : Number of partitions (72) A : Number of unique keys per time window (105 ) S : Size of a key/aggregate pair (100 bytes) Tcompact : Total number if time windows(400) Tlive : Number of live time windows (4) 7GB << 300GB
  • 54.
    Store caching issue Producerrate Live store [messages/sec] Producer rate Delta store [messages/sec]
  • 55.
    Store Cache ● Cachesfor all stores share the same memory ○ cache.max.bytes.buffering ● Cache entries are only removed when ○ Trying to insert a new value and the cache memory is full ○ The entry is explicitly removed from the underlying store ● Cache entries are not removed if older than the retention interval
  • 56.
    Cache starvation explained Totalcache memory buffer Empty
  • 57.
    Cache starvation explained Totalcache memory buffer Empty Live Delta
  • 58.
    Cache starvation explained Totalcache memory buffer Empty Live Delta
  • 59.
    Cache starvation explained Totalcache memory buffer Empty Live
  • 60.
    Cache starvation explained Totalcache memory buffer Empty Live Delta
  • 61.
    Cache starvation explained Totalcache memory buffer Live Live Delta Producer rate Commit interval
  • 62.
    Cache starvation explained Totalcache memory buffer Commit interval Live Producer rate
  • 63.
    Cache starvation explained Totalcache memory buffer Live Wasted cache space Retention
  • 64.
    Cache starvation solution Totalcache memory buffer Empty Live Delta Retention
  • 65.
    Finally Producer rate Livestore [messages/sec] Producer rate Delta store [messages/sec]
  • 66.
    Takeaways ● Current implementationis both a cache and a commit buffer ● We need buffering and we don’t need caching since we use in-memory stores ● Deleting entries only works since the entire store fits in memory
  • 67.
    Event time asstream time ● Can be set on the client ● Arbitrary unpredictable deviations ● Late arriving events have to be dropped, which is not always acceptable in practice ● Customers affect each other
  • 68.
    Ingress time Ti - hostsystem time |Ti -Tj | < 𝟄 Agei = Ti -ingressTime ageinflight = max{Agei }
  • 69.
    Mapping event timeto ingress time ● Event time is just another attribute of an ad event ● Aggregate is a map of event time buckets to counts ● Padding = IngressTime - EventTime Ingress Time Event 01:00 01:00 -> 10 02:00 -> 1 02:00 01:00 -> 3 02:00 -> 7 11:00 01:00 -> 1 11:00 -> 20
  • 70.
    Conclusions ● Know andcontrol your skew: max(ɑ) << 1/n ○ Metric process-rate by subtopology ● Dealing with the skew ○ Pre-aggregate traffic before repartition ○ Don’t repartition and sum up in IQ ● Mind the cache ● Control your stream time
  • 71.
    Aggregating Ad Events WithKafka Stream and interactive queries 😱after some skewing around 😱 Thank You!