Skip to content

Instantly share code, notes, and snippets.

@alexarchambault
Last active October 19, 2020 10:11
Show Gist options
  • Select an option

  • Save alexarchambault/09bdcf2b3d58bc04c4a7ed30906e141f to your computer and use it in GitHub Desktop.

Select an option

Save alexarchambault/09bdcf2b3d58bc04c4a7ed30906e141f to your computer and use it in GitHub Desktop.
$ ./ammonium
Loading...
Welcome to the Ammonite Repl 0.8.0
(Scala 2.11.8 Java 1.8.0_112)
@ import $ivy.`org.jupyter-scala::scio:0.4.0-RC1`
import $ivy.$
@ {
import jupyter.scio._
import com.spotify.scio._
import com.spotify.scio.accumulators._
import com.spotify.scio.bigquery._
import com.spotify.scio.experimental._
val sc = JupyterScioContext(
"runner" -> "DataflowPipelineRunner",
"project" -> "jupyter-scala",
"stagingLocation" -> "gs://test-bucket/jupyter-scala-scio-test"
).withGcpCredential(sys.props("user.home") + "/.gcp/credentials.json")
}
import jupyter.scio._
import com.spotify.scio._
import com.spotify.scio.accumulators._
import com.spotify.scio.bigquery._
import com.spotify.scio.experimental._
sc: com.spotify.scio.jupyter.JupyterScioContext = com.spotify.scio.jupyter.JupyterScioContext@5ace902e
@ {
val max = sc.maxAccumulator[Int]("maxLineLength")
val min = sc.minAccumulator[Int]("minLineLength")
val sumNonEmpty = sc.sumAccumulator[Long]("nonEmptyLines")
val sumEmpty = sc.sumAccumulator[Long]("emptyLines")
sc.textFile("gs://dataflow-samples/shakespeare/kinglear.txt")
.map(_.trim)
.accumulateBy(max, min)(_.length)
.accumulateCountFilter(sumNonEmpty, sumEmpty)(_.nonEmpty)
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.map(t => t._1 + ": " + t._2)
.saveAsTextFile("gs://test-bucket/jupyter-scala-scio-test/results/output")
}
max: values.Accumulator[Int] = com.spotify.scio.ScioContext$$anonfun$maxAccumulator$1$$anon$2@734990c1
min: values.Accumulator[Int] = com.spotify.scio.ScioContext$$anonfun$minAccumulator$1$$anon$3@36c6d53b
sumNonEmpty: values.Accumulator[Long] = com.spotify.scio.ScioContext$$anonfun$sumAccumulator$1$$anon$4@44a1e5d2
sumEmpty: values.Accumulator[Long] = com.spotify.scio.ScioContext$$anonfun$sumAccumulator$1$$anon$4@17d90f81
res2_4: concurrent.Future[io.Tap[String]] = List()
@ val result = sc.close()
[main] INFO com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner - Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.
[main] INFO com.google.cloud.dataflow.sdk.util.PackageUtil - Uploading 211 files from PipelineOptions.filesToStage to staging location to prepare for execution.
[main] INFO com.google.cloud.dataflow.sdk.util.PackageUtil - Uploading PipelineOptions.filesToStage complete: 2 files newly uploaded, 209 files cached
Dataflow SDK version: 1.8.0
[main] INFO com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner - To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/jupyter-scala/dataflow/job/2016-12-03_07_32_46-9281714036710530386
Submitted job: 2016-12-03_07_32_46-9281714036710530386
[main] INFO com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner - To cancel the job using the 'gcloud' tool, run:
> gcloud alpha dataflow jobs --project=jupyter-scala cancel 2016-12-03_07_32_46-9281714036710530386
result: ScioResult = com.spotify.scio.ScioResult@5f638200
@ {
result.state
result.accumulatorTotalValue(max)
result.accumulatorTotalValue(min)
result.accumulatorTotalValue(sumNonEmpty)
result.accumulatorTotalValue(sumEmpty)
}
res4_0: com.google.cloud.dataflow.sdk.PipelineResult.State = DONE
res4_1: Int = 69
res4_2: Int = 0
res4_3: Long = 3862L
res4_4: Long = 1663L
@
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment