Last active
November 23, 2015 11:18
-
-
Save amir-rahnama/6c05ea21ba9fca208c35 to your computer and use it in GitHub Desktop.
Send Result of MapReduce in Apache Spark (PySpark) over to a web socket: http://blog.ambodi.com/web-socket/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install websocket-client | |
from pyspark import SparkContext | |
from pyspark.streaming import StreamingContext | |
from websocket import create_connection | |
def take_rdd_send_to_socket(time, rdd, num=1000): | |
result = [] | |
taken = rdd.take(num + 1) | |
print("-------------------------------------------") | |
print("Time: %s" % time) | |
print("-------------------------------------------") | |
for record in taken[:num]: | |
print(record) | |
result.append(record) | |
# NOTE: If you don't create a connection for each RDD, you will end up with serialization error that I put separately in the other file | |
ws = create_connection(url) | |
ws.send(json.dumps(result)) | |
ws.close() | |
if len(taken) > num: | |
print("...") | |
print("") | |
dstream.foreachRDD(take_rdd_send_to_socket) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15/11/21 14:18:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
15/11/21 14:18:48 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. | |
Traceback (most recent call last): | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/util.py", line 95, in dumps | |
return bytearray(self.serializer.dumps((func.func, func.deserializers))) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps | |
return cloudpickle.dumps(obj, 2) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps | |
cp.dump(obj) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 107, in dump | |
return Pickler.dump(self, obj) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 224, in dump | |
self.save(obj) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 548, in save_tuple | |
save(element) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 193, in save_function | |
self.save_function_tuple(obj) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 241, in save_function_tuple | |
save(f_globals) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict | |
self._batch_setitems(obj.iteritems()) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems | |
save(v) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 331, in save | |
self.save_reduce(obj=obj, *rv) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 542, in save_reduce | |
save(state) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict | |
self._batch_setitems(obj.iteritems()) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems | |
save(v) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 331, in save | |
self.save_reduce(obj=obj, *rv) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 542, in save_reduce | |
save(state) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 548, in save_tuple | |
save(element) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 649, in save_dict | |
self._batch_setitems(obj.iteritems()) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 681, in _batch_setitems | |
save(v) | |
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 286, in save | |
f(self, obj) # Call unbound method with explicit self | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 315, in save_builtin_function | |
return self.save_function(obj) | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 191, in save_function | |
if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None: | |
AttributeError: 'builtin_function_or_method' object has no attribute '__code__' | |
15/11/21 14:18:49 ERROR StreamingContext: Error starting the context, marking it as stopped | |
java.io.IOException: java.lang.NullPointerException | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) | |
at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181) | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) | |
at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) | |
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:130) | |
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130) | |
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) | |
at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:131) | |
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:563) | |
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:602) | |
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:601) | |
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) | |
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) | |
at py4j.Gateway.invoke(Gateway.java:259) | |
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) | |
at py4j.commands.CallCommand.execute(CallCommand.java:79) | |
at py4j.GatewayConnection.run(GatewayConnection.java:207) | |
at java.lang.Thread.run(Thread.java:745) | |
Caused by: java.lang.NullPointerException | |
at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79) | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) | |
... 61 more | |
Traceback (most recent call last): | |
File "/Users/ara/dev/iteam/data-mining-worker/spark/map.py", line 45, in <module> | |
ssc.start() | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py", line 237, in start | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ | |
File "/Users/ara/dev/personal/spark-1.5.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value | |
py4j.protocol.Py4JJavaError: An error occurred while calling o20.start. | |
: java.io.IOException: java.lang.NullPointerException | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163) | |
at org.apache.spark.streaming.api.python.TransformFunction.writeObject(PythonDStream.scala:77) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) | |
at org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:181) | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) | |
at org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:176) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) | |
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) | |
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) | |
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) | |
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) | |
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply$mcV$sp(Checkpoint.scala:130) | |
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130) | |
at org.apache.spark.streaming.Checkpoint$$anonfun$serialize$1.apply(Checkpoint.scala:130) | |
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) | |
at org.apache.spark.streaming.Checkpoint$.serialize(Checkpoint.scala:131) | |
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:563) | |
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:602) | |
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:601) | |
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:623) | |
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) | |
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) | |
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) | |
at java.lang.reflect.Method.invoke(Method.java:497) | |
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) | |
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) | |
at py4j.Gateway.invoke(Gateway.java:259) | |
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) | |
at py4j.commands.CallCommand.execute(CallCommand.java:79) | |
at py4j.GatewayConnection.run(GatewayConnection.java:207) | |
at java.lang.Thread.run(Thread.java:745) | |
Caused by: java.lang.NullPointerException | |
at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:79) | |
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160) | |
... 61 more |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment