Skip to content

Instantly share code, notes, and snippets.

@MichaelDrogalis
Created January 9, 2020 16:06
Show Gist options
  • Save MichaelDrogalis/5a8b0c2fce647d2f30da2da4b3edde67 to your computer and use it in GitHub Desktop.
Save MichaelDrogalis/5a8b0c2fce647d2f30da2da4b3edde67 to your computer and use it in GitHub Desktop.
-- Step 1
CREATE STREAM riderLocations
(profileId VARCHAR, latitude DOUBLE, longitude DOUBLE, timestamp BIGINT)
WITH (kafka_topic='locations', key='profileId', value_format='json', partitions=1, timestamp='timestamp');
-- Step 2
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('4ab5cbad', 37.3956, -122.0810, 1578528022704);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('c2309eec', 37.7877, -122.4205, 1578528022805);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('18f4ea86', 37.3903, -122.0643, 1578528023612);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('4ab5cbad', 37.3952, -122.0813, 1578528024200);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('8b6eae59', 37.3944, -122.0813, 1578528024814);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('4a7c7b41', 37.4049, -122.0822, 1578528025097);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('4ab5cbad', 37.3949, -122.0815, 1578528025132);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('4ddad000', 37.7857, -122.4011, 1578528025890);
INSERT INTO riderLocations (profileId, latitude, longitude, timestamp) VALUES ('8b6eae59', 37.3954, -122.0816, 1578528025999);
-- Step 3
set 'auto.offset.reset' = 'earliest';
-- Step 4
SELECT profileid, count(*) AS pings,
WINDOWSTART() AS lower, WINDOWEND() AS upper
FROM riderLocations
WINDOW TUMBLING (SIZE 1 HOUR)
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5
GROUP BY profileid EMIT CHANGES;
+------------------------------+------------------------------+------------------------------+------------------------------+
|PROFILEID |PINGS |LOWER |UPPER |
+------------------------------+------------------------------+------------------------------+------------------------------+
|4a7c7b41 |1 |1578528000000 |1578531600000 |
|4ab5cbad |3 |1578528000000 |1578531600000 |
|8b6eae59 |2 |1578528000000 |1578531600000 |
-- Step 5
CREATE TABLE mountain_view AS
SELECT profileid, count(*) AS pings,
WINDOWSTART() AS lower, WINDOWEND() AS upper
FROM riderLocations
WINDOW TUMBLING (SIZE 1 HOUR)
WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5
GROUP BY profileid EMIT CHANGES;
-- Step 6: null lower/upper??
SELECT * from mountain_view WHERE ROWKEY='8b6eae59';
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|ROWKEY |WINDOWSTART |PROFILEID |PINGS |LOWER |UPPER |
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|8b6eae59 |1578528000000 |8b6eae59 |2 |null |null |
Query terminated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment