Skip to content

Instantly share code, notes, and snippets.

@mineo
Created March 31, 2018 18:25
Show Gist options
  • Save mineo/08dbc653e9a4b82e29b7402a9aac0b5a to your computer and use it in GitHub Desktop.
Save mineo/08dbc653e9a4b82e29b7402a9aac0b5a to your computer and use it in GitHub Desktop.
# coding: utf-8
import json
path = "/home/wieland/Downloads/Unpack-6438/listenbrainz-listens-dump-20180315-000401/listens/0/04/04d5f670-7c04-4dcf-801c-89ff231f3a96.listens"
sc = spark.sparkContext
lines = sc.textFile(path)
from pyspark.sql import Row
listens = lines.map(lambda listen: Row(**json.loads(listen)))
print(listens.count())
from pyspark.sql.types import *
fields = [
StructField("listened_at", IntegerType(), False),
StructField("recording_msid", StringType(), False),
StructField("track_metadata", StructType(fields=[
StructField("release_group_mbid", StringType()),
StructField("release_msid", StringType()),
StructField("release_mbid", StringType()),
StructField("recording_mbid", StringType()),
StructField("track_mbid", StringType()),
StructField("work_mbids", ArrayType(StringType())),
StructField("tracknumber", IntegerType()),
StructField("spotify_id", StringType()),
StructField("tags", ArrayType(StringType())),
StructField("additional_info", MapType(StringType(), StringType()))]))]
schema = StructType(fields)
foo = spark.createDataFrame(listens, schema)
print(foo.count())
foo.head()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment