Stream Processing with
 Apache Flink Maximilian Michels Flink PMC member mxm@apache.org @stadtlegende
The Agenda ▪ What is Apache Flink? ▪ Streaming 101 ▪ The Flink Engine ▪ A Quick Look at the API 2
Apache Flink ▪ A distributed open-source data analysis framework ▪ True streaming at its core ▪ Streaming & Batch API 3 Historic data Kafka,	RabbitMQ,	... HDFS,	JDBC,	... Event	logs ETL, Graphs,
 Machine Learning
 Relational, … Low latency,
 windowing, aggregations, ...
Organizations at Flink Forward 4
Featured in 5
Flink Community Top 5 Apache Big Data project in the Apache Software Foundation 500+ messages/month on the mailing list 8400+ commits 1500+ pull requests merged 950+ stars 510+ forks
Uses Cases for Flink 7
Use Case: Log File Analysis ▪ Load log files from a distributed file system ▪ Process them, sessionize according to the user id ▪ Write a view to the database or dump more data for further processing 8 • Process • Analyze • Aggregate
Use Case: Tweet Impressions 9 Continuous Stream of Tweets (each with a timestamp) ▪ How do we measure the importance of Tweets? • Total number of views • Views within a time period ▪ We need to process and aggregate Tweets! Max Marie Jonas Tim are tweeting.
Use Case: Tweet Impressions 10 Max Marie Jonas Tim are tweeting. Last minute Last hour Last day Impressions Impression Events Aggregation of Impressions Output More	at:	http://data-artisans.com/extending-the-yahoo-streaming-benchmark/
Streaming 101 11
Why Stream Processing? ▪ Most problems have streaming nature ▪ Stream processing gives lower latency ▪ Data volumes more easily tamed ▪ More predictable resource consumption 12 Event	stream batch (solved) event based
Challenges in Streaming ▪ Latency ▪ Throughput ▪ Fault-Tolerance ▪ Correctness ▪ Elements may be out-of-order ▪ Elements may be processed more than once 13
Windows ▪ A grouping of records according to time, count, or session, e.g. • Count: The last 100 records • Session: All records for user X • Time: All records of the last 2 minutes 14
Event Time ▪ Processing time: when data is processed ▪ Ingestion time: when data is loaded ▪ Event time: when data is generated ▪ Almost always, the three are different ▪ Event time helps to process out-of-order or to replay elements as they occurred 15
Event Time & Watermarks ▪ Elements arrives: How do we know what time it is? ▪ Processing time: take the hardware clock ▪ Event time: Watermarks ▪ Watermarks are timestamps ▪ No elements later than the timestamp are expected to arrive 16
Event Time & Watermarks 17 0 0 0 0 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 Watermark. Event Timewindow operator
Event Time & Watermarks 171 0 0 0 1 Watermark. Event Timewindow operator
Event Time & Watermarks 17 1 0 0 0 1 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 2 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 2 1 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 1 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 2 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 2 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 2 2 2 Watermark. Event Timewindow operator
Event Time & Watermarks 17 0 0 0 1 1 2 1 2 2 2 Watermark. Event Timewindow operator
18 Tumbling Windows of 4 Seconds 123412 4 59 9 0 20 20 22212326323321 26 35
18 Tumbling Windows of 4 Seconds 123412 4 59 9 0 20 20 22212326323321 26 35
18 Tumbling Windows of 4 Seconds 123412 4 59 9 20 20 22212326323321 26 35
0-3 18 Tumbling Windows of 4 Seconds 123412 4 59 9 20 20 22212326323321 26 35
0-3 18 Tumbling Windows of 4 Seconds 1 23412 4 59 9 20 20 22212326323321 26 35
0-3 18 Tumbling Windows of 4 Seconds 12 3412 4 59 9 20 20 22212326323321 26 35
0-3 18 Tumbling Windows of 4 Seconds 123 412 4 59 9 20 20 22212326323321 26 35
0-3 4-7 18 Tumbling Windows of 4 Seconds 123 412 4 59 9 20 20 22212326323321 26 35
0-3 4-7 18 Tumbling Windows of 4 Seconds 123 4 12 4 59 9 20 20 22212326323321 26 35
0-3 4-7 18 Tumbling Windows of 4 Seconds 123 4 1 2 4 59 9 20 20 22212326323321 26 35
0-3 4-7 18 Tumbling Windows of 4 Seconds 123 4 12 4 59 9 20 20 22212326323321 26 35
0-3 4-7 18 Tumbling Windows of 4 Seconds 123 4 12 4 59 9 20 20 22212326323321 26 35
4-7 18 Tumbling Windows of 4 Seconds 4 4 59 9 20 20 22212326323321 26 35
4-7 18 Tumbling Windows of 4 Seconds 4 59 9 20 20 22212326323321 26 35
4-7 18 Tumbling Windows of 4 Seconds 45 9 9 20 20 22212326323321 26 35
8-11 4-7 18 Tumbling Windows of 4 Seconds 45 9 9 20 20 22212326323321 26 35
8-11 4-7 18 Tumbling Windows of 4 Seconds 45 9 9 20 20 22212326323321 26 35
8-11 4-7 18 Tumbling Windows of 4 Seconds 45 9 9 20 20 22212326323321 26 35
8-11 18 Tumbling Windows of 4 Seconds 9 9 20 20 22212326323321 26 35
8-11 18 Tumbling Windows of 4 Seconds 9 20 20 22212326323321 26 35
20-23 8-11 18 Tumbling Windows of 4 Seconds 9 20 20 22212326323321 26 35
20-23 8-11 18 Tumbling Windows of 4 Seconds 9 20 20 22212326323321 26 35
20-23 8-11 18 Tumbling Windows of 4 Seconds 9 20 20 22212326323321 26 35
20-23 18 Tumbling Windows of 4 Seconds 20 20 22212326323321 26 35
20-23 18 Tumbling Windows of 4 Seconds 20 22212326323321 26 35
20-23 18 Tumbling Windows of 4 Seconds 20222123 26323321 26 35
24-27 20-23 18 Tumbling Windows of 4 Seconds 20222123 26323321 26 35
24-27 20-23 18 Tumbling Windows of 4 Seconds 20222123 26 323321 26 35
32-35 24-27 20-23 18 Tumbling Windows of 4 Seconds 20222123 26 323321 26 35
32-35 24-27 20-23 18 Tumbling Windows of 4 Seconds 20222123 26 3233 21 26 35
32-35 24-27 20-23 18 Tumbling Windows of 4 Seconds 20222123 26 3233 21 26 35
32-35 24-27 20-23 18 Tumbling Windows of 4 Seconds 20222123 26 3233 21 26 35
32-35 24-27 18 Tumbling Windows of 4 Seconds 26 3233 26 35
32-35 24-27 18 Tumbling Windows of 4 Seconds 26 3233 35
32-35 24-27 18 Tumbling Windows of 4 Seconds 26 323335
The Flink Engine 19
From Program to Execution case	class	Path	(from:	Long,	to:	Long) val	tc	=	edges.iterate(10)	{	paths:	DataSet[Path]	=>	val	next	=	paths	.join(edges)	.where("to")	.equalTo("from")	{	(path,	edge)	=>	Path(path.from,	edge.to)	}	.union(paths)	.distinct()	next	} Cost-based optimizer Type extraction stack Task scheduling Recovery metadata Pre-flight (Client) Master Workers DataSource orders.tbl Filter Map DataSource lineitem.tbl Join Hybrid Hash buildHT probe hash-part [0] hash-part [0] GroupRed sort forward Program Dataflow
 Graph Memory manager Out-of-core algorithms Batch & Streaming State & Checkpoints deploy
 operators track
 intermediate
 results
Flink Applications 21 Streaming topologies Heavy Batch jobs Machine Learning at scale Graph processing at scale
E.g.: Non-Native Iterations 22 Step Step Step Step Step Client for	(int	i	=	0;	i	<	maxIterations;	i++)	{ //	Execute	MapReduce	job }
Iterative Processing in Flink ▪ Built-in iterations and delta iterations ▪ Executes machine learning and graph algorithms efficiently 23
E.g.: Non-Native Streaming 24 discretize stream Job Job Job Job while	(true)	{	//	get	next	few	records	//	issue	batch	job }
Pipelining 25 Basic building block to “keep data moving” • Low latency • Operators push data forward • Data shipping as buffers, not tuple- wise • Natural handling of
Flink Engine 1. Execute everything as streams
Flink Engine 1. Execute everything as streams 2. Iterative (cyclic) dataflows
Flink Engine 1. Execute everything as streams 2. Iterative (cyclic) dataflows 3. Mutable state in operators State	+ Computation
Flink Engine 1. Execute everything as streams 2. Iterative (cyclic) dataflows 3. Mutable state in operators 4. Operate on managed memory State	+ Computation
Flink Engine 1. Execute everything as streams 2. Iterative (cyclic) dataflows 3. Mutable state in operators 4. Operate on managed memory 5. Special code paths for batch State	+ Computation
Flink Engine 1. Execute everything as streams 2. Iterative (cyclic) dataflows 3. Mutable state in operators 4. Operate on managed memory 5. Special code paths for batch 6. HA mode – no single point of failure State	+ Computation
Flink Engine 1. Execute everything as streams 2. Iterative (cyclic) dataflows 3. Mutable state in operators 4. Operate on managed memory 5. Special code paths for batch 6. HA mode – no single point of failure 7. Checkpointing of operator state State	+ Computation
Flink Eco System Gelly Table ML SAMOA DataSet (Java/Scala/Python) DataStream HadoopM/R Local Cluster Yarn Dataflow Dataflow MRQL Table Cascading Streaming dataflow runtime Storm Zeppelin
Flink Eco System Gelly Table ML SAMOA DataSet (Java/Scala/Python) DataStream HadoopM/R Local Cluster Yarn Dataflow Dataflow MRQL Table Cascading Streaming dataflow runtime Storm Zeppelin HDFS HBase Kafka RabbitMQ Flume HCatalog JDBC
A Quick Look at the DataStream API 28
API Structure //	Create	Environment StreamExecutionEnvironment	env	=	StreamExecutionEnvironment.getExecutionEnvironment(); //	Add	Source DataStream<Type>	source	=	env.addSource(…); //	Perform	transformations DataStream<Type2>	trans	=	source.keyBy(“field”).map(…).timeWindow(...) //	Add	Sink trans.addSink(…); //	Execute! env.execute(); 29
Hourly Impressions //	read	from	Kafka	Tweet	Impressions	topic
 DataStream<Tweet>	tweets	=
	env.addSource(new	FlinkKafkaConsumer<>(...));
 //	count	total	number	of	tweets
 DataStream<Tweet>	summaryStream	=	tweets	.filter(tweet	->	tweet.tweetId	!=	null)
	.keyBy(tweet	->	tweet.tweetId)
	.window(TumblingTimeWindows.of(Time.hours(1)))
	.sum("impressions");
 
 //	output	to	Kafka summaryStream.addSink(	new	FlinkKafkaProducer<Tweet>(...)); 30 class	Tweet	{
	String	tweetId;
	String	userId;
	String	text;
	long	impressions;
 }
Up-to-date Daily Impressions //	read	from	Kafka	Tweet	Impressions	topic
 DataStream<Tweet>	tweets	=
	env.addSource(new	FlinkKafkaConsumer<>(...));
 //	count	total	number	of	tweets
 DataStream<Tweet>	summaryStream	=	tweets	.filter(tweet	->	tweet.tweetId	!=	null)
	.keyBy(tweet	->	tweet.tweetId)
	.window(SlidingTimeWindows.of(	Time.days(1),	Time.minutes(1)))
	.sum("impressions");
 
 //	output	to	database	or	Kafka summaryStream.addSink(	new	FlinkKafkaProducer<Tweet>(...)); 31 class	Tweet	{
	String	tweetId;
	String	userId;
	String	text;
	long	impressions;
 }
Hourly Impression Summary DataStream<Summary>	summaryStream	=	tweets
	.keyBy(tweet	->	tweet.tweetId)
	.window(TumblingTimeWindows.of(Time.hours(1)))
	.apply(new	WindowFunction<>()	{
	public	void	apply(String	tweetId,	TimeWindow	window,
	Iterable<Tweet>	impressions,
	Collector<Summary>	out)	{
	long	count	=	0;	Tweet	tweet	=	null;
	for	(Tweet	val	:	impressions)	{
	tweet	=	val;	count++;
	}
	//	output	summary
	out.collect(new	Summary(tweet,	count,
	window.getStart(),
	window.getEnd()));	}
 }); 32 class	Tweet	{
	String	tweetId;
	String	userId;
	String	text;
 } class	Summary	{
	Tweet	tweet;
	long	impressions;
	long	beginTime;
	long	endTime;
 }
Closing 33
Apache Flink ▪ A powerful framework with stream processor at its core ▪ Features • True Streaming with great Batch support • Easy to use APIs, library ecosystem • Fault-tolerant and Consistent • Low latency - High throughput • Growing community
I ♥ , do you? 35 ▪ More information on flink.apache.org ▪ Flink Training at data-artisans.com ▪ Subscribe to the mailing lists ▪ Follow @ApacheFlink ▪ Next: 1.0.0 release ▪ Soon: Stream SQL, Mesos, Dynamic scaling
Thank you for your attention! 36

Stream processing with Apache Flink - Maximilian Michels Data Artisans