Created
August 6, 2015 07:19
-
-
Save txomon/52270a32bbb9123e749a to your computer and use it in GitHub Desktop.
Spark error
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Traceback (most recent call last): | |
File "/home/javier/test/ml/spark_pipeline.py", line 31, in <module> | |
print(files_spark_pipeline('/srv/testfiles/')) | |
File "/home/javier/test/ml/spark_pipeline.py", line 17, in files_spark_pipeline | |
text_files | |
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/context.py", line 393, in parallelize | |
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 215, in dump_stream | |
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 134, in dump_stream | |
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 144, in _write_with_length | |
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 414, in dumps | |
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 205, in __getnewargs__ | |
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
def preprocess_entries(lines): | |
# Do some reduce step here, I suppose I am receiving a list | |
pass | |
def json_and_tuple(line): | |
data = json.loads(line) | |
return (data['id'], data) | |
def files_spark_pipeline(paths): | |
files = [] | |
for path in paths: | |
files.extend(get_interesting_files(path)) | |
text_files = [sc.textFile(file) for file in files] | |
result = sc.parallelize( | |
text_files | |
).map( | |
json_and_tuple | |
).groupByKey( | |
).sample( | |
False, 0.1, 12 | |
).mapValues( | |
list | |
).mapValues( | |
preprocess_entries | |
).collect() | |
return result | |
if __name__ == '__main__': | |
print(files_spark_pipeline('/srv/testfiles/')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The problem here is that textFile needs to be constructed just once, therefore: