Skip to content

Instantly share code, notes, and snippets.

@cgrotz
Created December 10, 2021 20:27
Show Gist options
  • Save cgrotz/12074069dbc91f6c6ac74de7d0d6ec20 to your computer and use it in GitHub Desktop.
Save cgrotz/12074069dbc91f6c6ac74de7d0d6ec20 to your computer and use it in GitHub Desktop.
Covid Analysis
import com.google.api.services.bigquery.model.TableFieldSchema
import com.google.api.services.bigquery.model.TableRow
import com.google.api.services.bigquery.model.TableSchema
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.TextIO
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.ValueProvider
import org.apache.beam.sdk.transforms.Filter
import org.apache.beam.sdk.transforms.MapElements
import org.apache.beam.sdk.transforms.SimpleFunction
import org.apache.beam.sdk.values.TypeDescriptor
import java.util.*
// https://opendata.rhein-kreis-neuss.de/explore/dataset/deutschland-corona-impfungen/download/?format=csv
class CsvLineFormat : SimpleFunction<String, Array<String>>() {
override fun apply(line: String): Array<String> {
var lineElements = line.split(";")
return arrayOf(lineElements[0],
lineElements[1],lineElements[2],lineElements[3],lineElements[4],lineElements[5],
lineElements[6],lineElements[7],lineElements[8],lineElements[9],lineElements[10],
lineElements[11],lineElements[12],lineElements[13],lineElements[14],lineElements[15],
lineElements[16],lineElements[17],lineElements[18],lineElements[19],lineElements[20],
lineElements[21],lineElements[22],lineElements[23])
}
}
public class Convert : SimpleFunction<Array<String>, TableRow>() {
override fun apply(line: Array<String>): TableRow {
var row = TableRow()
row.set("date", line[0])
row.set("dosen_kumulativ", line[1].toInt())
row.set("dosen_differenz_zum_vortag", line[2].toInt())
row.set("dosen_erst_differenz_zum_vortag", line[3].toInt())
row.set("dosen_zweit_differenz_zum_vortag", line[4].toInt())
row.set("dosen_biontech_kumulativ", line[5].toInt())
row.set("dosen_moderna_kumulativ", line[6].toInt())
row.set("dosen_astrazeneca_kumulativ", line[7].toInt())
row.set("personen_erst_kumulativ", line[8].toInt())
row.set("personen_voll_kumulativ", line[9].toInt())
row.set("impf_quote_erst", line[10].toFloat())
row.set("impf_quote_voll", line[11].toFloat())
row.set("indikation_alter_dosen", line[12].toInt())
row.set("indikation_beruf_dosen", line[13].toInt())
row.set("indikation_medizinisch_dosen", line[14].toInt())
row.set("indikation_pflegeheim_dosen", line[15].toInt())
row.set("indikation_alter_erst", line[16].toInt())
row.set("indikation_beruf_erst", line[17].toInt())
row.set("indikation_medizinisch_erst", line[18].toInt())
row.set("indikation_pflegeheim_erst", line[19].toInt())
row.set("indikation_alter_voll", line[20].toInt())
row.set("indikation_beruf_voll", line[21].toInt())
row.set("indikation_medizinisch_voll", line[22].toInt())
row.set("indikation_pflegeheim_voll", line[23].toInt())
return row
}
}
public class RemoveHeaderFilter : SimpleFunction<Array<String>, Boolean>() {
override fun apply(it: Array<String>): Boolean {
return it[0] != "date"
}
}
inline fun <reified T: Any> clazz():TypeDescriptor<T> {
return TypeDescriptor.of(T::class.java)
}
fun main(args: Array<String>) {
val options = PipelineOptionsFactory
.fromArgs(*args)
.withValidation()
.`as`(PipelineOptions::class.java)
val p: Pipeline = Pipeline.create(options)
var project = "<project_id> "
var dataset = "covid_vaccination_germany"
var table = "vaccinations"
var schema = TableSchema()
.setFields(
Arrays.asList(
TableFieldSchema()
.setName("date")
.setType("DATETIME")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_kumulativ")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_differenz_zum_vortag")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_erst_differenz_zum_vortag")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_zweit_differenz_zum_vortag")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_biontech_kumulativ")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_moderna_kumulativ")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("dosen_astrazeneca_kumulativ")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("personen_erst_kumulativ")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("personen_voll_kumulativ")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("impf_quote_erst")
.setType("FLOAT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("impf_quote_voll")
.setType("FLOAT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_alter_dosen")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_beruf_dosen")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_medizinisch_dosen")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_pflegeheim_dosen")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_alter_erst")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_beruf_erst")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_medizinisch_erst")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_pflegeheim_erst")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_alter_voll")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_beruf_voll")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_medizinisch_voll")
.setType("INT64")
.setMode("REQUIRED"),
TableFieldSchema()
.setName("indikation_pflegeheim_voll")
.setType("INT64")
.setMode("REQUIRED")
)
)
p.apply(TextIO.read().from("gs://<project_id> -bucket/input.csv"))
.apply("Split CSV", MapElements
.into(TypeDescriptor.of(Array<String>::class.java))
.via( CsvLineFormat() ))
.apply("Filter Header", Filter.by( RemoveHeaderFilter() ))
.apply("Convert Elements", MapElements
.into(clazz<TableRow>())
.via( Convert() ))
.apply("Write to BigQuery", BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withSchema(schema)
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of("gs://<temp_bucket_id>/bigquerytemp"))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
)
p.run().waitUntilFinish()
}
SELECT
date,
SUM(cumulative_deceased) AS Deceased,
SUM(cumulative_confirmed) AS Confirmed
FROM `bigquery-public-data.covid19_open_data.covid19_open_data`
WHERE country_code = 'DE' AND subregion1_code is null
GROUP BY date
ORDER BY date
SELECT
CAST(v.date AS DATE) AS Date,
v.personen_erst_kumulativ AS First_Injection,
v.personen_voll_kumulativ AS Completely_Vaccinated,
c.Confirmed,
c.Deceased
FROM
`<project_id> .covid_vaccination_germany.vaccinations` AS v
INNER JOIN (
SELECT
date,
SUM(cumulative_deceased) AS Deceased,
SUM(cumulative_confirmed) AS Confirmed
FROM `bigquery-public-data.covid19_open_data.covid19_open_data`
WHERE country_code = 'DE' AND subregion1_code is null
GROUP BY date
ORDER BY date ) AS c
ON
CAST(v.date AS DATE) = c.date
ORDER BY
v.date
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment