Skip to content

Instantly share code, notes, and snippets.

View tobz's full-sized avatar
💁‍♂️
it me

Toby Lawrence tobz

💁‍♂️
it me
View GitHub Profile
{
"description": "A complete Vector configuration.",
"$ref": "#/definitions/vector::config::builder::ConfigBuilder",
"definitions": {
"vector::config::builder::ConfigBuilder": {
"description": "A complete Vector configuration.",
"allOf": [
{
"type": "object",
"properties": {
@tobz
tobz / datadog.yaml
Last active June 10, 2022 15:51 — forked from prognant/trace tool
apm_config:
# This points the `trace-agent` at Vector's `datadog_agent` source.
apm_dd_url: http://localhost:8081
# Set these to zero in order to avoid the Datadog Agent sampling traces before sending
# them to Vector, which will skew computed statistics like request count, or success vs
# error span count, etc.
max_traces_per_second: 0
errors_per_second: 0
// Our entrypoint for initializing logging via `tracing`.
pub fn initialize_logging(levels: &str) {
// Here, `levels` is "vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,kube=info".
let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
"logging filter targets were not formatted correctly or did not specify a valid level",
);
// `MetricsLayer` impls `tracing_subscriber::layer::Layer` so this ends up being `Option<Filtered<MetricsLayer, ...>>`.
let metrics_layer = metrics_layer_enabled()
.then(|| MetricsLayer::new().with_filter(tracing_subscriber::filter::LevelFilter::INFO));
/// A period of time.
#[derive(Clone, Serialize, Deserialize)]
pub struct SpecialDuration(u64);
/// Controls the batching behavior of events.
#[derive(Clone)]
#[configurable_component]
pub struct BatchConfig {
/// The maximum number of events in a batch before it is flushed.
max_events: Option<u64>,
#[configurable(type = "sink")]
pub struct BasicStruct {
foo: String,
}
const _: () = {
#[automatically_derived]
impl<'configurable> ::vector_config::Configurable<'configurable> for BasicStruct {
fn shape() -> ::vector_config::Shape {
::vector_config::Shape::Boolean
}
tracing = { version = "0.1.32", default-features = false }
tracing-core = { version = "0.1.23", default-features = false }
tracing-futures = { version = "0.2.5", default-features = false, features = ["futures-03"] }
tracing-log = { version = "0.1.2", default-features = false, features = ["log-tracer", "std"] }
tracing-subscriber = { version = "0.3.9", default-features = false, features = ["ansi", "env-filter", "fmt", "json", "registry"] }
@tobz
tobz / lib.rs
Created February 18, 2022 21:49
use std::{time::Duration, marker::PhantomData};
use schemars::{JsonSchema, schema::{SchemaObject, InstanceType, SingleOrVec, NumberValidation, Schema}, gen::SchemaGenerator};
use serde::{Deserialize, Serialize, Serializer, Deserializer};
use serde_with::{SerializeAs, DeserializeAs, DurationSeconds, formats::Strict};
#[derive(Copy, Clone, Debug, Default)]
pub struct AsSchema<T: ?Sized>(PhantomData<T>);
impl<T: JsonSchema + ?Sized> AsSchema<T> {
diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs
index 44f01fc6a..9ca868bda 100644
--- a/src/sources/kafka.rs
+++ b/src/sources/kafka.rs
@@ -1,7 +1,7 @@
use std::{
collections::{BTreeMap, HashMap},
io::Cursor,
- sync::Arc,
+ sync::{Arc, atomic::{AtomicUsize, Ordering}}, time::Duration,
pub struct Record<'a> {
/// The checksum of the record.
///
/// The checksum is CRC32C(big_endian_bytes(id) + payload).
pub(super) checksum: u32,
/// The record ID.
///
/// This is monotonic across records.
id: u64,
// The record length.