Skip to content

Instantly share code, notes, and snippets.

@MLnick
Created September 18, 2014 07:24
Show Gist options
  • Save MLnick/175941b5221561d94c04 to your computer and use it in GitHub Desktop.
Save MLnick/175941b5221561d94c04 to your computer and use it in GitHub Desktop.
Spark 1.1: Bringing Hadoop Input/Output Formats to PySpark
def fromAvro(obj: Any, schema: Schema): Any = {
if (obj == null) {
return null
}
schema.getType match {
case UNION => unpackUnion(obj, schema)
case ARRAY => unpackArray(obj, schema)
case FIXED => unpackFixed(obj, schema)
case MAP => unpackMap(obj, schema)
case BYTES => unpackBytes(obj)
case RECORD => unpackRecord(obj)
case STRING => obj.toString
case ENUM => obj.toString
case NULL => obj
case BOOLEAN => obj
case DOUBLE => obj
case FLOAT => obj
case INT => obj
case LONG => obj
case other => throw new SparkException(
s"Unknown Avro schema type ${other.getName}")
}
}
rdd = sc.parallelize([('key1', 1.0), ('key2', 2.0), ('key3', 3.0)])
rdd.saveAsSequenceFile('/tmp/pysequencefile/')
...
sc.sequenceFile('/tmp/pysequencefile/').collect()
[(u'key1', 1.0), (u'key2', 2.0), (u'key3', 3.0)]
val rdd = sc.sequenceFile[String, Double](path)
def unpackArray(obj: Any, schema: Schema): java.util.Collection[Any] = obj match {
case c: JCollection[_] =>
c.map(fromAvro(_, schema.getElementType))
case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
arr.toSeq
case arr: Array[_] =>
arr.map(fromAvro(_, schema.getElementType)).toSeq
case other => throw new SparkException(
s"Unknown ARRAY type ${other.getClass.getName}")
}
def unpackBytes(obj: Any): Array[Byte] = {
val bytes: Array[Byte] = obj match {
case buf: java.nio.ByteBuffer => buf.array()
case arr: Array[Byte] => arr
case other => throw new SparkException(
s"Unknown BYTES type ${other.getClass.getName}")
}
val bytearray = new Array[Byte](bytes.length)
System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
bytearray
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment