Skip to content

Instantly share code, notes, and snippets.

View tobz's full-sized avatar
💁‍♂️
it me

Toby Lawrence tobz

💁‍♂️
it me
View GitHub Profile
@tobz
tobz / code.rs
Created November 19, 2021 02:15
#[derive(Archive, Serialize, Debug)]
#[archive_attr(derive(CheckBytes))]
pub struct Record<'a> {
/// The checksum of the record.
///
/// The checksum is CRC32C(big_endian_bytes(id) + payload).
checksum: u32,
/// The record ID.
///
/// This is monotonic across records.
use std::{marker::PhantomData, pin::Pin};
use bytecheck::CheckBytes;
use rkyv::{
archived_value, archived_value_mut, check_archived_value,
ser::{serializers::AllocSerializer, Serializer},
validation::validators::DefaultValidator,
Archive, Serialize,
};
diff --git a/metrics/src/cow.rs b/metrics/src/cow.rs
index 968b5ef..f1627af 100644
--- a/metrics/src/cow.rs
+++ b/metrics/src/cow.rs
@@ -91,6 +91,10 @@ impl<'a> Cow<'a, str> {
marker: PhantomData,
}
}
+
+ pub const fn const_as_ref(&self) -> &str {
diff --git a/metrics/src/cow.rs b/metrics/src/cow.rs
index 968b5ef..ea11af8 100644
--- a/metrics/src/cow.rs
+++ b/metrics/src/cow.rs
@@ -91,6 +91,10 @@ impl<'a> Cow<'a, str> {
marker: PhantomData,
}
}
+
+ pub const fn const_as_ref(&self) -> &str {
diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs
index d5de25511a..77e7bfc839 100644
--- a/lib/vector-core/src/stream/driver.rs
+++ b/lib/vector-core/src/stream/driver.rs
@@ -1,13 +1,15 @@
+use super::FuturesUnorderedChunked;
use crate::event::{EventStatus, Finalizable};
use buffers::{Ackable, Acker};
-use futures::{poll, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt};
+use futures::{poll, FutureExt, Stream, StreamExt, TryFutureExt};
//! Design specification for our yet-to-be-named SPSC disk buffer implementation:
//!
//! We provide a single writer/single reader interface to an underlying set of files that
//! conceptually represent a ring buffer. Unlike a typical ring buffer, we block writes when the
//! total size of all unread records reaches the configured limit. It may be possible to alter the
//! design in the future such that we can provide a "drop oldest" operation mode, but that is
//! out-of-scope for version 1 of this design.
//!
//! Design constraints / invariants:
//! - buffer can be a maximum of 2TB in total size
@tobz
tobz / driver.rs
Created September 17, 2021 01:24
use std::{collections::HashMap, fmt, marker::PhantomData};
use buffers::{Ackable, Acker};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, stream::FuturesUnordered};
use tokio::{pin, select, sync::oneshot};
use tower::{Service, ServiceExt};
use tracing::Instrument;
use crate::event::{EventStatus, Finalizable};
struct Writer {
db: Arc<Database>,
reader_wakeup: Arc<AtomicWaker>,
writer_wakeup: Arc<Mutex<Vec<Waker>>>,
total_size: Arc<AtomicUsize>,
}
struct Reader {
db: Arc<Database>,
reader_wakeup: Arc<AtomicWaker>,
async fn run_io<S>(mut rx: Receiver<S3Request>, mut service: S, acker: Acker)
where
S: Service<S3Request>,
S::Future: Send + 'static,
S::Response: Response + Send + 'static,
S::Error: Into<crate::Error> + Send,
{
let in_flight = FuturesUnordered::new();
let mut pending_acks = HashMap::new();
let mut seq_head: u64 = 0;
// Strategy for partitioning events.
pub trait Partitioner {
type Item: ByteSizeOf;
type Key: Clone + Eq + Hash;
fn partition(&self, item: &Self::Item) -> Self::Key;
}
/// An in-progress batch for `PartitionBatch`.
///