Page1 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Building Data Pipelines for Solr with Apache NiFi Bryan Bende – Member of Technical Staff
Page2 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Outline • Introduction to Apache NiFi • Solr Indexing & Update Handlers • NiFi/Solr Integration • Use Cases
Page3 © Hortonworks Inc. 2011 – 2015. All Rights Reserved About Me • Member of Technical Staff at Hortonworks • Apache NiFi Committer & PMC Member since June 2015 • Solr/Lucene user for several years • Developed Solr integration for Apache NiFi 0.1.0 release • Twitter: @bbende / Blog: bryanbende.com
Page4 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Introduction Installing Solr and getting started - easy (extract, bin/solr start) Defining a schema and configuring Solr - easy Getting all of your incoming data into Solr - not as easy A lot of time spent… • Cleaning and parsing data • Writing custom code/scripts • Building approaches for monitoring and debugging • Deploying updates to code/scripts for small changes Need something to make this easier…
Page5 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Introduction to Apache NiFi
Page6 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Apache NiFi • Powerful and reliable system to process and distribute data • Directed graphs of data routing and transformation • Web-based User Interface for creating, monitoring, & controlling data flows • Highly configurable - modify data flow at runtime, dynamically prioritize data • Data Provenance tracks data through entire system • Easily extensible through development of custom components [1] https://nifi.apache.org/
Page7 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi - Terminology FlowFile • Unit of data moving through the system • Content + Attributes (key/value pairs) Processor • Performs the work, can access FlowFiles Connection • Links between processors • Queues that can be dynamically prioritized Process Group • Set of processors and their connections • Receive data via input ports, send data via output ports
Page8 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi - User Interface • Drag and drop processors to build a flow • Start, stop, and configure components in real time • View errors and corresponding error messages • View statistics and health of data flow • Create templates of common processor & connections
Page9 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi - Provenance • Tracks data at each point as it flows through the system • Records, indexes, and makes events available for display • Handles fan-in/fan-out, i.e. merging and splitting data • View attributes and content at given points in time
Page10 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi - Queue Prioritization • Configure a prioritizer per connection • Determine what is important for your data – time based, arrival order, importance of a data set • Funnel many connections down to a single connection to prioritize across data sets • Develop your own prioritizer if needed
Page11 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi - Extensibility Built from the ground up with extensions in mind Service-loader pattern for… • Processors • Controller Services • Reporting Tasks • Prioritizers Extensions packaged as NiFi Archives (NARs) • Deploy NiFi lib directory and restart • Provides ClassLoader isolation • Same model as standard components
Page12 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi - Architecture OS/Host JVM Flow Controller Web Server Processor 1 Extension N FlowFile Repository Content Repository Provenance Repository Local Storage OS/Host JVM Flow Controller Web Server Processor 1 Extension N FlowFile Repository Content Repository Provenance Repository Local Storage OS/Host JVM NiFi Cluster Manager – Request Replicator Web Server Master NiFi Cluster Manager (NCM) OS/Host JVM Flow Controller Web Server Processor 1 Extension N FlowFile Repository Content Repository Provenance Repository Local Storage Slaves NiFi Nodes
Page13 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Solr Indexing & Update Handlers
Page14 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Solr – Indexing Data Update Handlers • XML, JSON, CSV • https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers Clients • Java, PHP, Python, Ruby, Scala, Perl, and more • https://wiki.apache.org/solr/IntegratingSolr
Page15 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Solr Update Handlers - XML Adding documents <add> <doc> <field name=”foo”>bad</field> </doc> </add> Deleting documents <delete> <id>1234567</id> <query>foo:bar</query> </delete> Other Operations <commit waitSearcher="false"/> <commit waitSearcher="false" expungeDeletes="true"/> <optimize waitSearcher="false"/>
Page16 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Solr Update Handlers - JSON Solr-Style JSON… Add Documents [ { "id": "1”, "title": "Doc 1” }, { "id": "2”, "title": "Doc 2” } ] Commands { "add": { "doc": { "id": "1”, "title": { "boost": 2.3, "value": "Doc1” } } } }
Page17 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Solr Update Handlers - JSON Custom JSON • Transform custom JSON based on Solr schema • Define paths to split JSON into multiple Solr documents • Field mappings from JSON field name to Solr field name Produces two Solr documents: - John, Math, term1, 90 - John, Biology, term1, 86 split=/exams& f=name:/name& f=subject:/exams/subject& f=test:/exams/test& f=marks:/exams/marks { "name": "John", "exams": [ { "subject": "Math", "test" : "term1", "marks" : 90}, { "subject": "Biology", "test" : "term1", "marks" : 86} ] }
Page18 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Solr Update Handlers - CSV /update with Content-Type:application/csv Important parameters: • separator • trim • header • fieldnames • skip • rowid
Page19 © Hortonworks Inc. 2011 – 2015. All Rights Reserved SolrJ Client SolrDocument Update SolrInputDocument doc = new SolrInputDocument(); doc.addField("first", "bob"); doc.addField("last", "smith"); solrClient.add(doc); ContentStream Update ContentStreamUpdateRequest request = new ContentStreamUpdateRequest( "/update/json/docs"); request.setParam("json.command", "false"); request.setParam("split", "/exams"); request.getParams().add("f", "name:/name"); request.getParams().add("f", "subject:/exams/subject"); request.getParams().add("f","test:/exams/test"); request.getParams().add("f","marks:/exams/marks"); request.addContentStream(new ContentStream...);
Page20 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi/Solr Integration
Page21 © Hortonworks Inc. 2011 – 2015. All Rights Reserved NiFi Solr Processors • Support Solr Cloud and stand-alone Solr instances • Leverage SolrJ – CloudSolrClient & HttpSolrClient • Extract new documents based on a date/time field – GetSolr • Stream FlowFile content to an update handler - PutSolrContentStream
Page22 © Hortonworks Inc. 2011 – 2015. All Rights Reserved PutSolrContentStream • Choose Solr Type - Cloud or Standard • Specify ZooKeeper hosts, or the Solr URL • Specify a collection if using Solr Cloud • Specify the Solr path for the ContentStream • Dynamic properties sent as key/value pairs on the request • Relationships for success, failure, and connection_failure
Page23 © Hortonworks Inc. 2011 – 2015. All Rights Reserved GetSolr • Solr Type, Solr Location, and Collection are the same as PutSolr • Specify a query to run on each execution of the processor • Specify a sort clause and a date field used to filter results • Schedule processor to run on a cron, or timer • Retrieves documents with ‘Date Field’ greater than time of last execution • Produces output in SolrJ XML
Page24 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases
Page25 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases – Index JSON 1. Pull in Tweets using Twitter API 2. Extract language and text into FlowFile attributes 3. Get non-empty English tweets ${twitter.text:isEmpty():not():and( ${twitter.lang:equals("en")})} 4. Merge together JSON documents based on quantity, or time 5. Use dynamic field mappings to select fields for indexing:
Page26 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases – Issue Commands 1. Generate a FlowFile on a cron, or timer, to initiate an action 2. Replace the contents of the FlowFile with a Solr command <delete> <query> timestamp:[* TO NOW-1HOUR] </query> </delete> 3. Send the command to the appropriate update handler
Page27 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases – Multiple Collections 1. Set a FlowFile attribute containing the name of a Solr collection 2. Use expression language when setting the Collection property on the Solr processor: ${solr.collection} Note: • If merging documents, merge per collection in this case • Current bug preventing this scenario from working: https://issues.apache.org/jira/browse/NIFI-959
Page28 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases – Log Aggregation 1. Listen for log events over UDP on a given port • Set ‘Flow File Per Datagram’ to true 2. Send JSON log events • Syslog UDP forwarding • Logback/log4j UDP appenders 3. Merge JSON events together based on size, or time 4. Stream JSON update to Solr http://bryanbende.com/development/2015/05/17/c ollecting-logs-with-apache-nifi/
Page29 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases – Index Avro 1. Receive an Avro datafile with binary encoding 2. Convert Avro to JSON using built in ConvertAvroToJSON processor 3. Stream JSON documents to Solr
Page30 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Cases – Index a Relational Database 1. GenerateFlowFile acts a timer to trigger ExecuteSQL (Future plans to not require in an incoming FlowFile to ExecuteSQL NIFI-932) 2. ExecuteSQL performs a SQL query and streams the results as an Avro datafile Use expression language to construct a dynamic date range: ${now():toNumber():minus(60000) :format(‘YYYY-MM-DD’} 3. Convert Avro to JSON using built in ConvertAvroToJSON processor 4. Stream JSON update to Solr
Page31 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Use Case – Extraction in a Cluster 1. Schedule GetSolr to run on Primary Node 2. Send results to a Remote Process Group pointing back to self 3. Data gets redistributed to “Solr XML Docs” Input Ports across cluster 4. Perform further processing on each node
Page32 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Future Work Unofficial ideas… PutSolrDocument • Parse FlowFile InputStream into one or more SolrDocuments • Allow developers to provide “FlowFile to SolrDocument” converter PutSolrAttributes • Create a SolrDocument from FlowFile attributes • Processor properties specify attributes to include/exclude Distribute & Execute Solr Commands • DistributeSolrCommand learns about Solr shards and produces commands per shard • ExecuteSolrCommand performs action based on the incoming command
Page33 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Summary Resources • Apache NiFi Mailing Lists – https://nifi.apache.org/mailing_lists.html • Apache NiFi Documentation – https://nifi.apache.org/docs.html • Getting started developing extensions – https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions – https://nifi.apache.org/developer-guide.html Contact Info: • Email: bbende@hortonworks.com • Twitter: @bbende
Page34 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Sources [1] https://nifi.apache.org/ [2] https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers [3] https://wiki.apache.org/solr/IntegratingSolr [4] http://lucidworks.com/blog/indexing-custom-json-data/
Page35 © Hortonworks Inc. 2011 – 2015. All Rights Reserved Thank you

Building Data Pipelines for Solr with Apache NiFi

  • 1.
    Page1 © HortonworksInc. 2011 – 2015. All Rights Reserved Building Data Pipelines for Solr with Apache NiFi Bryan Bende – Member of Technical Staff
  • 2.
    Page2 © HortonworksInc. 2011 – 2015. All Rights Reserved Outline • Introduction to Apache NiFi • Solr Indexing & Update Handlers • NiFi/Solr Integration • Use Cases
  • 3.
    Page3 © HortonworksInc. 2011 – 2015. All Rights Reserved About Me • Member of Technical Staff at Hortonworks • Apache NiFi Committer & PMC Member since June 2015 • Solr/Lucene user for several years • Developed Solr integration for Apache NiFi 0.1.0 release • Twitter: @bbende / Blog: bryanbende.com
  • 4.
    Page4 © HortonworksInc. 2011 – 2015. All Rights Reserved Introduction Installing Solr and getting started - easy (extract, bin/solr start) Defining a schema and configuring Solr - easy Getting all of your incoming data into Solr - not as easy A lot of time spent… • Cleaning and parsing data • Writing custom code/scripts • Building approaches for monitoring and debugging • Deploying updates to code/scripts for small changes Need something to make this easier…
  • 5.
    Page5 © HortonworksInc. 2011 – 2015. All Rights Reserved Introduction to Apache NiFi
  • 6.
    Page6 © HortonworksInc. 2011 – 2015. All Rights Reserved Apache NiFi • Powerful and reliable system to process and distribute data • Directed graphs of data routing and transformation • Web-based User Interface for creating, monitoring, & controlling data flows • Highly configurable - modify data flow at runtime, dynamically prioritize data • Data Provenance tracks data through entire system • Easily extensible through development of custom components [1] https://nifi.apache.org/
  • 7.
    Page7 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi - Terminology FlowFile • Unit of data moving through the system • Content + Attributes (key/value pairs) Processor • Performs the work, can access FlowFiles Connection • Links between processors • Queues that can be dynamically prioritized Process Group • Set of processors and their connections • Receive data via input ports, send data via output ports
  • 8.
    Page8 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi - User Interface • Drag and drop processors to build a flow • Start, stop, and configure components in real time • View errors and corresponding error messages • View statistics and health of data flow • Create templates of common processor & connections
  • 9.
    Page9 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi - Provenance • Tracks data at each point as it flows through the system • Records, indexes, and makes events available for display • Handles fan-in/fan-out, i.e. merging and splitting data • View attributes and content at given points in time
  • 10.
    Page10 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi - Queue Prioritization • Configure a prioritizer per connection • Determine what is important for your data – time based, arrival order, importance of a data set • Funnel many connections down to a single connection to prioritize across data sets • Develop your own prioritizer if needed
  • 11.
    Page11 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi - Extensibility Built from the ground up with extensions in mind Service-loader pattern for… • Processors • Controller Services • Reporting Tasks • Prioritizers Extensions packaged as NiFi Archives (NARs) • Deploy NiFi lib directory and restart • Provides ClassLoader isolation • Same model as standard components
  • 12.
    Page12 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi - Architecture OS/Host JVM Flow Controller Web Server Processor 1 Extension N FlowFile Repository Content Repository Provenance Repository Local Storage OS/Host JVM Flow Controller Web Server Processor 1 Extension N FlowFile Repository Content Repository Provenance Repository Local Storage OS/Host JVM NiFi Cluster Manager – Request Replicator Web Server Master NiFi Cluster Manager (NCM) OS/Host JVM Flow Controller Web Server Processor 1 Extension N FlowFile Repository Content Repository Provenance Repository Local Storage Slaves NiFi Nodes
  • 13.
    Page13 © HortonworksInc. 2011 – 2015. All Rights Reserved Solr Indexing & Update Handlers
  • 14.
    Page14 © HortonworksInc. 2011 – 2015. All Rights Reserved Solr – Indexing Data Update Handlers • XML, JSON, CSV • https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers Clients • Java, PHP, Python, Ruby, Scala, Perl, and more • https://wiki.apache.org/solr/IntegratingSolr
  • 15.
    Page15 © HortonworksInc. 2011 – 2015. All Rights Reserved Solr Update Handlers - XML Adding documents <add> <doc> <field name=”foo”>bad</field> </doc> </add> Deleting documents <delete> <id>1234567</id> <query>foo:bar</query> </delete> Other Operations <commit waitSearcher="false"/> <commit waitSearcher="false" expungeDeletes="true"/> <optimize waitSearcher="false"/>
  • 16.
    Page16 © HortonworksInc. 2011 – 2015. All Rights Reserved Solr Update Handlers - JSON Solr-Style JSON… Add Documents [ { "id": "1”, "title": "Doc 1” }, { "id": "2”, "title": "Doc 2” } ] Commands { "add": { "doc": { "id": "1”, "title": { "boost": 2.3, "value": "Doc1” } } } }
  • 17.
    Page17 © HortonworksInc. 2011 – 2015. All Rights Reserved Solr Update Handlers - JSON Custom JSON • Transform custom JSON based on Solr schema • Define paths to split JSON into multiple Solr documents • Field mappings from JSON field name to Solr field name Produces two Solr documents: - John, Math, term1, 90 - John, Biology, term1, 86 split=/exams& f=name:/name& f=subject:/exams/subject& f=test:/exams/test& f=marks:/exams/marks { "name": "John", "exams": [ { "subject": "Math", "test" : "term1", "marks" : 90}, { "subject": "Biology", "test" : "term1", "marks" : 86} ] }
  • 18.
    Page18 © HortonworksInc. 2011 – 2015. All Rights Reserved Solr Update Handlers - CSV /update with Content-Type:application/csv Important parameters: • separator • trim • header • fieldnames • skip • rowid
  • 19.
    Page19 © HortonworksInc. 2011 – 2015. All Rights Reserved SolrJ Client SolrDocument Update SolrInputDocument doc = new SolrInputDocument(); doc.addField("first", "bob"); doc.addField("last", "smith"); solrClient.add(doc); ContentStream Update ContentStreamUpdateRequest request = new ContentStreamUpdateRequest( "/update/json/docs"); request.setParam("json.command", "false"); request.setParam("split", "/exams"); request.getParams().add("f", "name:/name"); request.getParams().add("f", "subject:/exams/subject"); request.getParams().add("f","test:/exams/test"); request.getParams().add("f","marks:/exams/marks"); request.addContentStream(new ContentStream...);
  • 20.
    Page20 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi/Solr Integration
  • 21.
    Page21 © HortonworksInc. 2011 – 2015. All Rights Reserved NiFi Solr Processors • Support Solr Cloud and stand-alone Solr instances • Leverage SolrJ – CloudSolrClient & HttpSolrClient • Extract new documents based on a date/time field – GetSolr • Stream FlowFile content to an update handler - PutSolrContentStream
  • 22.
    Page22 © HortonworksInc. 2011 – 2015. All Rights Reserved PutSolrContentStream • Choose Solr Type - Cloud or Standard • Specify ZooKeeper hosts, or the Solr URL • Specify a collection if using Solr Cloud • Specify the Solr path for the ContentStream • Dynamic properties sent as key/value pairs on the request • Relationships for success, failure, and connection_failure
  • 23.
    Page23 © HortonworksInc. 2011 – 2015. All Rights Reserved GetSolr • Solr Type, Solr Location, and Collection are the same as PutSolr • Specify a query to run on each execution of the processor • Specify a sort clause and a date field used to filter results • Schedule processor to run on a cron, or timer • Retrieves documents with ‘Date Field’ greater than time of last execution • Produces output in SolrJ XML
  • 24.
    Page24 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases
  • 25.
    Page25 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases – Index JSON 1. Pull in Tweets using Twitter API 2. Extract language and text into FlowFile attributes 3. Get non-empty English tweets ${twitter.text:isEmpty():not():and( ${twitter.lang:equals("en")})} 4. Merge together JSON documents based on quantity, or time 5. Use dynamic field mappings to select fields for indexing:
  • 26.
    Page26 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases – Issue Commands 1. Generate a FlowFile on a cron, or timer, to initiate an action 2. Replace the contents of the FlowFile with a Solr command <delete> <query> timestamp:[* TO NOW-1HOUR] </query> </delete> 3. Send the command to the appropriate update handler
  • 27.
    Page27 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases – Multiple Collections 1. Set a FlowFile attribute containing the name of a Solr collection 2. Use expression language when setting the Collection property on the Solr processor: ${solr.collection} Note: • If merging documents, merge per collection in this case • Current bug preventing this scenario from working: https://issues.apache.org/jira/browse/NIFI-959
  • 28.
    Page28 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases – Log Aggregation 1. Listen for log events over UDP on a given port • Set ‘Flow File Per Datagram’ to true 2. Send JSON log events • Syslog UDP forwarding • Logback/log4j UDP appenders 3. Merge JSON events together based on size, or time 4. Stream JSON update to Solr http://bryanbende.com/development/2015/05/17/c ollecting-logs-with-apache-nifi/
  • 29.
    Page29 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases – Index Avro 1. Receive an Avro datafile with binary encoding 2. Convert Avro to JSON using built in ConvertAvroToJSON processor 3. Stream JSON documents to Solr
  • 30.
    Page30 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Cases – Index a Relational Database 1. GenerateFlowFile acts a timer to trigger ExecuteSQL (Future plans to not require in an incoming FlowFile to ExecuteSQL NIFI-932) 2. ExecuteSQL performs a SQL query and streams the results as an Avro datafile Use expression language to construct a dynamic date range: ${now():toNumber():minus(60000) :format(‘YYYY-MM-DD’} 3. Convert Avro to JSON using built in ConvertAvroToJSON processor 4. Stream JSON update to Solr
  • 31.
    Page31 © HortonworksInc. 2011 – 2015. All Rights Reserved Use Case – Extraction in a Cluster 1. Schedule GetSolr to run on Primary Node 2. Send results to a Remote Process Group pointing back to self 3. Data gets redistributed to “Solr XML Docs” Input Ports across cluster 4. Perform further processing on each node
  • 32.
    Page32 © HortonworksInc. 2011 – 2015. All Rights Reserved Future Work Unofficial ideas… PutSolrDocument • Parse FlowFile InputStream into one or more SolrDocuments • Allow developers to provide “FlowFile to SolrDocument” converter PutSolrAttributes • Create a SolrDocument from FlowFile attributes • Processor properties specify attributes to include/exclude Distribute & Execute Solr Commands • DistributeSolrCommand learns about Solr shards and produces commands per shard • ExecuteSolrCommand performs action based on the incoming command
  • 33.
    Page33 © HortonworksInc. 2011 – 2015. All Rights Reserved Summary Resources • Apache NiFi Mailing Lists – https://nifi.apache.org/mailing_lists.html • Apache NiFi Documentation – https://nifi.apache.org/docs.html • Getting started developing extensions – https://cwiki.apache.org/confluence/display/NIFI/Maven+Projects+for+Extensions – https://nifi.apache.org/developer-guide.html Contact Info: • Email: bbende@hortonworks.com • Twitter: @bbende
  • 34.
    Page34 © HortonworksInc. 2011 – 2015. All Rights Reserved Sources [1] https://nifi.apache.org/ [2] https://cwiki.apache.org/confluence/display/solr/Uploading+Data+with+Index+Handlers [3] https://wiki.apache.org/solr/IntegratingSolr [4] http://lucidworks.com/blog/indexing-custom-json-data/
  • 35.
    Page35 © HortonworksInc. 2011 – 2015. All Rights Reserved Thank you