Massively Scalable Real-time Geospatial Data Processing with Apache Kafka and Cassandra Melbourne Distributed Meetup 30 April 2020 (online) Paul Brebner instaclustr.com Technology Evangelist
Staying at home Down to last dog toy Grass maze for locals
Overview ■ In the News (location) ■ Anomaly Detection – baseline throughput ■ Spatial Anomaly Detection problem ■ Solutions – location representation and querying/indexing ● Bounding boxes and secondary indexes ● Geohashes ● Lucene index ● Results ● 3D
In the News John Conway Legendary Polymath Passed away from Covid-19
Game of Life Next state of each cell depends on state of immediate neighbours
Game of Life Simple rules but complex patterns
Also in the news Social distancing and Covid-19 tracing Uncle Ron’s social distancing 3000 invention Or CovidSafe App?
Also in the news “UFO” photos declassified by USA And “planet-killer” asteroid missed the earth yesterday (16x moon orbit) Uncle Ron’s social distancing 3000 invention
Previously… Anomaly Detection Spot the difference At speed (< 1s RT) and scale (High throughput, lots of data)
How does it work? • CUSUM (Cumulative Sum Control Chart) • Statistical analysis of historical data • Data for a single variable/key at a time • Potentially Billions of keys
Pipeline Design • Interaction with Kafka and Cassandra Clusters • Efficient Cassandra Data writes and reads with key, a unique “account ID” or similar
Cassandra Data Model Events are timeseries Id is Partition Key Time is clustering key (order) Read gets most recent 50 values for id, very fast create table event_stream ( id bigint, time timestamp, value double, primary key (id, time) ) with clustering order by (time desc); select value from event_stream where id=314159265 limit 50;
Baseline throughput 19 Billion Anomaly Checks/Day = 100% 0 20 40 60 80 100 120 Baseline (single transaction ID) Normalised (%)
Harder problem – Spot the differences in Space Space is big. Really big. You just won’t believe how vastly, hugely, mind-bogglingly big it is. I mean, you may think it’s a long way down the road to the chemist, but that’s just peanuts to space. Douglas Adams, The Hitchhiker’s Guide to the Galaxy
Spatial Anomalies Many and varied
Real Example - John Snow No, not this one
John Snow’s 1854 Cholera Map Death’s per household + location Used to identify a polluted pump (X) Some outliers – brewers drank beer not water! X
But… First you have to know where you are - Location To usefully represent location need: Coordinate system Map Scale
Better • <lat, long> coordinates • Scale • Interesting locations “Bulk of treasure here”
Geospatial Anomaly Detection ■ New problem… ■ Rather than a single ID, events now have a location (and a value) ■ The problem now is to ● find the nearest 50 events to each new event ● Quickly (< 1s RT) ■ Can’t make any assumptions about geospatial properties of events ● including location, density or distribution – i.e. where, or how many ● Need to search from smallest to increasingly larger areas ● E.g. South Atlantic Geomagnetic Anomaly is BIG ■ Uber uses similar technologies to ● forecast demand ● Increase area until they have sufficient data for predictions ■ Can we use <lat, long> as Cassandra partition key? ● Yes, compound partition keys are allowed. ● But can only select the exact locations. South Atlantic Geomagnetic Anomaly
How to compute nearness To compute distance between locations Need coordinate system E.g. Mercator map Flat earth, distortion nearer poles
World is (approx) spherical calculation of distance between two lat/long points is non-trivial
Bounding box Approximation of distance using inequalities
Bounding boxes and Cassandra? Use ”country” partition key, Lat/long/time clustering keys But can’t run the query with multiple inequalities CREATE TABLE latlong ( country text, lat double, long double, time timestamp, PRIMARY KEY (country, lat, long, time) ) WITH CLUSTERING ORDER BY (lat ASC, long ASC, time DESC); select * from latlong where country='nz' and lat>= - 39.58 and lat <= -38.67 and long >= 175.18 and long <= 176.08 limit 50; InvalidRequest: Error from server: code=2200 [Invalid query] message="Clustering column "long" cannot be restricted (preceding column "lat" is restricted by a non-EQ relation)"
Secondary indexes to the rescue? ■ Secondary indexes ᐨ create index i1 on latlong (lat); ᐨ create index i2 on latlong (long); ● But same restrictions as clustering columns. ■ SASI - SSTable Attached Secondary Index ● Supports more complex queries more efficiently ᐨ create custom index i1 on latlong (long) using 'org.apache.cassandra.index.sasi.SASIIndex'; ᐨ create custom index i2 on latlong (lat) using 'org.apache.cassandra.index.sasi.SASIIndex’; ● select * from latlong where country='nz' and lat>= -39.58 and lat <= -38.67 and long >= 175.18 and long <= 176.08 limit 50 allow filtering; ● “allow filtering” may be inefficient (if many rows have to be retrieved prior to filtering) and isn’t suitable for production. ● But SASI docs say ᐨ even though “allow filtering” must be used with 2 or more column inequalities, there is actually no filtering taking place,
Results Very poor (< 1%) 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI
Geohashes to the rescue? Divide maps into named and hierarchical areas We’ve been something similar already: “country” partition key E.g. plate tectonics
Geohashes Rectangular areas Variable length base-32 string Single char regions 5,000km x 5,000km Each extra letter gives 32 sub-areas 8 chars is 40mx20m En/de-code lat/long to/from geohash But: Edges cases, non-linear near poles
Some geohashes are words “ketchup” is in Africa
Some geohashes are words 153mx153m
“Trump” Is in Kazakhstan! 5kmx5km Not to scale
Modifications for geohashes Lat/long encoded as geohash Geohash is new key Geohash used to query cassandra
Geohashes and Cassandra In theory Geohashes work well for database indexes Option 1 – Multiple indexed geohash columns CREATE TABLE geohash1to8 ( geohash1 text, time timestamp, geohash2 text, geohash3 text, geohash4 text, geohash5 text, geohash6 text, geohash7 text, geohash8 text, value double, PRIMARY KEY (hash1, time) ) WITH CLUSTERING ORDER BY (time DESC); CREATE INDEX i8 ON geohash1to8 (geohash8); CREATE INDEX i7 ON geohash1to8 (geohash7); CREATE INDEX i6 ON geohash1to8 (geohash6); CREATE INDEX i5 ON geohash1to8 (geohash5); CREATE INDEX i4 ON geohash1to8 (geohash4); CREATE INDEX i3 ON geohash1to8 (geohash3); CREATE INDEX i2 ON geohash1to8 (geohash2);
Query from smallest to largest areas Stop when 50 rows found select * from geohash1to8 where geohash1=’e’ and geohash7=’everywh’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash6=’everyw’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash5=’every’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash4=’ever’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash3=’eve’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash2=’ev’ limit 50; select * from geohash1to8 where geohash1=’e’ limit 50; Tradeoffs? Multiple secondary columns/indexes, multiple queries, accuracy and number of queries depends on spatial distribution and density
Results Option 1 = 10% 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1
Option 2 – Denormalized multiple tables Denormalization is “Normal” in Cassandra Create 8 tables, one for each geohash length CREATE TABLE geohash1 ( geohash text, time timestamp, value double, PRIMARY KEY (geohash, time) ) WITH CLUSTERING ORDER BY (time DESC); … CREATE TABLE geohash8 ( geohash text, time timestamp, value double, PRIMARY KEY (geohash, time) ) WITH CLUSTERING ORDER BY (time DESC);
Select from smallest to largest areas using corresponding table select * from geohash8 where geohash=’everywhe’ limit 50; select * from geohash7 where geohash=’everywh’ limit 50; select * from geohash6 where geohash=’everyw’ limit 50; select * from geohash5 where geohash=’every’ limit 50; select * from geohash4 where geohash=’ever’ limit 50; select * from geohash3 where geohash=’eve’ limit 50; select * from geohash2 where geohash=’ev’ limit 50; select * from geohash1 where geohash=’e’ limit 50; Tradeoffs? Multiple tables and writes, multiple queries
Results Option 2 = 20% 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2
Option 3 – Clustering Column(s) Similar to Option 1 but using clustering columns CREATE TABLE geohash1to8_clustering ( geohash1 text, time timestamp, geohash2 text, gephash3 text, geohash4 text, geohash5 text, geohash6 text, geohash7 text, geohash8 text, value double, PRIMARY KEY (geohash1, geohash2, geohash3, geohash4, geohash5, geohash6, geohash7, geohash8, time) ) WITH CLUSTERING ORDER BY (geohash2 DESC, geohash3 DESC, geohash4 DESC, geohash5 DESC, geohash6 DESC, geohash7 DESC, geohash8 DESC, time DESC);
How do Clustering columns work? Good for hierarchical data ■ Clustering columns are good for modelling and efficient querying of hierarchical/nested data ■ Query must include higher level columns with equality operator, ranges are only allowed on last column in query, lower level columns don’t have to be included. E.g. ● select * from geohash1to8_clustering where geohash1=’e’ and geohash2=’ev’ and geohash3 >= ’ev0’ and geohash3 <= ‘evz’ limit 50; ■ But why have multiple clustering columns when one is actually enough…
Better: Single Geohash Clustering Column Geohash8 and time are clustering keys CREATE TABLE geohash_clustering ( geohash1 text, time timestamp, geohash8 text, lat double, long double, PRIMARY KEY (geohash1, geohash8, time) ) WITH CLUSTERING ORDER BY (geohash8 DESC, time DESC);
Inequality range query With decreasing length geohashes Stop when result has 50 rows select * from geohash_clustering where geohash1=’e’ and geohash8=’everywhe’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’everywh0’ and geohash8 <=’everywhz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’everyw0’ and geohash8 <=’everywz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’every0’ and geohash8 <=’everyz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’ever0’ and geohash8 <=’everz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’eve0’ and geohash8 <=’evez’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’ev0’ and geohash8 <=’evz’ limit 50; select * from geohash_clustering where geohash1=’e’ limit 50;
Geohash Results Option 3 is best = 34% 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3
Issues? ■ Cardinality for partition key ● should be > 100,000 ● >= 4 character geohash ■ Unbounded partitions are bad ● May need composite partition key in production ● e.g. extra time bucket (hour, day, etc) ■ Space vs time ● could have different sized buckets for different sized spaces ● E.g. bigger areas with more frequent events may need shorter time buckets to limit size ● This may depend on the space-time scales of underlying systems/processes ● E.g. Spatial and temporal scales of oceanographic processes (left)
Other option(s) – Cassandra Lucene Index Plugin A concordance
Other option(s) – Cassandra Lucene Index Plugin ■ The Cassandra Lucene Index is a plugin for Apache Cassandra: ● that extends its index functionality to provide near real-time search, including full-text search capabilities and free multivariable, geospatial and bitemporal search ● It is achieved through an Apache Lucene based implementation of Cassandra secondary indexes, where each node of the cluster indexes its own data. ■ Instaclustr supports the plugin ● Optional add-on to managed Cassandra service ● And code support ᐨ https://github.com/instaclustr/cassandra-lucene-index ■ How does this help for Geospatial queries? ● has very rich geospatial semantics including geo points, geo shapes, geo distance search, geo bounding box search, geo shape search, multiple distance units, geo transformations, and complex geo shapes.
Cassandra table and Lucene indexes Geopoint Example Under the hood indexing is done using a tree structure with geohashes (configurable precision). CREATE TABLE latlong_lucene ( geohash1 text, value double, time timestamp, latitude double, longitude double, Primary key (geohash1, time) ) WITH CLUSTERING ORDER BY (time DESC); CREATE CUSTOM INDEX latlong_index ON latlong_lucene () USING 'com.stratio.cassandra.lucene.Index' WITH OPTIONS = { 'refresh_seconds': '1', 'schema': '{ fields: { geohash1: {type: "string"}, value: {type: "double"}, time: {type: "date", pattern: "yyyy/MM/dd HH:mm:ss.SSS"}, place: {type: "geo_point", latitude: "latitude", longitude: "longitude"} }' };
Search Options Sort Sophisticated but complex semantics (see the docs) SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ sort: [ {field: "place", type: "geo_distance", latitude: " + <lat> + ", longitude: " + <long> + "}, {field: "time", reverse: true} ] }') and geohash1=<geohash> limit 50;
Search Options Bounding Box filter Need to compute box corners SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: { type: "geo_bbox", field: "place", min_latitude: " + <minLat> + ", max_latitude: " + <maxLat> + ", min_longitude: " + <minLon> + ", max_longitude: " + <maxLon> + " }}') limit 50;
Search Options Geo Distance filter SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: { type: "geo_distance", field: "place", latitude: " + <lat> + ", longitude: " + <long> + ", max_distance: " <distance> + "km" } }') and geohash1=' + <hash1> + ' limit 50;
Search Options – Prefix filter prefix search is useful for searching larger areas over a single geohash column as you can search for a substring SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: [ {type: "prefix", field: "geohash1", value: <geohash>} ] }') limit 50 Similar to inequality over clustering column
Lucene Results Options = 2-25% Best is prefix filter 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash
Overall Geohash options faster (25%, 34%) 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash G e o h a s h G e o h a s h
Overall Geohash options faster (25%, 34%) Lucene bounded box/geo distance most accurate but only 5% of baseline performance 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash L u c e n e L u c e n e
3D (Up and Down) Who needs it?
Location, Altitude and Volume 3D Geohashes represent 2D location, altitude and volume A 3D geohash is a cube
Application? 3D Drone Proximity Detection
Proximity rules > 50m from people and property >150m from congested areas > 1000m from airports > 5000m from exclusion zones Just happen to correspond to different length 3D geohashes,
3D Geohashes 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash 3 D G e o h a s h Work with all the geohash index options So reasonably fast to compute 3D proximity More accurate slower options can be improved with bigger Cassandra clusters 3 D G e o h a s h 3 D G e o h a s h 3 D G e o h a s h
Covid-19 tracing! Social distancing is a spatiotemporal proximity problem ■ Logic is (something like) ● If less than 1.5m distance from another phone continuously for more than 15 minutes and the phone is diagnosed with Covid-19 within 2 weeks then receive alert ■ So does CovidSafe use location data? It required location permissions to work…
Covid-19 tracing! Social distancing is a spatiotemporal proximity problem ■ Turns out you don’t actually need location as Bluetooth detects other phones nearby (<30m?) ● Which could result in too many false positives ● So probably uses signal strength as distance proxy ■ CovidSafe – location enabled but not used (claimed) ■ UK tracing app plans to use actual location, e.g. to detect hotspots (c.f. cholera map)
The End ■ More Information? ■ Demo 3D Geohash java code ● https://gist.github.com/paul- brebner/a67243859d2cf38bd9038a12a7b14762 ● produces valid 3D geohashes for altitudes from 13km below sea level to geostationary satellite orbit
■ https://www.instaclustr.com/paul-brebner/ ■ Latest Blog Series – Globally distributed Streaming, Storage and Search ● Application is deployed in multiple locations, data is replicated or sent where/when it’s needed ● “Around the World” series, part 3 introduces a Stock Trading application ● https://www.instaclustr.com/building-a-low-latency-distributed-stock- broker-application-part-3/ Blogs
The End ■ Try out the Instaclustr Managed Platform for Open Source ● https://www.instaclustr.com/platform/ ● Free Trial ᐨ https://console.instaclustr.com/user/signup?coupon- code=WORKSHOP

Massively Scalable Real-time Geospatial Data Processing with Apache Kafka and Cassandra Melbourne Distributed Meetup 30 April 2020 (online)

  • 1.
    Massively Scalable Real-time GeospatialData Processing with Apache Kafka and Cassandra Melbourne Distributed Meetup 30 April 2020 (online) Paul Brebner instaclustr.com Technology Evangelist
  • 2.
    Staying at home Down tolast dog toy Grass maze for locals
  • 3.
    Overview ■ In theNews (location) ■ Anomaly Detection – baseline throughput ■ Spatial Anomaly Detection problem ■ Solutions – location representation and querying/indexing ● Bounding boxes and secondary indexes ● Geohashes ● Lucene index ● Results ● 3D
  • 4.
    In the News JohnConway Legendary Polymath Passed away from Covid-19
  • 5.
    Game of Life Nextstate of each cell depends on state of immediate neighbours
  • 6.
    Game of Life Simplerules but complex patterns
  • 7.
    Also in the news Socialdistancing and Covid-19 tracing Uncle Ron’s social distancing 3000 invention Or CovidSafe App?
  • 8.
    Also in the news “UFO”photos declassified by USA And “planet-killer” asteroid missed the earth yesterday (16x moon orbit) Uncle Ron’s social distancing 3000 invention
  • 9.
    Previously… Anomaly Detection Spot the difference Atspeed (< 1s RT) and scale (High throughput, lots of data)
  • 10.
    How does it work? •CUSUM (Cumulative Sum Control Chart) • Statistical analysis of historical data • Data for a single variable/key at a time • Potentially Billions of keys
  • 11.
    Pipeline Design • Interaction with Kafkaand Cassandra Clusters • Efficient Cassandra Data writes and reads with key, a unique “account ID” or similar
  • 12.
    Cassandra Data Model Events are timeseries Idis Partition Key Time is clustering key (order) Read gets most recent 50 values for id, very fast create table event_stream ( id bigint, time timestamp, value double, primary key (id, time) ) with clustering order by (time desc); select value from event_stream where id=314159265 limit 50;
  • 13.
    Baseline throughput 19 Billion Anomaly Checks/Day =100% 0 20 40 60 80 100 120 Baseline (single transaction ID) Normalised (%)
  • 14.
    Harder problem – Spot the differences inSpace Space is big. Really big. You just won’t believe how vastly, hugely, mind-bogglingly big it is. I mean, you may think it’s a long way down the road to the chemist, but that’s just peanuts to space. Douglas Adams, The Hitchhiker’s Guide to the Galaxy
  • 15.
  • 16.
  • 17.
    John Snow’s 1854 Cholera Map Death’sper household + location Used to identify a polluted pump (X) Some outliers – brewers drank beer not water! X
  • 18.
    But… First you have to knowwhere you are - Location To usefully represent location need: Coordinate system Map Scale
  • 19.
    Better • <lat, long> coordinates •Scale • Interesting locations “Bulk of treasure here”
  • 20.
    Geospatial Anomaly Detection ■ New problem… ■Rather than a single ID, events now have a location (and a value) ■ The problem now is to ● find the nearest 50 events to each new event ● Quickly (< 1s RT) ■ Can’t make any assumptions about geospatial properties of events ● including location, density or distribution – i.e. where, or how many ● Need to search from smallest to increasingly larger areas ● E.g. South Atlantic Geomagnetic Anomaly is BIG ■ Uber uses similar technologies to ● forecast demand ● Increase area until they have sufficient data for predictions ■ Can we use <lat, long> as Cassandra partition key? ● Yes, compound partition keys are allowed. ● But can only select the exact locations. South Atlantic Geomagnetic Anomaly
  • 21.
    How to compute nearness To compute distancebetween locations Need coordinate system E.g. Mercator map Flat earth, distortion nearer poles
  • 22.
    World is (approx) spherical calculation of distancebetween two lat/long points is non-trivial
  • 23.
  • 24.
    Bounding boxes and Cassandra? Use ”country” partitionkey, Lat/long/time clustering keys But can’t run the query with multiple inequalities CREATE TABLE latlong ( country text, lat double, long double, time timestamp, PRIMARY KEY (country, lat, long, time) ) WITH CLUSTERING ORDER BY (lat ASC, long ASC, time DESC); select * from latlong where country='nz' and lat>= - 39.58 and lat <= -38.67 and long >= 175.18 and long <= 176.08 limit 50; InvalidRequest: Error from server: code=2200 [Invalid query] message="Clustering column "long" cannot be restricted (preceding column "lat" is restricted by a non-EQ relation)"
  • 25.
    Secondary indexes to the rescue? ■Secondary indexes ᐨ create index i1 on latlong (lat); ᐨ create index i2 on latlong (long); ● But same restrictions as clustering columns. ■ SASI - SSTable Attached Secondary Index ● Supports more complex queries more efficiently ᐨ create custom index i1 on latlong (long) using 'org.apache.cassandra.index.sasi.SASIIndex'; ᐨ create custom index i2 on latlong (lat) using 'org.apache.cassandra.index.sasi.SASIIndex’; ● select * from latlong where country='nz' and lat>= -39.58 and lat <= -38.67 and long >= 175.18 and long <= 176.08 limit 50 allow filtering; ● “allow filtering” may be inefficient (if many rows have to be retrieved prior to filtering) and isn’t suitable for production. ● But SASI docs say ᐨ even though “allow filtering” must be used with 2 or more column inequalities, there is actually no filtering taking place,
  • 26.
    Results Very poor (<1%) 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI
  • 27.
    Geohashes to the rescue? Divide mapsinto named and hierarchical areas We’ve been something similar already: “country” partition key E.g. plate tectonics
  • 28.
    Geohashes Rectangular areas Variable length base-32string Single char regions 5,000km x 5,000km Each extra letter gives 32 sub-areas 8 chars is 40mx20m En/de-code lat/long to/from geohash But: Edges cases, non-linear near poles
  • 29.
  • 30.
  • 31.
  • 32.
    Modifications for geohashes Lat/long encoded as geohash Geohashis new key Geohash used to query cassandra
  • 33.
    Geohashes and Cassandra In theory Geohashes work wellfor database indexes Option 1 – Multiple indexed geohash columns CREATE TABLE geohash1to8 ( geohash1 text, time timestamp, geohash2 text, geohash3 text, geohash4 text, geohash5 text, geohash6 text, geohash7 text, geohash8 text, value double, PRIMARY KEY (hash1, time) ) WITH CLUSTERING ORDER BY (time DESC); CREATE INDEX i8 ON geohash1to8 (geohash8); CREATE INDEX i7 ON geohash1to8 (geohash7); CREATE INDEX i6 ON geohash1to8 (geohash6); CREATE INDEX i5 ON geohash1to8 (geohash5); CREATE INDEX i4 ON geohash1to8 (geohash4); CREATE INDEX i3 ON geohash1to8 (geohash3); CREATE INDEX i2 ON geohash1to8 (geohash2);
  • 34.
    Query from smallest to largest areas Stopwhen 50 rows found select * from geohash1to8 where geohash1=’e’ and geohash7=’everywh’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash6=’everyw’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash5=’every’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash4=’ever’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash3=’eve’ limit 50; select * from geohash1to8 where geohash1=’e’ and geohash2=’ev’ limit 50; select * from geohash1to8 where geohash1=’e’ limit 50; Tradeoffs? Multiple secondary columns/indexes, multiple queries, accuracy and number of queries depends on spatial distribution and density
  • 35.
    Results Option 1 =10% 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1
  • 36.
    Option 2 – Denormalized multiple tables Denormalizationis “Normal” in Cassandra Create 8 tables, one for each geohash length CREATE TABLE geohash1 ( geohash text, time timestamp, value double, PRIMARY KEY (geohash, time) ) WITH CLUSTERING ORDER BY (time DESC); … CREATE TABLE geohash8 ( geohash text, time timestamp, value double, PRIMARY KEY (geohash, time) ) WITH CLUSTERING ORDER BY (time DESC);
  • 37.
    Select from smallest to largest areas usingcorresponding table select * from geohash8 where geohash=’everywhe’ limit 50; select * from geohash7 where geohash=’everywh’ limit 50; select * from geohash6 where geohash=’everyw’ limit 50; select * from geohash5 where geohash=’every’ limit 50; select * from geohash4 where geohash=’ever’ limit 50; select * from geohash3 where geohash=’eve’ limit 50; select * from geohash2 where geohash=’ev’ limit 50; select * from geohash1 where geohash=’e’ limit 50; Tradeoffs? Multiple tables and writes, multiple queries
  • 38.
    Results Option 2 =20% 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2
  • 39.
    Option 3 – Clustering Column(s) Similarto Option 1 but using clustering columns CREATE TABLE geohash1to8_clustering ( geohash1 text, time timestamp, geohash2 text, gephash3 text, geohash4 text, geohash5 text, geohash6 text, geohash7 text, geohash8 text, value double, PRIMARY KEY (geohash1, geohash2, geohash3, geohash4, geohash5, geohash6, geohash7, geohash8, time) ) WITH CLUSTERING ORDER BY (geohash2 DESC, geohash3 DESC, geohash4 DESC, geohash5 DESC, geohash6 DESC, geohash7 DESC, geohash8 DESC, time DESC);
  • 40.
    How do Clustering columns work? Good for hierarchicaldata ■ Clustering columns are good for modelling and efficient querying of hierarchical/nested data ■ Query must include higher level columns with equality operator, ranges are only allowed on last column in query, lower level columns don’t have to be included. E.g. ● select * from geohash1to8_clustering where geohash1=’e’ and geohash2=’ev’ and geohash3 >= ’ev0’ and geohash3 <= ‘evz’ limit 50; ■ But why have multiple clustering columns when one is actually enough…
  • 41.
    Better: Single Geohash Clustering Column Geohash8 andtime are clustering keys CREATE TABLE geohash_clustering ( geohash1 text, time timestamp, geohash8 text, lat double, long double, PRIMARY KEY (geohash1, geohash8, time) ) WITH CLUSTERING ORDER BY (geohash8 DESC, time DESC);
  • 42.
    Inequality range query With decreasing lengthgeohashes Stop when result has 50 rows select * from geohash_clustering where geohash1=’e’ and geohash8=’everywhe’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’everywh0’ and geohash8 <=’everywhz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’everyw0’ and geohash8 <=’everywz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’every0’ and geohash8 <=’everyz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’ever0’ and geohash8 <=’everz’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’eve0’ and geohash8 <=’evez’ limit 50; select * from geohash_clustering where geohash1=’e’ and geohash8>=’ev0’ and geohash8 <=’evz’ limit 50; select * from geohash_clustering where geohash1=’e’ limit 50;
  • 43.
    Geohash Results Option 3 isbest = 34% 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3
  • 44.
    Issues? ■ Cardinality forpartition key ● should be > 100,000 ● >= 4 character geohash ■ Unbounded partitions are bad ● May need composite partition key in production ● e.g. extra time bucket (hour, day, etc) ■ Space vs time ● could have different sized buckets for different sized spaces ● E.g. bigger areas with more frequent events may need shorter time buckets to limit size ● This may depend on the space-time scales of underlying systems/processes ● E.g. Spatial and temporal scales of oceanographic processes (left)
  • 45.
  • 46.
    Other option(s) – Cassandra Lucene Index Plugin ■The Cassandra Lucene Index is a plugin for Apache Cassandra: ● that extends its index functionality to provide near real-time search, including full-text search capabilities and free multivariable, geospatial and bitemporal search ● It is achieved through an Apache Lucene based implementation of Cassandra secondary indexes, where each node of the cluster indexes its own data. ■ Instaclustr supports the plugin ● Optional add-on to managed Cassandra service ● And code support ᐨ https://github.com/instaclustr/cassandra-lucene-index ■ How does this help for Geospatial queries? ● has very rich geospatial semantics including geo points, geo shapes, geo distance search, geo bounding box search, geo shape search, multiple distance units, geo transformations, and complex geo shapes.
  • 47.
    Cassandra table and Lucene indexes Geopoint Example Underthe hood indexing is done using a tree structure with geohashes (configurable precision). CREATE TABLE latlong_lucene ( geohash1 text, value double, time timestamp, latitude double, longitude double, Primary key (geohash1, time) ) WITH CLUSTERING ORDER BY (time DESC); CREATE CUSTOM INDEX latlong_index ON latlong_lucene () USING 'com.stratio.cassandra.lucene.Index' WITH OPTIONS = { 'refresh_seconds': '1', 'schema': '{ fields: { geohash1: {type: "string"}, value: {type: "double"}, time: {type: "date", pattern: "yyyy/MM/dd HH:mm:ss.SSS"}, place: {type: "geo_point", latitude: "latitude", longitude: "longitude"} }' };
  • 48.
    Search Options Sort Sophisticated but complex semantics (seethe docs) SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ sort: [ {field: "place", type: "geo_distance", latitude: " + <lat> + ", longitude: " + <long> + "}, {field: "time", reverse: true} ] }') and geohash1=<geohash> limit 50;
  • 49.
    Search Options Bounding Box filter Needto compute box corners SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: { type: "geo_bbox", field: "place", min_latitude: " + <minLat> + ", max_latitude: " + <maxLat> + ", min_longitude: " + <minLon> + ", max_longitude: " + <maxLon> + " }}') limit 50;
  • 50.
    Search Options Geo Distance filter SELECTvalue FROM latlong_lucene WHERE expr(latlong_index, '{ filter: { type: "geo_distance", field: "place", latitude: " + <lat> + ", longitude: " + <long> + ", max_distance: " <distance> + "km" } }') and geohash1=' + <hash1> + ' limit 50;
  • 51.
    Search Options – Prefix filter prefixsearch is useful for searching larger areas over a single geohash column as you can search for a substring SELECT value FROM latlong_lucene WHERE expr(latlong_index, '{ filter: [ {type: "prefix", field: "geohash1", value: <geohash>} ] }') limit 50 Similar to inequality over clustering column
  • 52.
    Lucene Results Options = 2-25% Bestis prefix filter 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash
  • 53.
    Overall Geohash options faster (25%,34%) 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash G e o h a s h G e o h a s h
  • 54.
    Overall Geohash options faster (25%,34%) Lucene bounded box/geo distance most accurate but only 5% of baseline performance 0 20 40 60 80 100 120 Normalised (%) Baseline (single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash L u c e n e L u c e n e
  • 55.
  • 56.
    Location, Altitude and Volume 3D Geohashes represent2D location, altitude and volume A 3D geohash is a cube
  • 57.
  • 58.
    Proximity rules > 50m frompeople and property >150m from congested areas > 1000m from airports > 5000m from exclusion zones Just happen to correspond to different length 3D geohashes,
  • 59.
    3D Geohashes 0 20 40 60 80 100 120 Normalised (%) Baseline(single transaction ID) SASI Geohash Option 1 Geohash Option 2 Geohash Option 3 Lucene sort Lucene filter bounded box Lucene filter geo distance Lucene filter prefix over geohash 3 D G e o h a s h Work with all the geohash index options So reasonably fast to compute 3D proximity More accurate slower options can be improved with bigger Cassandra clusters 3 D G e o h a s h 3 D G e o h a s h 3 D G e o h a s h
  • 60.
    Covid-19 tracing! Social distancing is aspatiotemporal proximity problem ■ Logic is (something like) ● If less than 1.5m distance from another phone continuously for more than 15 minutes and the phone is diagnosed with Covid-19 within 2 weeks then receive alert ■ So does CovidSafe use location data? It required location permissions to work…
  • 61.
    Covid-19 tracing! Social distancing is aspatiotemporal proximity problem ■ Turns out you don’t actually need location as Bluetooth detects other phones nearby (<30m?) ● Which could result in too many false positives ● So probably uses signal strength as distance proxy ■ CovidSafe – location enabled but not used (claimed) ■ UK tracing app plans to use actual location, e.g. to detect hotspots (c.f. cholera map)
  • 62.
    The End ■ MoreInformation? ■ Demo 3D Geohash java code ● https://gist.github.com/paul- brebner/a67243859d2cf38bd9038a12a7b14762 ● produces valid 3D geohashes for altitudes from 13km below sea level to geostationary satellite orbit
  • 63.
    ■ https://www.instaclustr.com/paul-brebner/ ■ LatestBlog Series – Globally distributed Streaming, Storage and Search ● Application is deployed in multiple locations, data is replicated or sent where/when it’s needed ● “Around the World” series, part 3 introduces a Stock Trading application ● https://www.instaclustr.com/building-a-low-latency-distributed-stock- broker-application-part-3/ Blogs
  • 64.
    The End ■ Tryout the Instaclustr Managed Platform for Open Source ● https://www.instaclustr.com/platform/ ● Free Trial ᐨ https://console.instaclustr.com/user/signup?coupon- code=WORKSHOP

Editor's Notes

  • #2 Abstract: Geospatial data makes it possible to leverage location, location, location! Geospatial data is taking off, as companies realize that just about everyone needs the benefits of geospatially aware applications. As a result there are no shortages of unique but demanding use cases of how enterprises are leveraging large-scale and fast geospatial big data processing. The data must be processed in large quantities - and quickly - to reveal hidden spatiotemporal insights vital to businesses and their end users. In the rush to tap into geospatial data, many enterprises will find that representing, indexing and querying geospatially-enriched data is more complex than they anticipated - and might bring about tradeoffs between accuracy, latency, and throughput. This presentation will explore how we added location data to a scalable real-time anomaly detection application, built around Apache Kafka, and Cassandra. Kafka and Cassandra are designed for time-series data, however, it’s not so obvious how they can process geospatial data. In order to find location-specific anomalies, we need a way to represent locations, index locations, and query locations. We explore alternative geospatial representations including: Latitude/Longitude points, Bounding Boxes, Geohashes, and go vertical with 3D representations, including 3D Geohashes. To conclude we measure and compare the query throughput of some of the solutions, and summarise the results in terms of accuracy vs. performance to answer the question “Which geospatial data representation and Cassandra implementation is best?”
  • #10 Anomaly detection needs to be fast, under 1s
  • #11 A simple type of anomaly detection is called Break or Changepoint analysis.  This takes a stream of events and analyses them to see if the most recent events are “different” to previous ones. We picked a simple version to start with (CUSUM). It only uses data for a single variable at a time, which could be something like an account number, or an IP address.
  • #12 This is the prototype application design The Anomaly detection pipeline is written in Java and runs in a single multi-threaded process. It consists of a Kafka consumer which gets each new event and passes it to A Cassandra client, which writes the event to Cassandra, gets the previous 50 rows for the ID, runs the detector and decides if there’s an anomaly or not. Thread pools? Kafka Consumer pool useful to constrain the number of Kafka Consumers, and thereby constrain the number of Kafka partitions which are expensive!
  • #13 Note unbounded partitions, not ideal, but we assume billions of keys and uniform distribution Otherwise add bucket to key
  • #34 TODO Only talk about ones we have results for???
  • #45 https://www.researchgate.net/figure/Spatial-and-temporal-scales-of-oceanographic-processes-and-variables-affecting-key_fig3_229042791