Skip to content

Instantly share code, notes, and snippets.

View jbrisbin's full-sized avatar

Jon Brisbin jbrisbin

View GitHub Profile
package reactor.groovy.ext
import groovy.transform.CompileStatic
import groovy.util.slurpersupport.GPathResult
import org.reactivestreams.Publisher
import reactor.io.buffer.Buffer
import reactor.rx.Stream
import reactor.rx.Streams
@CompileStatic
import com.basho.riak.client.core.RiakCluster
import com.basho.riak.client.core.operations.StoreOperation
import com.basho.riak.client.core.query.Location
import com.basho.riak.client.core.query.Namespace
import com.basho.riak.client.core.query.RiakObject
import com.basho.riak.client.core.util.BinaryValue
import org.slf4j.LoggerFactory
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.cloud.stream.annotation.EnableBinding
@SpringBootApplication
@EnableBinding(Source.class)
class TrackerNetKafkaSource {
static {
Environment.initializeIfEmpty().assignErrorJournal()
}
static def TFL_URL = "http://cloud.tfl.gov.uk/TrackerNet/PredictionSummary/C"
public class DomainTypePipelines {
@Test
public void domainTypePipelines() throws Exception {
RingBufferWorkProcessor<Object> p = RingBufferWorkProcessor.create();
Stream<Object> s = Streams.wrap(p);
s.filter(o -> o instanceof Person)
.map(Person.class::cast)
.consume(person -> System.out.println(Thread.currentThread() + " person: " + person));
@jbrisbin
jbrisbin / build_error.txt
Created May 12, 2015 19:24
ScalaDoc build error in Spring XD
+-( spring-xd (master) ):> ./gradlew clean --stacktrace
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support was removed in 8.0
FAILURE: Build failed with an exception.
* Where:
Build file '/Users/jbrisbin/Development/Projects/spring-projects/master/spring-xd/build.gradle' line: 850
* What went wrong:
A problem occurred evaluating root project 'spring-xd'.
@jbrisbin
jbrisbin / DeltaAwareExpiringSession.java
Created May 5, 2015 17:35
DeltaAwareExpiringSession
public class DeltaAwareExpiringSession implements ExpiringSession {
private final Queue<Map.Entry<String, Object>> deltas = new LinkedList<Map.Entry<String, Object>>();
@Id
private String id = UUID.randomUUID().toString();
private long creationTime = System.currentTimeMillis();
private long lastAccessedTime = creationTime;
private int maxInactiveInterval = DEFAULT_MAX_INACTIVE_INTERVAL_SECONDS;
@jbrisbin
jbrisbin / PromiseBufferSketch.java
Last active August 29, 2015 14:18
Using Buffer and Promise rather than raw Publisher contracts
interface Buffer<B> extends Publisher<B> {
<NEWB> Buffer<NEWB> intercept(Interceptor<B, NEWB> fn);
Buffer<B> append(Buffer<B> buff);
Buffer<B> append(B obj);
}
@jbrisbin
jbrisbin / ExampleReactorCode.java
Last active August 29, 2015 14:17
Example RIPC server wiring using Reactor
ReactorTcpServer.listen(3000, ByteBuf.class)
.log("connection")
.consume(conn -> conn.out(conn.in().log("in"))
.log("confirmation")
.consume(buf -> LOG.info("write confirmed: {}", buf)));
@jbrisbin
jbrisbin / TcpConnection.java
Last active August 29, 2015 14:17
New RIPC abstractions
public interface TcpConnection<R, W> {
Reader<R> reader();
Writer<W> writer();
public interface Reader<R> extends Publisher<R> {
<NEWR> Reader<NEWR> intercept(Interceptor<R, NEWR> interceptor);
}
@jbrisbin
jbrisbin / ReactiveStreamsIO.java
Last active August 29, 2015 14:16
Sketch of Reactive Streams interfaces for Client/Server interaction
interface Channel<IN, OUT> extends Publisher<IN> {
Publisher<Boolean> write(Publisher<? extends OUT> data);
}
interface ClientSocketOptions {
SocketAddress connectAddress();