Skip to content

Instantly share code, notes, and snippets.

@mackwic
Last active May 11, 2020 14:48
Show Gist options
  • Save mackwic/06aca21d9f392877db0741a69d3a6feb to your computer and use it in GitHub Desktop.
Save mackwic/06aca21d9f392877db0741a69d3a6feb to your computer and use it in GitHub Desktop.
use pin_project::pin_project;
use thiserror::Error;
use tokio_postgres::{Client, Connection, Error as PgError};
pub type Oid = tokio_postgres::types::Oid;
pub type NoTlsConnection = Connection<tokio_postgres::Socket, tokio_postgres::tls::NoTlsStream>;
pub struct LargeObjectBridge {
client: Client,
}
#[derive(Debug)]
pub struct LargeObject {
oid: Oid,
client: Client,
}
enum WriteOperationState<Fut1, Fut2: std::future::Future> {
WantToRead,
Reading(Fut1),
WantToQuery,
Querying(Fut2),
}
#[pin_project]
pub struct WriteOperation<'a, R: tokio::io::AsyncRead, Fut1, Fut2: std::future::Future> {
buffer: Vec<u8>,
reader: R,
object: Box<&'a mut LargeObject>,
#[pin]
state: WriteOperationState<Fut1, Fut2>,
}
#[derive(Error, Debug)]
pub enum Error {
#[error("unable to get value from Row")]
UnableToGetValueFromRow(#[source] PgError),
#[error("error in the query")]
QueryError(&'static str, #[source] PgError),
#[error("error in the query")]
QueryErrorString(&'static str, String),
#[error("unable to read form source")]
IoError(#[source] std::io::Error),
}
impl LargeObjectBridge {
pub fn new(client: Client) -> Self {
LargeObjectBridge { client }
}
pub async fn create(self) -> Result<LargeObject, Error> {
const QUERY: &str = "SELECT lo_create(0);";
let oid_res = match self.client.query_one(QUERY, &[]).await {
Ok(row) => row
.try_get(0)
.or_else(|err| Err(Error::UnableToGetValueFromRow(err))),
Err(err) => Err(Error::QueryError(QUERY, err)),
};
let oid = oid_res?;
Ok(LargeObject {
oid,
client: self.client,
})
}
}
impl LargeObject {
pub async fn delete(self) -> Result<LargeObjectBridge, Error> {
const QUERY: &str = "SELECT lo_unlink($1);";
match self.client.query_one(QUERY, &[&self.oid]).await {
Ok(_) => Ok(LargeObjectBridge {
client: self.client,
}),
Err(err) => Err(Error::QueryError(QUERY, err)),
}
}
pub fn write_from<R: tokio::io::AsyncRead, Fut1, Fut2: std::future::Future>(
&mut self,
reader: R,
) -> WriteOperation<R, Fut1, Fut2> {
WriteOperation {
buffer: Vec::with_capacity(8_388_608), // 8KB
reader,
object: Box::new(self),
state: WriteOperationState::WantToRead,
}
}
}
impl<'a, R: tokio::io::AsyncRead, Fut1: std::future::Future, Fut2: std::future::Future>
tokio::io::AsyncWrite for WriteOperation<'a, R, Fut1, Fut2>
{
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buffer: &[u8],
) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
use std::cmp::min;
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use tokio_postgres::types::ToSql;
// safety: it is probably OK but please don't trust me and check
let me = self.project();
const QUERY: &str = "SOLECT lo_put($1, $2, $3);";
match unsafe { me.state.get_unchecked_mut() } {
WriteOperationState::WantToQuery => {
// don't write more bytes than Pg can handle
let write_amount = min(buffer.len(), u32::MAX as usize) as u32;
let arguments: &[&(dyn ToSql + Sync)] = &[
&me.object.oid,
&write_amount,
&buffer as &(dyn ToSql + Sync),
];
let mut test = me.object.client.query_one(QUERY, arguments);
let query_future = unsafe {
// safety: is it safe ? No data is moved from self nor mutated, so I think it's good ?
Pin::new_unchecked(&mut test)
};
let query_status = query_future.poll(cx);
match query_status {
Poll::Pending => {
me.state.set(WriteOperationState::Querying(
*query_future.get_unchecked_mut(),
/*error[E0308]: mismatched types
--> src/vendor/postgres_large_object_ng.rs:131:29
|
94 | impl<'a, R: tokio::io::AsyncRead, Fut1: std::future::Future, Fut2: std::future::Future>
| ---- this type parameter
...
131 | *query_future.get_unchecked_mut(),
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected type parameter `Fut2`, found opaque type
|
::: /Users/thomas/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-postgres-0.5.4/src/client.rs:247:10
|
247 | ) -> Result<Row, Error>
| ------------------ the found opaque type
|
= note: expected type parameter `Fut2`
found opaque type `impl std::future::Future`
= help: type parameters must be constrained to match other types
= note: for more information, visit https://doc.rust-lang.org/book/ch10-02-traits.html#traits-as-parameters
*/
));
Poll::Pending
}
_ => unimplemented!(),
}
}
_ => unimplemented!(),
}
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
todo!()
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
todo!()
}
}
// impl<'a, R: tokio::io::AsyncRead + Unpin, Fut1, Fut2> std::future::Future
// for WriteOperation<'a, R, Fut1, Fut2>
// {
// type Output = Result<usize, Error>;
// fn poll(
// self: std::pin::Pin<&mut Self>,
// cx: &mut std::task::Context<'_>,
// ) -> std::task::Poll<<Self as std::future::Future>::Output> {
// use std::pin::Pin;
// use std::task::Poll;
// use WriteOperationState::*;
// println!("called !");
// let mut idx = 0;
// // safety: it is probably OK but please don't trust me and check
// let me = unsafe { self.get_unchecked_mut() };
// loop {
// println!("loop !");
// match me.state {
// WantToRead => {
// let read_status = Pin::new(&mut me.reader).poll_read_buf(cx, &mut me.buffer);
// let read = match read_status {
// Poll::Pending => {
// *this.state = Reading();
// return Poll::Pending;
// }
// Poll::Ready(Err(io_error)) => {
// return Poll::Ready(Err(Error::IoError(io_error)))
// }
// Poll::Ready(Ok(0)) => return Poll::Ready(Ok(idx)),
// Poll::Ready(Ok(read)) => read,
// };
// }
// }
// while idx < read {
// println!("query !");
// println!("oid={} idx={} read={}", me.object.oid, idx, read);
// const QUERY: &str = "SOLECT lo_put($1, $2, $3);";
// let write_status = unsafe {
// // safety: is it safe ? No data is moved from self nor mutated, so I think it's good ?
// Pin::new_unchecked(
// &mut me
// .object
// .client
// .query_one(QUERY, &[&me.object.oid, &(idx as u32), &me.buffer]),
// )
// .poll(cx)
// };
// let wrote: u32 = match write_status {
// Poll::Pending => {
// println!("query pending");
// return Poll::Pending;
// }
// Poll::Ready(Err(pg_error)) => {
// println!("query ready err");
// return Poll::Ready(Err(Error::QueryError(QUERY, pg_error)));
// }
// Poll::Ready(Ok(ref row)) => match row.try_get(0) {
// Err(error) => {
// println!("query ready ok, row err");
// return Poll::Ready(Err(Error::QueryErrorString(
// QUERY,
// format!("{}", error),
// )));
// }
// Ok(bytes_wrote) => bytes_wrote,
// },
// };
// idx += wrote as usize;
// println!("end query: wrote={} idx={} read={}", wrote, idx, read);
// }
// }
// }
// }
#[cfg(test)]
mod test {
pub use super::*;
#[tokio::test]
async fn it_should_exists() {
let client = make_local_client().await;
LargeObjectBridge::new(client);
}
#[tokio::test]
async fn it_should_be_able_to_create_and_delete_an_object() {
// Arrange
let client = make_local_client().await;
let lob = LargeObjectBridge::new(client);
// Act
let object = lob.create().await.unwrap();
let res = object.delete().await;
// Assert
assert!(res.is_ok())
}
#[tokio::test]
async fn it_should_be_able_to_write_an_object() {
// Arrange
let client = make_local_client().await;
let large_object_bridge = LargeObjectBridge::new(client);
let mut object = large_object_bridge.create().await.unwrap();
let reader = tokio::fs::File::open("/Users/thomas/projects/scalingo/experiments/fs-on-redis/tests/fixtures/large_texts/bible.txt")
.await
.unwrap();
// Act
// FIXME
// let res = object.write_from(reader).await;
// Assert
// println!("object={:?}", object);
// assert!(res.is_ok())
}
async fn make_local_client() -> Client {
use tokio_postgres::NoTls;
let (client, connect) = tokio_postgres::connect("postgres://postgres@localhost", NoTls)
.await
.unwrap();
tokio::spawn(connect);
client
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment