Skip to content

Instantly share code, notes, and snippets.

@jcuffe
Last active December 2, 2024 18:34
Show Gist options
  • Save jcuffe/8d42e3ad9088d21834828f7ff41f0a66 to your computer and use it in GitHub Desktop.
Save jcuffe/8d42e3ad9088d21834828f7ff41f0a66 to your computer and use it in GitHub Desktop.
Parallel processing of decoded records
error: lifetime may not live long enough
--> src/main.rs:72:46
|
63 | impl<'a, T: AsyncReadExt + std::marker::Unpin, U: databento::dbn::HasRType + 'a> Stream
| -- lifetime `'a` defined here
...
68 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
| - let's call the lifetime of this reference `'1`
...
72 | Poll::Ready(Ok(Some(result))) => Poll::Ready(Some(Ok(result))),
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method was supposed to return data with lifetime `'a` but it is returning data with lifetime `'1`
use databento::{
dbn::{decode::AsyncDbnDecoder, Schema, TradeMsg},
historical::timeseries::GetRangeParams,
HistoricalClient, Symbols,
};
use dotenv::dotenv;
use rayon::prelude::*;
use std::{future::Future, pin::Pin};
use std::{
marker::PhantomData,
task::{Context, Poll},
};
use time::macros::datetime;
use tokio::{io::AsyncReadExt, time::Duration};
use tokio_stream::{Stream, StreamExt};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv()?;
let mut client = HistoricalClient::builder()
.key(std::env::var("DATABENTO_API_KEY")?)?
.build()?;
let decoder = client
.timeseries()
.get_range(
&GetRangeParams::builder()
.dataset("GLBX.MDP3")
.date_time_range((
datetime!(2022-06-10 14:30:00.000 UTC),
datetime!(2022-06-10 14:30:00.010 UTC),
))
.symbols(Symbols::All)
.schema(Schema::Trades)
.build(),
)
.await?;
let strm = StreamDecoder {
decoder,
_marker: PhantomData::<&TradeMsg>,
}
.chunks_timeout(10, Duration::from_secs(1));
tokio::pin!(strm);
while let Some(chunk) = strm.next().await {
chunk.into_par_iter().for_each(|trade| {
println!("PROCESSING {:?}", trade);
});
}
Ok(())
}
struct StreamDecoder<'a, T: AsyncReadExt + std::marker::Unpin, U: databento::dbn::HasRType + 'a> {
decoder: AsyncDbnDecoder<T>,
// Add a lifetime marker to ensure the reference lives as long as the decoder
_marker: std::marker::PhantomData<&'a U>,
}
impl<'a, T: AsyncReadExt + std::marker::Unpin, U: databento::dbn::HasRType + 'a> Stream
for StreamDecoder<'a, T, U>
{
type Item = databento::dbn::Result<&'a U>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let fut = self.decoder.decode_record::<U>();
tokio::pin!(fut);
match fut.poll(cx) {
Poll::Ready(Ok(Some(result))) => Poll::Ready(Some(Ok(result))),
Poll::Ready(Ok(None)) => Poll::Ready(None),
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment