- Reading and writing tables in TD through DataFrames of Spark.
- Running Spark SQL queries against DataFrames.
- Submitting Presto SQL queries to TD and reading the query results as DataFrame.
- If you use PySpark, you can use both Spark's DataFrames and Pandas DataFrames interchangeably.
- Using any hosted Spark services, such as Amazon EMR, Databrics Cloud.
- For the best performance, we recommend using us-east region in AWS.
- It's also possible to use Google Colaboratory to run PySpark. See demos for the deatils.
-
td-spark 1.1.0 (For Spark 2.4.0 + Scala 2.11): td-spark-assembly_2.11-1.1.0.jar
- Reduced the td-spark assembly jar file size from 151MB to 22MB.
- Add new methods:
- td.presto(sql)
- Improved the query performance by using the api-presto gateway.
- If you need to run a query that has large results, use td.prestoJob(sql).df.
- td.executePresto(sql)
- Run non-query Presto statements, e.g., CREATE TABLE, DROP TABLE, etc.
- td.presto(sql)
- td.prestoJob(sql)
- Run Presto query as a regular TD job
- td.hiveJob(sql)
- Run Hive query as a regular TD job
- New Database/Table methods:
- td.table(...).exists
- td.table(...).dropIfExists
- td.table(...).createIfNotExists
- td.database(...).exists
- td.database(...).dropIfExists
- td.database(...).createIfNotExists
- Add methods for creating new UDP tables:
- td.createLongPartitionedTable(table_name, long type column name)
- td.createStringPartitionedTable(table_name, string type column name)
- Support df.createOrReplaceTD(...) for UDP tables
- Add spark.td.site configuration for multiple regions.
- spark.td.site=us (default)
- For US customers. Using us-east region provides the best performance
- spark.td.site=jp
- For Tokyo region customers. Using ap-northeast region provides the best performance.
- spark.td.site=eu01
- For EU region customers. Using eu-central region provides the best performance.
- spark.td.site=us (default)
- Enabled predicate pushdown for UDP table queries
- Queries with exact match conditions for UDP keys can be accelerated.
- Bug fixes:
- Fixed a bug when using createOrReplaceTempView in multiple notebooks at Databricks cloud.
- Fixed an error that showed NoSuchMethod when using td.presto command
-
td-spark 1.0.0
- For Spark 2.4.x + Scala 2.11 or Scala 2.12:
- Scala 2.11: td-spark-assembly_2.11-1.0.0.jar (recommended: Scala 2.11 is the default Scala version for Spark 2.4.x)
- Scala 2.12: td-spark-assembly_2.12-1.0.0.jar
-
td-spark 0.4.2
- For Spark 2.3.x + Scala 2.11
- td-spark-assembly_2.11-0.4.2.jar
If you already have a Docker daemon installed on your machine, using Docker images of td-spark will be the most convenient method to try td-spark.
-
If you don't have Docker, install Docker on your machine.
-
Running Spark shell using your API Key
$ export TD_API_KEY=(your TD API key)
$ docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-shell:latest
- Running PySpark shell
$ docker run -it -e TD_API_KEY=$TD_API_KEY armtd/td-spark-pyspark:latest
It is also possible to use Spark binaries to test td-spark:
- Download Spark 2.4.0 (or higher) package, and extract it to any folder you like.
- Download td-spark_2.11_1.0.0.jar
- Prepare
td-spark.conf
configuration file: td-spark.conf
spark.td.apikey=(YOUR TD API KEY)
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.enabled=true
- Launch Spark Shell:
$ ./bin/spark-shell --jars (path to td-spark-assembly_2.11_1.0.0.jar) --properties-file (path to td-spark.conf)
To use td-spark, import com.treasuredata.spark._
package:
import com.treasuredata.spark._; val td = spark.td
To select a time range of a table, you can use td.table("table name")
and within
filtering method.
After applying some filtering, you can create a DataFrame with .df
method:
val df =
td.table("(database).(table)")
.within("-1d")
.df
Here are examples of selecting time ranges:
val t = td.table("sample_datasets.www_access")
// Using duration strings as in TD_INTERVAL
t.within("-1d") // last day
t.within("-1m/2014-10-03 09:13:00") // last 1 minute from a given offset
t.within("-1d PST") // specifying a time zone. (e.g., JST, PST, UTC etc.)
// Specifying unix time ranges
t.withinUnixTimeRange(from = 1412320845, until = 1412321000) // [from, unti) unix time range
t.fromUnixTime(1412320845) // [from, ...)
t.untilUnixTime(1412321000) // [... , until)
// Using time strings yyyy-MM-dd HH:mm:ss (timezone. default = UTC)?
t.fromTime("2014-10-03 09:12:00") // [from, ...)
t.untilTime("2014-10-03 09:13:00") // [..., until)
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00") // [from, until)
t.withinTimeRange(from = "2014-10-03 09:12:00", until = "2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo")) // [from, until) in Asia/Tokyo timezone
// Specifying timezone
t.fromTime("2014-10-03 09:12:00", ZoneId.of("Asia/Tokyo")) // [from, ...) in Asia/Tokyo timezone
t.untilTime("2014-10-03 09:13:00", ZoneId.of("Asia/Tokyo")) // [..., until) in Asia/Tokyo timezone
val df = td.presto("select * from www_access limit 10")
df.show
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null| 76.45.175.151| /item/sports/4642|/search/?c=Sports...| 200|Mozilla/5.0 (Maci...| 137| GET|1412380793|
|null|184.162.105.153| /category/finance| -| 200|Mozilla/4.0 (comp...| 68| GET|1412380784|
|null| 144.30.45.112|/item/electronics...| /item/software/4777| 200|Mozilla/5.0 (Maci...| 136| GET|1412380775|
|null| 68.42.225.106|/category/networking|/category/electro...| 200|Mozilla/4.0 (comp...| 98| GET|1412380766|
|null| 104.66.194.210| /category/books| -| 200|Mozilla/4.0 (comp...| 43| GET|1412380757|
|null| 64.99.74.69| /item/finance/3775|/category/electro...| 200|Mozilla/5.0 (Wind...| 86| GET|1412380748|
|null| 136.135.51.168|/item/networking/540| -| 200|Mozilla/5.0 (Wind...| 89| GET|1412380739|
|null| 52.99.134.55| /item/health/1326|/category/electro...| 200|Mozilla/5.0 (Maci...| 51| GET|1412380730|
|null| 136.51.116.68|/category/finance...| -| 200|Mozilla/5.0 (comp...| 99| GET|1412380721|
|null|136.141.218.177| /item/computers/959| -| 200|Mozilla/5.0 (Wind...| 124| GET|1412380712|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
td.table("sample_datasets.www_access").df.createOrReplaceTempView("www_access")
val q = spark.sql("select agent, count(*) cnt from www_access group by 1")
q.show
+--------------------+---+
| agent|cnt|
+--------------------+---+
|Mozilla/4.0 (comp...|159|
|Mozilla/5.0 (comp...|341|
|Mozilla/4.0 (comp...|140|
|Mozilla/5.0 (comp...|497|
|Mozilla/5.0 (Wind...|630|
|Mozilla/5.0 (iPad...|158|
|Mozilla/5.0 (Maci...|445|
|Mozilla/5.0 (comp...|261|
|Mozilla/5.0 (Wind...|304|
|Mozilla/4.0 (comp...|187|
|Mozilla/4.0 (comp...|150|
|Mozilla/5.0 (Wind...|291|
|Mozilla/5.0 (Wind...|297|
|Mozilla/4.0 (comp...|132|
|Mozilla/5.0 (Wind...|147|
|Mozilla/4.0 (comp...|123|
|Mozilla/5.0 (iPho...|139|
|Mozilla/5.0 (Wind...|439|
|Mozilla/4.0 (comp...|160|
+--------------------+---+
To upload a DataFrame to TD, use df.createOrReplaceTD
, df.insertIntoTD
:
val df: DataFrame = ... // Prepare some DataFrame
// Writes DataFrame to a new TD table. If the table already exists, this will fail.
df.write.td("(database name).(table name)")
// Create or replace the target TD table with the contents of DataFrame
df.createOrReplaceTD("(database name).(table name)")
// Append the contents of DataFrame to the target TD table.
// If the table doesn't exists, it will create a new table
df.insertIntoTD("(database name).(table name)")
You can also use the full DataFrame.write
and save
syntax. Specify the format as com.treasuredata.spark
,
then select the target table with .option("table", "(target database).(table name)")
:
df.write // df is an arbitrary DataFrame
.mode("overwrite") // append, overwrite, error, ignore
.format("com.treasuredata.spark")
.option("table", "(database name).(table name)") // Specify an upload taget table
.save
mode | behavior |
---|---|
append | Append to the target table. Throws an error if the table doesn't exist. |
overwrite | This performs two-step update. First it deletes the target table if it exists, then creates a new table with the new data. |
error | (default) If the target table already exists, throws an exception. |
ignore | If the target table exists, ignores the save operation. |
Creating a new table with CREATE TABLE AS (SELECT ...)
:
spark.sql("""
CREATE TABLE my_table
USING com.treasuredata.spark
OPTIONS(table '(database name).(table name)')
AS SELECT ...
""")
overwrite
for UDP tables is not supported yet. You need to create an UDP table using Presto, then useappend
mode for inserting data to the table.