# Installing the very useful iPython extension autotime (src:
%load_ext autotime
def get_file_content(credentials):
'''For given credentials, this functions returns a StringIO object containg the file content.'''
url1 = ''.join([credentials['auth_url'], '/v3/auth/tokens'])
data = {'auth': {'identity': {'methods': ['password'],
'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domainId']},
'password': credentials['password']}}}}}
headers1 = {'Content-Type': 'application/json'}
resp1 =, data=json.dumps(data), headers=headers1)
resp1_body = resp1.json()
for e1 in resp1_body['token']['catalog']:
for e2 in e1['endpoints']:
if(e2['interface']=='public'and e2['region']==credentials['region']):
url2 = ''.join([e2['url'],'/', credentials['container'], '/', credentials['filename']])
s_subject_token = resp1.headers['x-subject-token']
headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'}
resp2 = requests.get(url=url2, headers=headers2)
return StringIO.StringIO(resp2.content)
def set_hadoop_config(credentials):
prefix = "fs.swift.service." + credentials['name']
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v3/auth/tokens')
hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
hconf.set(prefix + ".tenant", credentials['projectId'])
hconf.set(prefix + ".username", credentials['userId'])
hconf.set(prefix + ".password", credentials['password'])
hconf.setInt(prefix + ".http.port", 8080)
hconf.set(prefix + ".region", credentials['region'])
hconf.setBoolean(prefix + ".public", True)
def cleanup(line):
modline = line.split("LineString[[")
if len(modline) == 2:
modline[0] = modline[0] + "LineString"
modline[1] = "[[%s" % modline[1]
return modline
# This gives us a collection (spark calls them RDDs) of lines
data_raw = sc.textFile(filename).flatMap(lambda line: cleanup(line)).map(lambda line: line)
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType
sqlContext = SQLContext(sc)
# ArrayType(LongType())
schema = StructType([
StructField( "ArrayOfCoord", ArrayType(ArrayType(DoubleType())), False),
StructField( "RouteDistance", DoubleType(), False),
StructField( "RouteType", StringType(), False),
StructField( "RouteId", StringType(), False),
StructField( "RouteDataType", StringType(), False)
df = sqlContext.createDataFrame(alldata, schema)
%AddDeps stocator 1.0.1 --transitive
%AddDeps org.scalaj scalaj-http_2.10 1.1.4 --transitive
%AddDeps org.json4s json4s-native_2.10 3.2.9 --transitive
/** The purpose of this cell is to define
* a helper method that is able to access
* Openstack Keystone Swift Distributed Object Storage
* Method Name: setRemoteObjectStorageConfig
* Parameters: name (datasource), sc (SparkContext), dsConfiguration (credentials)
* Returns: Boolean
* Criteria: True if no errors encountered
def setRemoteObjectStorageConfig(name:String, sc: SparkContext, dsConfiguration:String) : Boolean = {
try {
val result = scala.util.parsing.json.JSON.parseFull(dsConfiguration);
result match {
case Some(e:Map[String,String]) => {
val prefix = "fs.swift2d.service." + name;
val hconf = sc.hadoopConfiguration;
hconf.set(prefix + ".auth.url", e("auth_url") + "/v3/auth/tokens");
hconf.set(prefix + ".tenant", e("projectId"));
hconf.set(prefix + ".username", e("userId"));
hconf.set(prefix + ".password", e("password"));
hconf.set(prefix + "auth.method", "keystoneV3");
hconf.set(prefix + ".region", e("region"));
hconf.setBoolean(prefix + ".public", true);
println("Successfully modified sparkcontext object with remote Object Storage Credentials using datasource name " + name);
return true;
case None => println("Failed."); return false;
} catch {
case NonFatal(exc) => println(exc); return false;
/** Example usage */
val sqlctx = new SQLContext(sc);
val scplain = sqlctx.sparkContext;
val setObjStor = setRemoteObjectStorageConfig("sparksql", scplain, credentials);
val prf_rdd = sc.textFile("swift2d://container.sparksql/file.extension");
val prf_df = prf_rdd.toDF();
/** The purpose of this cell is to define
* a helper method that is able to check
* whether spark workers exist
* Method Name: isWorkerAlive
* Parameters: None
* Returns: Boolean
* Criteria: Worker Id Listed in JSON Array with status of ALIVE
case class Worker(
id: String,
host: String,
port: Integer,
webuiaddress: String,
cores: Integer,
coresused: Integer,
coresfree: Integer,
memory: Integer,
memoryused: Integer,
memoryfree: Integer,
state: String,
lastheartbeat: Integer
case class Body(workers: List[Worker])
def isWorkerAlive() : String = {
val lives = "Happy News! Your trusty spark worker is fighting the good fight";
val dies = "Your spark worker gave up the ghost";
val schrodingersCat = "Your spark worker is in a quantum state :: either alive or dead";
try {
/** Read the Spark Master IP and PORT */
var statusURL = sc.getConf.get("spark.master").replaceFirst("spark://","http://");
/** Redefine url to return JSON status */
statusURL = statusURL.substring(0, statusURL.length-4) + "8080/json/";
/** Fetch JSON */
val json: HttpResponse[String] = Http(statusURL).asString
implicit val formats = DefaultFormats
/** Work our way down the tree using our case classes */
val root = parse(json.body).extract[Body];
val workerList:List[Worker] = root.workers;
val firstWorker:Worker = workerList(0);
val firstWorkerState:String = firstWorker.state;
val workerPresent:String = (if ((workerList.size > 0) && (firstWorkerState == "ALIVE"))
lives else dies
return workerPresent;
} catch {
case NonFatal(exc) => println(exc);
return schrodingersCat;
/** Refreshing Classpath */
@transient val systemClassLoader = ClassLoader.getSystemClassLoader().asInstanceOf[]
@transient val m = classOf[].getDeclaredMethod("addURL", classOf[])
def load_jar(myUrl: = {
m.invoke(systemClassLoader, myUrl)
