Skip to content

Instantly share code, notes, and snippets.

@txomon
Created August 6, 2015 07:19
Show Gist options
  • Save txomon/52270a32bbb9123e749a to your computer and use it in GitHub Desktop.
Save txomon/52270a32bbb9123e749a to your computer and use it in GitHub Desktop.
Spark error
Traceback (most recent call last):
File "/home/javier/test/ml/spark_pipeline.py", line 31, in <module>
print(files_spark_pipeline('/srv/testfiles/'))
File "/home/javier/test/ml/spark_pipeline.py", line 17, in files_spark_pipeline
text_files
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/context.py", line 393, in parallelize
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 215, in dump_stream
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 134, in dump_stream
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 144, in _write_with_length
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 414, in dumps
File "/home/javier/test/spark/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 205, in __getnewargs__
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
import json
def preprocess_entries(lines):
# Do some reduce step here, I suppose I am receiving a list
pass
def json_and_tuple(line):
data = json.loads(line)
return (data['id'], data)
def files_spark_pipeline(paths):
files = []
for path in paths:
files.extend(get_interesting_files(path))
text_files = [sc.textFile(file) for file in files]
result = sc.parallelize(
text_files
).map(
json_and_tuple
).groupByKey(
).sample(
False, 0.1, 12
).mapValues(
list
).mapValues(
preprocess_entries
).collect()
return result
if __name__ == '__main__':
print(files_spark_pipeline('/srv/testfiles/'))
@txomon
Copy link
Author

txomon commented Aug 6, 2015

The problem here is that textFile needs to be constructed just once, therefore:

text_files = sc.textFile(*files)
result = text_files.map(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment