Skip to content

Instantly share code, notes, and snippets.

@NicolasRouquette
Created July 9, 2017 00:24
Show Gist options
  • Save NicolasRouquette/656ed7a2d6984ce0995fd78a3aec2566 to your computer and use it in GitHub Desktop.
Save NicolasRouquette/656ed7a2d6984ce0995fd78a3aec2566 to your computer and use it in GitHub Desktop.
Sequential crawler in http4s 0.17.0-M3 & circe 0.7.0
/*
* Copyright 2016 California Institute of Technology ("Caltech").
* U.S. Government sponsorship acknowledged.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* License Terms
*/
package gov.nasa.jpl.imce.oml.adapter.twc
import java.lang.System
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import fs2.{Chunk, Stream, Task}
import io.circe.Json
import org.http4s.{Headers, Method, Request, Uri}
import org.http4s.client.Client
import scala.collection.immutable.{Map, Set}
import scala.concurrent.duration.FiniteDuration
import scala.{Either, Int, Left, Right, StringContext}
object SequentialFetcher {
/**
* Fetch a starting set of root IDs and for a batch of IDs, fetch the corresponding elements
* and continue with the IDs of the element dependencies until all referenced IDs have been fetched.
*
* @param fetchRootIDsURI The URI to GET the starting set of root IDs
* @param fetchIDBatchURI The URI to POST a query to fetch elements for a given batch of IDs
* @param batchSize The maximum size of a batch of IDs or 0 for fetching all at once
* @param h Headers (content, authorization, ...)
* @param httpClient
* @param json2rootIDs Scan the Json result of the fetchRootIDsURI query for root IDs
* @param json2elementMapDependencies Scan the Json result of the fetchIDBatchURI query for elements and dependencies
* @param S fs2 strategy
* @tparam ID type
* @tparam Element type
* @return Error or the map of all elements fetched
*/
def fetchRecursively[ID, Element](
fetchRootIDsURI: Uri,
fetchIDBatchURI: Uri,
batchSize: Int,
h: Headers,
httpClient: Client,
json2rootIDs: (Json) => Set[ID],
json2elementMapDependencies: Json => (Map[ID, Element], Set[ID]))(
implicit S: fs2.Strategy)
: Either[java.lang.Throwable, Map[ID, Element]] = {
var other: FiniteDuration = FiniteDuration(0, TimeUnit.MILLISECONDS)
var query: FiniteDuration = FiniteDuration(0, TimeUnit.MILLISECONDS)
type ElementMap = Map[ID, Element]
type FetcherEither = Either[java.lang.Throwable, (ElementMap, Set[ID])]
import org.http4s.circe.jsonDecoder
def fetcher(acc: Task[FetcherEither]): Task[FetcherEither] = acc.flatMap {
case result @ Right((prev, queue)) =>
if (queue.isEmpty)
Task.now(result)
else {
System.out.println(
s"... ${prev.size} fetched,"+
s"queue: ${queue.size} IDs "+
s"{query=${prettyDuration(query)}; other=${prettyDuration(other)}}")
val (ids, rest) =
if (batchSize > 0)
queue.filterNot(prev.contains).splitAt(batchSize)
else
(queue.filterNot(prev.contains), Set.empty[ID])
val t0 = System.currentTimeMillis()
httpClient
.expect[Json](
Request(Method.POST,
fetchIDBatchURI,
headers = h,
body =
Stream.chunk(Chunk.bytes(ids.mkString(",").getBytes(StandardCharsets.UTF_8)))))
.flatMap { json =>
val t1 = System.currentTimeMillis()
query = query + FiniteDuration(t1 - t0, TimeUnit.MILLISECONDS)
System.out.println(": " + prettyDurationFromTo(t0, t1))
val (inc, more) = json2elementMapDependencies(json)
val next = rest ++ more
val t2 = System.currentTimeMillis()
other = other + FiniteDuration(t2 - t1, TimeUnit.MILLISECONDS)
fetcher(Task.now(Right((prev ++ inc, next))))
}
}
case Left(t) =>
Task.now(Left(t))
}
val task = httpClient
.expect[Json](Request(Method.GET, fetchRootIDsURI, headers = h))
.map(json2rootIDs)
.flatMap { rootIDs =>
fetcher(Task.now(Right((Map.empty, rootIDs))))
}
task.unsafeAttemptRun() match {
case Right(Right((map, queue))) =>
if (queue.isEmpty)
Right(map)
else
Left(
new java.lang.IllegalArgumentException(
s"Fetched ${map.size} elements but there are ${queue.size} left IDs to fetch!"
))
case Right(Left(t)) =>
Left(t)
case Left(t) =>
Left(t)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment