Skip to content

Instantly share code, notes, and snippets.

@tranvictor
Created March 10, 2015 11:38
Show Gist options
  • Save tranvictor/fc5e7338abd4fdaffedc to your computer and use it in GitHub Desktop.
Save tranvictor/fc5e7338abd4fdaffedc to your computer and use it in GitHub Desktop.
Spark log analytics experiment
Python snippet to count number of requests on countries.
```python
# in examples/src/main/python/access_log_analyzer.py
import sys
from pyspark import SparkContext
from geoip import geolite2
if __name__ == "__main__":
sc = SparkContext(appName="PythonAccessLogAnalyzer")
geoip_service = sc.broadcast(geolite2)
def get_country_from_line(line):
try:
ip = line.split(' ')[0]
match = geoip_service.lookup(ip)
if match is not None:
return match.country
else:
return "Unknown"
except IndexError:
return "Error"
rdd = sc.textFile("/Users/victor/access.log").map(get_country_from_line)
ips = rdd.countByValue()
print ips
sc.stop()
```
Running this on Spark
`bin/spark-submit examples/src/main/python/access_log_analyzer.py`
Error running this
```
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/10 18:34:19 INFO SecurityManager: Changing view acls to: victor
15/03/10 18:34:19 INFO SecurityManager: Changing modify acls to: victor
15/03/10 18:34:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(victor); users with modify permissions: Set(victor)
15/03/10 18:34:19 INFO Slf4jLogger: Slf4jLogger started
15/03/10 18:34:19 INFO Remoting: Starting remoting
15/03/10 18:34:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:58059]
15/03/10 18:34:19 INFO Utils: Successfully started service 'sparkDriver' on port 58059.
15/03/10 18:34:19 INFO SparkEnv: Registering MapOutputTracker
15/03/10 18:34:19 INFO SparkEnv: Registering BlockManagerMaster
15/03/10 18:34:19 INFO DiskBlockManager: Created local directory at /var/folders/bc/42jtww354ddgh2wp9hbt0g340000gp/T/spark-c903ab0c-17bb-4989-8601-5b19a17e11f1/spark-916ef9d4-d701-4045-9c0f-75b5871a0f12
15/03/10 18:34:19 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
15/03/10 18:34:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/03/10 18:34:20 INFO HttpFileServer: HTTP File server directory is /var/folders/bc/42jtww354ddgh2wp9hbt0g340000gp/T/spark-7fca6ab4-1578-494c-bd18-d24b662f8fce/spark-1d0e6101-f537-49d1-9617-d9dc224c4346
15/03/10 18:34:20 INFO HttpServer: Starting HTTP Server
15/03/10 18:34:20 INFO Utils: Successfully started service 'HTTP file server' on port 58065.
15/03/10 18:34:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
15/03/10 18:34:20 INFO SparkUI: Started SparkUI at http://192.168.1.160:4040
15/03/10 18:34:20 INFO Utils: Copying /Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/access_log_analyzer.py to /var/folders/bc/42jtww354ddgh2wp9hbt0g340000gp/T/spark-be830792-29e7-4cf6-8f32-8412dfde28a9/spark-0f95e33e-77ae-4dc5-bb99-097fdf5dda8e/access_log_analyzer.py
15/03/10 18:34:20 INFO SparkContext: Added file file:/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/access_log_analyzer.py at http://192.168.1.160:58065/files/access_log_analyzer.py with timestamp 1425987260357
15/03/10 18:34:20 INFO Executor: Starting executor ID <driver> on host localhost
15/03/10 18:34:20 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:58059/user/HeartbeatReceiver
15/03/10 18:34:20 INFO NettyBlockTransferService: Server created on 58068
15/03/10 18:34:20 INFO BlockManagerMaster: Trying to register BlockManager
15/03/10 18:34:20 INFO BlockManagerMasterActor: Registering block manager localhost:58068 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 58068)
15/03/10 18:34:20 INFO BlockManagerMaster: Registered BlockManager
Traceback (most recent call last):
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/access_log_analyzer.py", line 8, in <module>
geoip_service = sc.broadcast(geolite2)
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/python/pyspark/context.py", line 636, in broadcast
return Broadcast(self, value, self._pickled_broadcast_vars)
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/python/pyspark/broadcast.py", line 65, in __init__
self._path = self.dump(value, f)
File "/Users/victor/Downloads/spark-1.2.1-bin-hadoop2.4/python/pyspark/broadcast.py", line 82, in dump
cPickle.dump(value, f, 2)
cPickle.PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed
```
I think broadcasting the `geolite2` is wrong because it contains references to complex objects
@tranvictor
Copy link
Author

Anyone has any ideas? Please tell me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment