Skip to content

Instantly share code, notes, and snippets.

@ocadaruma
Last active March 7, 2023 14:41
Show Gist options
  • Save ocadaruma/4503de325490c2591c0003bcb25540fc to your computer and use it in GitHub Desktop.
Save ocadaruma/4503de325490c2591c0003bcb25540fc to your computer and use it in GitHub Desktop.
Decaton PerKeyQuota

Per topic quota enforcement flow

image

Threading model

image

Experiment

Scenario:

  1. Set processing rate quota to 500
  2. Start producing 100 keys with 100 tasks/sec each.
  3. After 3min elapsed, start producing 5 keys with 1000 tasks/sec each.

Expected result:

  • 5 keys are detected as "burst" and queued to shaping topics
  • Burst tasks are processed in 500 tasks/sec per partition

Result:

  • After 3min elapsed, there are shaping tasks at 5000/sec. Working as expected.
    • image
  • During bursting, shaping topics are processed with 500 tasks/sec per partition. Also working as expected.
    • image
  • During bursting, processed_total (which observes task count that pushed down to user-supplied processor) metric showed that processing rate was 11000/sec (= 10000/sec for innocent keys, 1000/sec for bursting keys from 2 partitions), which is expected.
    • image

Performance experiment

Result

$ benchmark/run-pkq-bm.sh pkq-unlimited com.linecorp.decaton.benchmark.DecatonPKQUnlimitedRunner /path/to/output
$ benchmark/run-pkq-bm.sh pkq com.linecorp.decaton.benchmark.DecatonPKQRunner /path/to/output
runner throughput
DecatonPKQUnlimitedRunner 144673.6789784853
DecatonPKQRunner 120564.21000588825

There's certain overhead but might be acceptable level

package com.linecorp.decaton.processor;
import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PER_KEY_QUOTA_PROCESSING_RATE;
import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PROCESSING_RATE;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import com.google.common.util.concurrent.RateLimiter;
import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.processor.metrics.Metrics;
import com.linecorp.decaton.processor.runtime.PerKeyQuotaConfig;
import com.linecorp.decaton.processor.runtime.PerKeyQuotaConfig.QuotaCallback.Action;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.processor.runtime.StaticPropertySupplier;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;
import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer;
import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer;
import com.linecorp.decaton.protocol.Sample.HelloTask;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;
import io.micrometer.core.instrument.Counter;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.prometheus.client.exporter.HTTPServer;
public class RealPerKeyQuotaTest {
@ClassRule
public static KafkaClusterRule rule = new KafkaClusterRule();
private String topic;
private String shapingTopic;
@Before
public void setUp() {
topic = rule.admin().createRandomTopic(3, 3);
shapingTopic = topic + "-shaping";
rule.admin().createTopic(shapingTopic, 3, 3);
}
@After
public void tearDown() {
rule.admin().deleteTopics(true, topic, shapingTopic);
}
@Test
public void run() throws Exception {
CountDownLatch terminationLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(terminationLatch::countDown));
PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
Metrics.register(prometheusRegistry);
new HTTPServer(new InetSocketAddress(8080), prometheusRegistry.getPrometheusRegistry(), true);
Counter totalProcessedCount = Counter.builder("processed")
.description("total processed tasks")
.register(prometheusRegistry);
Properties consumerConfig = new Properties();
consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-decaton-processor");
consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, rule.bootstrapServers());
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");
ProcessorSubscription subscription =
SubscriptionBuilder.newBuilder("pkq-processor")
.properties(StaticPropertySupplier.of(
Property.ofStatic(CONFIG_PER_KEY_QUOTA_PROCESSING_RATE, 500L)))
.enablePerKeyQuota(PerKeyQuotaConfig.shape())
.processorsBuilder(
ProcessorsBuilder.consuming(topic,
new ProtocolBuffersDeserializer<>(HelloTask.parser()))
.thenProcess((context, task) -> {
totalProcessedCount.increment();
})
)
.consumerConfig(consumerConfig)
.buildAndStart();
Properties producerProps = new Properties();
producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "my-decaton-client");
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, rule.bootstrapServers());
producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "100");
DecatonClient<HelloTask> client = DecatonClient
.producing(topic, new ProtocolBuffersSerializer<HelloTask>())
.applicationId("pkq-test")
.instanceId("localhost")
.producerConfig(producerProps)
.build();
int innocentKeys = 100;
ExecutorService service = Executors.newFixedThreadPool(innocentKeys);
for (int i = 0; i < innocentKeys; i++) {
String key = "key" + i;
RateLimiter limiter = RateLimiter.create(100L);
service.execute(() -> {
while (terminationLatch.getCount() > 0) {
limiter.acquire();
client.put(key, HelloTask.getDefaultInstance());
}
});
}
terminationLatch.await(180, TimeUnit.SECONDS);
int burstingKeys = 5;
ExecutorService burstingService = Executors.newFixedThreadPool(burstingKeys);
for (int i = 0; i < burstingKeys; i++) {
String key = "bursting-key" + i;
RateLimiter limiter = RateLimiter.create(1000L);
burstingService.execute(() -> {
while (terminationLatch.getCount() > 0) {
limiter.acquire();
client.put(key, HelloTask.getDefaultInstance());
}
});
}
terminationLatch.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment