Created
May 23, 2019 17:31
-
-
Save Daenyth/4ecf2cbd7fc885182148f60484df2fe1 to your computer and use it in GitHub Desktop.
AlpakkaS3Put for fs2
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.http.scaladsl.model.{ContentType, ContentTypes} | |
import akka.stream.Materializer | |
import akka.stream.alpakka.s3.scaladsl.{S3 => AlpakkaS3} | |
import akka.stream.alpakka.s3.{MetaHeaders, MultipartUploadResult} | |
import akka.util.ByteString | |
import cats.effect.{ConcurrentEffect, ContextShift} | |
import fs2.{Pipe, Stream} | |
import org.http4s.Uri | |
import org.joda.time.DateTime | |
import Compat._ | |
import scala.concurrent.{ExecutionContext, Future} | |
object AlpakkaS3Put { | |
def fromConfig[F[_]: ConcurrentEffect: ContextShift]( | |
config: S3Config | |
)( | |
implicit m: Materializer, | |
ec: ExecutionContext | |
): AlpakkaS3Put[F] = | |
new AlpakkaS3Put[F](config) | |
} | |
class AlpakkaS3Put[F[_]: ContextShift]( | |
config: S3Config | |
)( | |
implicit m: Materializer, | |
ec: ExecutionContext, | |
F: ConcurrentEffect[F] | |
) extends S3Put[F] { | |
override type PutResult = MultipartUploadResult | |
def upload( | |
key: S3Key, | |
contentType: ContentType = ContentTypes.`text/plain(UTF-8)`, | |
contentEncoding: ContentEncoding = ContentEncoding.identity, | |
headers: MetaHeaders = MetaHeaders(Map.empty) | |
): Pipe[F, Byte, MultipartUploadResult] = { in: Stream[F, Byte] => | |
in.through(byteToByteString) | |
.through(akkaSink(key, contentType, contentEncoding, headers).toPipeMat[F]) | |
} | |
private def akkaSink(key: S3Key, | |
contentType: ContentType, | |
contentEncoding: ContentEncoding, | |
headers: MetaHeaders) | |
: AkkaSink[ByteString, Future[MultipartUploadResult]] = { | |
val akkaSink: AkkaSink[ByteString, Future[MultipartUploadResult]] = | |
AlpakkaS3.multipartUpload( | |
config.bucket, | |
key = key.value, | |
contentType = contentType, | |
metaHeaders = MetaHeaders(Map( | |
"Content-Encoding" -> contentEncoding.value) ++ headers.metaHeaders) | |
) | |
akkaSink | |
} | |
def generateSignedUri(key: S3Key, validAsOf: DateTime): F[Uri] = | |
config.generatePresignedUrl[F](key, validAsOf) | |
} | |
object Compat { | |
type AkkaSink[-T, +Mat] = akka.stream.scaladsl.Sink[T, Mat] | |
val AkkaSink = akka.stream.scaladsl.Sink | |
/** | |
* A relatively efficient means of converting from a stream of bytes to | |
* one of akka [[ByteString]]s. | |
* | |
* This approach relies on mapping full chunks at a time to a | |
* single ByteString via a backing array | |
*/ | |
def byteToByteString[F[_]]: Pipe[F, Byte, ByteString] = | |
_.mapChunks(c => Chunk.singleton(ByteString.fromArray(c.toBytes.toArray))) | |
implicit class RichAkkaSink[A, B](val sink: AkkaSink[A, Future[B]]) extends AnyVal { | |
/** Converts an akka sink with a success-status-indicating Future[B] | |
* materialized result into an fs2 stream which will fail if the Future fails. | |
* The stream returned by this will emit the Future's value one time at the end, | |
* then terminate. | |
*/ | |
def toPipeMat[F[_]: ConcurrentEffect: ContextShift]( | |
implicit ec: ExecutionContext, | |
m: Materializer | |
): Pipe[F, A, B] = | |
Compat.toPipeMat[F, A, B](sink) | |
/** Converts an akka sink with a success-status-indicating Future[B] | |
* materialized result into an fs2 stream which will fail if the Future fails. | |
* The stream returned by this will emit the Future's value one time at the end, | |
* then terminate. | |
*/ | |
def toPipeMat[F[_]: ConcurrentEffect: ContextShift, A, B]( | |
akkaSink: AkkaSink[A, Future[B]] | |
)( | |
implicit ec: ExecutionContext, | |
m: Materializer | |
): Pipe[F, A, B] = { | |
val mkPromise = Deferred[F, Either[Throwable, B]] | |
// `Pipe` is just a function of Stream[F, A] => Stream[F, B], so we take a stream as input. | |
in => | |
Stream.eval(mkPromise).flatMap { p => | |
// Akka streams produce a materialized value as a side effect of being run. | |
// streamz-converters allows us to have a `Future[Done] => Unit` callback when that materialized value is created. | |
// This callback tells the akka materialized future to store its result status into the Promise | |
val captureMaterializedResult: Future[B] => Unit = _.onComplete { | |
case Failure(ex) => p.complete(Left(ex)).toIO.unsafeRunSync | |
case Success(value) => p.complete(Right(value)).toIO.unsafeRunSync | |
} | |
// toSink is from streamz-converters; convert an akka sink to fs2 sink with a callback for the materialized values | |
val fs2Sink: Pipe[F, A, Unit] = | |
akkaSink.toSink(captureMaterializedResult) | |
val fs2Stream: Stream[F, Unit] = fs2Sink.apply(in) | |
val materializedResultStream: Stream[F, B] = Stream.eval { | |
p.get // Async wait on the promise to be completed; => F[Either[Throwable, B]] | |
.rethrow // F[Either[Throwable, B]] => F[B] | |
} | |
// Run the akka sink for its effects and then run stream containing the effect of getting the Promise results | |
fs2Stream.drain ++ materializedResultStream | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.http.scaladsl.model.{ContentType, ContentTypes} | |
import akka.stream.alpakka.s3.MetaHeaders | |
import fs2.{Pipe, Stream} | |
case class S3Key(value: String) extends AnyVal | |
case class ContentEncoding(value: String) extends AnyVal | |
/** Content encodings | |
* | |
* @see [[https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#Directives]] | |
*/ | |
object ContentEncoding { | |
val gzip = ContentEncoding("gzip") | |
/** No encoding */ | |
val identity = ContentEncoding("identity") | |
} | |
/** Encapsulates "upload byte to s3" as an interface */ | |
trait S3Put[F[_]] { | |
/** Implementation-defined result object from uploading to s3 */ | |
type PutResult | |
/** Upload input bytes to S3 at `key`, filling in the S3 object metadata | |
* from `contentType`, `contentEncoding`, and `headers` */ | |
def upload( | |
key: S3Key, | |
contentType: ContentType = ContentTypes.`text/plain(UTF-8)`, | |
contentEncoding: ContentEncoding = ContentEncoding.identity, | |
headers: MetaHeaders = MetaHeaders(Map.empty) | |
): Pipe[F, Byte, PutResult] | |
/** Generate a publicly-accessible Uri which will expire some time after `validAsOf`. | |
* The exact expiry time is unspecified in this interface; usually it will be driven | |
* by configuration in the specific instance */ | |
def generateSignedUri(key: S3Key, validAsOf: DateTime): F[Uri] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment