Skip to content

Instantly share code, notes, and snippets.

View chbatey's full-sized avatar
🐯
WFH

Christopher Batey chbatey

🐯
WFH
View GitHub Profile
Find all containers that match the term:
TERM=hello; docker ps -a | grep $TERM | awk '{print $1}'
Delete all containers that match a term:
TERM=hello; sudo docker rm $(sudo docker ps -a | grep $TERM | awk '{print $1}')
CREATE MATERIALIZED VIEW state_yearly_rain AS
SELECT country_code FROM raw_weather_data WHERE
state_code IS NOT NULL AND
one_hour_precip IS NOT NULL AND
year IS NOT NULL AND
day IS NOT NULL AND
month IS NOT NULL AND
hour IS NOT NULL
PRIMARY KEY ((state_code, year), one_hour_precip, wsid, day, month, hour);
CREATE OR REPLACE FUNCTION state_group_and_count( state map<text, int>, type text )
CALLED ON NULL INPUT
RETURNS map<text, int>
LANGUAGE java AS '
try { Thread.sleep(200); } catch (Exception e) {}; Integer count = (Integer) state.get(type); if (count == null) count = 1; else count++; state.put(type, count); return state; ' ;
CREATE OR REPLACE AGGREGATE group_and_count(text)
SFUNC state_group_and_count
STYPE map<text, int>
INITCOND {};
CREATE MATERIALIZED VIEW state_yearly_rain AS
SELECT country_code FROM raw_weather_data
PRIMARY KEY ((state_code, year), one_hour_precip);
CREATE MATERIALIZED VIEW state_monthly_rain AS
SELECT country_code FROM raw_weather_data
PRIMARY KEY ((state_code, year), month, one_hour_precip);
INSERT INTO raw_weather_data(wsid, year, month, day, hour, country_code, state_code, temperature, one_hour_precip ) values ('station1', 2012, 12, 25, 1, 'GB', 'Cumbria', 14.0, 20) ;
INSERT INTO raw_weather_data(wsid, year, month, day, hour, country_code, state_code, temperature, one_hour_precip ) values ('station2', 2012, 12, 25, 1, 'GB', 'Cumbria', 4.0, 2) ;
INSERT INTO raw_weather_data(wsid, year, month, day, hour, country_code, state_code, temperature, one_hour_precip ) values ('station3', 2012, 12, 25, 1, 'GB', 'Greater London', 16.0, 10) ;
CREATE TABLE weather_station (
id text PRIMARY KEY, // Composite of Air Force Datsav3 station number and NCDC WBAN number
name text, // Name of reporting station
country_code text, // 2 letter ISO Country ID
state_code text, // 2 letter state code for US stations
call_sign text, // International station call sign
lat double, // Latitude in decimal degrees
long double, // Longitude in decimal degrees
elevation double // Elevation in meters
);
CREATE TABLE raw_weather_data (
wsid text, // Composite of Air Force Datsav3 station number and NCDC WBAN number
year int, // Year collected
month int, // Month collected
day int, // Day collected
hour int, // Hour collected
name text, // Name of reporting station
country_code text, // 2 letter ISO Country ID
state_code text, // 2 letter state code for US stations
val singleWeatherStation: Array[Double] = sc.cassandraTable[Double]
("isd_weather_data", "raw_weather_data")
.select("temperature").where("wsid = ?", "725030:14732")
.collect()
println(StatCounter(singleWeatherStation))
def daily(day: Day, requester: ActorRef): Unit =
sc.cassandraTable[Double](keyspace, rawtable)
.select("temperature").where("wsid = ? AND year = ? AND month = ? AND day = ?",
day.wsid, day.year, day.month, day.day)
.collectAsync()
.map(toDaily(_, day)) pipeTo requester
private def toDaily(aggregate: Seq[Double], key: Day): WeatherAggregate =
if (aggregate.nonEmpty) {
val data = toDailyTemperature(key, StatCounter(aggregate))