An introduction to stream processing Vincenzo Gulisano vincenzo.gulisano@chalmers.se
Agenda • Lecture 1 • Part 1 – Introduction and basics • Part 2 – Distributed and parallel analysis • Lecture 2 • Part 3 – Correctness guarantees • Part 4 – One size DOES NOT fit all in performance 2
Part 1 – Introduction and basics 3
IoT enables for increased awareness, security, power-efficiency, ... large IoT systems are complex traditional data analysis techniques alone are not adequate! 4
Advanced Metering Infrastructures (AMIs) Smart Grids Vehicular Networks (VNs) • demand-response • scheduling • micro-grids • detection of medium size blackouts • detection of non-technical losses • ... • autonomous driving • platooning • accident detection • variable tolls • congestion monitoring • ... IoT enables for increased awareness, security, power-efficiency, ... 5
AMIs VNs large IoT systems are complex Characteristics: 1. edge location 2. location awareness 3. low latency 4. geographical distribution 5. large-scale 6. support for mobility 7. real-time interactions 8. predominance of wireless 9. heterogeneous 10. interoperability / federation 11. interaction with the cloud 6
traditional data analysis techniques alone are not adequate! 1. does the infrastructure allow for billions of readings per day to be transferred continuously? 2. the latency incurred while transferring data, does that undermine the utility of the analysis? 3. is it secure to concentrate all the data in a single place? 4. is it smart to give away fine-grained data? 7
A small example of what fine-grained data can reveal... 8 source: Andrés Molina-Markham, Prashant Shenoy, Kevin Fu, Emmanuel Cecchet, and David Irwin. 2010. Private memoirs of a smart meter. In Proceedings of the 2nd ACM Workshop on Embedded Sensing Systems for Energy-Efficiency in Building (BuildSys ’10). Association for Computing Machinery, New York, NY, USA, 61–66. DOI:https://doi.org/10.1145/1878431.1878446
a better answer we leverage the entire infrastructure! 9 Traditional analysis techniques cannot address all the challenges in these setups That’s where stream processing can make the difference!
Data streaming basics 10
Main Memory Motivation DBMS vs. DSMS Disk 1 Data Query Processing 3 Query results 2 Query Main Memory Query Processing Continuous Query Data Query results 11
Before we start... about data streaming and Stream Processing Engines (SPEs) 12 An incomplete, non-sorted list of SPEs: time Borealis The Aurora Project STanfordstREamdatAManager NiagaraCQ COUGAR StreamCloud Covering all of them / discussing which use cases are best for each one out of scope...
All documentation images / code snippets in the following are taken from: https://flink.apache.org/ 13
data stream: unbounded sequence of tuples sharing the same schema 14 Example: vehicles’ speed reports time Field Field vehicle id text time (secs) text speed (Km/h) double X coordinate double Y coordinate double A 8:00 55.5 X1 Y1 Let’s assume each source (e.g., vehicle) produces and delivers a timestamp-sorted stream A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2
continuous query (or simply query): Directed Acyclic Graph (DAG) of streams and operators 15 OP OP OP OP OP OP OP source op (1+ out streams) sink op (1+ in streams) stream op (1+ in, 1+ out streams)
data streaming operators Two main types: • Stateless operators • do not maintain any state • one-by-one processing • if they maintain some state, such state does not evolve depending on the tuples being processed • Stateful operators • maintain a state that evolves depending on the tuples being processed • produce output tuples that depend on multiple input tuples 16 OP OP
stateless operators 17 Filter ... Map Union ... Filter / route tuples based on one (or more) conditions Transform each tuple Merge multiple streams (with the same schema) into one
stateless operators 18 Filter ... Map Union ...
stateful operators 19 Aggregate information from multiple tuples (e.g., max, min, sum, ...) Join tuples coming from 2 streams given a certain predicate Aggregate Join
Wait a moment! if streams are unbounded, how can we aggregate or join? 20
windows and stateful analysis Stateful operations are done over windows: • Time-based (e.g., tuples in the last 10 minutes) • Tuple-based (e.g., given the last 50 tuples) 21 time [8:00,9:00) [8:20,9:20) [8:40,9:40) Example of time-based window of size 1 hour and advance 20 minutes How many tuple in a window? Which time period does a window span?
time-based sliding window aggregation (count) 22 Counter: 4 time [8:00,9:00) 8:05 8:15 8:22 8:45 9:05 Output: 4 Counter: 1 Counter: 2 Counter: 3 Counter: 3 time 8:05 8:15 8:22 8:45 9:05 [8:20,9:20)
time-based sliding window joining 23 t1 t2 t3 t4 t1 t2 t3 t4 R S Sliding window Window size WS WSWR Predicate P
24 windows and stateful analysis See the part about Distributed and parallel analysis to understand what this is
25 basic operators and user-defined operators Besides a set of basic operators, SPEs usually allow the user to define ad-hoc operators (e.g., when existing aggregation are not enough)
Part 2 – Distributed and parallel analysis 26
sample query For each vehicle, raise an alert if the speed of the latest report is more than 2 times higher than its average speed in the last 30 days. 27 time A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2
28 Remove unused fields Map Field vehicle id time (secs) speed (Km/h) X coordinate Y coordinate Field vehicle id time (secs) speed (Km/h) Compute average speed for each vehicle during the last 30 days Aggregate Field vehicle id time (secs) avg speed (Km/h) Join Check condition Filter Field vehicle id time (secs) speed (Km/h) Join on vehicle id Field vehicle id time (secs) avg speed (Km/h) speed (Km/h) sample query
29 M A J F sample query Notice: • the same semantics can be defined in several ways (using different operators and composing them in different ways) • Using many basic building blocks can ease the task of distributing and parallelizing the analysis
M A J F At the edge, in parallel at each vehicle M A J F 30
M A J FAt the cloud 31
Distributed, edge + fog + cloud A J M F 32
DISCLAIMER / BEFORE WE START I am using this symbol a lot in the following This is not necessarily a physical computer, but a “processing unit”. That is, a computer or a CPU or a core in a CPU or a thread... 33
Centralized execution 34 M A J F
Research topics (initially studied for centralized executions) • Approximation • Due to limited resources, how to approximate results to reduce space or time complexity? • Load Shedding • If close to saturation, which information to discard in order to maximize the Quality of Service? • Operator scheduling • In which order to run operators in order to minimize an overhead / maximize a certain metric? 35
Distributed execution 36 Inter-operator parallelism M A J F
Distributed execution • Load Balancing: how to distribute / place operators to nodes in order to maximize throughput? • Offline load balancing • Dynamic load balancing 37
Distributed execution  Fault Tolerance • Existing techniques: • Active standby • Passive standby • Upstream backup • Guarantees: • Precise • “Eventual” • Gap 38
Distributed execution – Fault Tolerance • Active standby Primary Replica AggOP Agg 39 Cost: Recovery Time:
Distributed execution – Fault Tolerance • Passive standby Primary Replica AggOP Agg Periodic checkpoints 40 Cost: Recovery Time:
Distributed execution – Fault Tolerance • Passive standby Primary 41 Cost: Recovery Time: AggOP Periodic checkpoints Disk
Notice!!! Primary Replica AggOP Agg Periodic checkpoints AggOP Periodic checkpoints Disk Both techniques need to be able to replay some of the previous data 42
Distributed execution – Fault Tolerance • Upstream Backup AggOP Buffer 43 Cost: Recovery Time:
Parallel execution 44 Intra-operator parallelism M A J F … …
Parallel-Distributed execution • Challenges: • Semantic Transparency • Results produced by parallel-distributed execution equal to centralized or distributed execution • Throughput maximization • How to maximize throughput when distributing and parallelizing data streaming operators? 45
• General approach OPA OPB 46 Parallel execution
• General approach R: Router M: Merger OPA OPB OPA RM Thread 1 OPA RM Thread m … 47 Parallel execution OPA RM Thread 1 Thread 2 Thread 3 OPA RM Thread 1 Thread 2 OPA RM Thread 1 Thread 2 …
• General approach R: Router M: Merger OPA OPB OPA RM Thread 1 OPA RM Thread m … 48 Parallel execution OPA RM Thread 1 Thread 2 Thread 3 OPA RM Thread 1 Thread 2 OPA RM Thread 1 Thread 2 …
• General approach OPA OPB OPA RM Thread 1 OPA RM Thread m … Subcluster A 49 Parallel execution R: Router M: Merger
• General approach OPA RM Thread 1 OPA RM Thread m … OPA OPB OPB RM Thread 1 OPB RM Thread n … Subcluster A Subcluster B 50 Parallel execution R: Router M: Merger
• General approach OPA RM Thread 1 OPA RM Thread m … OPA OPB OPB RM Thread 1 OPB RM Thread n … 51 Parallel execution R: Router M: Merger
• Stateful operators: Semantic awareness • Aggregate: count within last hour, group-by vehicle id Previous Subcluster R… R… M Agg1 M Agg2 M Agg3 … … … Vehicle A 52 Parallel execution
A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2 • Stateful operators: Semantic awareness • Aggregate: count within last hour, group-by vehicle id Previous Subcluster R… R… M Agg1 M Agg2 M Agg3 … … … Vehicle A 53 Parallel execution
Parallel execution • Depending on the stateful operator semantic: • Partition input stream into keys • Each key is processed by 1 thread • # keys >> # threads/nodes 54
Parallel execution • Depending on the stateful operator semantic: • Partition input stream into keys • Each key is processed by 1 thread • # keys >> # threads/nodes Keys domain Agg1 Agg2 Agg3 A D E B C F 55
Parallel execution • Depending on the stateful operator semantic: • Partition input stream into keys • Each key is processed by 1 thread • # keys >> # threads/nodes Keys domain Agg1 Agg2 Agg3 A D E B C F 56
Parallel execution • Example: OP1 OP2 OP4 OP6OP3 OP5 Group-by A1 Group-by A2 57
Parallel execution • Example: – 90 threads available OP1 OP2 OP4 OP6OP3 OP5 58 Group-by A1 Group-by A2
Parallel execution OP1 OP2 OP4 OP6OP3 OP5 59 How to parallelize/distribute the query to maximize throughput? • Example: – 90 threads available Group-by A1 Group-by A2
• Parallelization Strategies Parallel execution Full query at all threads 1 operator per threads1+ operators per threads 60
Parallel execution … OP1 OP2 OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 90 Full query at all threads 61 Group-by A1 Group-by A2
Parallel execution OP1 OP2 OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 90 902 902 … Full query at all threads 62 Group-by A1 Group-by A2
Parallel execution … OP1 OP2 OP6 OP1 OP2 OP6 Thread 1 Thread 15 Thread 16 Thread 30 Thread 76 Thread 90 1 operator per thread … … 63
Parallel execution OP1 OP2 OP6 OP1 OP2 OP6 Thread 1 Thread 15 Thread 16 Thread 30 Thread 76 Thread 90 15… … … 1 operator per thread 64
Parallel execution OP1 OP2 OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 30 Thread 31 Thread 60 Thread 61 Thread 90 … … … 1+ operators per thread 65 Group-by A1 Group-by A2
Parallel execution … OP1 OP2 OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 30 Thread 31 Thread 60 Thread 61 Thread 90 30 … … 1+ operators per thread 66 Group-by A1 Group-by A2
Parallel execution of streaming operator • General approach, referred to as shared-nothing works for both parallel and distributed execution • We use threads the example, but they could be processes or even nodes… 67 OP1 OP2 OP1 OP2 Thread 1 Thread 15 Thread 16 Thread 30 … … OP1 OP2 OP1 OP2 Node 1 Node 15 Node 16 Node 30 … …
Elastic execution 68 M A J F … …
Elastic execution 69 M A J F … … + +
Elastic execution 70 M A J F … … - -
Elastic execution • Key for Cloud environment • Elasticity in various forms • Variable number of threads • Variable number of nodes, scaling with queries • Variable number of nodes, scaling with operators 71
Why is it complicated? • Transfer state between nodes • Load balancing algorithm • Minimum parallelization unit  key • Transfer keys between instances 72
Elastic execution • State transfer challenging for stateful operators A B time 73 Tuples referring to car A
Elastic execution • Window Recreation Protocol A B time A A B Send to A Send to B + No communication between nodes - Completion time proportional to window size 74
Elastic execution • State Recreation Protocol A B time B B B Copy to B + Minimizes completion time - Communication between nodes 75
Part 3 – Correctness guarantees 76
77 OP1 OP2 Thread OP1 OP2 Thread Thread OP1 OP2 ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2
Correct execution • What does correct execution means in the context of SPEs? • Many definitions, some more formal than others StreamCloud: An Elastic and Scalable Data Streaming System. Vincenzo Gulisano, Ricardo Jimenez- Peris, Marta Patiño-Martinez, Claudio Soriente, Patrick Valduriez. IEEE Transactions on Parallel and Distributed Processing (TPDS) Viper: A Module for Communication-Layer Determinism and Scaling in Low-Latency Stream Processing. Ivan Walulya, Dimitris Palyvos-Giannas, Yiannis Nikolakopoulos, Vincenzo Gulisano, Marina Papatriantafilou and Philippas Tsigas. Elsevier Future Generation Computer Systems Journal. 2018.
Correct execution A query that simply discards all data, always, would most likely be in accordance with previous definitions. Would you say it’s analysis is correct? In general (but still in a vague form): Correctness implies each tuple is processed according to the analysis’ semantics (according to its stateless and stateful operators) and is not affected by execution- and implementation-related aspects
Why is it complicated? • It depends on a mix of external / internal factors. • We are going to look at some examples
Unsorted input streams Counter: 4 time [8:00,9:00) 8:05 8:15 8:22 8:45 9:05 Output: 4 Counter: 1 Counter: 2 Counter: 3 Counter: 3 time 8:05 8:15 8:22 8:45 9:05 [8:20,9:20) What if the tuple with timestamp 8:45 arrives after the one with timestamp 9:05 and we have already produced the result?
Unsorted input streams – possible solutions • Sort at the source, but... • is it always possible? • How late can a single tuple be? • What about latency? • Allow for late arrivals and withhold (temporary) results or produce correction tuples, but... • this is in turn creating disordered output streams!
Sorted input streams + distributed/parallel execution M M A A A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2 A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2 What if tuple with timestamp 8:00 arrives after tuple with timestamp 8:07?
Sorted input streams + distributed/parallel execution – possible solutions • merge-sort (e.g., based on the timestamp) input streams deterministically • What if a stream is extremely slow / missing tuples? • What is the sorting overhead? Merge
A general solution for order-insensitive analysis  Watermarks • Notice: • If the analysis is order-insensitive, the result for a given window is correct as long as all tuples contributing to it are taken into account. The order does not really matter • Keep processing tuples even if they are not in order • Produce result for a certain window only when sure future input tuples will no longer contribute to such window • Key idea: • let tuples be disordered (from the source or because of parallel execution) • Include special tuples (watermarks) that are sorted and distinguish timestamps before / after them
Example The notion of time is based on the timestamps carried by tuples themselves This specifies timestamps and watermarks Watermarks are used to decide when the result for a certain window can be produced
Part 4 – One size DOES NOT fit all in performance 90
Phillip B. Gibbons, Keynote Talk IPDPS’15 91
92
Recap on stream joins 93 Data stream: unbounded sequence of tuples t1 t2 t3 t4 t1 t2 t3 t4 t1 t2 t3 t4 R S Sliding window Window size WS WSWR Predicate P
Why parallel stream joins? • WS = 600 seconds • R receives 500 tuples/second • S receives 500 tuples/second • WR will contain 300,000 tuples • WS will contain 300,000 tuples • Each new tuple from R gets compared with all the tuples in WS • Each new tuple from S gets compared with all the tuples in WR … 300,000,000 comparisons/second! t1 t2 t3 t4 t1 t2 t3 t4 R S WSWR 94
Which are the challenges of a parallel stream join? Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism 95
The 3-step procedure (sequential stream join) For each incoming tuple t: 1. compare t with all tuples in opposite window given predicate P 2. add t to its window 3. remove stale tuples from t’s window Add tuples to S Add tuples to R Prod R Prod S Consume resultsConsPU 96 We assume each producer delivers tuples in timestamp order
The 3-step procedure, is it enough? Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism 97 t1 t2 t1 t2 R S WSWR t3 t1 t2 t1 t2 R S WSWR t4 t3
Enforcing determinism in sequential stream joins • Next tuple to process = earliest(tS,tR) • The earliest(tS,tR) tuple is referred to as the next ready tuple • Process ready tuples in timestamp order  Determinism PU tS tR 98
Deterministic 3-step procedure Pick the next ready tuple t: 1. compare t with all tuples in opposite window given predicate P 2. add t to its window 3. remove stale tuples from t’s window Add tuples to S Add tuples to R Prod R Prod S Consume resultsConsPU 99
Shared-nothing parallel stream join (state-of-the-art) Prod R Prod S PU1 PU2 PUN… Cons Add tuple to PUi S Add tuple to PUi R Consume results Pick the next ready tuple t: 1. compare t with all tuples in opposite window given P 2. add t to its window 3. remove stale tuples from t’s window Chose a PU Chose a PU Take the next ready output tuple Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism 100 Merge
Shared-nothing parallel stream join (state-of-the-art) Prod R Prod S PU1 PU2 PUN … 101 enqueue() dequeue() ConsMerge
From coarse-grained to fine-grained synchronization Prod R Prod S PU1 PU2 PUN … Cons 102
ScaleGate 103 addTuple(tuple,sourceID) allows a tuple from sourceID to be merged by ScaleGate in the resulting timestamp-sorted stream of ready tuples. getNextReadyTuple(readerID) provides to readerID the next earliest ready tuple that has not been yet consumed by the former. https://github.com/dcs-chalmers/ScaleGate_Java
ScaleGate Anatomy (1) • Inspired from lock-free skip lists • randomized height of nodes • expected cost for search/insertion O(logN) • Search by traversing from higher to lower levels 104
ScaleGate Anatomy (2) • Reader-local view of "head" (also has minimum ts for that reader) • Flagging mechanism: • If "head" is not flagged can be safely returned • Flag the last written tuple of each source • Nodes free to be garbage-collected after every reader passes (almost...) head0 head1 105
ScaleJoin Prod R Prod S PU1 PU2 PUN … Cons Add tuple SGin Add tuple SGin Get next ready output tuple from SGout Get next ready input tuple from SGin 1. compare t with all tuples in opposite window given P 2. add t to its window in a round-robin fashion 3. remove stale tuples from t’s window 106 SGin SGout Steps for PU
107 t1 t2 R S WR t3 t4 R S t4 t1 WR R S t4 t2 WR R S t4 WR t3 Sequential stream join: ScaleJoin with 3 PUs: ScaleJoin (example)
ScaleJoin Prod R Prod S PU1 PU2 PUN … Cons Add tuple SGin Add tuple SGin Get next ready output tuple from SGout 108 SGin SGout Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism Prod S Prod S Prod R Get next ready input tuple from SGin 1. compare t with all tuples in opposite window given P 2. add t to its window in a round robin fashion 3. remove stale tuples from t’s window Steps for PUi OPA RM Thread 1 OPA RM Thread m … OPB RM Thread 1 OPB RM Thread n …
ScaleJoin Scalability – comparisons/second 109 Number of PUs
ScaleJoin latency – milliseconds 110 Number of PUs
State of the art solution at the time ScaleJoin was published 111
almost 112
Millions of years of evolution Millions of sensors • Store information • Iterate multiple times over data • Think, do not rush through decisions • ”Hard-wired” routines • Real-time decisions • High-throughput / low-latency Should I (really) have an extra piece of cake? Danger!!! Run!!! Humans 113
Years / Decades of evolution Millions of sensors What traffic congestion patterns can I observe frequently? Don’t take over, car in opposite lane! • Store information • Iterate multiple times over data • Think, do not rush through decisions Databases, data mining techniques... Data streaming, distributed and parallel analysis • Continuous analysis • Real-time decisions • High-throughput / low-latency Computers (cyber-physical / IoT systems) 114
115
Bibliography (with some label) 1. Zhou, Jiazhen, Rose Qingyang Hu, and Yi Qian. "Scalable distributed communication architectures to support advanced metering infrastructure in smart grid." IEEE Transactions on Parallel and Distributed Systems 23.9 (2012): 1632-1642. 2. Gulisano, Vincenzo, et al. "BES: Differentially Private and Distributed Event Aggregation in Advanced Metering Infrastructures." Proceedings of the 2nd ACM International Workshop on Cyber-Physical System Security. ACM, 2016. 3. Gulisano, Vincenzo, Magnus Almgren, and Marina Papatriantafilou. "Online and scalable data validation in advanced metering infrastructures." IEEE PES Innovative Smart Grid Technologies, Europe. IEEE, 2014. 4. Gulisano, Vincenzo, Magnus Almgren, and Marina Papatriantafilou. "METIS: a two-tier intrusion detection system for advanced metering infrastructures." International Conference on Security and Privacy in Communication Systems. Springer International Publishing, 2014. 5. Yousefi, Saleh, Mahmoud Siadat Mousavi, and Mahmood Fathy. "Vehicular ad hoc networks (VANETs): challenges and perspectives." 2006 6th International Conference on ITS Telecommunications. IEEE, 2006. 6. El Zarki, Magda, et al. "Security issues in a future vehicular network." European Wireless. Vol. 2. 2002. 7. Georgiadis, Giorgos, and Marina Papatriantafilou. "Dealing with storage without forecasts in smart grids: Problem transformation and online scheduling algorithm." Proceedings of the 29th Annual ACM Symposium on Applied Computing. ACM, 2014. 8. Fu, Zhang, et al. "Online temporal-spatial analysis for detection of critical events in Cyber-Physical Systems." Big Data (Big Data), 2014 IEEE International Conference on. IEEE, 2014. 116 AMIs AMIs Privacy AMIs Data validation AMIs Intrusion detection VNs VNs Smart Grid Anomaly detection Smart Grid
Bibliography 9. Arasu, Arvind, et al. "Linear road: a stream data management benchmark." Proceedings of the Thirtieth international conference on Very large data bases-Volume 30. VLDB Endowment, 2004. 10. Lv, Yisheng, et al. "Traffic flow prediction with big data: a deep learning approach." IEEE Transactions on Intelligent Transportation Systems 16.2 (2015): 865-873. 11. Grochocki, David, et al. "AMI threats, intrusion detection requirements and deployment recommendations." Smart Grid Communications (SmartGridComm), 2012 IEEE Third International Conference on. IEEE, 2012. 12. Molina-Markham, Andrés, et al. "Private memoirs of a smart meter." Proceedings of the 2nd ACM workshop on embedded sensing systems for energy-efficiency in building. ACM, 2010. 13. Gulisano, Vincenzo, et al. "Streamcloud: A large scale data streaming system." Distributed Computing Systems (ICDCS), 2010 IEEE 30th International Conference on. IEEE, 2010. 14. Stonebraker, Michael, Uǧur Çetintemel, and Stan Zdonik. "The 8 requirements of real-time stream processing." ACM SIGMOD Record 34.4 (2005): 42-47. 15. Bonomi, Flavio, et al. "Fog computing and its role in the internet of things." Proceedings of the first edition of the MCC workshop on Mobile cloud computing. ACM, 2012. 16. Himmelsbach, Michael, et al. "LIDAR-based 3D object perception." Proceedings of 1st international workshop on cognition for technical systems. Vol. 1. 2008. 117 VNsBenchmark VNs ML/NN AMIs AMIs Parallel/Distr. SPE Streaming basics Fog architectures Lidar sensor
Bibliography 17. Geiger, Andreas, et al. "Vision meets robotics: The KITTI dataset." The International Journal of Robotics Research (2013): 0278364913491297. 18. Gulisano, Vincenzo Massimiliano. StreamCloud: An Elastic Parallel-Distributed Stream Processing Engine. Diss. Informatica, 2012. 19. Cardellini, Valeria, et al. "Optimal operator placement for distributed stream processing applications." Proceedings of the 10th ACM International Conference on Distributed and Event-based Systems. ACM, 2016. 20. Costache, Stefania, et al. "Understanding the Data-Processing Challenges in Intelligent Vehicular Systems." Proceedings of the 2016 IEEE Intelligent Vehicles Symposium (IV16). 21. Cormode, Graham. "The continuous distributed monitoring model." ACM SIGMOD Record 42.1 (2013): 5-14. 22. Giatrakos, Nikos, Antonios Deligiannakis, and Minos Garofalakis. "Scalable Approximate Query Tracking over Highly Distributed Data Streams." Proceedings of the 2016 International Conference on Management of Data. ACM, 2016. 23. Gulisano, Vincenzo, et al. "Streamcloud: An elastic and scalable data streaming system." IEEE Transactions on Parallel and Distributed Systems 23.12 (2012): 2351-2365. 24. Shah, Mehul A., et al. "Flux: An adaptive partitioning operator for continuous query systems." Data Engineering, 2003. Proceedings. 19th International Conference on. IEEE, 2003. 118 Dataset / Benchmark for VNs Parallel/Distr. SPE Scheduling / Operator placement Streaming in VNs Streaming basics Approximation Approximation Parallel/Distr. SPE Parallel/Distr. Streaming analysis
Bibliography 25. Cederman, Daniel, et al. "Brief announcement: concurrent data structures for efficient streaming aggregation." Proceedings of the 26th ACM symposium on Parallelism in algorithms and architectures. ACM, 2014. 26. Ji, Yuanzhen, et al. "Quality-driven processing of sliding window aggregates over out-of-order data streams." Proceedings of the 9th ACM International Conference on Distributed Event-Based Systems. ACM, 2015. 27. Ji, Yuanzhen, et al. "Quality-driven disorder handling for concurrent windowed stream queries with shared operators." Proceedings of the 10th ACM International Conference on Distributed and Event-based Systems. ACM, 2016. 28. Gulisano, Vincenzo, et al. "Scalejoin: A deterministic, disjoint-parallel and skew-resilient stream join." Big Data (Big Data), 2015 IEEE International Conference on. IEEE, 2015. 29. Ottenwälder, Beate, et al. "MigCEP: operator migration for mobility driven distributed complex event processing." Proceedings of the 7th ACM international conference on Distributed event-based systems. ACM, 2013. 30. De Matteis, Tiziano, and Gabriele Mencagli. "Keep calm and react with foresight: strategies for low-latency and energy-efficient elastic data stream processing." Proceedings of the 21st ACM SIGPLAN Symposium on Principles and Practice of Parallel Programming. ACM, 2016. 31. Balazinska, Magdalena, et al. "Fault-tolerance in the Borealis distributed stream processing system." ACM Transactions on Database Systems (TODS) 33.1 (2008): 3. 32. Castro Fernandez, Raul, et al. "Integrating scale out and fault tolerance in stream processing using operator state management." Proceedings of the 2013 ACM SIGMOD international conference on Management of data. ACM, 2013. 119 Out-of-order streams Shared-memory parallel streaming Out-of-order streams Shared-memory stream joins Load balancing Elasticity Fault tolerance Fault tolerance Parallel streaming
Thank you! Questions?

Crash course on data streaming (with examples using Apache Flink)

  • 1.
    An introduction to streamprocessing Vincenzo Gulisano vincenzo.gulisano@chalmers.se
  • 2.
    Agenda • Lecture 1 •Part 1 – Introduction and basics • Part 2 – Distributed and parallel analysis • Lecture 2 • Part 3 – Correctness guarantees • Part 4 – One size DOES NOT fit all in performance 2
  • 3.
  • 4.
    IoT enables forincreased awareness, security, power-efficiency, ... large IoT systems are complex traditional data analysis techniques alone are not adequate! 4
  • 5.
    Advanced Metering Infrastructures(AMIs) Smart Grids Vehicular Networks (VNs) • demand-response • scheduling • micro-grids • detection of medium size blackouts • detection of non-technical losses • ... • autonomous driving • platooning • accident detection • variable tolls • congestion monitoring • ... IoT enables for increased awareness, security, power-efficiency, ... 5
  • 6.
    AMIs VNs large IoTsystems are complex Characteristics: 1. edge location 2. location awareness 3. low latency 4. geographical distribution 5. large-scale 6. support for mobility 7. real-time interactions 8. predominance of wireless 9. heterogeneous 10. interoperability / federation 11. interaction with the cloud 6
  • 7.
    traditional data analysistechniques alone are not adequate! 1. does the infrastructure allow for billions of readings per day to be transferred continuously? 2. the latency incurred while transferring data, does that undermine the utility of the analysis? 3. is it secure to concentrate all the data in a single place? 4. is it smart to give away fine-grained data? 7
  • 8.
    A small exampleof what fine-grained data can reveal... 8 source: Andrés Molina-Markham, Prashant Shenoy, Kevin Fu, Emmanuel Cecchet, and David Irwin. 2010. Private memoirs of a smart meter. In Proceedings of the 2nd ACM Workshop on Embedded Sensing Systems for Energy-Efficiency in Building (BuildSys ’10). Association for Computing Machinery, New York, NY, USA, 61–66. DOI:https://doi.org/10.1145/1878431.1878446
  • 9.
    a better answerwe leverage the entire infrastructure! 9 Traditional analysis techniques cannot address all the challenges in these setups That’s where stream processing can make the difference!
  • 10.
  • 11.
    Main Memory Motivation DBMS vs.DSMS Disk 1 Data Query Processing 3 Query results 2 Query Main Memory Query Processing Continuous Query Data Query results 11
  • 12.
    Before we start...about data streaming and Stream Processing Engines (SPEs) 12 An incomplete, non-sorted list of SPEs: time Borealis The Aurora Project STanfordstREamdatAManager NiagaraCQ COUGAR StreamCloud Covering all of them / discussing which use cases are best for each one out of scope...
  • 13.
    All documentation images/ code snippets in the following are taken from: https://flink.apache.org/ 13
  • 14.
    data stream: unboundedsequence of tuples sharing the same schema 14 Example: vehicles’ speed reports time Field Field vehicle id text time (secs) text speed (Km/h) double X coordinate double Y coordinate double A 8:00 55.5 X1 Y1 Let’s assume each source (e.g., vehicle) produces and delivers a timestamp-sorted stream A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2
  • 15.
    continuous query (orsimply query): Directed Acyclic Graph (DAG) of streams and operators 15 OP OP OP OP OP OP OP source op (1+ out streams) sink op (1+ in streams) stream op (1+ in, 1+ out streams)
  • 16.
    data streaming operators Twomain types: • Stateless operators • do not maintain any state • one-by-one processing • if they maintain some state, such state does not evolve depending on the tuples being processed • Stateful operators • maintain a state that evolves depending on the tuples being processed • produce output tuples that depend on multiple input tuples 16 OP OP
  • 17.
    stateless operators 17 Filter ... Map Union ... Filter /route tuples based on one (or more) conditions Transform each tuple Merge multiple streams (with the same schema) into one
  • 18.
  • 19.
    stateful operators 19 Aggregate information frommultiple tuples (e.g., max, min, sum, ...) Join tuples coming from 2 streams given a certain predicate Aggregate Join
  • 20.
    Wait a moment! ifstreams are unbounded, how can we aggregate or join? 20
  • 21.
    windows and statefulanalysis Stateful operations are done over windows: • Time-based (e.g., tuples in the last 10 minutes) • Tuple-based (e.g., given the last 50 tuples) 21 time [8:00,9:00) [8:20,9:20) [8:40,9:40) Example of time-based window of size 1 hour and advance 20 minutes How many tuple in a window? Which time period does a window span?
  • 22.
    time-based sliding windowaggregation (count) 22 Counter: 4 time [8:00,9:00) 8:05 8:15 8:22 8:45 9:05 Output: 4 Counter: 1 Counter: 2 Counter: 3 Counter: 3 time 8:05 8:15 8:22 8:45 9:05 [8:20,9:20)
  • 23.
    time-based sliding windowjoining 23 t1 t2 t3 t4 t1 t2 t3 t4 R S Sliding window Window size WS WSWR Predicate P
  • 24.
    24 windows and statefulanalysis See the part about Distributed and parallel analysis to understand what this is
  • 25.
    25 basic operators anduser-defined operators Besides a set of basic operators, SPEs usually allow the user to define ad-hoc operators (e.g., when existing aggregation are not enough)
  • 26.
    Part 2 –Distributed and parallel analysis 26
  • 27.
    sample query For eachvehicle, raise an alert if the speed of the latest report is more than 2 times higher than its average speed in the last 30 days. 27 time A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2
  • 28.
    28 Remove unused fields Map Field vehicle id time(secs) speed (Km/h) X coordinate Y coordinate Field vehicle id time (secs) speed (Km/h) Compute average speed for each vehicle during the last 30 days Aggregate Field vehicle id time (secs) avg speed (Km/h) Join Check condition Filter Field vehicle id time (secs) speed (Km/h) Join on vehicle id Field vehicle id time (secs) avg speed (Km/h) speed (Km/h) sample query
  • 29.
    29 M A JF sample query Notice: • the same semantics can be defined in several ways (using different operators and composing them in different ways) • Using many basic building blocks can ease the task of distributing and parallelizing the analysis
  • 30.
    M A JF At the edge, in parallel at each vehicle M A J F 30
  • 31.
    M A JFAt the cloud 31
  • 32.
    Distributed, edge +fog + cloud A J M F 32
  • 33.
    DISCLAIMER / BEFORE WESTART I am using this symbol a lot in the following This is not necessarily a physical computer, but a “processing unit”. That is, a computer or a CPU or a core in a CPU or a thread... 33
  • 34.
  • 35.
    Research topics (initially studiedfor centralized executions) • Approximation • Due to limited resources, how to approximate results to reduce space or time complexity? • Load Shedding • If close to saturation, which information to discard in order to maximize the Quality of Service? • Operator scheduling • In which order to run operators in order to minimize an overhead / maximize a certain metric? 35
  • 36.
  • 37.
    Distributed execution • LoadBalancing: how to distribute / place operators to nodes in order to maximize throughput? • Offline load balancing • Dynamic load balancing 37
  • 38.
    Distributed execution Fault Tolerance • Existing techniques: • Active standby • Passive standby • Upstream backup • Guarantees: • Precise • “Eventual” • Gap 38
  • 39.
    Distributed execution –Fault Tolerance • Active standby Primary Replica AggOP Agg 39 Cost: Recovery Time:
  • 40.
    Distributed execution –Fault Tolerance • Passive standby Primary Replica AggOP Agg Periodic checkpoints 40 Cost: Recovery Time:
  • 41.
    Distributed execution –Fault Tolerance • Passive standby Primary 41 Cost: Recovery Time: AggOP Periodic checkpoints Disk
  • 42.
  • 43.
    Distributed execution –Fault Tolerance • Upstream Backup AggOP Buffer 43 Cost: Recovery Time:
  • 44.
  • 45.
    Parallel-Distributed execution • Challenges: •Semantic Transparency • Results produced by parallel-distributed execution equal to centralized or distributed execution • Throughput maximization • How to maximize throughput when distributing and parallelizing data streaming operators? 45
  • 46.
    • General approach OPAOPB 46 Parallel execution
  • 47.
    • General approach R:Router M: Merger OPA OPB OPA RM Thread 1 OPA RM Thread m … 47 Parallel execution OPA RM Thread 1 Thread 2 Thread 3 OPA RM Thread 1 Thread 2 OPA RM Thread 1 Thread 2 …
  • 48.
    • General approach R:Router M: Merger OPA OPB OPA RM Thread 1 OPA RM Thread m … 48 Parallel execution OPA RM Thread 1 Thread 2 Thread 3 OPA RM Thread 1 Thread 2 OPA RM Thread 1 Thread 2 …
  • 49.
    • General approach OPAOPB OPA RM Thread 1 OPA RM Thread m … Subcluster A 49 Parallel execution R: Router M: Merger
  • 50.
    • General approach OPARM Thread 1 OPA RM Thread m … OPA OPB OPB RM Thread 1 OPB RM Thread n … Subcluster A Subcluster B 50 Parallel execution R: Router M: Merger
  • 51.
    • General approach OPARM Thread 1 OPA RM Thread m … OPA OPB OPB RM Thread 1 OPB RM Thread n … 51 Parallel execution R: Router M: Merger
  • 52.
    • Stateful operators:Semantic awareness • Aggregate: count within last hour, group-by vehicle id Previous Subcluster R… R… M Agg1 M Agg2 M Agg3 … … … Vehicle A 52 Parallel execution
  • 53.
    A 8:00 55.5X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2 • Stateful operators: Semantic awareness • Aggregate: count within last hour, group-by vehicle id Previous Subcluster R… R… M Agg1 M Agg2 M Agg3 … … … Vehicle A 53 Parallel execution
  • 54.
    Parallel execution • Dependingon the stateful operator semantic: • Partition input stream into keys • Each key is processed by 1 thread • # keys >> # threads/nodes 54
  • 55.
    Parallel execution • Dependingon the stateful operator semantic: • Partition input stream into keys • Each key is processed by 1 thread • # keys >> # threads/nodes Keys domain Agg1 Agg2 Agg3 A D E B C F 55
  • 56.
    Parallel execution • Dependingon the stateful operator semantic: • Partition input stream into keys • Each key is processed by 1 thread • # keys >> # threads/nodes Keys domain Agg1 Agg2 Agg3 A D E B C F 56
  • 57.
    Parallel execution • Example: OP1OP2 OP4 OP6OP3 OP5 Group-by A1 Group-by A2 57
  • 58.
    Parallel execution • Example: –90 threads available OP1 OP2 OP4 OP6OP3 OP5 58 Group-by A1 Group-by A2
  • 59.
    Parallel execution OP1 OP2OP4 OP6OP3 OP5 59 How to parallelize/distribute the query to maximize throughput? • Example: – 90 threads available Group-by A1 Group-by A2
  • 60.
    • Parallelization Strategies Parallelexecution Full query at all threads 1 operator per threads1+ operators per threads 60
  • 61.
    Parallel execution … OP1 OP2OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 90 Full query at all threads 61 Group-by A1 Group-by A2
  • 62.
    Parallel execution OP1 OP2OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 90 902 902 … Full query at all threads 62 Group-by A1 Group-by A2
  • 63.
    Parallel execution … OP1 OP2OP6 OP1 OP2 OP6 Thread 1 Thread 15 Thread 16 Thread 30 Thread 76 Thread 90 1 operator per thread … … 63
  • 64.
    Parallel execution OP1 OP2OP6 OP1 OP2 OP6 Thread 1 Thread 15 Thread 16 Thread 30 Thread 76 Thread 90 15… … … 1 operator per thread 64
  • 65.
    Parallel execution OP1 OP2OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 30 Thread 31 Thread 60 Thread 61 Thread 90 … … … 1+ operators per thread 65 Group-by A1 Group-by A2
  • 66.
    Parallel execution … OP1 OP2OP4 OP6OP3 OP5 OP1 OP2 OP4 OP6OP3 OP5 Thread 1 Thread 30 Thread 31 Thread 60 Thread 61 Thread 90 30 … … 1+ operators per thread 66 Group-by A1 Group-by A2
  • 67.
    Parallel execution ofstreaming operator • General approach, referred to as shared-nothing works for both parallel and distributed execution • We use threads the example, but they could be processes or even nodes… 67 OP1 OP2 OP1 OP2 Thread 1 Thread 15 Thread 16 Thread 30 … … OP1 OP2 OP1 OP2 Node 1 Node 15 Node 16 Node 30 … …
  • 68.
  • 69.
    Elastic execution 69 M AJ F … … + +
  • 70.
    Elastic execution 70 M AJ F … … - -
  • 71.
    Elastic execution • Keyfor Cloud environment • Elasticity in various forms • Variable number of threads • Variable number of nodes, scaling with queries • Variable number of nodes, scaling with operators 71
  • 72.
    Why is itcomplicated? • Transfer state between nodes • Load balancing algorithm • Minimum parallelization unit  key • Transfer keys between instances 72
  • 73.
    Elastic execution • Statetransfer challenging for stateful operators A B time 73 Tuples referring to car A
  • 74.
    Elastic execution • WindowRecreation Protocol A B time A A B Send to A Send to B + No communication between nodes - Completion time proportional to window size 74
  • 75.
    Elastic execution • StateRecreation Protocol A B time B B B Copy to B + Minimizes completion time - Communication between nodes 75
  • 76.
    Part 3 –Correctness guarantees 76
  • 77.
    77 OP1 OP2 Thread OP1 OP2 ThreadThread OP1 OP2 ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2 Node Node ProcessProcess Thread Thread OP1 OP2
  • 78.
    Correct execution • Whatdoes correct execution means in the context of SPEs? • Many definitions, some more formal than others StreamCloud: An Elastic and Scalable Data Streaming System. Vincenzo Gulisano, Ricardo Jimenez- Peris, Marta Patiño-Martinez, Claudio Soriente, Patrick Valduriez. IEEE Transactions on Parallel and Distributed Processing (TPDS) Viper: A Module for Communication-Layer Determinism and Scaling in Low-Latency Stream Processing. Ivan Walulya, Dimitris Palyvos-Giannas, Yiannis Nikolakopoulos, Vincenzo Gulisano, Marina Papatriantafilou and Philippas Tsigas. Elsevier Future Generation Computer Systems Journal. 2018.
  • 79.
    Correct execution A querythat simply discards all data, always, would most likely be in accordance with previous definitions. Would you say it’s analysis is correct? In general (but still in a vague form): Correctness implies each tuple is processed according to the analysis’ semantics (according to its stateless and stateful operators) and is not affected by execution- and implementation-related aspects
  • 80.
    Why is itcomplicated? • It depends on a mix of external / internal factors. • We are going to look at some examples
  • 81.
    Unsorted input streams Counter:4 time [8:00,9:00) 8:05 8:15 8:22 8:45 9:05 Output: 4 Counter: 1 Counter: 2 Counter: 3 Counter: 3 time 8:05 8:15 8:22 8:45 9:05 [8:20,9:20) What if the tuple with timestamp 8:45 arrives after the one with timestamp 9:05 and we have already produced the result?
  • 82.
    Unsorted input streams– possible solutions • Sort at the source, but... • is it always possible? • How late can a single tuple be? • What about latency? • Allow for late arrivals and withhold (temporary) results or produce correction tuples, but... • this is in turn creating disordered output streams!
  • 83.
    Sorted input streams+ distributed/parallel execution M M A A A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2 A 8:00 55.5 X1 Y1 A 8:07 34.3 X3 Y3 A 8:03 70.3 X2 Y2 What if tuple with timestamp 8:00 arrives after tuple with timestamp 8:07?
  • 84.
    Sorted input streams+ distributed/parallel execution – possible solutions • merge-sort (e.g., based on the timestamp) input streams deterministically • What if a stream is extremely slow / missing tuples? • What is the sorting overhead? Merge
  • 85.
    A general solutionfor order-insensitive analysis  Watermarks • Notice: • If the analysis is order-insensitive, the result for a given window is correct as long as all tuples contributing to it are taken into account. The order does not really matter • Keep processing tuples even if they are not in order • Produce result for a certain window only when sure future input tuples will no longer contribute to such window • Key idea: • let tuples be disordered (from the source or because of parallel execution) • Include special tuples (watermarks) that are sorted and distinguish timestamps before / after them
  • 89.
    Example The notionof time is based on the timestamps carried by tuples themselves This specifies timestamps and watermarks Watermarks are used to decide when the result for a certain window can be produced
  • 90.
    Part 4 – Onesize DOES NOT fit all in performance 90
  • 91.
    Phillip B. Gibbons,Keynote Talk IPDPS’15 91
  • 92.
  • 93.
    Recap on streamjoins 93 Data stream: unbounded sequence of tuples t1 t2 t3 t4 t1 t2 t3 t4 t1 t2 t3 t4 R S Sliding window Window size WS WSWR Predicate P
  • 94.
    Why parallel streamjoins? • WS = 600 seconds • R receives 500 tuples/second • S receives 500 tuples/second • WR will contain 300,000 tuples • WS will contain 300,000 tuples • Each new tuple from R gets compared with all the tuples in WS • Each new tuple from S gets compared with all the tuples in WR … 300,000,000 comparisons/second! t1 t2 t3 t4 t1 t2 t3 t4 R S WSWR 94
  • 95.
    Which are thechallenges of a parallel stream join? Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism 95
  • 96.
    The 3-step procedure(sequential stream join) For each incoming tuple t: 1. compare t with all tuples in opposite window given predicate P 2. add t to its window 3. remove stale tuples from t’s window Add tuples to S Add tuples to R Prod R Prod S Consume resultsConsPU 96 We assume each producer delivers tuples in timestamp order
  • 97.
    The 3-step procedure,is it enough? Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism 97 t1 t2 t1 t2 R S WSWR t3 t1 t2 t1 t2 R S WSWR t4 t3
  • 98.
    Enforcing determinism insequential stream joins • Next tuple to process = earliest(tS,tR) • The earliest(tS,tR) tuple is referred to as the next ready tuple • Process ready tuples in timestamp order  Determinism PU tS tR 98
  • 99.
    Deterministic 3-step procedure Pickthe next ready tuple t: 1. compare t with all tuples in opposite window given predicate P 2. add t to its window 3. remove stale tuples from t’s window Add tuples to S Add tuples to R Prod R Prod S Consume resultsConsPU 99
  • 100.
    Shared-nothing parallel streamjoin (state-of-the-art) Prod R Prod S PU1 PU2 PUN… Cons Add tuple to PUi S Add tuple to PUi R Consume results Pick the next ready tuple t: 1. compare t with all tuples in opposite window given P 2. add t to its window 3. remove stale tuples from t’s window Chose a PU Chose a PU Take the next ready output tuple Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism 100 Merge
  • 101.
    Shared-nothing parallel streamjoin (state-of-the-art) Prod R Prod S PU1 PU2 PUN … 101 enqueue() dequeue() ConsMerge
  • 102.
    From coarse-grained tofine-grained synchronization Prod R Prod S PU1 PU2 PUN … Cons 102
  • 103.
    ScaleGate 103 addTuple(tuple,sourceID) allows a tuplefrom sourceID to be merged by ScaleGate in the resulting timestamp-sorted stream of ready tuples. getNextReadyTuple(readerID) provides to readerID the next earliest ready tuple that has not been yet consumed by the former. https://github.com/dcs-chalmers/ScaleGate_Java
  • 104.
    ScaleGate Anatomy (1) •Inspired from lock-free skip lists • randomized height of nodes • expected cost for search/insertion O(logN) • Search by traversing from higher to lower levels 104
  • 105.
    ScaleGate Anatomy (2) •Reader-local view of "head" (also has minimum ts for that reader) • Flagging mechanism: • If "head" is not flagged can be safely returned • Flag the last written tuple of each source • Nodes free to be garbage-collected after every reader passes (almost...) head0 head1 105
  • 106.
    ScaleJoin Prod R Prod S PU1 PU2 PUN … Cons Add tuple SGin Addtuple SGin Get next ready output tuple from SGout Get next ready input tuple from SGin 1. compare t with all tuples in opposite window given P 2. add t to its window in a round-robin fashion 3. remove stale tuples from t’s window 106 SGin SGout Steps for PU
  • 107.
    107 t1 t2 R S WR t3 t4 R S t4 t1 WR RS t4 t2 WR R S t4 WR t3 Sequential stream join: ScaleJoin with 3 PUs: ScaleJoin (example)
  • 108.
    ScaleJoin Prod R Prod S PU1 PU2 PUN … Cons Add tuple SGin Addtuple SGin Get next ready output tuple from SGout 108 SGin SGout Scalability High throughput Low latency Disjoint parallelism Skew resilience Determinism Prod S Prod S Prod R Get next ready input tuple from SGin 1. compare t with all tuples in opposite window given P 2. add t to its window in a round robin fashion 3. remove stale tuples from t’s window Steps for PUi OPA RM Thread 1 OPA RM Thread m … OPB RM Thread 1 OPB RM Thread n …
  • 109.
    ScaleJoin Scalability –comparisons/second 109 Number of PUs
  • 110.
    ScaleJoin latency –milliseconds 110 Number of PUs
  • 111.
    State of theart solution at the time ScaleJoin was published 111
  • 112.
  • 113.
    Millions of years ofevolution Millions of sensors • Store information • Iterate multiple times over data • Think, do not rush through decisions • ”Hard-wired” routines • Real-time decisions • High-throughput / low-latency Should I (really) have an extra piece of cake? Danger!!! Run!!! Humans 113
  • 114.
    Years / Decades ofevolution Millions of sensors What traffic congestion patterns can I observe frequently? Don’t take over, car in opposite lane! • Store information • Iterate multiple times over data • Think, do not rush through decisions Databases, data mining techniques... Data streaming, distributed and parallel analysis • Continuous analysis • Real-time decisions • High-throughput / low-latency Computers (cyber-physical / IoT systems) 114
  • 115.
  • 116.
    Bibliography (with somelabel) 1. Zhou, Jiazhen, Rose Qingyang Hu, and Yi Qian. "Scalable distributed communication architectures to support advanced metering infrastructure in smart grid." IEEE Transactions on Parallel and Distributed Systems 23.9 (2012): 1632-1642. 2. Gulisano, Vincenzo, et al. "BES: Differentially Private and Distributed Event Aggregation in Advanced Metering Infrastructures." Proceedings of the 2nd ACM International Workshop on Cyber-Physical System Security. ACM, 2016. 3. Gulisano, Vincenzo, Magnus Almgren, and Marina Papatriantafilou. "Online and scalable data validation in advanced metering infrastructures." IEEE PES Innovative Smart Grid Technologies, Europe. IEEE, 2014. 4. Gulisano, Vincenzo, Magnus Almgren, and Marina Papatriantafilou. "METIS: a two-tier intrusion detection system for advanced metering infrastructures." International Conference on Security and Privacy in Communication Systems. Springer International Publishing, 2014. 5. Yousefi, Saleh, Mahmoud Siadat Mousavi, and Mahmood Fathy. "Vehicular ad hoc networks (VANETs): challenges and perspectives." 2006 6th International Conference on ITS Telecommunications. IEEE, 2006. 6. El Zarki, Magda, et al. "Security issues in a future vehicular network." European Wireless. Vol. 2. 2002. 7. Georgiadis, Giorgos, and Marina Papatriantafilou. "Dealing with storage without forecasts in smart grids: Problem transformation and online scheduling algorithm." Proceedings of the 29th Annual ACM Symposium on Applied Computing. ACM, 2014. 8. Fu, Zhang, et al. "Online temporal-spatial analysis for detection of critical events in Cyber-Physical Systems." Big Data (Big Data), 2014 IEEE International Conference on. IEEE, 2014. 116 AMIs AMIs Privacy AMIs Data validation AMIs Intrusion detection VNs VNs Smart Grid Anomaly detection Smart Grid
  • 117.
    Bibliography 9. Arasu, Arvind,et al. "Linear road: a stream data management benchmark." Proceedings of the Thirtieth international conference on Very large data bases-Volume 30. VLDB Endowment, 2004. 10. Lv, Yisheng, et al. "Traffic flow prediction with big data: a deep learning approach." IEEE Transactions on Intelligent Transportation Systems 16.2 (2015): 865-873. 11. Grochocki, David, et al. "AMI threats, intrusion detection requirements and deployment recommendations." Smart Grid Communications (SmartGridComm), 2012 IEEE Third International Conference on. IEEE, 2012. 12. Molina-Markham, Andrés, et al. "Private memoirs of a smart meter." Proceedings of the 2nd ACM workshop on embedded sensing systems for energy-efficiency in building. ACM, 2010. 13. Gulisano, Vincenzo, et al. "Streamcloud: A large scale data streaming system." Distributed Computing Systems (ICDCS), 2010 IEEE 30th International Conference on. IEEE, 2010. 14. Stonebraker, Michael, Uǧur Çetintemel, and Stan Zdonik. "The 8 requirements of real-time stream processing." ACM SIGMOD Record 34.4 (2005): 42-47. 15. Bonomi, Flavio, et al. "Fog computing and its role in the internet of things." Proceedings of the first edition of the MCC workshop on Mobile cloud computing. ACM, 2012. 16. Himmelsbach, Michael, et al. "LIDAR-based 3D object perception." Proceedings of 1st international workshop on cognition for technical systems. Vol. 1. 2008. 117 VNsBenchmark VNs ML/NN AMIs AMIs Parallel/Distr. SPE Streaming basics Fog architectures Lidar sensor
  • 118.
    Bibliography 17. Geiger, Andreas,et al. "Vision meets robotics: The KITTI dataset." The International Journal of Robotics Research (2013): 0278364913491297. 18. Gulisano, Vincenzo Massimiliano. StreamCloud: An Elastic Parallel-Distributed Stream Processing Engine. Diss. Informatica, 2012. 19. Cardellini, Valeria, et al. "Optimal operator placement for distributed stream processing applications." Proceedings of the 10th ACM International Conference on Distributed and Event-based Systems. ACM, 2016. 20. Costache, Stefania, et al. "Understanding the Data-Processing Challenges in Intelligent Vehicular Systems." Proceedings of the 2016 IEEE Intelligent Vehicles Symposium (IV16). 21. Cormode, Graham. "The continuous distributed monitoring model." ACM SIGMOD Record 42.1 (2013): 5-14. 22. Giatrakos, Nikos, Antonios Deligiannakis, and Minos Garofalakis. "Scalable Approximate Query Tracking over Highly Distributed Data Streams." Proceedings of the 2016 International Conference on Management of Data. ACM, 2016. 23. Gulisano, Vincenzo, et al. "Streamcloud: An elastic and scalable data streaming system." IEEE Transactions on Parallel and Distributed Systems 23.12 (2012): 2351-2365. 24. Shah, Mehul A., et al. "Flux: An adaptive partitioning operator for continuous query systems." Data Engineering, 2003. Proceedings. 19th International Conference on. IEEE, 2003. 118 Dataset / Benchmark for VNs Parallel/Distr. SPE Scheduling / Operator placement Streaming in VNs Streaming basics Approximation Approximation Parallel/Distr. SPE Parallel/Distr. Streaming analysis
  • 119.
    Bibliography 25. Cederman, Daniel,et al. "Brief announcement: concurrent data structures for efficient streaming aggregation." Proceedings of the 26th ACM symposium on Parallelism in algorithms and architectures. ACM, 2014. 26. Ji, Yuanzhen, et al. "Quality-driven processing of sliding window aggregates over out-of-order data streams." Proceedings of the 9th ACM International Conference on Distributed Event-Based Systems. ACM, 2015. 27. Ji, Yuanzhen, et al. "Quality-driven disorder handling for concurrent windowed stream queries with shared operators." Proceedings of the 10th ACM International Conference on Distributed and Event-based Systems. ACM, 2016. 28. Gulisano, Vincenzo, et al. "Scalejoin: A deterministic, disjoint-parallel and skew-resilient stream join." Big Data (Big Data), 2015 IEEE International Conference on. IEEE, 2015. 29. Ottenwälder, Beate, et al. "MigCEP: operator migration for mobility driven distributed complex event processing." Proceedings of the 7th ACM international conference on Distributed event-based systems. ACM, 2013. 30. De Matteis, Tiziano, and Gabriele Mencagli. "Keep calm and react with foresight: strategies for low-latency and energy-efficient elastic data stream processing." Proceedings of the 21st ACM SIGPLAN Symposium on Principles and Practice of Parallel Programming. ACM, 2016. 31. Balazinska, Magdalena, et al. "Fault-tolerance in the Borealis distributed stream processing system." ACM Transactions on Database Systems (TODS) 33.1 (2008): 3. 32. Castro Fernandez, Raul, et al. "Integrating scale out and fault tolerance in stream processing using operator state management." Proceedings of the 2013 ACM SIGMOD international conference on Management of data. ACM, 2013. 119 Out-of-order streams Shared-memory parallel streaming Out-of-order streams Shared-memory stream joins Load balancing Elasticity Fault tolerance Fault tolerance Parallel streaming
  • 120.

Editor's Notes

  • #2 Please ask questions / interact the way you prefer
  • #9 and for vehicles, a sensor about the seat might tell you whether the driver is pregnant...
  • #10 this is not an exhaustive discussion...
  • #13 A like to be hands-on...
  • #19 say these are some examples...
  • #23 About the ordering we will come back later
  • #95 Make the computation at the blackboard
  • #96 
  • #97 So, let’s see how stream joins are actually implemented Example But wait, what happens if we get tuples in another order? Mmmmhhh
  • #98 So, let’s see how stream joins are actually implemented Example But wait, what happens if we get tuples in another order? Mmmmhhh
  • #100 Ok, so deterministic 3-step procedure looks like this Now let’s try to parallelize this, and let’s do it as it has been done before
  • #101 Ok, so deterministic 3-step procedure looks like this Now let’s try to parallelize this, and let’s do it as it has been done before First, we need to do more operations, this affects the latency for sure But what’s worst is that we introduce a new bottleneck, the output thread and ready tuples And this actually breaks disjoint parallelism too… Finally, is not really skew-resilient So, what’s the problem? Are we doing it in the wrong way or are we forgetting something? Look at the data structures, we parallelize by parallelizing the computation, but what about the communication?
  • #102 The queues! We parallelized the computation, but overlooked the communication We are still using a queue with its methods enqueue and dequeue
  • #103 Let’s be creative, let’s assume they actually share something more powerful, that let’s them communicate and synchronize in a more efficient way What do we want from such communication and synchronization ds?
  • #107 Then we can do something like that…
  • #109 Here we can basically discuss why this addresses the different challenges, one by one… It gets even better, you can even have multiple physical producers for S and R!!!! And this is actually important because in the real world it will be like that!
  • #110 Here we want to check the number of comparisons per second sustained by ScaleJoin After checking the ones obtained for a single thread, we computed the expected max and then observed ones for 3 different window sizes As you can see… Up to 4 billion comparison/second!
  • #111 This is the processing latency we get (in milliseconds) As you can see, even when we have 48 PUs (and notice that this means more threads than cores, since we have also injectors and receivers…) less than 60 – Actually, when we do not spawn too many threads we are talking of 30 milliseconds Might seem counterintuitive that latency grows with PU, but that’s because of determinism!
  • #115 maybe say the DB part is a bit of an oversimplifaction