Last active
December 2, 2024 18:34
-
-
Save jcuffe/8d42e3ad9088d21834828f7ff41f0a66 to your computer and use it in GitHub Desktop.
Parallel processing of decoded records
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
error: lifetime may not live long enough | |
--> src/main.rs:72:46 | |
| | |
63 | impl<'a, T: AsyncReadExt + std::marker::Unpin, U: databento::dbn::HasRType + 'a> Stream | |
| -- lifetime `'a` defined here | |
... | |
68 | fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
| - let's call the lifetime of this reference `'1` | |
... | |
72 | Poll::Ready(Ok(Some(result))) => Poll::Ready(Some(Ok(result))), | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ method was supposed to return data with lifetime `'a` but it is returning data with lifetime `'1` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use databento::{ | |
dbn::{decode::AsyncDbnDecoder, Schema, TradeMsg}, | |
historical::timeseries::GetRangeParams, | |
HistoricalClient, Symbols, | |
}; | |
use dotenv::dotenv; | |
use rayon::prelude::*; | |
use std::{future::Future, pin::Pin}; | |
use std::{ | |
marker::PhantomData, | |
task::{Context, Poll}, | |
}; | |
use time::macros::datetime; | |
use tokio::{io::AsyncReadExt, time::Duration}; | |
use tokio_stream::{Stream, StreamExt}; | |
#[tokio::main] | |
async fn main() -> anyhow::Result<()> { | |
dotenv()?; | |
let mut client = HistoricalClient::builder() | |
.key(std::env::var("DATABENTO_API_KEY")?)? | |
.build()?; | |
let decoder = client | |
.timeseries() | |
.get_range( | |
&GetRangeParams::builder() | |
.dataset("GLBX.MDP3") | |
.date_time_range(( | |
datetime!(2022-06-10 14:30:00.000 UTC), | |
datetime!(2022-06-10 14:30:00.010 UTC), | |
)) | |
.symbols(Symbols::All) | |
.schema(Schema::Trades) | |
.build(), | |
) | |
.await?; | |
let strm = StreamDecoder { | |
decoder, | |
_marker: PhantomData::<&TradeMsg>, | |
} | |
.chunks_timeout(10, Duration::from_secs(1)); | |
tokio::pin!(strm); | |
while let Some(chunk) = strm.next().await { | |
chunk.into_par_iter().for_each(|trade| { | |
println!("PROCESSING {:?}", trade); | |
}); | |
} | |
Ok(()) | |
} | |
struct StreamDecoder<'a, T: AsyncReadExt + std::marker::Unpin, U: databento::dbn::HasRType + 'a> { | |
decoder: AsyncDbnDecoder<T>, | |
// Add a lifetime marker to ensure the reference lives as long as the decoder | |
_marker: std::marker::PhantomData<&'a U>, | |
} | |
impl<'a, T: AsyncReadExt + std::marker::Unpin, U: databento::dbn::HasRType + 'a> Stream | |
for StreamDecoder<'a, T, U> | |
{ | |
type Item = databento::dbn::Result<&'a U>; | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
let fut = self.decoder.decode_record::<U>(); | |
tokio::pin!(fut); | |
match fut.poll(cx) { | |
Poll::Ready(Ok(Some(result))) => Poll::Ready(Some(Ok(result))), | |
Poll::Ready(Ok(None)) => Poll::Ready(None), | |
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), | |
Poll::Pending => Poll::Pending, | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment