Last active
July 12, 2018 19:56
-
-
Save codefromthecrypt/76d94054b77e3be338bd75424ca8ba30 to your computer and use it in GitHub Desktop.
Kafka one-way with Brave
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* This is an example of a one-way or "messaging span", which is possible by use of the {@link | |
* Span#flush()} operator. | |
* | |
* <p>Note that this uses a span as a kafka key, not because it is recommended, rather as it is | |
* convenient for demonstration, since kafka doesn't have message properties. | |
* | |
* <p>See https://github.com/openzipkin/zipkin/issues/1243 | |
*/ | |
public class KafkaExampleIT { | |
@Rule public KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create()); | |
InMemoryStorage storage = new InMemoryStorage(); | |
Tracing tracing = Tracing.newBuilder() | |
.localEndpoint(Endpoint.builder().serviceName("producer").build()) | |
.reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) | |
.build(); | |
KafkaProducer<Span, String> producer; | |
KafkaConsumer<Span, String> consumer; | |
Endpoint kafkaEndpoint; | |
@Before public void setup() { | |
producer = kafka.helper() | |
.createProducer(new SpanSerializer(tracing), new StringSerializer(), null); | |
consumer = kafka.helper() | |
.createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null); | |
kafkaEndpoint = | |
Endpoint.builder().serviceName("kafka").port(kafka.helper().kafkaPort()).build(); | |
} | |
@Test | |
public void startWithOneTracerAndStopWithAnother() throws Exception { | |
String topic = "startWithOneTracerAndStopWithAnother"; | |
consumer.subscribe(Collections.singletonList(topic)); | |
Span span = tracing.tracer().newTrace().kind(Span.Kind.CLIENT).remoteEndpoint(kafkaEndpoint); | |
producer.send(new ProducerRecord<>(topic, span, "foo")).get(); | |
span.flush(); | |
producer.close(); | |
consumer.poll(500L).forEach(record -> { | |
record.key() | |
.name(record.value()) | |
.kind(Span.Kind.SERVER) | |
.remoteEndpoint(kafkaEndpoint) | |
.flush(); | |
}); | |
consumer.close(); | |
} | |
} | |
/** This class simulates a consumer being on a different process, by not sharing a tracer */ | |
final class SpanDeserializer implements Deserializer<Span> { | |
final Tracing tracing; | |
final TraceContext.Extractor<Properties> extractor; | |
SpanDeserializer(InMemoryStorage storage) { | |
tracing = Tracing.newBuilder() | |
.localEndpoint(Endpoint.builder().serviceName("consumer").build()) | |
.reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s))) | |
.build(); | |
extractor = tracing.propagation().extractor(Properties::getProperty); | |
} | |
@Override public void configure(Map<String, ?> map, boolean b) { | |
} | |
/** Extract the span from the key or start a new trace if any problem. */ | |
@Override public Span deserialize(String s, byte[] bytes) { | |
try { | |
Properties properties = new Properties(); | |
properties.load(new ByteArrayInputStream(bytes)); | |
// in Brave 4.3 this will be simplified to tracing.tracer().nextSpan(extractor, properties) | |
TraceContextOrSamplingFlags result = extractor.extract(properties); | |
return result.context() != null | |
? tracer.joinSpan(result.context()) | |
: tracer.newTrace(result.samplingFlags()); | |
} catch (RuntimeException | IOException e) { | |
return tracer.newTrace(); // return a new trace upon failure of any kind | |
} | |
} | |
@Override public void close() { | |
} | |
} | |
final class SpanSerializer implements Serializer<Span> { | |
final TraceContext.Injector<Properties> injector; | |
SpanSerializer(Tracing tracing) { | |
injector = tracing.propagation().injector(Properties::setProperty); | |
} | |
@Override public void configure(Map<String, ?> map, boolean b) { | |
} | |
@Override public byte[] serialize(String s, Span span) { | |
Properties properties = new Properties(); | |
injector.inject(span.context(), properties); | |
ByteArrayOutputStream out = new ByteArrayOutputStream(); | |
try { | |
properties.store(out, "zipkin"); | |
} catch (IOException e) { | |
throw new AssertionError(e); | |
} | |
return out.toByteArray(); | |
} | |
@Override public void close() { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks a lot!