Skip to content

Instantly share code, notes, and snippets.

@jbrisbin
Last active October 31, 2015 15:48
Show Gist options
  • Save jbrisbin/b7e63de13fec022df085 to your computer and use it in GitHub Desktop.
Save jbrisbin/b7e63de13fec022df085 to your computer and use it in GitHub Desktop.
@SpringBootApplication
@EnableBinding(Source.class)
class TrackerNetKafkaSource {
static {
Environment.initializeIfEmpty().assignErrorJournal()
}
static def TFL_URL = "http://cloud.tfl.gov.uk/TrackerNet/PredictionSummary/C"
@Bean
def stationStatusMessages() {
new DirectChannel()
}
@Bean
def trackerNetPredictionSummary(MessageChannel stationStatusMessages) {
Streams.period(0, 30, TimeUnit.SECONDS).
flatMap {
NetStreams.httpClient().get(TFL_URL).
flatMap(new BufferAccumulator()).
flatMap { b -> XmlStreams.from(b) { xml -> xml.S } }
}.
consume { station ->
def statusMsg = [
name : [email protected](),
platforms: station.P.collect { platform ->
[name : [email protected](),
trains: platform.T.collect { train ->
[timeToStation: [email protected](),
destination : [email protected](),
location : [email protected]()]
}]
}
]
stationStatusMessages.send(MessageBuilder.withPayload(statusMsg).build())
}
}
@Bean
def kafkaSink(MessageChannel stationStatusMessages, Source out) {
IntegrationFlows.from(stationStatusMessages)
.transform(new ObjectToJsonTransformer())
.channel(out.output())
.get()
}
public static void main(String[] args) {
SpringApplication.run(TrackerNetKafkaSource)
while (true) {
Thread.sleep(5000)
}
}
}
@SpringBootApplication
@EnableBinding(Sink.class)
class TrackerNetRiakSink {
static def LOG = LoggerFactory.getLogger(TrackerNetRiakSink)
static def FROM_KAFKA = new Namespace("fromKafka")
@Bean
def riakSink(RiakCluster cluster, Sink from) {
from.input().subscribe { msg ->
LOG.info("Received message: $msg")
def loc = new Location(FROM_KAFKA, "${UUIDUtils.create()}")
def obj = new RiakObject()
obj.contentType = msg.headers[MessageHeaders.CONTENT_TYPE]
obj.value = BinaryValue.create((String) msg.payload)
LOG.info("Storing $obj at $loc")
def op = new StoreOperation.Builder(loc).withContent(obj).build()
cluster.execute(op)
} as MessageHandler
}
public static void main(String[] args) {
SpringApplication.run(TrackerNetRiakSink)
while (true) {
Thread.sleep(5000)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment