JRuby with Java Code in Data Processing World JRubyConf.EU at 31 Jul 2015 Satoshi Tagomori (@tagomoris)
Satoshi "Moris" Tagomori (@tagomoris) Fluentd, Norikra, MessagePack-Ruby,... Docker logging driver for Fluentd (docker v1.8) Treasure Data, Inc.
https://jobs.lever.co/treasure-data We're hiring! OSS team (developer / community manager) Distributed system engineer (Hadoop, queue/workers) Front-end engineer (RoR)
Data Processing World
Data Processing World
Java Data Processing World
Data Processing World Hadoop, Spark, Tez, Flink, Storm, Kafka, ... Hive, Pig, Drill, Impala, Presto, ....
Java + Scala, Clojure + C++, .... Data Processing World on JVM
Data Processing World Many CPU cores, Large memory, High rate Disk I/O, ... High throughput data processing Hadoop YARN/MapReduce/HDFS API compatibility
Two OSS using Java&JRuby
Norikra: Stream Processing with SQL for everybody Server software, written in JRuby, runs on JVM Open source software (GPLv2) http://norikra.github.io/ https://github.com/norikra/norikra Distributed on rubygems.org "gem i norikra"
What Norikra does: SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":300, "duration":0.03, "referer":"...", "user-agent":"...." path:"/", s:301 1
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/a", "status":200, "bytes":10240, "duration":0.53, "referer":"...", "user-agent":"...." path:"/", s:301 path:"/download/a", s:10240 2
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":404, "bytes":0, "duration":0.08, "referer":"...", "user-agent":"...." path:"/", s:301 path:"/download/a", s:10240 3
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.01, "referer":"...", "user-agent":"...." path:"/", s:602 path:"/download/a", s:10240 4
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/b", "status":200, "bytes":678, "duration":0.11, "referer":"...", "user-agent":"...." path:"/", s:602 path:"/download/a", s:10240 path:"/download/b", s:678 5
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/b", "status":200, "bytes":678, "duration":0.13, "referer":"...", "user-agent":"...." path:"/", s:602 path:"/download/a", s:10240 path:"/download/b", s:1356 6
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.02, "referer":"...", "user-agent":"...." path:"/", s:903 path:"/download/a", s:10240 path:"/download/b", s:1356 7
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.09, "referer":"...", "user-agent":"...." path:"/", s:1204 path:"/download/a", s:10240 path:"/download/b", s:1356 8
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/a", "status":200, "bytes":10240, "duration":1.1, "referer":"...", "user-agent":"...." path:"/", s:1204 path:"/download/a", s:20480 path:"/download/b", s:1356 9
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.05, "referer":"...", "user-agent":"...." path:"/", s:1505 path:"/download/a", s:20480 path:"/download/b", s:1356 10
SELECT path, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC 10 {"path":"/download/a", "s":20480} {"path":"/", "s":1505} {"path":"/download/b", "s":1356}
Norikra and Java Norikra is written in JRuby, and using Esper Key factor: productivity (33days until first release) Esper:Java library, provides Complex Event Processing SQL parser, executor Many features and good performance Licensed under GPLv2
Plugins as rubygems Norikra Server (on JVM) Esper (Query Engine) Type Definition Manager Output Event Pool Norikra Engine RPC Server mizuno (Jetty + Rack) Rack RPC Handler Listener UDF UDF User-Defined Functions "gem i norikra-udf-xxx" written in Java, or JRuby (compiled to Java) works in Esper instance: must be a Java class Listener handler for output data of queries, written in JRuby "gem i norikra-listener-xxx"
Embulk "Embulk is a open-source bulk data loader that helps data transfer between various databases, storages, file formats, and cloud services." http://www.embulk.org/docs/
Embulk: makes painful data integration work relaxed Plugin-based parallel bulk data loader Open source software (Apache License v2.0) http://www.embulk.org/ https://github.com/embulk/embulk Distributed as .jar or on rubygems.org Plugins are on rubygems.org http://www.slideshare.net/frsyuki/embuk-making-data-integration-works-relaxed http://www.slideshare.net/HiroshiNakamura/embulk-20150411
HDFS MySQL Amazon S3 Embulk CSV Files SequenceFile Salesforce.com Elasticsearch Cassandra Hive Redis ✓ Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying Plugins Plugins bulk load
#ccc_cd4 / #embulk InputPlugin OutputPlugin Executor plugin Filter plugin Filter plugin Filter plugins records Threads, MapReduce records convert, … input, … output. 29 records config
#ccc_cd4 / #embulk InputPlugin FileInput plugin OutputPlugin FileOutput plugin Encoder plugin Formatter plugin Decoder plugin Parser plugin HDFS, S3,
 Riak CS, … gzip, bzip2,
 aes, … CSV, JSON,
 pcap, … buffer buffer buffer buffer Filter plugin Filter plugin Filter plugins recordsrecords Executor plugin 30 records config
Embulk and Java Embulk core is written in Java mainly for performance Embulk plugins: are loaded over API based on JRuby are written in JRuby or Java JRuby for early release Java for performance
InputPlugin module Embulk class InputExample < InputPlugin Plugin.register_input('example', self) def self.transaction(config, &control) # read config task = { 'message' => config.param('message', :string, default: nil) } threads = config.param('threads', :int, default: 2) columns = [ Column.new(0, 'col0', :long), Column.new(1, 'col1', :double), Column.new(2, 'col2', :string), ] # BEGIN here commit_reports = yield(task, columns, threads) # COMMIT here puts "Example input finished" return {} end def run(task, schema, index, page_builder) puts "Example input thread #{@index}…" 10.times do |i| @page_builder.add([i, 10.0, "example"]) end @page_builder.finish commit_report = { } return commit_report end end end
OutputPlugin module Embulk class OutputExample < OutputPlugin Plugin.register_output('example', self) def self.transaction( config, schema, processor_count, &control) # read config task = { 'message' => config.param('message', :string, default: "record") } puts "Example output started." commit_reports = yield(task) puts "Example output finished. Commit reports = #{commit_reports.to_json}" return {} end def initialize(task, schema, index) puts "Example output thread #{index}..." super @message = task.prop('message', :string) @records = 0 end def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] puts "#{@message}: #{hash.to_json}" @records += 1 end end def finish end def abort end def commit commit_report = { "records" => @records } return commit_report end end end
Plugin management: Norikra Esper instance Engine Plugin management UDF Listener plugins as gems plugin loader written in JRuby Java JRuby
Plugin management: Embulk Embulk core Plugin management input/output/filter parser/formatter Java JRuby decoder/encoder file-input/output executor plugins as gems plugin loader written in JRuby
Pluggable software on JVM & Java API Java? Scala? Clojure? JRuby?: JRuby Plugin packaging: jar? gem?: gem rubygem.org >>> maven central (or others) especially for plugin authors Plugin loader: Class Loader? "require"?: require
JRuby in Japan Not so many users :( CRuby is super major software in Japan Java -> Ruby -> Scala? Golang?
Make your software pluggable. Make eco-system&community. with JRuby! Thanks!

JRuby with Java Code in Data Processing World

  • 1.
    JRuby with JavaCode in Data Processing World JRubyConf.EU at 31 Jul 2015 Satoshi Tagomori (@tagomoris)
  • 2.
    Satoshi "Moris" Tagomori (@tagomoris) Fluentd,Norikra, MessagePack-Ruby,... Docker logging driver for Fluentd (docker v1.8) Treasure Data, Inc.
  • 3.
    https://jobs.lever.co/treasure-data We're hiring! OSS team(developer / community manager) Distributed system engineer (Hadoop, queue/workers) Front-end engineer (RoR)
  • 4.
  • 5.
  • 6.
  • 7.
    Data Processing World Hadoop,Spark, Tez, Flink, Storm, Kafka, ... Hive, Pig, Drill, Impala, Presto, ....
  • 8.
    Java + Scala,Clojure + C++, .... Data Processing World on JVM
  • 9.
    Data Processing World ManyCPU cores, Large memory, High rate Disk I/O, ... High throughput data processing Hadoop YARN/MapReduce/HDFS API compatibility
  • 10.
    Two OSS usingJava&JRuby
  • 11.
    Norikra: Stream Processing withSQL for everybody Server software, written in JRuby, runs on JVM Open source software (GPLv2) http://norikra.github.io/ https://github.com/norikra/norikra Distributed on rubygems.org "gem i norikra"
  • 12.
    What Norikra does: SELECTpath, SUM(bytes) AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC
  • 13.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":300, "duration":0.03, "referer":"...", "user-agent":"...." path:"/", s:301 1
  • 14.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/a", "status":200, "bytes":10240, "duration":0.53, "referer":"...", "user-agent":"...." path:"/", s:301 path:"/download/a", s:10240 2
  • 15.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":404, "bytes":0, "duration":0.08, "referer":"...", "user-agent":"...." path:"/", s:301 path:"/download/a", s:10240 3
  • 16.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.01, "referer":"...", "user-agent":"...." path:"/", s:602 path:"/download/a", s:10240 4
  • 17.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/b", "status":200, "bytes":678, "duration":0.11, "referer":"...", "user-agent":"...." path:"/", s:602 path:"/download/a", s:10240 path:"/download/b", s:678 5
  • 18.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/b", "status":200, "bytes":678, "duration":0.13, "referer":"...", "user-agent":"...." path:"/", s:602 path:"/download/a", s:10240 path:"/download/b", s:1356 6
  • 19.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.02, "referer":"...", "user-agent":"...." path:"/", s:903 path:"/download/a", s:10240 path:"/download/b", s:1356 7
  • 20.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.09, "referer":"...", "user-agent":"...." path:"/", s:1204 path:"/download/a", s:10240 path:"/download/b", s:1356 8
  • 21.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/download/a", "status":200, "bytes":10240, "duration":1.1, "referer":"...", "user-agent":"...." path:"/", s:1204 path:"/download/a", s:20480 path:"/download/b", s:1356 9
  • 22.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC {"path":"/", "status":200, "bytes":301, "duration":0.05, "referer":"...", "user-agent":"...." path:"/", s:1505 path:"/download/a", s:20480 path:"/download/b", s:1356 10
  • 23.
    SELECT path, SUM(bytes)AS s FROM www_access_logs.win:length_batch(10) WHERE status=200 GROUP BY path ORDER BY s DESC 10 {"path":"/download/a", "s":20480} {"path":"/", "s":1505} {"path":"/download/b", "s":1356}
  • 24.
    Norikra and Java Norikrais written in JRuby, and using Esper Key factor: productivity (33days until first release) Esper:Java library, provides Complex Event Processing SQL parser, executor Many features and good performance Licensed under GPLv2
  • 25.
    Plugins as rubygems Norikra Server(on JVM) Esper (Query Engine) Type Definition Manager Output Event Pool Norikra Engine RPC Server mizuno (Jetty + Rack) Rack RPC Handler Listener UDF UDF User-Defined Functions "gem i norikra-udf-xxx" written in Java, or JRuby (compiled to Java) works in Esper instance: must be a Java class Listener handler for output data of queries, written in JRuby "gem i norikra-listener-xxx"
  • 26.
    Embulk "Embulk is aopen-source bulk data loader that helps data transfer between various databases, storages, file formats, and cloud services." http://www.embulk.org/docs/
  • 27.
    Embulk: makes painful dataintegration work relaxed Plugin-based parallel bulk data loader Open source software (Apache License v2.0) http://www.embulk.org/ https://github.com/embulk/embulk Distributed as .jar or on rubygems.org Plugins are on rubygems.org http://www.slideshare.net/frsyuki/embuk-making-data-integration-works-relaxed http://www.slideshare.net/HiroshiNakamura/embulk-20150411
  • 28.
    HDFS MySQL Amazon S3 Embulk CSV Files SequenceFile Salesforce.com Elasticsearch Cassandra Hive Redis ✓Parallel execution ✓ Data validation ✓ Error recovery ✓ Deterministic behavior ✓ Idempotet retrying Plugins Plugins bulk load
  • 29.
    #ccc_cd4 / #embulk InputPluginOutputPlugin Executor plugin Filter plugin Filter plugin Filter plugins records Threads, MapReduce records convert, … input, … output. 29 records config
  • 30.
    #ccc_cd4 / #embulk InputPlugin FileInputplugin OutputPlugin FileOutput plugin Encoder plugin Formatter plugin Decoder plugin Parser plugin HDFS, S3,
 Riak CS, … gzip, bzip2,
 aes, … CSV, JSON,
 pcap, … buffer buffer buffer buffer Filter plugin Filter plugin Filter plugins recordsrecords Executor plugin 30 records config
  • 31.
    Embulk and Java Embulkcore is written in Java mainly for performance Embulk plugins: are loaded over API based on JRuby are written in JRuby or Java JRuby for early release Java for performance
  • 32.
    InputPlugin module Embulk class InputExample< InputPlugin Plugin.register_input('example', self) def self.transaction(config, &control) # read config task = { 'message' => config.param('message', :string, default: nil) } threads = config.param('threads', :int, default: 2) columns = [ Column.new(0, 'col0', :long), Column.new(1, 'col1', :double), Column.new(2, 'col2', :string), ] # BEGIN here commit_reports = yield(task, columns, threads) # COMMIT here puts "Example input finished" return {} end def run(task, schema, index, page_builder) puts "Example input thread #{@index}…" 10.times do |i| @page_builder.add([i, 10.0, "example"]) end @page_builder.finish commit_report = { } return commit_report end end end
  • 33.
    OutputPlugin module Embulk class OutputExample< OutputPlugin Plugin.register_output('example', self) def self.transaction( config, schema, processor_count, &control) # read config task = { 'message' => config.param('message', :string, default: "record") } puts "Example output started." commit_reports = yield(task) puts "Example output finished. Commit reports = #{commit_reports.to_json}" return {} end def initialize(task, schema, index) puts "Example output thread #{index}..." super @message = task.prop('message', :string) @records = 0 end def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] puts "#{@message}: #{hash.to_json}" @records += 1 end end def finish end def abort end def commit commit_report = { "records" => @records } return commit_report end end end
  • 34.
    Plugin management: Norikra Esperinstance Engine Plugin management UDF Listener plugins as gems plugin loader written in JRuby Java JRuby
  • 35.
    Plugin management: Embulk Embulkcore Plugin management input/output/filter parser/formatter Java JRuby decoder/encoder file-input/output executor plugins as gems plugin loader written in JRuby
  • 36.
    Pluggable software on JVM& Java API Java? Scala? Clojure? JRuby?: JRuby Plugin packaging: jar? gem?: gem rubygem.org >>> maven central (or others) especially for plugin authors Plugin loader: Class Loader? "require"?: require
  • 37.
    JRuby in Japan Notso many users :( CRuby is super major software in Japan Java -> Ruby -> Scala? Golang?
  • 38.
    Make your softwarepluggable. Make eco-system&community. with JRuby! Thanks!