Skip to content

Instantly share code, notes, and snippets.

@hakanilter
Created March 6, 2019 08:52
Show Gist options
  • Save hakanilter/fb60094e8fd999e95a96abe8a850c3fe to your computer and use it in GitHub Desktop.
Save hakanilter/fb60094e8fd999e95a96abe8a850c3fe to your computer and use it in GitHub Desktop.
Convert Wikipedia Category SQL File to Parquet Files
from pyspark.sql import SparkSession
# init spark
spark = SparkSession.builder \
.master("local[*]") \
.appName("anaconda") \
.config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
.enableHiveSupport() \
.getOrCreate()
def parse(sql):
return sql.replace('INSERT INTO `categorylinks` VALUES (', '') \
.replace(');', '') \
.split('),(')
# parse sql file and save as csv
rdd = spark.sparkContext \
.textFile('/opt/notebooks/enwiki/enwiki-latest-categorylinks.sql') \
.filter(lambda x: x.startswith('INSERT INTO')) \
.flatMap(lambda sql: parse(sql))
rdd.saveAsTextFile('wikipedia-category')
# Read csv and save as parquet
from pyspark.sql.types import *
schema = StructType([
StructField("cl_from", IntegerType()),
StructField("cl_to", StringType()),
StructField("cl_sortkey", StringType()),
StructField("cl_timestamp", TimestampType()),
StructField("cl_sortkey_prefix", StringType()),
StructField("cl_collation", StringType()),
StructField("cl_type", StringType())
])
spark.read \
.schema(schema) \
.option('quote', '\'') \
.option("sep", ',') \
.option("header", "false") \
.csv('wikipedia-category')
.write \
.mode("overwrite") \
.parquet("categorylinks")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment