Skip to content

Instantly share code, notes, and snippets.

@zero323
Created August 11, 2015 13:22
Show Gist options
  • Save zero323/943b8fb05482fe09ac67 to your computer and use it in GitHub Desktop.
Save zero323/943b8fb05482fe09ac67 to your computer and use it in GitHub Desktop.
from pyspark.sql.window import Window
from pyspark.sql import functions as f
df = sqlContext.createDataFrame(
zip(["foo"] * 5 + ["bar"] * 5, range(1, 6) + range(6, 11)),
("k", "v")
).withColumn("dummy", f.lit(1))
df.registerTempTable("df")
w_part_ord_rng = Window.partitionBy("k").orderBy("v").rowsBetween(-1, 1)
w_ord_rng = Window.orderBy("v").rowsBetween(-1, 1)
sql_part_ord_rng = """SELECT k, v, avg(v) OVER (
PARTITION BY k
ORDER BY v
ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
) AS avg FROM df"""
sql_ord_rng = """SELECT k, v, avg(v) OVER (
ORDER BY v
ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
) AS avg FROM df"""
df.select("k", "v", f.avg("v").over(w_part_ord_rng).alias("avg")).show()
sqlContext.sql(sql_part_ord_rng).show()
df.select("k", "v", f.avg("v").over(w_ord_rng).alias("avg")).show()
sqlContext.sql(sql_ord_rng).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment