Last active
October 24, 2016 23:25
-
-
Save adgaudio/0191e14717af68bbba81 to your computer and use it in GitHub Desktop.
This gist demonstrates that spark 0.9.1 (and I'm guessing also 1.0.0) don't serialize a logger instance properly when code runs on workers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This gist demonstrates that spark 1.0.0 and 0.9.1 | |
don't serialize a logger instance properly when code runs on workers. | |
run this code via: | |
spark-submit spark_serialization_demo.py | |
- or - | |
pyspark spark_serialization_demo.py | |
""" | |
import pyspark | |
from os.path import abspath | |
import logging | |
# initialize logger | |
log = logging.getLogger('alexTest') | |
_h = logging.StreamHandler() | |
_h.setFormatter(logging.Formatter("%(levelname)s %(msg)s")) | |
log.addHandler(_h) | |
log.setLevel(logging.DEBUG) | |
log.info("module imported and logger initialized") | |
FUNC = 'passes()' | |
def myfunc(*ignore_args): | |
log.debug('logging a line from: %s' % FUNC) | |
return 0 | |
def passes(): | |
mycode_module = __import__('spark_serialization_demo') | |
print(textFile.map(mycode_module.myfunc, preservesPartitioning=True).take(5)) | |
def fails(): | |
print(textFile.map(myfunc, preservesPartitioning=True).take(5)) | |
raise Exception("Never reach this point because code fails first due to serialization error") | |
if __name__ == '__main__': | |
sc = pyspark.SparkContext("local[10]", 'test') | |
textFile = sc.textFile("file://%s" % abspath(__file__), 5) | |
print('\n\n---') | |
FUNC = 'fails()' | |
log.info( | |
"This example fails because it serializes a function that" | |
"does not initialize the logger when the function is unserialized") | |
try: | |
fails() | |
except Exception as err: | |
log.error("See, I failed! Details: %s" % err) | |
print('\n---') | |
log.info( | |
"This example passes because it serializes a module that initializes" | |
" the logger when the module is unserialized") | |
passes() |
Hi,
I am running into the same issue commented by @larryhu. Has someone found any solution for this ?
Thanks,
Javier.
This can be solved by adding get_logger in the mapper function and making sure that you just give filename.
This will make sure that, it will create N logger and N log file. (N is the number of executors).
These log files are available at "SPARK_INSTALLATION_DIR/work/ " inside an application directory(starts with app-) inside executor id (these are numbers).
Did some digging :
https://medium.com/@anicolaspp/how-to-log-in-apache-spark-f4204fad78a#.cq6k7d7x8
It will be great if someone can convert the code in python
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It's not work on 1.6.1. Always throw:
16/07/01 15:25:05 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/home/danqoo/apps/spark/current/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
UnpicklingError: NEWOBJ class argument has NULL tp_new