Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sksundaram-learning/ed5e687c172bc549f63670a19e6295c7 to your computer and use it in GitHub Desktop.
Save sksundaram-learning/ed5e687c172bc549f63670a19e6295c7 to your computer and use it in GitHub Desktop.
Python & PySpark Routines
pyspark_udf.py
==============
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
maturity_udf = udf(lambda age: "adult" if age >=18 else "child", StringType())
df = sqlContext.createDataFrame([{'name': 'Alice', 'age': 1}])
df.withColumn("maturity", maturity_udf(df.age))
Another UDF Example:
--------------------
# Generate Random Data
import itertools
import random
students = ['John', 'Mike','Matt']
subjects = ['Math', 'Sci', 'Geography', 'History']
random.seed(1)
data = []
for (student, subject) in itertools.product(students, subjects):
data.append((student, subject, random.randint(0, 100)))
# Create Schema Object
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("student", StringType(), nullable=False),
StructField("subject", StringType(), nullable=False),
StructField("score", IntegerType(), nullable=False)
])
# Create DataFrame
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
# Define udf
from pyspark.sql.functions import udf
def scoreToCategory(score):
if score >= 80: return 'A'
elif score >= 60: return 'B'
elif score >= 35: return 'C'
else: return 'D'
udfScoreToCategory=udf(scoreToCategory, StringType())
df.withColumn("category", udfScoreToCategory("score")).show(10)
PyArrow, fastParquet and Pandas Usage:
======================================
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
df = pd.DataFrame({'one': [-1, np.nan, 2.5], 'two': ['foo', 'bar', 'baz'], 'three': [True, False, True]})
table = pa.Table.from_pandas(df)
pq.write_table(table, 'example.parquet')
table2 = pq.read_table('example.parquet')
table2.to_pandas()
pq.read_table('example.parquet', columns=['one', 'three'])
Fine-grained reading and writing: from
https://arrow.apache.org/docs/python/parquet.html
--------------------------------
parquet_file = pq.ParquetFile('example.parquet')
parquet_file.metadata
Out[13]:
<pyarrow._parquet.FileMetaData object at 0x7f0c82583a48>
created_by: parquet-cpp version 1.3.1-SNAPSHOT
num_columns: 4
num_rows: 3
num_row_groups: 1
format_version: 1.0
serialized_size: 850
parquet_file.schema
Out[14]:
<pyarrow._parquet.ParquetSchema object at 0x7f0c82ea25c8>
one: DOUBLE
three: BOOLEAN
two: BYTE_ARRAY UTF8
__index_level_0__: INT64
parquet_file.num_row_groups
parquet_file.read_row_group(0)
writer = pq.ParquetWriter('example2.parquet', table.schema)
for i in range(3):
....: writer.write_table(table)
writer.close()
pf2 = pq.ParquetFile('example2.parquet')
pf2.num_row_groups
pq.write_table(table, where, use_dictionary=False)
pq.write_table(table, where, compression='snappy')
pq.write_table(table, where, compression='gzip')
pq.write_table(table, where, compression='brotli')
pq.write_table(table, where, compression='none')
These settings can also be set on a per-column basis:
pq.write_table(table, where, compression={'foo': 'snappy', 'bar': 'gzip'},
use_dictionary=['foo', 'bar'])
Reading multiple files and partirioned data sets:
------------------------------------------------
e.g.:
dataset_name/
year=2007/
month=01/
0.parq
1.parq
...
month=02/
0.parq
1.parq
...
month=03/
...
year=2008/
month=01/
...
...
The ParquetDataset class accepts either a directory name or a list or file paths, and can discover and infer some common partition structures, such as those produced by Hive:
dataset = pq.ParquetDataset('dataset_name/')
table = dataset.read()
Multithreaded reads:
--------------------
Each of the reading functions have an nthreads argument which will read columns with the indicated level of parallelism. Depending on the speed of IO and how expensive it is to decode the columns in a particular file (particularly with GZIP compression), this can yield significantly higher data throughput:
pq.read_table(where, nthreads=4)
pq.ParquetDataset(where).read(nthreads=4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment