Skip to content

Instantly share code, notes, and snippets.

View mgodave's full-sized avatar
🤟
move along, nothing to see here...

Dave Rusek mgodave

🤟
move along, nothing to see here...
  • ex-Twitter
  • Denver, CO
  • 09:49 (UTC -07:00)
View GitHub Profile
313a314,332
> }
>
> /**
> * @param ctx context associated with this service and request.
> * @param request the request from the client.
> * @return a {@link Single} which sends the response to the client when it terminates.
> */
> default Single<TestResponse> test(GrpcServiceContext ctx, TestRequest request) {
> throw new UnsupportedOperationException("Method test is unimplemented");
5d4
< import io.servicetalk.concurrent.api.AsyncCloseable;
14d12
< import io.servicetalk.encoding.api.Identity;
26d23
< import io.servicetalk.grpc.api.GrpcPayloadWriter;
28d24
< import io.servicetalk.grpc.api.GrpcSerializationProvider;
34d29
< import io.servicetalk.grpc.protobuf.ProtoBufSerializationProviderBuilder;
4d3
< import io.servicetalk.concurrent.api.AsyncCloseable;
12d10
< import io.servicetalk.encoding.api.Identity;
24d21
< import io.servicetalk.grpc.api.GrpcSerializationProvider;
30d26
< import io.servicetalk.grpc.protobuf.ProtoBufSerializationProviderBuilder;
32d27
< import java.lang.Deprecated;
12d11
< import io.servicetalk.encoding.api.Identity;
15d13
< import io.servicetalk.grpc.api.DefaultGrpcClientMetadata;
20d17
< import io.servicetalk.grpc.api.GrpcClientMetadata;
24d20
< import io.servicetalk.grpc.api.GrpcSerializationProvider;
30d25
< import io.servicetalk.grpc.protobuf.ProtoBufSerializationProviderBuilder;
/**
* Copyright [2020] David J. Rusek <[email protected]>
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
  • Status: Proposal
  • Author: Dave Rusek - Streamlio
  • Pull Request: See Below
  • Mailing List discussion:

Motivation

Data flowing through a messaging system is typically untyped. Data flows from end-to-end as bytes and only the producers and consumers are aware of the type

Keybase proof

I hereby claim:

  • I am mgodave on github.
  • I am daverusek (https://keybase.io/daverusek) on keybase.
  • I have a public key whose fingerprint is 0632 699D 471F 7255 B581 2B71 2DE6 CBB6 229F 7DBC

To claim this, I am signing this object:

import com.twitter.concurrent.AsyncStream
import com.twitter.concurrent.AsyncStream._
import com.twitter.util.{Await, Future}
object MapConcurrentLeak extends App{
def stream(i: Int, stop: Int): AsyncStream[Int] = {
if (i >= stop) fromFuture(Future.never)
else i +:: stream(i + 1, stop)
}
import com.twitter.concurrent.AsyncStream
import com.twitter.util._
object Merge {
def merge[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
def inner(next: Seq[Future[Option[(E, () => AsyncStream[E])]]]): AsyncStream[E] = {
AsyncStream.fromFuture(Future.select(next)) flatMap {
case (Return(Some((head, tail))), tails) =>
object AsyncStreams {
def collect[E](streams: Seq[AsyncStream[E]]): AsyncStream[E] = {
AsyncStream.fromFuture(
Future.collect(
streams.map(_.head)
).map(_.flatten)
).flatMap(AsyncStream.fromSeq) ++ collect(streams)
}
}