Last active
May 6, 2016 13:02
-
-
Save joshisa/514a98e3fb3a35fd23cc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Installing the very useful iPython extension autotime (src: https://github.com/cpcloud/ipython-autotime) | |
%install_ext https://raw.github.com/cpcloud/ipython-autotime/master/autotime.py | |
%load_ext autotime | |
def get_file_content(credentials): | |
'''For given credentials, this functions returns a StringIO object containg the file content.''' | |
url1 = ''.join([credentials['auth_url'], '/v3/auth/tokens']) | |
data = {'auth': {'identity': {'methods': ['password'], | |
'password': {'user': {'name': credentials['username'],'domain': {'id': credentials['domainId']}, | |
'password': credentials['password']}}}}} | |
headers1 = {'Content-Type': 'application/json'} | |
resp1 = requests.post(url=url1, data=json.dumps(data), headers=headers1) | |
resp1_body = resp1.json() | |
for e1 in resp1_body['token']['catalog']: | |
if(e1['type']=='object-store'): | |
for e2 in e1['endpoints']: | |
if(e2['interface']=='public'and e2['region']==credentials['region']): | |
url2 = ''.join([e2['url'],'/', credentials['container'], '/', credentials['filename']]) | |
s_subject_token = resp1.headers['x-subject-token'] | |
headers2 = {'X-Auth-Token': s_subject_token, 'accept': 'application/json'} | |
resp2 = requests.get(url=url2, headers=headers2) | |
return StringIO.StringIO(resp2.content) | |
def set_hadoop_config(credentials): | |
prefix = "fs.swift.service." + credentials['name'] | |
hconf = sc._jsc.hadoopConfiguration() | |
hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v3/auth/tokens') | |
hconf.set(prefix + ".auth.endpoint.prefix", "endpoints") | |
hconf.set(prefix + ".tenant", credentials['projectId']) | |
hconf.set(prefix + ".username", credentials['userId']) | |
hconf.set(prefix + ".password", credentials['password']) | |
hconf.setInt(prefix + ".http.port", 8080) | |
hconf.set(prefix + ".region", credentials['region']) | |
hconf.setBoolean(prefix + ".public", True) | |
def cleanup(line): | |
modline = line.split("LineString[[") | |
if len(modline) == 2: | |
modline[0] = modline[0] + "LineString" | |
modline[1] = "[[%s" % modline[1] | |
return modline | |
# This gives us a collection (spark calls them RDDs) of lines | |
data_raw = sc.textFile(filename).flatMap(lambda line: cleanup(line)).map(lambda line: line) | |
from pyspark.sql import SQLContext | |
from pyspark.sql import functions as F | |
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType, StringType | |
sqlContext = SQLContext(sc) | |
# ArrayType(LongType()) | |
schema = StructType([ | |
StructField( "ArrayOfCoord", ArrayType(ArrayType(DoubleType())), False), | |
StructField( "RouteDistance", DoubleType(), False), | |
StructField( "RouteType", StringType(), False), | |
StructField( "RouteId", StringType(), False), | |
StructField( "RouteDataType", StringType(), False) | |
]) | |
df = sqlContext.createDataFrame(alldata, schema) | |
df.printSchema() | |
df.describe().show() | |
================================================================================== | |
Scala | |
================================================================================== | |
%AddDeps com.ibm.stocator stocator 1.0.1 --transitive | |
%AddDeps org.scalaj scalaj-http_2.10 1.1.4 --transitive | |
%AddDeps org.json4s json4s-native_2.10 3.2.9 --transitive | |
--------------------- | |
/** The purpose of this cell is to define | |
* a helper method that is able to access | |
* Openstack Keystone Swift Distributed Object Storage | |
* | |
* Method Name: setRemoteObjectStorageConfig | |
* Parameters: name (datasource), sc (SparkContext), dsConfiguration (credentials) | |
* Returns: Boolean | |
* Criteria: True if no errors encountered | |
*/ | |
def setRemoteObjectStorageConfig(name:String, sc: SparkContext, dsConfiguration:String) : Boolean = { | |
try { | |
val result = scala.util.parsing.json.JSON.parseFull(dsConfiguration); | |
result match { | |
case Some(e:Map[String,String]) => { | |
val prefix = "fs.swift2d.service." + name; | |
val hconf = sc.hadoopConfiguration; | |
hconf.set("fs.swift2d.impl","com.ibm.stocator.fs.ObjectStoreFileSystem"); | |
hconf.set(prefix + ".auth.url", e("auth_url") + "/v3/auth/tokens"); | |
hconf.set(prefix + ".tenant", e("projectId")); | |
hconf.set(prefix + ".username", e("userId")); | |
hconf.set(prefix + ".password", e("password")); | |
hconf.set(prefix + "auth.method", "keystoneV3"); | |
hconf.set(prefix + ".region", e("region")); | |
hconf.setBoolean(prefix + ".public", true); | |
println("Successfully modified sparkcontext object with remote Object Storage Credentials using datasource name " + name); | |
println(""); | |
return true; | |
}; | |
case None => println("Failed."); return false; | |
}; | |
} catch { | |
case NonFatal(exc) => println(exc); return false; | |
} | |
} | |
/** Example usage */ | |
val sqlctx = new SQLContext(sc); | |
val scplain = sqlctx.sparkContext; | |
val setObjStor = setRemoteObjectStorageConfig("sparksql", scplain, credentials); | |
Class.forName("com.ibm.stocator.fs.ObjectStoreFileSystem") | |
val prf_rdd = sc.textFile("swift2d://container.sparksql/file.extension"); | |
val prf_df = prf_rdd.toDF(); | |
prf_rdd.take(1); | |
------------------ | |
/** The purpose of this cell is to define | |
* a helper method that is able to check | |
* whether spark workers exist | |
* | |
* Method Name: isWorkerAlive | |
* Parameters: None | |
* Returns: Boolean | |
* Criteria: Worker Id Listed in JSON Array with status of ALIVE | |
*/ | |
case class Worker( | |
id: String, | |
host: String, | |
port: Integer, | |
webuiaddress: String, | |
cores: Integer, | |
coresused: Integer, | |
coresfree: Integer, | |
memory: Integer, | |
memoryused: Integer, | |
memoryfree: Integer, | |
state: String, | |
lastheartbeat: Integer | |
) | |
case class Body(workers: List[Worker]) | |
def isWorkerAlive() : String = { | |
val lives = "Happy News! Your trusty spark worker is fighting the good fight"; | |
val dies = "Your spark worker gave up the ghost"; | |
val schrodingersCat = "Your spark worker is in a quantum state :: either alive or dead"; | |
try { | |
/** Read the Spark Master IP and PORT */ | |
var statusURL = sc.getConf.get("spark.master").replaceFirst("spark://","http://"); | |
/** Redefine url to return JSON status */ | |
statusURL = statusURL.substring(0, statusURL.length-4) + "8080/json/"; | |
/** Fetch JSON */ | |
val json: HttpResponse[String] = Http(statusURL).asString | |
implicit val formats = DefaultFormats | |
/** Work our way down the tree using our case classes */ | |
val root = parse(json.body).extract[Body]; | |
val workerList:List[Worker] = root.workers; | |
val firstWorker:Worker = workerList(0); | |
val firstWorkerState:String = firstWorker.state; | |
val workerPresent:String = (if ((workerList.size > 0) && (firstWorkerState == "ALIVE")) | |
lives else dies | |
); | |
return workerPresent; | |
} catch { | |
case NonFatal(exc) => println(exc); | |
return schrodingersCat; | |
} | |
} | |
println(""); | |
--------------------------- | |
/** Refreshing Classpath */ | |
@transient val systemClassLoader = ClassLoader.getSystemClassLoader().asInstanceOf[java.net.URLClassLoader] | |
@transient val m = classOf[java.net.URLClassLoader].getDeclaredMethod("addURL", classOf[java.net.URL]) | |
m.setAccessible(true) | |
def load_jar(myUrl: java.net.URL) = { | |
m.invoke(systemClassLoader, myUrl) | |
} | |
kernel.interpreter.classLoader.asInstanceOf[java.net.URLClassLoader].getURLs.foreach(load_jar) | |
---------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment