Optimising geospatial queries with dynamic file pruning Matthew Slack Principal Data Architect - Wejo
Agenda Wejo Typical use cases Data lake indexing strategies Dynamic file pruning Z-ordering strategies Measuring effectiveness of file pruning Optimizing queries to activate file pruning
Wejo
Wejo
Processing data from over 15m vehicles OEM A Streamed OEM A Micro-batch (Ign OFF) OEM B - n Streamed 15m+ Vehicles 18 billion data points per day 0.4 trillion per month Peak ~900,000 per second 5Pb data (and growing)
Wejo and Databricks Data Lake Data analysis Data science Data governance CoreIngress OEM Egress Transform Filter & Aggregate Stream Raw asset Bespoke DS output Adept Stream BI and Dashboards Incident Manage ment Change and Release 24x7 Monitori ng Alerting SecOps Devops Stream Stream Stream Aggregates Batch Insight Batch Bespoke Insights Sample Preview Infosec Complian ce BI Datamart Data Aggregates Infosec Tooling Sample Generation On-prem AWS Google Cloud Microsoft Azure Portal OEM OEM OEM Databricks underpins our ad-hoc analysis of data for data science We use it to populate both the datamart for BI and the derived data used in WIM Introduction of Delta Lake to support geospatial workloads and CCPA and GDPR
Typical use cases ▪ Traffic intelligence ▪ Map matching ▪ Anomaly detection ▪ Journey intelligence ▪ Origin destination analysis All require both geospatial and temporal indexing
Spark geospatial support Magellan and many others… ▪ Many existing libraries and frameworks ▪ However… ▪ Most are designed to efficiently join geospatial datasets that are already loaded to the cluster ▪ They do not optimize the reading of data from underlying storage ▪ Geomesa can be configured to utilise the underlying partitioning on disk but is complex geopandas
Data Lake Indexing Strategies (Recap)
Data lake partitioning ▪ Choice of partition columns in data lakes is always a trade- off ▪ Partitioning in data lakes is usually done using a hierarchical directory structure year=2020 • month=11 • day=17 • day=18 ingest_date=2020-11-17 • ingest_hour=10 • ingest_hour=11 country=US • state=NY • state=TX ingest_date=2020-11-17 • state=NY Complexity Selectivity Number of partitions (< 10k) Size of each partition (> 10s MB)
Dynamic partition pruning ▪ Allows partitions to be dynamically skipped at query runtime ▪ Partitions that do not match the query filters are excluded by the optimizer ▪ Significantly improved in Spark 3.0
Data skipping, via effective partitioning strategies, combined with columnar file formats such as Parquet, ORC were historically the main levers for optimizing a data lake
Dynamic File Pruning
Dynamic file pruning overview ▪ Introduced with Databricks Delta Lake in early 2020 ▪ Allows files to be dynamically skipped within a partition ▪ Files that do not match the query filters are excluded by the optimizer ▪ Databricks collects metadata on a subset of columns in all files added to the dataset, stored in _delta_log folder ▪ Relies on files being pre-sorted beforehand
Anatomy of _delta_log/00000000000000000000.json delta.dataSkippingNumIndexedCols
Test environment ▪ Test cluster ▪ Spark 2.4.5, Databricks Runtime 6.6 ▪ i3.4xlarge x 10 executors ▪ Auto scaling off ▪ CLEAR CACHE before each test ▪ Input dataset ▪ One day of connected car data (6th March) ▪ Nested parquet in AWS S3 ▪ Over 16 billion rows, ~40 columns ▪ 2491 files, ~1.4 TB data, ~0.5GB per file ▪ Partition strategy ▪ ingest_date, ingest_hour
Naïve example on un-optimized Parquet ▪ Generate all geohashes that cover the Austin polygon ▪ Spark scans all files that match the partition filters ▪ Just over 97M datapoints are covered by the geohashes
However… ▪ Slow… over 5 minutes ▪ Datapoints spread randomly across all of the 2491 input files
Z-Ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. Databricks
Z-Ordering the data OPTIMIZEConvert to Delta ▪ New dataset ▪ 6233 files, ~0.8 TB data, 128MB per file ▪ Partition strategy ▪ ingest_date, ingest_hour ▪ Z-ordered columns ▪ longitude, latitude
Co-location of geospatial data ▪ Spread of data across files, after Z-ORDER by geohash
Measuring file pruning ▪ input_file_name ▪ EXPLAIN
Measuring file pruning – continued df.queryExecution.optimizedPlan.collect { // WARNING: this info will not be available >= DBR 7.0 case DeltaTable(prepared: PreparedDeltaFileIndex) => val stat = prepared.preparedScan val skippingPct = (100 - (stat.scanned.bytesCompressed.get.toDouble / partitionReadBytes.get) * 100)
Candidate z-order columns ▪ Longitude/latitude ▪ Geohash ▪ Zipcode ▪ State ▪ H3 (Uber - https://eng.uber.com/h3/) ▪ S2 (Google - https://s2geometry.io/) Z-ordering by one geospatial column WILL also enable dynamic file pruning for queries that filter on a different geospatial column For example, z-ordering on geohash will also improve performance for queries on zipcode or state
Comparing performance Filter Type INNER JOIN (with broadcast) geohash range h3 range longitude/ latitude range state zipcode range (10) make/model range Columns Z-Ordered longitude, latitude 5.55 mins 0% skipping 33s 82% skipping 30s 81% skipping 12s 93% skipping 26s 74% skipping 8s 97% skipping 1.6 mins 0% skipping h3 27s 80% skipping 14s 96% skipping 18s 92% skipping 38s 68% skipping 11s 95% skipping 1.7 mins 0% skipping geohash 12s 98% skipping 16s 84% skipping 16s 92% skipping 20s 76% skipping 7s 97% skipping 1.3 mins 0% skipping make/model 1.4 mins 0% skipping 1.4 mins 0% skipping 1.4 mins 0% skipping 20s 84% skipping geohash, longitude, latitude 27s 90% skipping 23s 82% skipping 23s 91% skipping geohash, make/model 17s 83% skipping 28s 55% skipping 35s 58% skipping 43s 38% skipping 15s 84% skipping 36s 61% skipping s2 17s 83% skipping 36s 82% skipping 22s 91% skipping 30s 73% skipping 11s 96% skipping 1.8 mins 0% skipping
Spot the difference Only one of these queries activates dynamic file pruning Check the query plan!
Query caveats ▪ Only certain query filters will activate file pruning Activates Simple filters, e.g: • =, >, <, BETWEEN IN (with up to 10 values) INNER JOIN – but.. • must be BROADCAST • must be included in WHERE clause Doesn’t activate RLIKE IN (with over 10 values)
Skipping the OPTIMIZE step ▪ OPTIMIZE is effective but expensive ▪ Requires an additional step in your data pipelines ▪ repartitionByRange ▪ works for batch and stream ▪ does not ZORDER, but does shuffle datapoints into files based on the selected columns ▪ can use a column such as geohash, which is already a z-order over the data
Process overview ▪ Importing data ▪ Querying data Import data in stream or batch Process Add geohash column (implicit Z-ORDER) repartitionByRange (requires shuffle) Write to parquet (non-delta table) Import to delta (does not require shuffle) Convert all geospatial queries to a set of covering geohashes Lookup into table using a BROADCAST join up to x100 reduction in query read times from object storage (S3)
With the right query adjustments, dynamic file pruning is a very effective tool for optimizing geospatial queries which read from object storage such as S3
Feedback Your feedback is important to us. Don’t forget to rate and review the sessions.

Optimising Geospatial Queries with Dynamic File Pruning

  • 1.
    Optimising geospatial queries withdynamic file pruning Matthew Slack Principal Data Architect - Wejo
  • 2.
    Agenda Wejo Typical use cases Datalake indexing strategies Dynamic file pruning Z-ordering strategies Measuring effectiveness of file pruning Optimizing queries to activate file pruning
  • 3.
  • 4.
  • 5.
    Processing data fromover 15m vehicles OEM A Streamed OEM A Micro-batch (Ign OFF) OEM B - n Streamed 15m+ Vehicles 18 billion data points per day 0.4 trillion per month Peak ~900,000 per second 5Pb data (and growing)
  • 6.
    Wejo and Databricks DataLake Data analysis Data science Data governance CoreIngress OEM Egress Transform Filter & Aggregate Stream Raw asset Bespoke DS output Adept Stream BI and Dashboards Incident Manage ment Change and Release 24x7 Monitori ng Alerting SecOps Devops Stream Stream Stream Aggregates Batch Insight Batch Bespoke Insights Sample Preview Infosec Complian ce BI Datamart Data Aggregates Infosec Tooling Sample Generation On-prem AWS Google Cloud Microsoft Azure Portal OEM OEM OEM Databricks underpins our ad-hoc analysis of data for data science We use it to populate both the datamart for BI and the derived data used in WIM Introduction of Delta Lake to support geospatial workloads and CCPA and GDPR
  • 7.
    Typical use cases ▪Traffic intelligence ▪ Map matching ▪ Anomaly detection ▪ Journey intelligence ▪ Origin destination analysis All require both geospatial and temporal indexing
  • 8.
    Spark geospatial support Magellan andmany others… ▪ Many existing libraries and frameworks ▪ However… ▪ Most are designed to efficiently join geospatial datasets that are already loaded to the cluster ▪ They do not optimize the reading of data from underlying storage ▪ Geomesa can be configured to utilise the underlying partitioning on disk but is complex geopandas
  • 9.
    Data Lake IndexingStrategies (Recap)
  • 10.
    Data lake partitioning ▪Choice of partition columns in data lakes is always a trade- off ▪ Partitioning in data lakes is usually done using a hierarchical directory structure year=2020 • month=11 • day=17 • day=18 ingest_date=2020-11-17 • ingest_hour=10 • ingest_hour=11 country=US • state=NY • state=TX ingest_date=2020-11-17 • state=NY Complexity Selectivity Number of partitions (< 10k) Size of each partition (> 10s MB)
  • 11.
    Dynamic partition pruning ▪Allows partitions to be dynamically skipped at query runtime ▪ Partitions that do not match the query filters are excluded by the optimizer ▪ Significantly improved in Spark 3.0
  • 12.
    Data skipping, viaeffective partitioning strategies, combined with columnar file formats such as Parquet, ORC were historically the main levers for optimizing a data lake
  • 13.
  • 14.
    Dynamic file pruningoverview ▪ Introduced with Databricks Delta Lake in early 2020 ▪ Allows files to be dynamically skipped within a partition ▪ Files that do not match the query filters are excluded by the optimizer ▪ Databricks collects metadata on a subset of columns in all files added to the dataset, stored in _delta_log folder ▪ Relies on files being pre-sorted beforehand
  • 15.
  • 16.
    Test environment ▪ Testcluster ▪ Spark 2.4.5, Databricks Runtime 6.6 ▪ i3.4xlarge x 10 executors ▪ Auto scaling off ▪ CLEAR CACHE before each test ▪ Input dataset ▪ One day of connected car data (6th March) ▪ Nested parquet in AWS S3 ▪ Over 16 billion rows, ~40 columns ▪ 2491 files, ~1.4 TB data, ~0.5GB per file ▪ Partition strategy ▪ ingest_date, ingest_hour
  • 17.
    Naïve example onun-optimized Parquet ▪ Generate all geohashes that cover the Austin polygon ▪ Spark scans all files that match the partition filters ▪ Just over 97M datapoints are covered by the geohashes
  • 18.
    However… ▪ Slow… over5 minutes ▪ Datapoints spread randomly across all of the 2491 input files
  • 19.
    Z-Ordering is atechnique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake on Databricks data-skipping algorithms to dramatically reduce the amount of data that needs to be read. Databricks
  • 20.
    Z-Ordering the data OPTIMIZEConvertto Delta ▪ New dataset ▪ 6233 files, ~0.8 TB data, 128MB per file ▪ Partition strategy ▪ ingest_date, ingest_hour ▪ Z-ordered columns ▪ longitude, latitude
  • 21.
    Co-location of geospatial data ▪Spread of data across files, after Z-ORDER by geohash
  • 22.
    Measuring file pruning ▪input_file_name ▪ EXPLAIN
  • 23.
    Measuring file pruning– continued df.queryExecution.optimizedPlan.collect { // WARNING: this info will not be available >= DBR 7.0 case DeltaTable(prepared: PreparedDeltaFileIndex) => val stat = prepared.preparedScan val skippingPct = (100 - (stat.scanned.bytesCompressed.get.toDouble / partitionReadBytes.get) * 100)
  • 24.
    Candidate z-order columns ▪ Longitude/latitude ▪Geohash ▪ Zipcode ▪ State ▪ H3 (Uber - https://eng.uber.com/h3/) ▪ S2 (Google - https://s2geometry.io/) Z-ordering by one geospatial column WILL also enable dynamic file pruning for queries that filter on a different geospatial column For example, z-ordering on geohash will also improve performance for queries on zipcode or state
  • 25.
    Comparing performance Filter Type INNERJOIN (with broadcast) geohash range h3 range longitude/ latitude range state zipcode range (10) make/model range Columns Z-Ordered longitude, latitude 5.55 mins 0% skipping 33s 82% skipping 30s 81% skipping 12s 93% skipping 26s 74% skipping 8s 97% skipping 1.6 mins 0% skipping h3 27s 80% skipping 14s 96% skipping 18s 92% skipping 38s 68% skipping 11s 95% skipping 1.7 mins 0% skipping geohash 12s 98% skipping 16s 84% skipping 16s 92% skipping 20s 76% skipping 7s 97% skipping 1.3 mins 0% skipping make/model 1.4 mins 0% skipping 1.4 mins 0% skipping 1.4 mins 0% skipping 20s 84% skipping geohash, longitude, latitude 27s 90% skipping 23s 82% skipping 23s 91% skipping geohash, make/model 17s 83% skipping 28s 55% skipping 35s 58% skipping 43s 38% skipping 15s 84% skipping 36s 61% skipping s2 17s 83% skipping 36s 82% skipping 22s 91% skipping 30s 73% skipping 11s 96% skipping 1.8 mins 0% skipping
  • 26.
    Spot the difference Onlyone of these queries activates dynamic file pruning Check the query plan!
  • 27.
    Query caveats ▪ Onlycertain query filters will activate file pruning Activates Simple filters, e.g: • =, >, <, BETWEEN IN (with up to 10 values) INNER JOIN – but.. • must be BROADCAST • must be included in WHERE clause Doesn’t activate RLIKE IN (with over 10 values)
  • 28.
    Skipping the OPTIMIZEstep ▪ OPTIMIZE is effective but expensive ▪ Requires an additional step in your data pipelines ▪ repartitionByRange ▪ works for batch and stream ▪ does not ZORDER, but does shuffle datapoints into files based on the selected columns ▪ can use a column such as geohash, which is already a z-order over the data
  • 29.
    Process overview ▪ Importingdata ▪ Querying data Import data in stream or batch Process Add geohash column (implicit Z-ORDER) repartitionByRange (requires shuffle) Write to parquet (non-delta table) Import to delta (does not require shuffle) Convert all geospatial queries to a set of covering geohashes Lookup into table using a BROADCAST join up to x100 reduction in query read times from object storage (S3)
  • 30.
    With the rightquery adjustments, dynamic file pruning is a very effective tool for optimizing geospatial queries which read from object storage such as S3
  • 31.
    Feedback Your feedback isimportant to us. Don’t forget to rate and review the sessions.