producer.newMessage()
.deliverAfter(3L, TimeUnit.Minute)
.value("Hello Pulsar after 3 minutes!")
.send();producer.newMessage()
.deliverAt(new Date(2019, 06, 27, 23, 00, 00)
.getTime())
.value("Hello Pulsar at 11pm on 06/27/2019!")
.send();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.replicateSubscriptionState(true)
.subscribe();import (
"fmt"
"context"
"github.com/apache/pulsar/pulsar-function-go/pf"
)
func HandleRequest(ctx context.Context, in []byte) error {
fmt.Println(string(in) + "!")
return nil| Compatibility Check Strategy | Changes allowed | Check against which schemas | Upgrade first |
|---|---|---|---|
ALWAYS_INCOMPATIBLE |
All changes are disabled | All previous versions | None |
ALWAYS_COMPATIBLE |
All changes are allowed | Compatibility checking disabled | Depends |
BACKWARD |
Delete fields; Add optional fields | Latest version | Consumers |
BACKWARD_TRANSITIVE |
Delete fields; Add optional fields | All previous versions | Consumers |
FORWARD |
Add fields; Delete optional fields | Latest version | Producers |
FORWARD_TRANSITIVE |
Add fields; Delete optional fields | All previous versions | Producers |
FULL |
Modify optional fields | Latest version | Any order |
FULL_TRANSITIVE |
Modify optional fields | All previous versions | Any order |
RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
recordSchemaBuilder
.field("intField")
.type(SchemaType.INT32);
SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
Schema<GenericRecord> schema = Schema.generic(schemaInfo);Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
producer.newMessage()
.value(schema.newRecordBuilder()
.set("intField", 32)
.build())
.send();Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
…
.subscribe();
Message<GenericRecord> msg = consumer.receive() ;
GenericRecord record = msg.getValue();Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);