Real-time Insights powered by Reactive Programming Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps Let's talk why
Jay Phelps | @_jayphelps Time is money
Jay Phelps | @_jayphelps 127+ million person-hours lost per year $500k — $1+ million per hour average hourly cost of critical failure https://devops.com/real-cost-downtime/ Fortune 500
Jay Phelps | @_jayphelps Every second counts
Jay Phelps | @_jayphelps Not just outages...
Jay Phelps | @_jayphelps debugging, testing, and even Information Security too
Jay Phelps | @_jayphelps improving Developer Experience
Senior Software Engineer | @_jayphelps Jay Phelps
Jay Phelps | @_jayphelps debugging, testing, and InfoSec
Jay Phelps | @_jayphelps InfoSec “preventing unauthorized access, use, or disruption of information”
Jay Phelps | @_jayphelps stopping hackers
Jay Phelps | @_jayphelps InfoSec
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps I'm on it.
Jay Phelps | @_jayphelps We can block exploits using our gateway proxy
Jay Phelps | @_jayphelps ...we need to know it's working...
Jay Phelps | @_jayphelps ...we want to watch attackers try...
Jay Phelps | @_jayphelps ...in real-time
Jay Phelps | @_jayphelps We need real-time insights For debugging, testing, and InfoSec
Jay Phelps | @_jayphelps Logging is the answer
Jay Phelps | @_jayphelps LOG ALL THE THINGSLOG ALL THE THINGSLOG ALL THE THINGSLOG ALL THE THINGSLOG ALL THE THINGS
Jay Phelps | @_jayphelps One little problem...
Jay Phelps | @_jayphelps We have millions of devices...
Jay Phelps | @_jayphelps We have thousands of servers...
Jay Phelps | @_jayphelps We need to process them in real-time
Jay Phelps | @_jayphelps “Netflix is a log generating company that happens to stream movies” - Adrian Cockroft
Jay Phelps | @_jayphelps Massiveamount of streaming logs
Jay Phelps | @_jayphelps Reactive Extensions
Jay Phelps | @_jayphelps Rx
Jay Phelps | @_jayphelps The best ideas from the Observer pattern, the Iterator pattern, and functional programming
Jay Phelps | @_jayphelps “lodash for events”
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps High-level intro
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3]
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3]
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3]
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3]
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3]
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3] //	10
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3] //	10 //	20
Jay Phelps | @_jayphelps	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	}); [1,	2,	3] //	10 //	20 //	30
Jay Phelps | @_jayphelps [1,	2,	3] //	10 //	20 //	30	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps [1,	2,	3] //	10 //	20 //	30	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3)	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3)	.map(value	=>	value	*	10)	.forEach(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3)	.map(value	=>	value	*	10)	.subscribe(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3)	.map(value	=>	value	*	10)	.subscribe(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3) //	10	.map(value	=>	value	*	10)	.subscribe(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3) //	10 //	20	.map(value	=>	value	*	10)	.subscribe(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Observable.of(1,	2,	3) //	10 //	20 //	30	.map(value	=>	value	*	10)	.subscribe(value	=>	{	console.log(value);	});
Jay Phelps | @_jayphelps Array is a collection
Jay Phelps | @_jayphelps Observable is a collection that arrives over time
Jay Phelps | @_jayphelps Represents a stream
Jay Phelps | @_jayphelps button.addEventListener('click',	event	=>	{	console.log('you	clicked!'); });
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click')	.subscribe(event	=>	{	console.log('you	clicked!');	});
.subscribe(event	=>	{	console.log('you	clicked!');	}); Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
.subscribe(event	=>	{	console.log('you	clicked!');	}); Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms500 ms
Observable.fromEvent(button,	'click') .debounceTime(500) Jay Phelps | @_jayphelps 500 ms 500 ms500 ms
Jay Phelps | @_jayphelps Observable.fromEvent(button,	'click') .debounceTime(500)
Jay Phelps | @_jayphelps Stream of logs
Jay Phelps | @_jayphelps {	"path":	"/something",	"status":	500 } Log message in JSON
Jay Phelps | @_jayphelps import	{	webSocket	}	from	"rxjs/observable/webSocket"; let	streamOfLogs	=	webSocket("ws://logs.netflix.com")
.subscribe(msg	=>	{	console.log(msg);	}); Jay Phelps | @_jayphelps streamOfLogs .filter(msg	=>	msg.status	!==	200)
.subscribe(msg	=>	{	console.log(msg);	}); Jay Phelps | @_jayphelps streamOfLogs .filter(msg	=>	msg.status	!==	200)
500 200 404 301 Jay Phelps | @_jayphelps 500 streamOfLogs .filter(msg	=>	msg.status	!==	200) 404
Jay Phelps | @_jayphelps Learning Rx isn't easy
Jay Phelps | @_jayphelps It's worth it!
Jay Phelps | @_jayphelps It's worth it! Jay!!
Jay Phelps | @_jayphelps Case Study Scaling high volume logs in real-time
Jay Phelps | @_jayphelps We're going to log JSON
Jay Phelps | @_jayphelps {	"path":	"/something",	"status":	200 } Device {	"event":	"StartPlay",	"device":	"XBox	360" } Server
Jay Phelps | @_jayphelps Stream them all through a single pipeline
Jay Phelps | @_jayphelps How do you find and process the logs you want
Jay Phelps | @_jayphelps Write a custom job in Java?
Jay Phelps | @_jayphelps Job: a unit of work
Jay Phelps | @_jayphelps {	"path":	"/something",	"status":	500,	"duration":	135,	//	etc	... } Server log message
Jay Phelps | @_jayphelps streamOfLogs	.filter(event	->	event.get("status")	!=	200)	.map(event	->	new	JSONObject()	.put("path",	event.get("path"))	.put("status",	event.get("status"))	);
Jay Phelps | @_jayphelps Powerful, but... not ideal
Jay Phelps | @_jayphelps Query Language
Jay Phelps | @_jayphelps SELECT	path,status	WHERE	status	!=	200
Jay Phelps | @_jayphelps Just-in-time (JIT) Compilation
Jay Phelps | @_jayphelps SELECT	path,status,duration	WHERE	status	!=	200
Jay Phelps | @_jayphelps SELECT	path,status,duration	WHERE	status	!=	200
Jay Phelps | @_jayphelps streamOfLogs	.filter(event	->	event.get("status")	!=	200)	.map(event	->	new	JSONObject()	.put("path",	event.get("path"))	.put("status",	event.get("status"))	); SELECT	path,status,duration	WHERE	status	!=	200
Jay Phelps | @_jayphelps What's next?
Jay Phelps | @_jayphelps 8+ million messages per second, peakmessagespersecond Time
Jay Phelps | @_jayphelps How do we scale this?
Jay Phelps | @_jayphelps Distribute work via autoscaling
Jay Phelps | @_jayphelps A Source B Server B Server 50% 50% Load balancing a job
Jay Phelps | @_jayphelps Chain jobs together
Jay Phelps | @_jayphelps SELECT	path,status	WHERE	source	==	"API" Chain jobs together
Jay Phelps | @_jayphelps A Job B C Job Job Chain jobs together
Jay Phelps | @_jayphelps High-volume, distributed systems have a problem...
Jay Phelps | @_jayphelps Backpressure
Jay Phelps | @_jayphelps “pressure opposed to the desired flow of gases in confined places such as a pipe” - wikipedia.org
Jay Phelps | @_jayphelps A B C Server Server Server 100 rps 75 rps
Jay Phelps | @_jayphelps A B C Server Server Server 60 sec * 25 rps = 1,500 rpm! 100 rps 75 rps
Jay Phelps | @_jayphelps A B C Server Server Server 100 rps 75 rps 60 sec * 60 min * 25 rps = 90,000 rph!
Jay Phelps | @_jayphelps Buffer or Drop
Jay Phelps | @_jayphelps onBackpressureBuffer
.onBackpressureBuffer() Jay Phelps | @_jayphelps streamOfLogs	.subscribe(value	->	{	//	something	expensive	Thread.sleep(100);	});
.onBackpressureBuffer() Jay Phelps | @_jayphelps streamOfLogs	.subscribe(value	->	{	//	something	expensive	Thread.sleep(100);	});
.onBackpressureBuffer() Jay Phelps | @_jayphelps streamOfLogs	.subscribe(value	->	{	//	something	expensive	Thread.sleep(100);	});
Jay Phelps | @_jayphelps 1 2 3 4 5 .onBackpressureBuffer() [3,	4,	5] 1 2
Jay Phelps | @_jayphelps onBackpressureDrop
.onBackpressureDrop() streamOfLogs	.subscribe(value	->	{	//	something	expensive	Thread.sleep(100);	}); Jay Phelps | @_jayphelps
.onBackpressureDrop() streamOfLogs	.subscribe(value	->	{	//	something	expensive	Thread.sleep(100);	}); Jay Phelps | @_jayphelps
.onBackpressureDrop() streamOfLogs	.subscribe(value	->	{	//	something	expensive	Thread.sleep(100);	}); Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps 1 2 3 4 5 .onBackpressureDrop() 1 4
Jay Phelps | @_jayphelps Job authors choose which
Jay Phelps | @_jayphelps 700+ jobs running 24/7 8+ million events/second Autoscaling Mantis Low-latency, high throughput stream-processing job platform http://techblog.netflix.com
Jay Phelps | @_jayphelps We need somewhere to submit and view query results
Jay Phelps | @_jayphelps Query Builder UI
Jay Phelps | @_jayphelps SELECT	path,status	WHERE	status	!=	200
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps
Jay Phelps | @_jayphelps Can be extremely high-volume 100k+ rps
Jay Phelps | @_jayphelps We simply can’t render that fast!
Jay Phelps | @_jayphelps Even if we could, it would be bad UX
Jay Phelps | @_jayphelps Performance solutions are often driven by UX
Jay Phelps | @_jayphelps So what do we do?
Jay Phelps | @_jayphelps UI virtualization aka Virtual Table?
VirtualizedNOT Virtualized vs.
Jay Phelps | @_jayphelps We still can’t update that virtual table 100k per second
Jay Phelps | @_jayphelps This is also backpressure
Jay Phelps | @_jayphelps Buffer or Drop
Jay Phelps | @_jayphelps Buffer for 1 second
Jay Phelps | @_jayphelps getWebSocket()	.bufferTime(1000)
Jay Phelps | @_jayphelps Buffer size is unbounded
Jay Phelps | @_jayphelps See what your users actually do
Jay Phelps | @_jayphelps Users want a sample
Jay Phelps | @_jayphelps Batch sampling
Jay Phelps | @_jayphelps Drop after reaching a certain threshold
Jay Phelps | @_jayphelps Batch sampling
Jay Phelps | @_jayphelps Batch sampling
Jay Phelps | @_jayphelps Batch sampling
Jay Phelps | @_jayphelps Batch sampling
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps let	buffer	=	getWebSocket()	.bufferTime(1000); let	gate	=	new	BehaviorSubject(true); let	batchSize	=	50; let	batchSizeCounter	=	0; let	results	=	gate$	.switchMap(enabled	=>	enabled	?	buffer	:	Observable.never())	.do(buffer	=>	{	batchSizeCounter	+=	buffer.length;	if	(batchSizeCounter	>=	batchSize)	{	//	truncates	the	array,	if	it's	over	batchSize	buffer.length	=	batchSize;	//	turns	on	the	gate,	pausing	the	stream	gate.next(false);	batchSizeCounter	=	0;	}	}); https://goo.gl/DMOqBA
Jay Phelps | @_jayphelps Works for low-volume queries too
Jay Phelps | @_jayphelps Raven
Jay Phelps | @_jayphelps RavenSweet Christmas
Jay Phelps | @_jayphelps improved debugging, testing, and InfoSec
Jay Phelps | @_jayphelps Netflix loves Rx
Jay Phelps | @_jayphelps Rx is powerful, cross-platform
Thanks! @_jayphelps

Real-time Insights, powered by Reactive Programming