Share this
Apache Beam pipelines with Scala: part 3 - dynamic processing
by Pythian Marketing on Dec 12, 2017 12:00:00 AM
INPUT => OUTPUT. For our needs we may want to minimize code in control messages so we convert input data into JsonObject before pass it to function. We would get TableRow as result. Thus V is JsonObject => TableRow. We expect that it would be fine to use for "control" message: [code language="scala"] (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", json.get("text").getAsString) }[/code] Now it's time to define an object that provides us with the ability to do something like: [code language="scala"]val tableRow = code.evalFor(json)[/code] This is the such object: [code language="scala"]object Dynamic { private val toolbox = currentMirror.mkToolBox() private val dynamic = new Memoize(10)((code: String) => { val tree = toolbox.parse(code) toolbox.eval(tree).asInstanceOf[JsonObject => TableRow] }) def apply(code: String) = dynamic(code) }[/code] We keep single per JMV instances of toolbox and dynamic memoized compiler. So now we can simply use: [code language="scala"]val tableRow = Dynamic(code)(json)[/code] To make it works as String method above we should use implicit value class [code language="scala"] implicit class RichString(val code: String) extends AnyVal { def evalFor(arg: JsonObject): TableRow = Dynamic(code)(arg) }[/code] Now the only thing we change from our previous side input example is slightly altered MyDoFn: [code language="scala"] class MyDoFn(sideView: PCollectionView[java.util.List[String]]) extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val t0 = System.currentTimeMillis() val sideInput = c.sideInput(sideView).get(0) val inputString = c.element() logger.info(s"Getting new data=$inputString") Try { val json = new JsonParser().parse(inputString).getAsJsonObject sideInput.evalFor(json) } match { case Success(row) => logger.info(s"Inserting to BiqQuery: $row") c.output(row) case Failure(ex) => logger.info(s"Unable to parse message: $inputString", ex) } val t1 = System.currentTimeMillis() logger.info(s"Processed data in ${t1 - t0} ms") } }[/code] I added logger for time processing to show how much time it saves using cached functions. We are ready to test it now. Start it locally with sbt: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-dynamic\project [info] Loading settings from build.sbt ... [info] Set current project to beam-dynamic (in build file:/C:/Users/Valentin/workspace/beam-dynamic/) [info] Running com.pythian.Beam[/code] Now publish our first attempt code into "control" topic and then publish data to "data" topic [code language="scala"]//for the "control topic" (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", "placeholder") } // for the "data" topic {"id":1,"text":"row1"} {"id":2,"text":"row2"} {"id":3,"text":"row3"}[/code] We can see how data is processed in the logs: [code language="scala"]2017/12/11 17:47:24.049 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":1,"text":"row1"} 2017/12/11 17:47:24.544 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":2,"text":"row2"} 2017/12/11 17:47:25.816 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=1, data=placeholder} 2017/12/11 17:47:25.816 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=2, data=placeholder} 2017/12/11 17:47:25.880 INFO com.pythian.Beam$MyDoFn - Processed data in 923 ms 2017/12/11 17:47:25.880 INFO com.pythian.Beam$MyDoFn - Processed data in 336 ms 2017/12/11 17:47:30.076 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":3,"text":"row3"} 2017/12/11 17:47:30.076 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=3, data=placeholder} 2017/12/11 17:47:30.077 INFO com.pythian.Beam$MyDoFn - Processed data in 1 ms[/code] It took about 1 sec to compile the code for id=1, then for id=2 it was waiting for some time on synchronized while "control" function has been compiling and for id=3 it took just a 1 ms. Now lets change our parsing code and publish yet another 3 data rows: [code language="scala"]//for the "control" topic (json: com.google.gson.JsonObject) => { new com.google.api.services.bigquery.model.TableRow() .set("id", json.get("id").getAsLong) .set("data", json.get("text").getAsString) } //for the "data" topic {"id":4,"text":"row4"} {"id":5,"text":"row5"} {"id":6,"text":"row6"}[/code] And here is a log: [code language="scala"]2017/12/11 17:54:09.859 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":4,"text":"row4"} 2017/12/11 17:54:10.237 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=4, data=row4} 2017/12/11 17:54:10.238 INFO com.pythian.Beam$MyDoFn - Processed data in 379 ms 2017/12/11 17:54:26.885 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":5,"text":"row5"} 2017/12/11 17:54:26.885 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=5, data=row5} 2017/12/11 17:54:26.886 INFO com.pythian.Beam$MyDoFn - Processed data in 0 ms 2017/12/11 17:54:42.868 INFO com.pythian.Beam$MyDoFn - Getting new data={"id":6,"text":"row6"} 2017/12/11 17:54:42.869 INFO com.pythian.Beam$MyDoFn - Inserting to BiqQuery: {id=6, data=row6} 2017/12/11 17:54:42.871 INFO com.pythian.Beam$MyDoFn - Processed data in 1 ms[/code] As expected the new code applied and later messages processed much faster than the first one. Please note that as with any other dynamic approach you have take care about permissions for the "control" topic. You can find the code here. Topics: Google Cloud Platform Google Cloud Big Data Apache Beam Dataflow Technical Track Cloud Scala
Share this
- Technical Track (853)
- Oracle (485)
- Database (184)
- MySQL (139)
- Cloud (128)
- Microsoft SQL Server (119)
- Open Source (87)
- Google Cloud (81)
- Microsoft Azure (66)
- Amazon Web Services (AWS) (59)
- Big Data (51)
- Google Cloud Platform (45)
- Cassandra (42)
- DevOps (38)
- Linux (28)
- Pythian (27)
- Podcasts (25)
- PostgreSQL (24)
- Site Reliability Engineering (23)
- Oracle E-Business Suite (22)
- Performance (22)
- Docker (20)
- Security (19)
- DBA (18)
- Oracle Cloud Infrastructure (OCI) (18)
- MongoDB (17)
- Hadoop (16)
- Amazon RDS (15)
- Automation (15)
- BigQuery (15)
- Ansible (14)
- Exadata (14)
- Oracle Database (14)
- Oracleebs (14)
- Snowflake (14)
- Artificial Intelligence (AI) (13)
- Oracle Exadata (13)
- Replication (13)
- ASM (12)
- Data (12)
- GenAI (12)
- Kubernetes (12)
- LLM (12)
- Advanced Analytics (11)
- Machine Learning (11)
- Cloud Migration (10)
- Datascape Podcast (10)
- Oracle Applications (10)
- Apache Cassandra (9)
- Authentication, SSO and MFA (9)
- ChatGPT (8)
- High Availability (8)
- Infrastructure (8)
- Monitoring (8)
- Oracle EBS (8)
- Percona (8)
- Rman (8)
- Data Governance (7)
- Innodb (7)
- Microsoft Azure SQL Database (7)
- Migration (7)
- Myrocks (7)
- Python (7)
- Series (7)
- AWR (6)
- Analytics (6)
- Apache Beam (6)
- Data Enablement (6)
- Data Guard (6)
- Oracle Enterprise Manager (OEM) (6)
- Orchestrator (6)
- Performance Tuning (6)
- RocksDB (6)
- Serverless (6)
- Airflow (5)
- Azure Synapse Analytics (5)
- Covid-19 (5)
- Data Visualization (5)
- Disaster Recovery (5)
- Generative AI (5)
- Mariadb (5)
- Microsoft (5)
- SAP (5)
- Scala (5)
- Xtrabackup (5)
- Cloud Security (4)
- Cloud Spanner (4)
- CockroachDB (4)
- Data Management (4)
- Data Pipeline (4)
- Data Security (4)
- Data Strategy (4)
- Database Administrator (4)
- Database Migration (4)
- Database Performance (4)
- Databases (4)
- Dataflow (4)
- Google (4)
- Google BigQuery (4)
- Oracle Autonomous Database (Adb) (4)
- Oracle Cloud (4)
- Oracle Enterprise Manager (4)
- Prometheus (4)
- Redhat (4)
- Ssl (4)
- Windows (4)
- Amazon Relational Database Service (Rds) (3)
- Apache Kafka (3)
- Apex (3)
- Aurora (3)
- Cloud Armor (3)
- Cloud Database (3)
- Cloud FinOps (3)
- Cosmos Db (3)
- Data Analytics (3)
- Database Management (3)
- Database Monitoring (3)
- Database Troubleshooting (3)
- Digital Transformation (3)
- ERP (3)
- Google Chrome (3)
- Google Cloud Sql (3)
- Google Workspace (3)
- Heterogeneous Database Migration (3)
- Oracle Live Sql (3)
- Oracle Rac (3)
- Perl (3)
- Power Bi (3)
- Remote Teams (3)
- Slob (3)
- Tensorflow (3)
- Terraform (3)
- Amazon Data Migration Service (2)
- Amazon Ec2 (2)
- Anisble (2)
- Apache (2)
- Apache Flink (2)
- Apexexport (2)
- Ashdump (2)
- Azure Data Factory (2)
- Business Intelligence (2)
- Cloud Data Fusion (2)
- Cloud Hosting (2)
- Cloud Infrastructure (2)
- Cloud Shell (2)
- Cloud Sql (2)
- Conferences (2)
- Cosmosdb (2)
- Cost Management (2)
- Data Discovery (2)
- Data Integration (2)
- Data Migration (2)
- Data Quality (2)
- Data Streaming (2)
- Data Warehouse (2)
- Database Upgrade (2)
- Dataguard (2)
- Dataops (2)
- Enterprise Data Platform (EDP) (2)
- Events (2)
- Fusion Middleware (2)
- Gemini (2)
- Graphite (2)
- Infrastructure As Code (2)
- Innodb Cluster (2)
- Innodb File Structure (2)
- Innodb Group Replication (2)
- Liquibase (2)
- NLP (2)
- Neo4J (2)
- Nosql (2)
- Open Source Database (2)
- Oracle Data Guard (2)
- Oracle Datase (2)
- Oracle Flashback (2)
- Oracle Forms (2)
- Oracle Installation (2)
- Oracle Io Testing (2)
- Podcast (2)
- Rdbms (2)
- Redshift (2)
- Remote DBA (2)
- Remote Sre (2)
- S3 (2)
- SAP HANA Cloud (2)
- Single Sign-On (2)
- Webinars (2)
- X5 (2)
- AI (1)
- Actifio (1)
- Adop (1)
- Advanced Data Services (1)
- Afd (1)
- Alloydb (1)
- Amazon (1)
- Amazon Aurora Backtrack (1)
- Amazon Efs (1)
- Amazon Redshift (1)
- Amazon S3 (1)
- Amazon Sagemaker (1)
- Amazon Vpc Flow Logs (1)
- Analysis (1)
- Analytical Models (1)
- Anthos (1)
- Apache Nifi (1)
- Application Migration (1)
- Ash (1)
- Asmlib (1)
- Atlas CLI (1)
- Atp (1)
- Autonomous (1)
- Awr Data Mining (1)
- Awr Mining (1)
- Azure Data Lake (1)
- Azure Data Lake Analytics (1)
- Azure Data Lake Store (1)
- Azure Data Migration Service (1)
- Azure OpenAI (1)
- Azure Sql Data Warehouse (1)
- Batches In Cassandra (1)
- Business Insights (1)
- Chown (1)
- Chrome Security (1)
- Cloud Browser (1)
- Cloud Build (1)
- Cloud Consulting (1)
- Cloud Cost Optimization (1)
- Cloud Data Warehouse (1)
- Cloud Database Management (1)
- Cloud Dataproc (1)
- Cloud Foundry (1)
- Cloud Manager (1)
- Cloud Networking (1)
- Cloud SQL Replica (1)
- Cloud Scheduler (1)
- Cloud Services (1)
- Cloud Strategies (1)
- Compliance (1)
- Conversational AI (1)
- Cyber Security (1)
- Data Analysis (1)
- Data Analytics Platform (1)
- Data Box (1)
- Data Classification (1)
- Data Cleansing (1)
- Data Encryption (1)
- Data Engineering (1)
- Data Estate (1)
- Data Insights (1)
- Data Integrity (1)
- Data Leader (1)
- Data Lifecycle Management (1)
- Data Lineage (1)
- Data Masking (1)
- Data Mesh (1)
- Data Migration Assistant (1)
- Data Migration Service (1)
- Data Mining (1)
- Data Modeling (1)
- Data Monetization (1)
- Data Policy (1)
- Data Profiling (1)
- Data Protection (1)
- Data Retention (1)
- Data Safe (1)
- Data Sheets (1)
- Data Summit (1)
- Data Vault (1)
- Data Warehouse Modernization (1)
- Database Auditing (1)
- Database Consultant (1)
- Database Consulting (1)
- Database Link (1)
- Database Migrations (1)
- Database Modernization (1)
- Database Provisioning (1)
- Database Provisioning Failed (1)
- Database Replication (1)
- Database Schemas (1)
- Databricks (1)
- Datascape 59 (1)
- DeepSeek (1)
- Docker-Composer (1)
- Duet AI (1)
- Edp (1)
- Etl (1)
- Gcp Compute (1)
- Gcp-Spanner (1)
- Global Analytics (1)
- Google Analytics (1)
- Google Cloud Architecture Framework (1)
- Google Cloud Data Services (1)
- Google Cloud Partner (1)
- Google Cloud Spanner (1)
- Google Cloud VMware Engine (1)
- Google Compute Engine (1)
- Google Dataflow (1)
- Google Datalab (1)
- Google Grab And Go (1)
- Graph Algorithms (1)
- Graph Databases (1)
- Graph Inferences (1)
- Graph Theory (1)
- GraphQL (1)
- Health Check (1)
- Healthcheck (1)
- Information (1)
- Infrastructure As A Code (1)
- Innobackupex (1)
- Innodb Concurrency (1)
- Innodb Flush Method (1)
- It Industry (1)
- Kubeflow (1)
- LMSYS Chatbot Arena (1)
- Linux Host Monitoring (1)
- Linux Storage Appliance (1)
- Looker (1)
- MMLU (1)
- Managed Services (1)
- Migrate (1)
- Migration Checklist (1)
- MongoDB Atlas (1)
- MongoDB Compass (1)
- Newsroom (1)
- Nifi (1)
- OPEX (1)
- Odbcs (1)
- Odbs (1)
- On-Premises (1)
- Ora-01852 (1)
- Ora-7445 (1)
- Oracle Cursor (1)
- Oracle Database@Google Cloud (1)
- Oracle Exadata Smart Scan (1)
- Oracle Licensing (1)
- Oracle Linux Virtualization Manager (1)
- Oracle Oda (1)
- Oracle Openworld (1)
- Oracle Parallelism (1)
- Oracle RMAN (1)
- Oracle Rdbms (1)
- Oracle Real Application Clusters (1)
- Oracle Reports (1)
- Oracle Security (1)
- Perfomrance (1)
- Performance Schema (1)
- Policy (1)
- Prompt Engineering (1)
- Public Cloud (1)
- Pythian News (1)
- Rdb (1)
- Replication Error (1)
- Retail (1)
- Securing Sql Server (1)
- Serverless Computing (1)
- Sso (1)
- Tenserflow (1)
- Teradata (1)
- Vertex AI (1)
- Videos (1)
- Workspace Security (1)
- Xbstream (1)
- August 2025 (1)
- July 2025 (3)
- June 2025 (1)
- May 2025 (3)
- March 2025 (2)
- February 2025 (1)
- January 2025 (2)
- December 2024 (1)
- October 2024 (2)
- September 2024 (7)
- August 2024 (4)
- July 2024 (2)
- June 2024 (6)
- May 2024 (3)
- April 2024 (2)
- February 2024 (1)
- January 2024 (11)
- December 2023 (10)
- November 2023 (10)
- October 2023 (11)
- September 2023 (8)
- August 2023 (6)
- July 2023 (2)
- June 2023 (13)
- May 2023 (4)
- April 2023 (6)
- March 2023 (10)
- February 2023 (6)
- January 2023 (5)
- December 2022 (10)
- November 2022 (10)
- October 2022 (10)
- September 2022 (13)
- August 2022 (16)
- July 2022 (12)
- June 2022 (13)
- May 2022 (11)
- April 2022 (4)
- March 2022 (5)
- February 2022 (4)
- January 2022 (14)
- December 2021 (16)
- November 2021 (11)
- October 2021 (6)
- September 2021 (11)
- August 2021 (6)
- July 2021 (9)
- June 2021 (4)
- May 2021 (8)
- April 2021 (16)
- March 2021 (16)
- February 2021 (6)
- January 2021 (12)
- December 2020 (12)
- November 2020 (17)
- October 2020 (11)
- September 2020 (10)
- August 2020 (11)
- July 2020 (13)
- June 2020 (6)
- May 2020 (9)
- April 2020 (18)
- March 2020 (21)
- February 2020 (13)
- January 2020 (15)
- December 2019 (10)
- November 2019 (11)
- October 2019 (12)
- September 2019 (16)
- August 2019 (15)
- July 2019 (10)
- June 2019 (16)
- May 2019 (20)
- April 2019 (21)
- March 2019 (14)
- February 2019 (18)
- January 2019 (18)
- December 2018 (5)
- November 2018 (16)
- October 2018 (12)
- September 2018 (20)
- August 2018 (27)
- July 2018 (31)
- June 2018 (34)
- May 2018 (28)
- April 2018 (27)
- March 2018 (17)
- February 2018 (8)
- January 2018 (20)
- December 2017 (14)
- November 2017 (4)
- October 2017 (1)
- September 2017 (3)
- August 2017 (5)
- July 2017 (4)
- June 2017 (2)
- May 2017 (7)
- April 2017 (7)
- March 2017 (8)
- February 2017 (8)
- January 2017 (5)
- December 2016 (3)
- November 2016 (4)
- October 2016 (8)
- September 2016 (9)
- August 2016 (10)
- July 2016 (9)
- June 2016 (8)
- May 2016 (13)
- April 2016 (16)
- March 2016 (13)
- February 2016 (11)
- January 2016 (6)
- December 2015 (11)
- November 2015 (11)
- October 2015 (5)
- September 2015 (16)
- August 2015 (4)
- July 2015 (1)
- June 2015 (3)
- May 2015 (6)
- April 2015 (5)
- March 2015 (5)
- February 2015 (4)
- January 2015 (3)
- December 2014 (7)
- October 2014 (4)
- September 2014 (6)
- August 2014 (6)
- July 2014 (16)
- June 2014 (7)
- May 2014 (6)
- April 2014 (5)
- March 2014 (4)
- February 2014 (10)
- January 2014 (6)
- December 2013 (8)
- November 2013 (12)
- October 2013 (9)
- September 2013 (6)
- August 2013 (7)
- July 2013 (9)
- June 2013 (7)
- May 2013 (7)
- April 2013 (4)
- March 2013 (7)
- February 2013 (4)
- January 2013 (4)
- December 2012 (6)
- November 2012 (8)
- October 2012 (9)
- September 2012 (3)
- August 2012 (5)
- July 2012 (5)
- June 2012 (7)
- May 2012 (11)
- April 2012 (1)
- March 2012 (8)
- February 2012 (1)
- January 2012 (6)
- December 2011 (8)
- November 2011 (5)
- October 2011 (9)
- September 2011 (6)
- August 2011 (4)
- July 2011 (1)
- June 2011 (1)
- May 2011 (5)
- April 2011 (2)
- February 2011 (2)
- January 2011 (2)
- December 2010 (1)
- November 2010 (7)
- October 2010 (3)
- September 2010 (8)
- August 2010 (2)
- July 2010 (4)
- June 2010 (7)
- May 2010 (2)
- April 2010 (1)
- March 2010 (3)
- February 2010 (3)
- January 2010 (2)
- November 2009 (6)
- October 2009 (6)
- August 2009 (3)
- July 2009 (3)
- June 2009 (3)
- May 2009 (2)
- April 2009 (8)
- March 2009 (6)
- February 2009 (4)
- January 2009 (3)
- November 2008 (3)
- October 2008 (7)
- September 2008 (6)
- August 2008 (9)
- July 2008 (9)
- June 2008 (9)
- May 2008 (9)
- April 2008 (8)
- March 2008 (4)
- February 2008 (3)
- January 2008 (3)
- December 2007 (2)
- November 2007 (7)
- October 2007 (1)
- August 2007 (4)
- July 2007 (3)
- June 2007 (8)
- May 2007 (4)
- April 2007 (2)
- March 2007 (2)
- February 2007 (5)
- January 2007 (8)
- December 2006 (1)
- November 2006 (3)
- October 2006 (4)
- September 2006 (3)
- July 2006 (1)
- May 2006 (2)
- April 2006 (1)
- July 2005 (1)
No Comments Yet
Let us know what you think