Created
June 11, 2018 21:21
-
-
Save futureperfect/774a18c1faccd2bce6bc01838d69fb58 to your computer and use it in GitHub Desktop.
PySpark Reduction
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
py4j.protocol.Py4JJavaError: An error occurred while calling o90.save. | |
: java.io.IOException: Failed to open native connection to Cassandra at {<Redacted>}:<Redacted port> | |
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:168) | |
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$8.apply(CassandraConnector.scala:154) | |
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$8.apply(CassandraConnector.scala:154) | |
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:32) | |
at com.datastax.spark.connector.cql.RefCountedCache.syncAcquire(RefCountedCache.scala:69) | |
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:57) | |
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:79) | |
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111) | |
at com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory$.forSystemLocalPartitioner(TokenFactory.scala:98) | |
at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:272) | |
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:83) | |
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:518) | |
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:498) | |
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) | |
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) | |
at py4j.Gateway.invoke(Gateway.java:280) | |
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) | |
at py4j.commands.CallCommand.execute(CallCommand.java:79) | |
at py4j.GatewayConnection.run(GatewayConnection.java:214) | |
at java.lang.Thread.run(Thread.java:745) | |
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: [REDACTED]:42184 (com.datastax.driver.core.exceptions.TransportException: [REDACTED]:42184] Cannot connect)) | |
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:233) | |
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:79) | |
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1483) | |
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:399) | |
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:161) | |
... 23 more |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark-submit\ | |
--master yarn \ | |
--cluster- | |
--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.ibm.stocator:stocator:1.0.14 \ | |
--executor-memory=1G \ | |
--num-executors 10 \ | |
--executor-cores 3 \ | |
--conf spark.executor.memoryOverhead=2048M \ | |
--conf spark.speculation=true \ | |
--conf spark.hadoop.cloneConf=true \ | |
--conf spark.cassandra.connection.ssl.enabled=true \ | |
--conf spark.cassandra.connection.host=${DB_HOST} \ | |
--conf spark.cassandra.connection.port=${DB_PORT} \ | |
--conf spark.cassandra.auth.username=${DB_USERNAME} \ | |
--conf spark.cassandra.auth.password=${DB_PASSWORD} \ | |
--py-files dist/jobs.zip,dist/libs.zip dist/main.py \ | |
--job testdb \ | |
--job-args ${JOB_ARGS} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from functools import partial | |
from shared.context import JobContext | |
class TestdbJobContext(JobContext): | |
def _init_accumulators(self, sc): | |
self.initalize_counter(sc, 'test') | |
def analyze(sc, ...): | |
spark = SparkSession(sc) | |
context = TestdbJobContext(sc) | |
l = [("example key", 1234)] | |
df = spark.createDataFrame(l, ['foo','bar']) | |
df.write.format("org.apache.spark.sql.cassandra")\ | |
.options(table="spark_test", keyspace="test")\ | |
.save(mode="append") | |
return | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment