Skip to content

Instantly share code, notes, and snippets.

@deusaquilus
Created December 10, 2020 20:28
Show Gist options
  • Select an option

  • Save deusaquilus/5b0c2c6801641f8fe56b9b94ecb752c7 to your computer and use it in GitHub Desktop.

Select an option

Save deusaquilus/5b0c2c6801641f8fe56b9b94ecb752c7 to your computer and use it in GitHub Desktop.
Grouping into an Array with Quill Spark
case class Address(street: String, zip: Int, personName: String)
case class Person(name: String, addresses: List[String])
/*
In some Data-Models used with Spark, you may want to embed arrays into an object.
You may then want to be able to set array-properties of this kind of object when
a group-by is done. Normally this would be done in Spark using the following way
*/
val addresses: Dataset[Address] = ???
val people = addresses.groupByKey(a => a.personName).mapGroups { case (name, addresses) =>
Person(name, addresses.map(_.street).toList)
}
/*
If you are using Quill Queries however, this has to be done a little bit differently because all Quill-Spark
queries need to be converted to SQL before they are run. The question then becomes, what kind of SQL Aggregator
to use to achieve the equivalent functionality. Fortunately, Spark has an aggregator called `collect_list`
which does this.
*/
val addressesQuery: Dataset[Address] = liftQuery(addresses)
val peopleQuery: Query[Person] = quote {
addressesQuery
.groupBy(a => a.personName)
.map { case (name, addresses) =>
Person(name, infix"collect_list(${addresses.map(_.street)})".pure.as[List[String]])
}
}
/*
This creates a Query that looks like the following:
SELECT
a.name,
collect_list(a.street)
FROM Addresses a GROUP BY a.name
*/
/*
Alternatively, the list that is collected can be a list of objects.
*/
case class Address(street: String, zip: Int, personName: String)
case class Person(name: String, addresses: List[Address])
val addressesQuery: Dataset[Address] = liftQuery(addresses)
val peopleQuery: Query[Person] = quote {
addressesQuery
.groupBy(a => a.personName)
.map { case (name, addresses) =>
Person(name, infix"collect_list(${addresses})".pure.as[List[Address]])
}
}
/*
This creates a Query that looks like the following:
SELECT
a.name,
collect_list(a.*)
FROM Addresses a GROUP BY a.name
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment