Last active
May 11, 2020 14:48
-
-
Save mackwic/06aca21d9f392877db0741a69d3a6feb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use pin_project::pin_project; | |
use thiserror::Error; | |
use tokio_postgres::{Client, Connection, Error as PgError}; | |
pub type Oid = tokio_postgres::types::Oid; | |
pub type NoTlsConnection = Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>; | |
pub struct LargeObjectBridge { | |
client: Client, | |
} | |
#[derive(Debug)] | |
pub struct LargeObject { | |
oid: Oid, | |
client: Client, | |
} | |
enum WriteOperationState<Fut1, Fut2: std::future::Future> { | |
WantToRead, | |
Reading(Fut1), | |
WantToQuery, | |
Querying(Fut2), | |
} | |
#[pin_project] | |
pub struct WriteOperation<'a, R: tokio::io::AsyncRead, Fut1, Fut2: std::future::Future> { | |
buffer: Vec<u8>, | |
reader: R, | |
object: Box<&'a mut LargeObject>, | |
#[pin] | |
state: WriteOperationState<Fut1, Fut2>, | |
} | |
#[derive(Error, Debug)] | |
pub enum Error { | |
#[error("unable to get value from Row")] | |
UnableToGetValueFromRow(#[source] PgError), | |
#[error("error in the query")] | |
QueryError(&'static str, #[source] PgError), | |
#[error("error in the query")] | |
QueryErrorString(&'static str, String), | |
#[error("unable to read form source")] | |
IoError(#[source] std::io::Error), | |
} | |
impl LargeObjectBridge { | |
pub fn new(client: Client) -> Self { | |
LargeObjectBridge { client } | |
} | |
pub async fn create(self) -> Result<LargeObject, Error> { | |
const QUERY: &str = "SELECT lo_create(0);"; | |
let oid_res = match self.client.query_one(QUERY, &[]).await { | |
Ok(row) => row | |
.try_get(0) | |
.or_else(|err| Err(Error::UnableToGetValueFromRow(err))), | |
Err(err) => Err(Error::QueryError(QUERY, err)), | |
}; | |
let oid = oid_res?; | |
Ok(LargeObject { | |
oid, | |
client: self.client, | |
}) | |
} | |
} | |
impl LargeObject { | |
pub async fn delete(self) -> Result<LargeObjectBridge, Error> { | |
const QUERY: &str = "SELECT lo_unlink($1);"; | |
match self.client.query_one(QUERY, &[&self.oid]).await { | |
Ok(_) => Ok(LargeObjectBridge { | |
client: self.client, | |
}), | |
Err(err) => Err(Error::QueryError(QUERY, err)), | |
} | |
} | |
pub fn write_from<R: tokio::io::AsyncRead, Fut1, Fut2: std::future::Future>( | |
&mut self, | |
reader: R, | |
) -> WriteOperation<R, Fut1, Fut2> { | |
WriteOperation { | |
buffer: Vec::with_capacity(8_388_608), // 8KB | |
reader, | |
object: Box::new(self), | |
state: WriteOperationState::WantToRead, | |
} | |
} | |
} | |
impl<'a, R: tokio::io::AsyncRead, Fut1: std::future::Future, Fut2: std::future::Future> | |
tokio::io::AsyncWrite for WriteOperation<'a, R, Fut1, Fut2> | |
{ | |
fn poll_write( | |
self: std::pin::Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
buffer: &[u8], | |
) -> std::task::Poll<std::result::Result<usize, std::io::Error>> { | |
use std::cmp::min; | |
use std::future::Future; | |
use std::pin::Pin; | |
use std::task::Poll; | |
use tokio_postgres::types::ToSql; | |
// safety: it is probably OK but please don't trust me and check | |
let me = self.project(); | |
const QUERY: &str = "SOLECT lo_put($1, $2, $3);"; | |
match unsafe { me.state.get_unchecked_mut() } { | |
WriteOperationState::WantToQuery => { | |
// don't write more bytes than Pg can handle | |
let write_amount = min(buffer.len(), u32::MAX as usize) as u32; | |
let arguments: &[&(dyn ToSql + Sync)] = &[ | |
&me.object.oid, | |
&write_amount, | |
&buffer as &(dyn ToSql + Sync), | |
]; | |
let mut test = me.object.client.query_one(QUERY, arguments); | |
let query_future = unsafe { | |
// safety: is it safe ? No data is moved from self nor mutated, so I think it's good ? | |
Pin::new_unchecked(&mut test) | |
}; | |
let query_status = query_future.poll(cx); | |
match query_status { | |
Poll::Pending => { | |
me.state.set(WriteOperationState::Querying( | |
*query_future.get_unchecked_mut(), | |
/*error[E0308]: mismatched types | |
--> src/vendor/postgres_large_object_ng.rs:131:29 | |
| | |
94 | impl<'a, R: tokio::io::AsyncRead, Fut1: std::future::Future, Fut2: std::future::Future> | |
| ---- this type parameter | |
... | |
131 | *query_future.get_unchecked_mut(), | |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected type parameter `Fut2`, found opaque type | |
| | |
::: /Users/thomas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-postgres-0.5.4/src/client.rs:247:10 | |
| | |
247 | ) -> Result<Row, Error> | |
| ------------------ the found opaque type | |
| | |
= note: expected type parameter `Fut2` | |
found opaque type `impl std::future::Future` | |
= help: type parameters must be constrained to match other types | |
= note: for more information, visit https://doc.rust-lang.org/book/ch10-02-traits.html#traits-as-parameters | |
*/ | |
)); | |
Poll::Pending | |
} | |
_ => unimplemented!(), | |
} | |
} | |
_ => unimplemented!(), | |
} | |
} | |
fn poll_flush( | |
self: std::pin::Pin<&mut Self>, | |
_cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<std::result::Result<(), std::io::Error>> { | |
todo!() | |
} | |
fn poll_shutdown( | |
self: std::pin::Pin<&mut Self>, | |
_cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<std::result::Result<(), std::io::Error>> { | |
todo!() | |
} | |
} | |
// impl<'a, R: tokio::io::AsyncRead + Unpin, Fut1, Fut2> std::future::Future | |
// for WriteOperation<'a, R, Fut1, Fut2> | |
// { | |
// type Output = Result<usize, Error>; | |
// fn poll( | |
// self: std::pin::Pin<&mut Self>, | |
// cx: &mut std::task::Context<'_>, | |
// ) -> std::task::Poll<<Self as std::future::Future>::Output> { | |
// use std::pin::Pin; | |
// use std::task::Poll; | |
// use WriteOperationState::*; | |
// println!("called !"); | |
// let mut idx = 0; | |
// // safety: it is probably OK but please don't trust me and check | |
// let me = unsafe { self.get_unchecked_mut() }; | |
// loop { | |
// println!("loop !"); | |
// match me.state { | |
// WantToRead => { | |
// let read_status = Pin::new(&mut me.reader).poll_read_buf(cx, &mut me.buffer); | |
// let read = match read_status { | |
// Poll::Pending => { | |
// *this.state = Reading(); | |
// return Poll::Pending; | |
// } | |
// Poll::Ready(Err(io_error)) => { | |
// return Poll::Ready(Err(Error::IoError(io_error))) | |
// } | |
// Poll::Ready(Ok(0)) => return Poll::Ready(Ok(idx)), | |
// Poll::Ready(Ok(read)) => read, | |
// }; | |
// } | |
// } | |
// while idx < read { | |
// println!("query !"); | |
// println!("oid={} idx={} read={}", me.object.oid, idx, read); | |
// const QUERY: &str = "SOLECT lo_put($1, $2, $3);"; | |
// let write_status = unsafe { | |
// // safety: is it safe ? No data is moved from self nor mutated, so I think it's good ? | |
// Pin::new_unchecked( | |
// &mut me | |
// .object | |
// .client | |
// .query_one(QUERY, &[&me.object.oid, &(idx as u32), &me.buffer]), | |
// ) | |
// .poll(cx) | |
// }; | |
// let wrote: u32 = match write_status { | |
// Poll::Pending => { | |
// println!("query pending"); | |
// return Poll::Pending; | |
// } | |
// Poll::Ready(Err(pg_error)) => { | |
// println!("query ready err"); | |
// return Poll::Ready(Err(Error::QueryError(QUERY, pg_error))); | |
// } | |
// Poll::Ready(Ok(ref row)) => match row.try_get(0) { | |
// Err(error) => { | |
// println!("query ready ok, row err"); | |
// return Poll::Ready(Err(Error::QueryErrorString( | |
// QUERY, | |
// format!("{}", error), | |
// ))); | |
// } | |
// Ok(bytes_wrote) => bytes_wrote, | |
// }, | |
// }; | |
// idx += wrote as usize; | |
// println!("end query: wrote={} idx={} read={}", wrote, idx, read); | |
// } | |
// } | |
// } | |
// } | |
#[cfg(test)] | |
mod test { | |
pub use super::*; | |
#[tokio::test] | |
async fn it_should_exists() { | |
let client = make_local_client().await; | |
LargeObjectBridge::new(client); | |
} | |
#[tokio::test] | |
async fn it_should_be_able_to_create_and_delete_an_object() { | |
// Arrange | |
let client = make_local_client().await; | |
let lob = LargeObjectBridge::new(client); | |
// Act | |
let object = lob.create().await.unwrap(); | |
let res = object.delete().await; | |
// Assert | |
assert!(res.is_ok()) | |
} | |
#[tokio::test] | |
async fn it_should_be_able_to_write_an_object() { | |
// Arrange | |
let client = make_local_client().await; | |
let large_object_bridge = LargeObjectBridge::new(client); | |
let mut object = large_object_bridge.create().await.unwrap(); | |
let reader = tokio::fs::File::open("/Users/thomas/projects/scalingo/experiments/fs-on-redis/tests/fixtures/large_texts/bible.txt") | |
.await | |
.unwrap(); | |
// Act | |
// FIXME | |
// let res = object.write_from(reader).await; | |
// Assert | |
// println!("object={:?}", object); | |
// assert!(res.is_ok()) | |
} | |
async fn make_local_client() -> Client { | |
use tokio_postgres::NoTls; | |
let (client, connect) = tokio_postgres::connect("postgres://postgres@localhost", NoTls) | |
.await | |
.unwrap(); | |
tokio::spawn(connect); | |
client | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment