Last active
October 8, 2018 16:26
-
-
Save PeterCorless/8ad5f2f9c37ce5a8daad90a98e85896b to your computer and use it in GitHub Desktop.
Hooking up Spark and ScyllaDB: Part 3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cqlsh> SELECT * FROM test.test; | |
id | elems | flag | name | toggles | |
----+----------------+-------+---------+-------------------- | |
2 | ['Oracle'] | True | Tzach | {'cookies': True} | |
3 | ['IBM'] | False | Nadav | {'cookies': False} | |
4 | ['Cloudius'] | False | Glauber | {'cookies': False} | |
1 | ['Red', 'Hat'] | True | Dor | {'cookies': True} | |
(4 rows) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE KEYSPACE test | |
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1}; | |
CREATE TABLE test.test | |
(id bigint, | |
name text, | |
flag boolean, | |
elems list<text>, | |
toggles map<text, boolean>, | |
PRIMARY KEY (id)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE test.test_mismatched | |
(id bigint, | |
name bigint, | |
flag text, | |
elems list<text>, | |
toggles map<text, boolean>, | |
PRIMARY KEY (id)); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Record(id: Long, name: String, flag: Boolean, | |
elems: List[String], toggles: Map[String, Boolean]) | |
val df = spark.createDataFrame( | |
Seq( | |
Record(1L, "Dor", true, List("Red", "Hat"), Map("cookies" -> true)), | |
Record(2L, "Tzach", true, List("Oracle"), Map("cookies" -> true)), | |
Record(3L, "Nadav", false, List("IBM"), Map("cookies" -> false)), | |
Record(4L, "Glauber", false, List("Cloudius"), Map("cookies" -> false)) | |
) | |
) | |
// df: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 3 more fields] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val writer = df.write.cassandraFormat(table = "test", keyspace = "test") | |
// writer: org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.DataFrameWriter@6cf47d05 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker-compose exec scylla cqlsh |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cd scylla-and-spark/writing-to-scylla | |
docker-compose up -d |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
df.createCassandraTable( | |
keyspaceName = "test", | |
tableName = "another_test", | |
partitionKeyColumns = Some(List("id")) | |
) | |
df.write.cassandraFormat(keyspace = "test", table = "another_test").save() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.util.Random | |
def randomRecord = Record(Random.nextLong, Random.nextString(5), false, List(), Map()) | |
val largeDf = spark.createDataFrame(List.fill(25000)(randomRecord)) | |
largeDf.write.cassandraFormat(keyspace = "test", table = "test").save() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.datastax.spark.connector._ | |
import org.apache.spark.sql.cassandra._ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.datastax.spark.connector.cql.{CassandraConnector, Schema} | |
val connector = CassandraConnector(spark.sparkContext) | |
val schema = Schema.fromCassandra( | |
connector, | |
keyspaceName = Some("test"), | |
tableName = Some("test_mismatched")) | |
val tableDef = schema.tables.headOption | |
tableDef.map(_.cql).getOrElse("") | |
// res22: String = | |
// CREATE TABLE "test"."test_mismatched" ( | |
// "id" bigint, | |
// "elems" list<varchar>, | |
// "flag" boolean, | |
// "name" bigint, | |
// "toggles" map<varchar, boolean>, | |
// PRIMARY KEY (("id")) | |
// ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker-compose exec spark-master spark-shell \ | |
--conf spark.driver.host=spark-master \ | |
--conf spark.cassandra.connection.host=scylla \ | |
--packages datastax:spark-cassandra-connector:2.3.0-s_2.11,commons-configuration:commons-configuration:1.10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java.util.NoSuchElementException: Columns not found in table test.test_mismatched: name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.datastax.spark.connector.cql.TableDef | |
import com.datastax.driver.core.ProtocolVersion | |
val dfTableDef = TableDef.fromDataFrame( | |
df, | |
keyspaceName = "test", | |
tableName = "test", | |
ProtocolVersion.V4 | |
) | |
dfTableDef.cql | |
// res18: String = | |
// CREATE TABLE "test"."test" ( | |
// "id" bigint, | |
// "name" varchar, | |
// "flag" boolean, | |
// "elems" list<varchar>, | |
// "toggles" map<varchar, boolean>, | |
// PRIMARY KEY (("id")) | |
// ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker-compose exec scylla cqlsh -e "SELECT * FROM system_traces.sessions" | |
session_id | request | duration | parameters | |
-------------------------------------+----------------------------------------------------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | |
21fd12c2-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 374 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'} | |
2123a2b0-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 5061 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'} | |
1b5ebcc2-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 310 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'} | |
21719ba1-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 266 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'} | |
20b2df81-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 1059 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
docker-compose exec scylla cqlsh -e "TRUNCATE TABLE test.test;" | |
docker-compose exec scylla nodetool settraceprobability 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
df.write.cassandraFormat(keyspace = "test", table = "test_mismatched").save() | |
// ... java.lang.NumberFormatException: For input string: "a" ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Read the "test" table into a DataFrame in Spark | |
val dfFromScylla = spark.read.cassandraFormat("test", "test").load() | |
// Create a new table in Scylla based on the schema for the DataFrame we just loaded | |
dfFromScylla.createCassandraTable( | |
keyspaceName = "test", | |
tableName = "test_duplicated", | |
partitionKeyColumns = Some(List("id")) | |
) | |
// Write the DataFrame into the new table | |
dfFromScylla.write.cassandraFormat(keyspace = "test", table = "test_duplicated").save() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
writer.save() | |
// java.io.IOException: Couldn't find test.test or any similarly named keyspace and table pairs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment