Created
March 10, 2015 11:38
-
-
Save tranvictor/fc5e7338abd4fdaffedc to your computer and use it in GitHub Desktop.
Spark log analytics experiment
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Python snippet to count number of requests on countries. | |
```python | |
# in examples/src/main/python/access_log_analyzer.py | |
import sys | |
from pyspark import SparkContext | |
from geoip import geolite2 | |
if __name__ == "__main__": | |
sc = SparkContext(appName="PythonAccessLogAnalyzer") | |
geoip_service = sc.broadcast(geolite2) | |
def get_country_from_line(line): | |
try: | |
ip = line.split(' ')[0] | |
match = geoip_service.lookup(ip) | |
if match is not None: | |
return match.country | |
else: | |
return "Unknown" | |
except IndexError: | |
return "Error" | |
rdd = sc.textFile("/Users/victor/access.log").map(get_country_from_line) | |
ips = rdd.countByValue() | |
print ips | |
sc.stop() | |
``` | |
Running this on Spark | |
`bin/spark-submit examples/src/main/python/access_log_analyzer.py` | |
Error running this | |
``` | |
Spark assembly has been built with Hive, including Datanucleus jars on classpath | |
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties | |
15/03/10 18:34:19 INFO SecurityManager: Changing view acls to: victor | |
15/03/10 18:34:19 INFO SecurityManager: Changing modify acls to: victor | |
15/03/10 18:34:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(victor); users with modify permissions: Set(victor) | |
15/03/10 18:34:19 INFO Slf4jLogger: Slf4jLogger started | |
15/03/10 18:34:19 INFO Remoting: Starting remoting | |
15/03/10 18:34:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:58059] | |
15/03/10 18:34:19 INFO Utils: Successfully started service 'sparkDriver' on port 58059. | |
15/03/10 18:34:19 INFO SparkEnv: Registering MapOutputTracker | |
15/03/10 18:34:19 INFO SparkEnv: Registering BlockManagerMaster | |
15/03/10 18:34:19 INFO DiskBlockManager: Created local directory at /var/folders/bc/42jtww354ddgh2wp9hbt0g340000gp/T/spark-c903ab0c-17bb-4989-8601-5b19a17e11f1/spark-916ef9d4-d701-4045-9c0f-75b5871a0f12 | |
15/03/10 18:34:19 INFO MemoryStore: MemoryStore started with capacity 265.1 MB | |
15/03/10 18:34:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable | |
15/03/10 18:34:20 INFO HttpFileServer: HTTP File server directory is /var/folders/bc/42jtww354ddgh2wp9hbt0g340000gp/T/spark-7fca6ab4-1578-494c-bd18-d24b662f8fce/spark-1d0e6101-f537-49d1-9617-d9dc224c4346 | |
15/03/10 18:34:20 INFO HttpServer: Starting HTTP Server | |
15/03/10 18:34:20 INFO Utils: Successfully started service 'HTTP file server' on port 58065. | |
15/03/10 18:34:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. | |
15/03/10 18:34:20 INFO SparkUI: Started SparkUI at http://192.168.1.160:4040 | |
15/03/10 18:34:20 INFO Utils: Copying /Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/access_log_analyzer.py to /var/folders/bc/42jtww354ddgh2wp9hbt0g340000gp/T/spark-be830792-29e7-4cf6-8f32-8412dfde28a9/spark-0f95e33e-77ae-4dc5-bb99-097fdf5dda8e/access_log_analyzer.py | |
15/03/10 18:34:20 INFO SparkContext: Added file file:/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/access_log_analyzer.py at http://192.168.1.160:58065/files/access_log_analyzer.py with timestamp 1425987260357 | |
15/03/10 18:34:20 INFO Executor: Starting executor ID <driver> on host localhost | |
15/03/10 18:34:20 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:58059/user/HeartbeatReceiver | |
15/03/10 18:34:20 INFO NettyBlockTransferService: Server created on 58068 | |
15/03/10 18:34:20 INFO BlockManagerMaster: Trying to register BlockManager | |
15/03/10 18:34:20 INFO BlockManagerMasterActor: Registering block manager localhost:58068 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 58068) | |
15/03/10 18:34:20 INFO BlockManagerMaster: Registered BlockManager | |
Traceback (most recent call last): | |
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/access_log_analyzer.py", line 8, in <module> | |
geoip_service = sc.broadcast(geolite2) | |
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/python/pyspark/context.py", line 636, in broadcast | |
return Broadcast(self, value, self._pickled_broadcast_vars) | |
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/python/pyspark/broadcast.py", line 65, in __init__ | |
self._path = self.dump(value, f) | |
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/python/pyspark/broadcast.py", line 82, in dump | |
cPickle.dump(value, f, 2) | |
cPickle.PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed | |
``` | |
I think broadcasting the `geolite2` is wrong because it contains references to complex objects |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Anyone has any ideas? Please tell me.