Skip to content

Instantly share code, notes, and snippets.

@jbrisbin
Last active September 29, 2015 20:01
Show Gist options
  • Save jbrisbin/ac7dffcd95d06d8f6f8f to your computer and use it in GitHub Desktop.
Save jbrisbin/ac7dffcd95d06d8f6f8f to your computer and use it in GitHub Desktop.
import com.basho.riak.client.core.RiakCluster
import com.basho.riak.client.core.operations.StoreOperation
import com.basho.riak.client.core.query.Location
import com.basho.riak.client.core.query.Namespace
import com.basho.riak.client.core.query.RiakObject
import com.basho.riak.client.core.util.BinaryValue
import org.slf4j.LoggerFactory
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.cloud.stream.annotation.EnableBinding
import org.springframework.cloud.stream.messaging.Sink
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.messaging.MessageHandler
import org.springframework.messaging.MessageHeaders
import reactor.core.support.UUIDUtils
@Configuration
@EnableAutoConfiguration
@ComponentScan
@EnableBinding(Sink.class)
class TrackerNetRiakSink {
static def LOG = LoggerFactory.getLogger(TrackerNetRiakSink)
@Bean
def fromKafka() {
new Namespace("fromKafka")
}
@Bean
def riakSink(RiakCluster cluster, Sink from) {
IntegrationFlows.from(from.input()).
handle({ msg ->
LOG.info("Received message: $msg")
def loc = new Location(fromKafka(), "${UUIDUtils.create()}")
def obj = new RiakObject()
obj.contentType = msg.headers[MessageHeaders.CONTENT_TYPE]
obj.value = BinaryValue.create((String) msg.payload)
LOG.info("Storing $obj at $loc")
def op = new StoreOperation.Builder(loc).withContent(obj).build()
cluster.execute(op)
} as MessageHandler).
get()
}
public static void main(String[] args) {
SpringApplication.run(TrackerNetRiakSink)
while (true) {
Thread.sleep(5000)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment