Created
September 7, 2016 15:58
-
-
Save zshamrock/16215ecdccfac107212fabcc4af88bc2 to your computer and use it in GitHub Desktop.
stan: invalid start time exception
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>experiments</groupId> | |
<artifactId>nats.streaming.intro</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<!-- Language properties --> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> | |
<java.version>1.8</java.version> | |
<!-- https://maven.apache.org/plugins/maven-compiler-plugin/compile-mojo.html#target --> | |
<maven.compiler.target>${java.version}</maven.compiler.target> | |
<apache.commons.lang3.version>3.4</apache.commons.lang3.version> | |
<nats.io.streaming.client.version>0.1.2</nats.io.streaming.client.version> | |
<slf4j.version>1.7.21</slf4j.version> | |
</properties> | |
<dependencies> | |
<!-- NATS streaming --> | |
<dependency> | |
<groupId>io.nats</groupId> | |
<artifactId>java-nats-streaming</artifactId> | |
<version>${nats.io.streaming.client.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.commons</groupId> | |
<artifactId>commons-lang3</artifactId> | |
<version>${apache.commons.lang3.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-jdk14</artifactId> | |
<version>${slf4j.version}</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<configuration> | |
<source>${java.version}</source> | |
<target>${java.version}</target> | |
<testSource>${java.version}</testSource> | |
<testTarget>${java.version}</testTarget> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.charset.StandardCharsets; | |
import java.util.concurrent.TimeUnit; | |
import io.nats.stan.Connection; | |
import org.apache.commons.lang3.RandomStringUtils; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class Publisher extends PubSub { | |
private static final Logger logger = LoggerFactory.getLogger(Publisher.class); | |
public static void main(String[] args) throws Exception { | |
new Publisher().start(); | |
} | |
private void start() throws Exception { | |
final Connection connection = createConnection("inbound"); | |
// final String subject = "session." + UUID.randomUUID().toString(); | |
final String subject = "session.ceba04ba-aeff-414c-8912-1eb337db0ddc"; | |
logger.info("Publishing messages to {} at {} ms", subject, System.currentTimeMillis()); | |
while (true) { | |
final String message = String.format("%s@%d", | |
RandomStringUtils.randomAlphanumeric(10), System.currentTimeMillis()); | |
logger.info("Publishing {} message", message); | |
connection.publish(subject, message.getBytes(StandardCharsets.UTF_8)); | |
TimeUnit.SECONDS.sleep(1); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.concurrent.TimeoutException; | |
import io.nats.stan.Connection; | |
import io.nats.stan.ConnectionFactory; | |
class PubSub { | |
protected Connection createConnection(final String clientId) throws IOException, TimeoutException { | |
final io.nats.client.Connection natsConnection = new io.nats.client.ConnectionFactory("nats://127.0.0.1:4333") | |
.createConnection(); | |
final ConnectionFactory cf = new ConnectionFactory("events", clientId); | |
cf.setNatsConnection(natsConnection); | |
return cf.createConnection(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.time.Instant; | |
import io.nats.stan.Connection; | |
import io.nats.stan.Message; | |
import io.nats.stan.MessageHandler; | |
import io.nats.stan.Subscription; | |
import io.nats.stan.SubscriptionOptions; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class Subscriber extends PubSub { | |
private static final Logger logger = LoggerFactory.getLogger(Subscriber.class); | |
public static void main(String[] args) throws Exception { | |
new Subscriber().start(args[0], Long.parseLong(args[1])); | |
} | |
private void start(final String subject, final long sinceMillis) throws Exception { | |
final Connection connection = createConnection("emulator"); | |
logger.info("Subscribing messages from {} since {} ms", subject, sinceMillis); | |
final Instant startTime = Instant.ofEpochMilli(sinceMillis); | |
final EventsMessageHandler handler = new EventsMessageHandler(); | |
final Subscription subscription = connection.subscribe( | |
subject, handler, new SubscriptionOptions.Builder().startAtTime(startTime).build()); | |
} | |
private static class EventsMessageHandler implements MessageHandler { | |
@Override | |
public void onMessage(final Message msg) { | |
logger.info(msg.toString()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment