Skip to content

Instantly share code, notes, and snippets.

@adgaudio
Last active October 24, 2016 23:25
Show Gist options
  • Save adgaudio/0191e14717af68bbba81 to your computer and use it in GitHub Desktop.
Save adgaudio/0191e14717af68bbba81 to your computer and use it in GitHub Desktop.
This gist demonstrates that spark 0.9.1 (and I'm guessing also 1.0.0) don't serialize a logger instance properly when code runs on workers
"""
This gist demonstrates that spark 1.0.0 and 0.9.1
don't serialize a logger instance properly when code runs on workers.
run this code via:
spark-submit spark_serialization_demo.py
- or -
pyspark spark_serialization_demo.py
"""
import pyspark
from os.path import abspath
import logging
# initialize logger
log = logging.getLogger('alexTest')
_h = logging.StreamHandler()
_h.setFormatter(logging.Formatter("%(levelname)s %(msg)s"))
log.addHandler(_h)
log.setLevel(logging.DEBUG)
log.info("module imported and logger initialized")
FUNC = 'passes()'
def myfunc(*ignore_args):
log.debug('logging a line from: %s' % FUNC)
return 0
def passes():
mycode_module = __import__('spark_serialization_demo')
print(textFile.map(mycode_module.myfunc, preservesPartitioning=True).take(5))
def fails():
print(textFile.map(myfunc, preservesPartitioning=True).take(5))
raise Exception("Never reach this point because code fails first due to serialization error")
if __name__ == '__main__':
sc = pyspark.SparkContext("local[10]", 'test')
textFile = sc.textFile("file://%s" % abspath(__file__), 5)
print('\n\n---')
FUNC = 'fails()'
log.info(
"This example fails because it serializes a function that"
"does not initialize the logger when the function is unserialized")
try:
fails()
except Exception as err:
log.error("See, I failed! Details: %s" % err)
print('\n---')
log.info(
"This example passes because it serializes a module that initializes"
" the logger when the module is unserialized")
passes()
@larryhu
Copy link

larryhu commented Jul 1, 2016

It's not work on 1.6.1. Always throw:

16/07/01 15:25:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

@javibravo
Copy link

Hi,

I am running into the same issue commented by @larryhu. Has someone found any solution for this ?

Thanks,
Javier.

@shett044
Copy link

shett044 commented Aug 2, 2016

This can be solved by adding get_logger in the mapper function and making sure that you just give filename.
This will make sure that, it will create N logger and N log file. (N is the number of executors).
These log files are available at "SPARK_INSTALLATION_DIR/work/ " inside an application directory(starts with app-) inside executor id (these are numbers).

@shett044
Copy link

shett044 commented Aug 2, 2016

Did some digging :
https://medium.com/@anicolaspp/how-to-log-in-apache-spark-f4204fad78a#.cq6k7d7x8
It will be great if someone can convert the code in python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment