Last active
October 22, 2018 23:07
-
-
Save PeterCorless/f467a40abc8595a4367b4ea6ce238319 to your computer and use it in GitHub Desktop.
Hooking up Spark and ScyllaDB: Part 4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE quotes.quotes | |
(symbol TEXT, | |
timestamp TIMESTAMP, | |
day TIMESTAMP, | |
latest_price DOUBLE, | |
previous_close DOUBLE, | |
latest_volume BIGINT, | |
PRIMARY KEY ((symbol), timestamp)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./create-tables.sh | |
./run.sh |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker-compose exec spark-master spark-shell \ | |
--conf spark.driver.host=spark-master \ | |
--conf spark.cassandra.connection.host=scylla \ | |
--packages datastax:spark-cassandra-connector:2.3.0-s_2.11,commons-configuration:commons-configuration:1.10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"AAPL": { | |
"quote": { | |
"latestPrice": 221.43, | |
"latestSource": "IEX real time price", | |
"latestUpdate": 1537455071032, | |
"latestVolume": 18919004, | |
"previousClose": 218.37, | |
"symbol": "AAPL" | |
} | |
}, | |
"FB": { | |
"quote": { | |
"latestPrice": 165.55, | |
"latestSource": "IEX real time price", | |
"latestUpdate": 1537455069589, | |
"latestVolume": 14295872, | |
"previousClose": 163.06, | |
"symbol": "FB" | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.{functions => f} | |
val query = queryDefn | |
.selectExpr("CAST(key AS STRING) AS symbol", | |
"CAST(value AS STRING) AS data", | |
"timestamp") | |
.select($"symbol", $"timestamp", f.from_json($"data", schema).as("data")) | |
// query: org.apache.spark.sql.DataFrame = [symbol: string, timestamp: timestamp ... 1 more field] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.types._ | |
val schema = StructType( | |
List( | |
StructField( | |
"quote", | |
StructType( | |
List( | |
StructField("latestPrice", DoubleType), | |
StructField("previousClose", DoubleType), | |
StructField("latestVolume", LongType) | |
) | |
) | |
) | |
)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
+-------------+--------------------+------+---------+------+--------------------+-------------+ | |
| key| value| topic|partition|offset| timestamp|timestampType| | |
+-------------+--------------------+------+---------+------+--------------------+-------------+ | |
|[41 41 50 4C]|[7B 22 71 75 6F 7...|quotes| 0| 0|2018-09-20 14:58:...| 0| | |
... | |
| [46 42]|[7B 22 71 75 6F 7...|quotes| 0| 16|2018-09-20 14:58:...| 0| | |
|[53 4E 41 50]|[7B 22 71 75 6F 7...|quotes| 0| 17|2018-09-20 14:58:...| 0| | |
|[54 53 4C 41]|[7B 22 71 75 6F 7...|quotes| 0| 18|2018-09-20 14:58:...| 0| | |
|[41 4D 5A 4E]|[7B 22 71 75 6F 7...|quotes| 0| 19|2018-09-20 14:56:...| 0| | |
+-------------+--------------------+------+---------+------+--------------------+-------------+ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ http localhost:3000/stats/2018/09/04 | |
HTTP/1.1 200 OK | |
Content-Length: 295 | |
Content-Type: text/plain; charset=UTF-8 | |
Date: Tue, 04 Sep 2018 20:19:15 GMT | |
Server: akka-http/10.1.4 | |
Symbol: AAPL, max difference: -0.33%, min difference: -0.32% | |
Symbol: TSLA, max difference: 4.21%, min difference: 4.23% | |
Symbol: FB, max difference: 2.6%, min difference: 2.61% | |
Symbol: SNAP, max difference: 2.84%, min difference: 2.94% | |
Symbol: AMZN, max difference: -1.35%, min difference: -1.33% |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE MATERIALIZED VIEW quotes.quotes_by_day AS | |
SELECT * FROM quotes.quotes | |
WHERE symbol IS NOT NULL AND | |
timestamp IS NOT NULL | |
PRIMARY KEY ((day), symbol, timestamp) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query.status | |
// res9: org.apache.spark.sql.streaming.StreamingQueryStatus = | |
// { | |
// "message" : "Waiting for data to arrive", | |
// "isDataAvailable" : false, | |
// "isTriggerActive" : false | |
// } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
query.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val query = queryDefn | |
.selectExpr("CAST(key AS STRING) AS symbol", | |
"CAST(value AS STRING) AS data", | |
"timestamp") | |
.writeStream | |
.format("console") | |
.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ScyllaSink(parameters: Map[String, String]) extends Sink { | |
override def addBatch(batchId: Long, data: DataFrame): Unit = | |
data.write | |
.cassandraFormat(parameters("table"), | |
parameters("keyspace"), | |
parameters("cluster")) | |
.mode(SaveMode.Append) | |
.save() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val quotes = spark.read | |
.cassandraFormat("quotes", "quotes", "scylla") | |
.load() | |
val result = quotes | |
.where($"day" === f.to_timestamp(f.lit(s"2018-09-20"), "yy-MM-dd")) | |
.groupBy($"symbol") | |
.agg( | |
f.max("latest_price").as("max_price"), | |
f.min("latest_price").as("min_price"), | |
f.min("previous_close").as("previous_close") | |
) | |
.select( | |
$"symbol", | |
f.round(((f.col("previous_close") - f.col("max_price")) / f.col( | |
"previous_close")) * 100, | |
2) | |
.as("max_difference"), | |
f.round(((f.col("previous_close") - f.col("min_price")) / f.col( | |
"previous_close")) * 100, | |
2) | |
.as("min_difference") | |
) | |
.collect() | |
// result: Array[org.apache.spark.sql.Row] = | |
// Array( | |
// [AAPL,-1.42,-1.2], | |
// [TSLA,0.26,0.72], | |
// [FB,-1.53,-1.22], | |
// [SNAP,0.6,0.93], | |
// [AMZN,-1.17,-0.93] | |
// ) | |
// |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val queryDefn = spark.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", "kafka:9092") | |
.option("subscribe", "quotes") | |
.option("startingOffsets", "earliest") | |
.load() | |
val query = queryDefn | |
.writeStream | |
.format("console") | |
.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cd scylla-and-spark/streaming-into-scylla | |
docker-compose up -d |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
+------+--------------------+--------------------+ | |
|symbol| data| timestamp| | |
+------+--------------------+--------------------+ | |
| AAPL|{"quote":{"symbol...|2018-09-20 14:58:...| | |
| FB|{"quote":{"symbol...|2018-09-20 14:58:...| | |
... | |
| TSLA|{"quote":{"symbol...|2018-09-20 14:58:...| | |
| AMZN|{"quote":{"symbol...|2018-09-20 14:56:...| | |
+------+--------------------+--------------------+ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
+------+--------------------+-------------------+------------+--------------+-------------+ | |
|symbol| timestamp| day|latest_price|previous_close|latest_volume| | |
+------+--------------------+-------------------+------------+--------------+-------------+ | |
| AAPL|2018-09-20 14:58:...|2018-09-20 00:00:00| 221.02| 218.37| 9931987| | |
| FB|2018-09-20 14:58:...|2018-09-20 00:00:00| 165.17| 163.06| 7241683| | |
... | |
| TSLA|2018-09-20 14:58:...|2018-09-20 00:00:00| 298.25| 299.02| 2977981| | |
| AMZN|2018-09-20 14:56:...|2018-09-20 00:00:00| 1944.415| 1926.42| 1248901| | |
+------+--------------------+-------------------+------------+--------------+-------------+ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val query = queryDefn | |
.selectExpr("CAST(key AS STRING) AS symbol", | |
"CAST(value AS STRING) AS data", | |
"timestamp") | |
.select($"symbol", $"timestamp", f.from_json($"data", schema).as("data")) | |
.select( | |
$"symbol", | |
$"timestamp", | |
f.col("timestamp").cast(DateType).cast(TimestampType).as("day"), | |
$"data.quote.latestPrice".as("latest_price"), | |
$"data.quote.previousClose".as("previous_close"), | |
$"data.quote.latestVolume".as("latest_volume") | |
) | |
.writeStream | |
.format("console") | |
.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val query = queryDefn | |
.selectExpr("CAST(key AS STRING) AS symbol", | |
"CAST(value AS STRING) AS data", | |
"timestamp") | |
.select($"symbol", $"timestamp", f.from_json($"data", schema).as("data")) | |
.select( | |
$"symbol", | |
$"timestamp", | |
f.col("timestamp").cast(DateType).cast(TimestampType).as("day"), | |
$"data.quote.latestPrice".as("latest_price"), | |
$"data.quote.previousClose".as("previous_close"), | |
$"data.quote.latestVolume".as("latest_volume") | |
) | |
.writeStream | |
.format("com.scylladb.streaming.ScyllaSinkProvider") | |
.outputMode(OutputMode.Append) | |
.options( | |
Map( | |
"cluster" -> "scylla", | |
"keyspace" -> "quotes", | |
"table" -> "quotes", | |
"checkpointLocation" -> "/tmp/checkpoints" | |
) | |
) | |
.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment