Approximation Data Structures for Streaming Data Applications Debasish Ghosh (@debasishg)
Big Data => Fast Data •Volume •Variety •Velocity
https://whatsthebigdata.com/2016/04/22/what-happens-on-the-internet-in-60-seconds/
Credit: http://www.doc.govt.nz/nature/habitats/freshwater/ A fundamental change in the shape of data that we need to process
Data Stream Model
Data Stream Model • So big that it doesn’t fit in a single computer (unbounded)
Data Stream Model • So big that it doesn’t fit in a single computer (unbounded) • So big that a polynomial running time isn’t good enough
Data Stream Model • So big that it doesn’t fit in a single computer (unbounded) • So big that a polynomial running time isn’t good enough • An algorithm processing such data can only access data in a single pass
Data Stream Model • So big that it doesn’t fit in a single computer (unbounded) • So big that a polynomial running time isn’t good enough • An algorithm processing such data can only access data in a single pass • And yet data needs to be processed with a low latency feedback loop with the consumers
Motivating Use Cases • Monitor events when a user visits a web site. Event streams drive analytics and generate various metrics on user behaviors • Traffic monitoring in network routers based on IP addresses - explore heavy hitters (top traffic intensive IP addresses) • Processing financial data streams (stock quotes & orders) to facilitate real time decision making • Online clustering algorithms - similarity detection in real time • Real time anomaly detection on data streams
Algorithm Ideas • Continuous processing of unbounded streams of data • Single pass over the data • Memory and time bounded - sublinear space • Queries may not have to be served with hard accuracy - some affordance of errors allowed
Can we have a deterministic and/or exact algorithm that meets all of these requirements ?
Distinct Elements Problem • Input: Stream of integers • Where: [n] denotes the Set { 1, 2, .. , n } • Output: The number of distinct elements seen in the stream • Goal: Minimize space consumption i1, . . . , im ∈ [n]
Distinct Elements Problem • Solution 1: Keep a bit array of length n, initialized to all zeroes. When you see i in the stream, set the ith bit to 1. • Space required: n bits of memory
Distinct Elements Problem • Solution 1: Keep a bit array of length n, initialized to all zeroes. When you see i in the stream, set the ith bit to 1. • Space required: n bits of memory • Solution 2: Store the whole stream in memory explicitly • Space required: bits of memory⌈mlog2n⌉
Can we have a deterministic and/or exact algorithm that beats this space bound of ?min{n, ⌈mlog2n⌉}
Sublinear with Deterministic & Exact - Possible ? • Each element of the stream can be represented by n bits. The entire stream can then be mapped to {0, 1}n • Suppose a deterministic & exact algorithm exists that uses s bits of space where s < n • Then there must exist some mapping from n-bit strings to s-bit strings i.e. {0,1}n to {0,1}s • And this mapping has to be injective (no 2 elements of the domain can map to the same element in co-domain) • It can be proved that such a mapping does not exist (there cannot be an injective mapping from a larger set to a smaller set)
There exists NO deterministic and/or exact algorithm that implements Distinct Elements problem in sublinear space
Randomized & Approximate
Randomized & Approximate • Estimators - the algorithm returns an estimator in response to a query
Randomized & Approximate • Estimators - the algorithm returns an estimator in response to a query Unbiased ? Variance ?
Randomized & Approximate • Estimators - the algorithm returns an estimator in response to a query • Error bound - f(x) is accurate up to a certain bound ( bound )ϵ
Randomized & Approximate • Estimators - the algorithm returns an estimator in response to a query • Error bound - f(x) is accurate up to a certain bound ( bound ) • Confidence of accuracy - probability that the estimator will be within the above bound ( ) ϵ 1 − δ
ϵ − δ Approximation
ϵ − δ Approximation Accuracy within bounds with a failure probability of ±ϵ δ
ϵ − δ Approximation Accuracy within bounds with a failure probability of ±ϵ δ ℙ( ∣ ˜n − n ∣ > ϵn) < δ
(Data) (Summary) f(X)
(Data) (Summary) f(X) X C(X) Sketch
• A Sketch C(X) of some data set X with respect to some function f is a compression of X that allows us to compute, or approximately compute f(X), given access only to C(X)
Alice Bob Data set X, which is a list of Integers Data set Y, which is a list of Integers f(X, Y) = ∑ z∈X∪Y z
Alice Bob Data set X, which is a list of Integers Data set Y, which is a list of Integers f(X, Y) = ∑ z∈X∪Y z Maintain Sketch of X as the running sum of the integers Maintain Sketch of Y as the running sum of the integers
Source: https://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/ Show me some data! Membership Query with 4% error - Bloom Filter Exact Membership Query, Cardinality Estimation - Sorted IDs or Hash Table Frequencies of top-100 most frequent elements with 4% error - Count Min Sketch Top-100 most frequent elements with 4% error - Stream-Summary Cardinality Estimation with 4% error - Loglog Counter Cardinality Estimation with 4% error - Linear Counter Exact Frequency Estimation, Range Query - Sorted Table or Hash Map Raw Data
A Simple Counter • Use Case - Monitor a stream of events • At any point in time output (an estimate of) the number of events seen so far.You may have to report from multiple counters aggregated by event types • Idea is to beat O(log2n) space. Any trivial algorithm can implement this using log2n bits
• Using a suitable sketch, there exists an algorithm that returns an estimator of the counter within a bound of • and a small probability of failure k(1 ± ϵ) δ ϵ − δ Approximation
Approximate Counting (Morris ’78) Counting Large Number of Events in Small Registers - Robert Morris, CACM, Volume 21, Issue 10, Oct 1978: https://dl.acm.org/citation.cfm?id=359627 ℙ( ∣ ˜n − n ∣ > ϵn) < δ 1. Initialize X ⟵ 0. 2. For each u pdate, increment X with probability 1/2X . 3. For a qu ery, ou tpu t ˜n= 2X −1.
The steps to analyze this algorithm generalize beautifully to all approximation data structures used to handle streaming data
Generalization steps .. • Compute the expected value of the estimator. In [Morris ’78] we have • Compute the variance of the estimator. In [Morris ’78] we have • Using median trick, establish 𝔼[2X − 1] = n var[2X − 1] = O(n2 ) ϵ − δ Approximation
Algorithm Data Stream Data Sketch f(x) Response Sketch based Query Model
Use Case • Continuous stream of IP addresses hitting a router • Updates of the form (i, ), which means the count of IP address i has to increase by by • Want an estimate of how many times IP address i has hit the router at any point in time (Frequency Estimation) Δ Δ Credit: http://voipstuff.net.au/routers/
Count Min Sketch width w d hash functions An Improved Data Stream Summary: The Count-Min Sketch and its Applications - Graham Cormode and S. Muthukrishnan (http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf)
Count Min Sketch width w d hash functions (i, Δ) update comes
Count Min Sketch width w d hash functions i (i, Δ) update comes +Δ +Δ +Δ +Δ h1(i) h2(i) h3(i) hd(i) hash using pairwise independent hash functions
Count Min Sketch width w d hash functions +Δ h2 w5 Sum of frequencies of all items i that hash to w5 using hash function h2
query(i) width w d hash functions i +Δ h1(i) h2(i) h3(i) hd(i) +Δ +Δ +Δ • Hash i using all d hash functions • The results point to d cells in the table, each containing some frequency value • Return the minimum of the d values as an estimate of query(i)
Count Min Sketch Claim 1. Fo r ϵ − po in t qu ery with failu re pro bability δ . 2. qu ery(i) = xi ± ϵ ∥ x ∥1 with pro b ≥ 1 − δ . 3. Set w = ⌈2/ϵ⌉ an d d = ⌈lo g 2(1/δ)⌉ . 4. Space requ ired is O(ϵ−1 lo g 2(1/δ) .
Count Min Sketch in Spark
https://twitter.github.io/algebird/
Algebra of a Monoid Set A ϕ : A × A → A given a binary operation (a ϕ b) ϕ c = a ϕ (b ϕ c) associative fo r (a, b, c) ∈ A a ϕ I = I ϕ a = a fo r (a, I ) ∈ A identity
time 1 time 2 time 3 time 4 time 5 window at time 1 window at time 3 window at time 5 window-based operation original DStream windowed DStream Stream of host IPs hitting the router CMS in the wild
time 1 time 2 time 3 time 4 time 5 window at time 1 window at time 3 window at time 5 window-based operation original DStream windowed DStream Stream of host IPs hitting the router Frequency Sketch / Heavy Hitter Sketch for this batch Frequency Sketch / Heavy Hitter Sketch for this window Frequency Sketch / Heavy Hitter Sketch global CMS in the wild
time 1 time 2 time 3 time 4 time 5 window at time 1 window at time 3 window at time 5 window-based operation original DStream windowed DStream Stream of host IPs hitting the router Frequency Sketch / Heavy Hitter Sketch for this batch Frequency Sketch / Heavy Hitter Sketch for this window Frequency Sketch / Heavy Hitter Sketch global Kafka HDFS Dashboard CMS in the wild
Streaming CMS // CMS parameters val DELTA = 1E-3 val EPS = 0.01 val SEED = 1 // create CMS val cmsMonoid = CMS.monoid[String](DELTA, EPS, SEED) var globalCMS = cmsMonoid.zero // Generate data stream val hosts: DStream[String] = lines.flatMap(r => LogParseUtil.parseHost(r.value).toOption) // load data into CMS val approxHosts: DStream[CMS[String]] = hosts.mapPartitions(ids => { val cms = CMS.monoid[String](DELTA, EPS, SEED) ids.map(cms.create) }).reduce(_ ++ _)
Streaming CMS approxHosts.foreachRDD(rdd => { if (rdd.count() != 0) { val cmsThisBatch: CMS[String] = rdd.first globalCMS ++= cmsThisBatch val f1ThisBatch = cmsThisBatch.f1 val freqThisBatch = cmsThisBatch.frequency("world.std.com") val f1Overall = globalCMS.f1 val freqOverall = globalCMS.frequency("world.std.com") // .. } })
Motivation of Streaming CMS • Prepare the sketch online on streaming data • Store it offline for future analytics • It’s a small structure - hence ideal for serialization & storage • It’s a commutative monoid and hence you can distribute many of them across multiple machines, do parallel computations and again aggregate the results
Count Min Sketch - Applications • AT&T has used it in network switches to perform network analyses on streaming network traffic with limited memory [1]. • Streaming log analysis • Join size estimation for database query planners • Heavy hitters - • Top-k active users on Twitter • Popular products - most viewed products page • Compute frequent search queries • Identify heavy TCP flow • Identify volatile stocks [1] G. Cormode, T. Johnson, F. Korn, S. Muthukrishnan, O. Spatscheck, and D. Srivastava. Holistic UDAFs at streaming speeds. In Proceedings of the 2004 ACM SIGMOD International Conference on Management of Data, pages 35–46, 2004.
Heavy Hitters Problem • Using a single pass over a data stream, find all elements with frequencies greater than k percent of the total number of elements seen so far. • unbounded data stream • will have to use sublinear space • Fact: There is no deterministic algorithm that solves the Heavy Hitters problems in 1 pass while using sublinear space • Hence ϵ − approximate Heavy Hitters Problem
Approximate Heavy Hitters using Count Min Sketch Datastreamofelements Count Min Sketch Heap N Count seen so far (1) Element Xi comes (2) Add Xi to CMS (3) Check freq of Xi > Threshold ? Yes (4)AddtoHeap No
Streaming Approximate Heavy Hitters // create heavy hitter CMS val approxHH: DStream[TopCMS[String]] = hosts.mapPartitions(ids => { val cms = TopPctCMS.monoid[String](DELTA, EPS, SEED, 0.15) ids.map(cms.create(_)) }).reduce(_ ++ _) // analyze in microbatch approxHH.foreachRDD(rdd => { if (rdd.count() != 0) { val hhThisBatch: TopCMS[String] = rdd.first hhThisBatch.heavyHitters.foreach(println) } })
Bloom Filter • Another sketching data structure (based on hashing) • Solves the same problem as Hash Map but with much less space • Great tool to have if you want approximate membership query with sublinear storage • Can give false positives
Bloom Filter - Under the Hood • Ingredients • Array A of n bits. If we store a dataset S, then number of bits used per object = n/|S| • k hash functions (h1,h2, ..,hk) (usually k is small) • Insert(x) • For i=1,2, ..,k set A[hi(x)]=1 irrespective of what the previous values of those bits were • Query(x) • if for every i=1,2, ..,k A[hi(x)]=1 return true • No false negatives • Can have false positives Space/time trade-offs in hash coding with allowable errors - B. H. Bloom. Communications of the ACM 13(7): 422-426. 1970. ByDavidEppstein-self-made,originallyforatalkatWADS2007,PublicDomain,https://commons.wikimedia.org/w/index.p
Bloom Filter as Application State Kafka Streams* Application Kafka Streams* Application Local State Local State Rebalancing Partition #1 Partition #2 Partition #3 Data Stream Kafka Topic * 2 instances of the same application
Bloom Filter State Store // Bloom Filter as a StateStore. The only query it supports is membership. class BFStore[T: Hash128]( override val name: String, val loggingEnabled: Boolean = true, val numHashes: Int = 6, val width: Int = 32, val seed: Int = 1) extends WriteableBFStore[T] with StateStore { // monoid! private val bfMonoid = new BloomFilterMonoid[T](numHashes, width) // initialize private[processor] var bf: BF[T] = bfMonoid.zero // .. }
Bloom Filter State Store // Bloom Filter as a StateStore. The only query it supports is membership. class BFStore[T: Hash128]( override val name: String, val loggingEnabled: Boolean = true, val numHashes: Int = 6, val width: Int = 32, val seed: Int = 1) extends WriteableBFStore[T] with StateStore { // .. def +(item: T): Unit = bf = bf + item def contains(item: T): Boolean = { val v = bf.contains(item) v.isTrue && v.withProb > ACCEPTABLE_PROBABILITY } def maybeContains(item: T): Boolean = bf.maybeContains(item) def size: Approximate[Long] = bf.size }
BF Store with Kafka Streams Processor // the Kafka Streams processor that will be part of the topology class WeblogProcessor extends AbstractProcessor[String, String] // the store instance private var bfStore: BFStore[String] = _ override def init(context: ProcessorContext): Unit = { super.init(context) // .. bfStore = this.context.getStateStore( WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]] } override def process(dummy: String, record: String): Unit = LogParseUtil.parseLine(record) match { case Success(r) => { bfStore + r.host bfStore.changeLogger.logChange(bfStore.changelogKey, bfStore.bf) } case Failure(ex) => // .. } // .. }
https://www.lightbend.com/products/fast-data-platform
Questions?

Approximation Data Structures for Streaming Applications

  • 1.
    Approximation Data Structuresfor Streaming Data Applications Debasish Ghosh (@debasishg)
  • 3.
    Big Data =>Fast Data •Volume •Variety •Velocity
  • 4.
  • 5.
    Credit: http://www.doc.govt.nz/nature/habitats/freshwater/ A fundamentalchange in the shape of data that we need to process
  • 6.
  • 7.
    Data Stream Model •So big that it doesn’t fit in a single computer (unbounded)
  • 8.
    Data Stream Model •So big that it doesn’t fit in a single computer (unbounded) • So big that a polynomial running time isn’t good enough
  • 9.
    Data Stream Model •So big that it doesn’t fit in a single computer (unbounded) • So big that a polynomial running time isn’t good enough • An algorithm processing such data can only access data in a single pass
  • 10.
    Data Stream Model •So big that it doesn’t fit in a single computer (unbounded) • So big that a polynomial running time isn’t good enough • An algorithm processing such data can only access data in a single pass • And yet data needs to be processed with a low latency feedback loop with the consumers
  • 11.
    Motivating Use Cases •Monitor events when a user visits a web site. Event streams drive analytics and generate various metrics on user behaviors • Traffic monitoring in network routers based on IP addresses - explore heavy hitters (top traffic intensive IP addresses) • Processing financial data streams (stock quotes & orders) to facilitate real time decision making • Online clustering algorithms - similarity detection in real time • Real time anomaly detection on data streams
  • 12.
    Algorithm Ideas • Continuousprocessing of unbounded streams of data • Single pass over the data • Memory and time bounded - sublinear space • Queries may not have to be served with hard accuracy - some affordance of errors allowed
  • 13.
    Can we havea deterministic and/or exact algorithm that meets all of these requirements ?
  • 14.
    Distinct Elements Problem •Input: Stream of integers • Where: [n] denotes the Set { 1, 2, .. , n } • Output: The number of distinct elements seen in the stream • Goal: Minimize space consumption i1, . . . , im ∈ [n]
  • 15.
    Distinct Elements Problem •Solution 1: Keep a bit array of length n, initialized to all zeroes. When you see i in the stream, set the ith bit to 1. • Space required: n bits of memory
  • 16.
    Distinct Elements Problem •Solution 1: Keep a bit array of length n, initialized to all zeroes. When you see i in the stream, set the ith bit to 1. • Space required: n bits of memory • Solution 2: Store the whole stream in memory explicitly • Space required: bits of memory⌈mlog2n⌉
  • 17.
    Can we havea deterministic and/or exact algorithm that beats this space bound of ?min{n, ⌈mlog2n⌉}
  • 18.
    Sublinear with Deterministic &Exact - Possible ? • Each element of the stream can be represented by n bits. The entire stream can then be mapped to {0, 1}n • Suppose a deterministic & exact algorithm exists that uses s bits of space where s < n • Then there must exist some mapping from n-bit strings to s-bit strings i.e. {0,1}n to {0,1}s • And this mapping has to be injective (no 2 elements of the domain can map to the same element in co-domain) • It can be proved that such a mapping does not exist (there cannot be an injective mapping from a larger set to a smaller set)
  • 19.
    There exists NOdeterministic and/or exact algorithm that implements Distinct Elements problem in sublinear space
  • 20.
  • 21.
    Randomized & Approximate •Estimators - the algorithm returns an estimator in response to a query
  • 22.
    Randomized & Approximate •Estimators - the algorithm returns an estimator in response to a query Unbiased ? Variance ?
  • 23.
    Randomized & Approximate •Estimators - the algorithm returns an estimator in response to a query • Error bound - f(x) is accurate up to a certain bound ( bound )ϵ
  • 24.
    Randomized & Approximate •Estimators - the algorithm returns an estimator in response to a query • Error bound - f(x) is accurate up to a certain bound ( bound ) • Confidence of accuracy - probability that the estimator will be within the above bound ( ) ϵ 1 − δ
  • 25.
    ϵ − δApproximation
  • 26.
    ϵ − δApproximation Accuracy within bounds with a failure probability of ±ϵ δ
  • 27.
    ϵ − δApproximation Accuracy within bounds with a failure probability of ±ϵ δ ℙ( ∣ ˜n − n ∣ > ϵn) < δ
  • 28.
  • 29.
  • 30.
    • A SketchC(X) of some data set X with respect to some function f is a compression of X that allows us to compute, or approximately compute f(X), given access only to C(X)
  • 31.
    Alice Bob Data setX, which is a list of Integers Data set Y, which is a list of Integers f(X, Y) = ∑ z∈X∪Y z
  • 32.
    Alice Bob Data setX, which is a list of Integers Data set Y, which is a list of Integers f(X, Y) = ∑ z∈X∪Y z Maintain Sketch of X as the running sum of the integers Maintain Sketch of Y as the running sum of the integers
  • 33.
    Source: https://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/ Show mesome data! Membership Query with 4% error - Bloom Filter Exact Membership Query, Cardinality Estimation - Sorted IDs or Hash Table Frequencies of top-100 most frequent elements with 4% error - Count Min Sketch Top-100 most frequent elements with 4% error - Stream-Summary Cardinality Estimation with 4% error - Loglog Counter Cardinality Estimation with 4% error - Linear Counter Exact Frequency Estimation, Range Query - Sorted Table or Hash Map Raw Data
  • 34.
    A Simple Counter •Use Case - Monitor a stream of events • At any point in time output (an estimate of) the number of events seen so far.You may have to report from multiple counters aggregated by event types • Idea is to beat O(log2n) space. Any trivial algorithm can implement this using log2n bits
  • 35.
    • Using asuitable sketch, there exists an algorithm that returns an estimator of the counter within a bound of • and a small probability of failure k(1 ± ϵ) δ ϵ − δ Approximation
  • 36.
    Approximate Counting (Morris ’78) CountingLarge Number of Events in Small Registers - Robert Morris, CACM, Volume 21, Issue 10, Oct 1978: https://dl.acm.org/citation.cfm?id=359627 ℙ( ∣ ˜n − n ∣ > ϵn) < δ 1. Initialize X ⟵ 0. 2. For each u pdate, increment X with probability 1/2X . 3. For a qu ery, ou tpu t ˜n= 2X −1.
  • 37.
    The steps toanalyze this algorithm generalize beautifully to all approximation data structures used to handle streaming data
  • 38.
    Generalization steps .. •Compute the expected value of the estimator. In [Morris ’78] we have • Compute the variance of the estimator. In [Morris ’78] we have • Using median trick, establish 𝔼[2X − 1] = n var[2X − 1] = O(n2 ) ϵ − δ Approximation
  • 39.
  • 40.
    Use Case • Continuousstream of IP addresses hitting a router • Updates of the form (i, ), which means the count of IP address i has to increase by by • Want an estimate of how many times IP address i has hit the router at any point in time (Frequency Estimation) Δ Δ Credit: http://voipstuff.net.au/routers/
  • 41.
    Count Min Sketch widthw d hash functions An Improved Data Stream Summary: The Count-Min Sketch and its Applications - Graham Cormode and S. Muthukrishnan (http://dimacs.rutgers.edu/~graham/pubs/papers/cm-full.pdf)
  • 42.
    Count Min Sketch widthw d hash functions (i, Δ) update comes
  • 43.
    Count Min Sketch widthw d hash functions i (i, Δ) update comes +Δ +Δ +Δ +Δ h1(i) h2(i) h3(i) hd(i) hash using pairwise independent hash functions
  • 44.
    Count Min Sketch widthw d hash functions +Δ h2 w5 Sum of frequencies of all items i that hash to w5 using hash function h2
  • 45.
    query(i) width w d hash functions i +Δ h1(i) h2(i) h3(i) hd(i) +Δ +Δ +Δ •Hash i using all d hash functions • The results point to d cells in the table, each containing some frequency value • Return the minimum of the d values as an estimate of query(i)
  • 46.
    Count Min Sketch Claim 1.Fo r ϵ − po in t qu ery with failu re pro bability δ . 2. qu ery(i) = xi ± ϵ ∥ x ∥1 with pro b ≥ 1 − δ . 3. Set w = ⌈2/ϵ⌉ an d d = ⌈lo g 2(1/δ)⌉ . 4. Space requ ired is O(ϵ−1 lo g 2(1/δ) .
  • 47.
  • 48.
  • 49.
    Algebra of aMonoid Set A ϕ : A × A → A given a binary operation (a ϕ b) ϕ c = a ϕ (b ϕ c) associative fo r (a, b, c) ∈ A a ϕ I = I ϕ a = a fo r (a, I ) ∈ A identity
  • 50.
    time 1 time2 time 3 time 4 time 5 window at time 1 window at time 3 window at time 5 window-based operation original DStream windowed DStream Stream of host IPs hitting the router CMS in the wild
  • 51.
    time 1 time2 time 3 time 4 time 5 window at time 1 window at time 3 window at time 5 window-based operation original DStream windowed DStream Stream of host IPs hitting the router Frequency Sketch / Heavy Hitter Sketch for this batch Frequency Sketch / Heavy Hitter Sketch for this window Frequency Sketch / Heavy Hitter Sketch global CMS in the wild
  • 52.
    time 1 time2 time 3 time 4 time 5 window at time 1 window at time 3 window at time 5 window-based operation original DStream windowed DStream Stream of host IPs hitting the router Frequency Sketch / Heavy Hitter Sketch for this batch Frequency Sketch / Heavy Hitter Sketch for this window Frequency Sketch / Heavy Hitter Sketch global Kafka HDFS Dashboard CMS in the wild
  • 53.
    Streaming CMS // CMSparameters val DELTA = 1E-3 val EPS = 0.01 val SEED = 1 // create CMS val cmsMonoid = CMS.monoid[String](DELTA, EPS, SEED) var globalCMS = cmsMonoid.zero // Generate data stream val hosts: DStream[String] = lines.flatMap(r => LogParseUtil.parseHost(r.value).toOption) // load data into CMS val approxHosts: DStream[CMS[String]] = hosts.mapPartitions(ids => { val cms = CMS.monoid[String](DELTA, EPS, SEED) ids.map(cms.create) }).reduce(_ ++ _)
  • 54.
    Streaming CMS approxHosts.foreachRDD(rdd =>{ if (rdd.count() != 0) { val cmsThisBatch: CMS[String] = rdd.first globalCMS ++= cmsThisBatch val f1ThisBatch = cmsThisBatch.f1 val freqThisBatch = cmsThisBatch.frequency("world.std.com") val f1Overall = globalCMS.f1 val freqOverall = globalCMS.frequency("world.std.com") // .. } })
  • 55.
    Motivation of Streaming CMS •Prepare the sketch online on streaming data • Store it offline for future analytics • It’s a small structure - hence ideal for serialization & storage • It’s a commutative monoid and hence you can distribute many of them across multiple machines, do parallel computations and again aggregate the results
  • 56.
    Count Min Sketch- Applications • AT&T has used it in network switches to perform network analyses on streaming network traffic with limited memory [1]. • Streaming log analysis • Join size estimation for database query planners • Heavy hitters - • Top-k active users on Twitter • Popular products - most viewed products page • Compute frequent search queries • Identify heavy TCP flow • Identify volatile stocks [1] G. Cormode, T. Johnson, F. Korn, S. Muthukrishnan, O. Spatscheck, and D. Srivastava. Holistic UDAFs at streaming speeds. In Proceedings of the 2004 ACM SIGMOD International Conference on Management of Data, pages 35–46, 2004.
  • 57.
    Heavy Hitters Problem •Using a single pass over a data stream, find all elements with frequencies greater than k percent of the total number of elements seen so far. • unbounded data stream • will have to use sublinear space • Fact: There is no deterministic algorithm that solves the Heavy Hitters problems in 1 pass while using sublinear space • Hence ϵ − approximate Heavy Hitters Problem
  • 58.
    Approximate Heavy Hitters usingCount Min Sketch Datastreamofelements Count Min Sketch Heap N Count seen so far (1) Element Xi comes (2) Add Xi to CMS (3) Check freq of Xi > Threshold ? Yes (4)AddtoHeap No
  • 59.
    Streaming Approximate Heavy Hitters //create heavy hitter CMS val approxHH: DStream[TopCMS[String]] = hosts.mapPartitions(ids => { val cms = TopPctCMS.monoid[String](DELTA, EPS, SEED, 0.15) ids.map(cms.create(_)) }).reduce(_ ++ _) // analyze in microbatch approxHH.foreachRDD(rdd => { if (rdd.count() != 0) { val hhThisBatch: TopCMS[String] = rdd.first hhThisBatch.heavyHitters.foreach(println) } })
  • 60.
    Bloom Filter • Anothersketching data structure (based on hashing) • Solves the same problem as Hash Map but with much less space • Great tool to have if you want approximate membership query with sublinear storage • Can give false positives
  • 61.
    Bloom Filter -Under the Hood • Ingredients • Array A of n bits. If we store a dataset S, then number of bits used per object = n/|S| • k hash functions (h1,h2, ..,hk) (usually k is small) • Insert(x) • For i=1,2, ..,k set A[hi(x)]=1 irrespective of what the previous values of those bits were • Query(x) • if for every i=1,2, ..,k A[hi(x)]=1 return true • No false negatives • Can have false positives Space/time trade-offs in hash coding with allowable errors - B. H. Bloom. Communications of the ACM 13(7): 422-426. 1970. ByDavidEppstein-self-made,originallyforatalkatWADS2007,PublicDomain,https://commons.wikimedia.org/w/index.p
  • 62.
    Bloom Filter asApplication State Kafka Streams* Application Kafka Streams* Application Local State Local State Rebalancing Partition #1 Partition #2 Partition #3 Data Stream Kafka Topic * 2 instances of the same application
  • 63.
    Bloom Filter StateStore // Bloom Filter as a StateStore. The only query it supports is membership. class BFStore[T: Hash128]( override val name: String, val loggingEnabled: Boolean = true, val numHashes: Int = 6, val width: Int = 32, val seed: Int = 1) extends WriteableBFStore[T] with StateStore { // monoid! private val bfMonoid = new BloomFilterMonoid[T](numHashes, width) // initialize private[processor] var bf: BF[T] = bfMonoid.zero // .. }
  • 64.
    Bloom Filter StateStore // Bloom Filter as a StateStore. The only query it supports is membership. class BFStore[T: Hash128]( override val name: String, val loggingEnabled: Boolean = true, val numHashes: Int = 6, val width: Int = 32, val seed: Int = 1) extends WriteableBFStore[T] with StateStore { // .. def +(item: T): Unit = bf = bf + item def contains(item: T): Boolean = { val v = bf.contains(item) v.isTrue && v.withProb > ACCEPTABLE_PROBABILITY } def maybeContains(item: T): Boolean = bf.maybeContains(item) def size: Approximate[Long] = bf.size }
  • 65.
    BF Store withKafka Streams Processor // the Kafka Streams processor that will be part of the topology class WeblogProcessor extends AbstractProcessor[String, String] // the store instance private var bfStore: BFStore[String] = _ override def init(context: ProcessorContext): Unit = { super.init(context) // .. bfStore = this.context.getStateStore( WeblogDriver.LOG_COUNT_STATE_STORE).asInstanceOf[BFStore[String]] } override def process(dummy: String, record: String): Unit = LogParseUtil.parseLine(record) match { case Success(r) => { bfStore + r.host bfStore.changeLogger.logChange(bfStore.changelogKey, bfStore.bf) } case Failure(ex) => // .. } // .. }
  • 66.
  • 67.