Skip to content

Instantly share code, notes, and snippets.

@MatrixSenpai
Created December 17, 2024 02:26
Show Gist options
  • Save MatrixSenpai/1d4324d12367c3184f62d0d3c10d3957 to your computer and use it in GitHub Desktop.
Save MatrixSenpai/1d4324d12367c3184f62d0d3c10d3957 to your computer and use it in GitHub Desktop.
pub struct SystemRepository {
db: Client,
}
impl SystemRepository {
pub async fn new() -> anyhow::Result<Self> {
let client = Client::with_uri_str("mongodb://127.0.0.1:27017/trading").await?;
Ok(Self {
db: client,
})
}
pub fn insert_db<T: Send + Sync + Serialize>(&self, value: T, collection: &'static str) -> Result<(), RepositoryError> {
let handle = Handle::current().enter();
let db = self.db.default_database().ok_or(RepositoryError::ExpectedDataNotPresentError)?;
let collection = db.collection::<T>(collection);
match block_on(async move {
collection.insert_one(&value).await
}) {
Ok(_) => Ok(()),
Err(e) => {
error!("Failed to insert into db! {e:?}");
Err(RepositoryError::WriteError)
}
}
}
pub fn fetch_db<T: Send + Sync + DeserializeOwned>(&self, collection: &'static str, query: Document) -> Result<T, RepositoryError> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let db = self.db.default_database().ok_or(RepositoryError::ExpectedDataNotPresentError)?;
let collection = db.collection::<T>(collection);
match runtime.block_on(async move {
collection.find_one(query).await
}) {
Ok(v) => v.ok_or(RepositoryError::ExpectedDataNotPresentError),
Err(e) => {
error!("Failed to fetch from db! {e:?}");
Err(RepositoryError::ReadError)
}
}
}
pub fn fetch_many_db<T: Send + Sync + DeserializeOwned>(&self, collection: &'static str, query: Document) -> Result<Vec<T>, RepositoryError> {
let handle = Handle::current().enter();
let db = self.db.default_database().ok_or(RepositoryError::ExpectedDataNotPresentError)?;
let collection = db.collection::<T>(collection);
match block_on(async move {
collection.find(query).await.unwrap().try_collect().await
}) {
Ok(v) => Ok(v),
Err(e) => Err(RepositoryError::ReadError),
}
}
pub fn remove_db<T: Send + Sync>(&self, collection: &'static str, query: Document) -> Result<(), RepositoryError> {
let handle = Handle::current().enter();
let db = self.db.default_database().ok_or(RepositoryError::ExpectedDataNotPresentError)?;
let collection = db.collection::<T>(collection);
match block_on(async move {
collection.delete_one(query).await
}) {
Ok(v) => Ok(()),
Err(e) => {
error!("Failed to fetch from db! {e:?}");
Err(RepositoryError::ReadError)
}
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct BalanceStorage {
engine_id: Uuid,
balance: Balance,
}
impl BalanceHandler for SystemRepository {
fn set_balance(&mut self, engine_id: Uuid, balance: Balance) -> Result<(), RepositoryError> {
self.insert_db(BalanceStorage { engine_id, balance }, "balance")
}
fn get_balance(&mut self, engine_id: Uuid) -> Result<Balance, RepositoryError> {
let balance: BalanceStorage = self.fetch_db("balance", doc! {
"engine_id": engine_id
})?;
Ok(balance.balance)
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct ExitPositionStorage {
exit_position_id: ExitedPositionsId,
position: Position,
engine_id: Uuid,
}
impl PositionHandler for SystemRepository {
fn set_open_position(&mut self, position: Position) -> Result<(), RepositoryError> {
self.insert_db(position, "position")
}
fn get_open_position(&mut self, position_id: &PositionId) -> Result<Option<Position>, RepositoryError> {
let position_id = position_id.to_string();
self.fetch_db("position", doc! {
"position_id": position_id
})
}
fn get_open_positions<'a, Markets: Iterator<Item=&'a Market>>(&mut self, engine_id: Uuid, markets: Markets) -> Result<Vec<Position>, RepositoryError> {
markets.filter_map(|market| {
self.get_open_position(&determine_position_id(engine_id, &market.exchange, &market.instrument))
.transpose()
})
.collect()
}
fn remove_position(&mut self, position_id: &PositionId) -> Result<Option<Position>, RepositoryError> {
let position = self.get_open_position(position_id)?;
let position_id = position_id.to_string();
self.remove_db::<Position>("position", doc! {
"position_id": position_id
})?;
Ok(position)
}
fn set_exited_position(&mut self, engine_id: Uuid, position: Position) -> Result<(), RepositoryError> {
let exit_position_id = determine_exited_positions_id(engine_id);
self.insert_db(ExitPositionStorage { exit_position_id, position, engine_id }, "exit_position")
}
fn get_exited_positions(&mut self, engine_id: Uuid) -> Result<Vec<Position>, RepositoryError> {
let exit_position_id = determine_exited_positions_id(engine_id).to_string();
let positions = self.fetch_many_db::<ExitPositionStorage>("exit_position", doc! {
"exit_position_id": exit_position_id
})?;
Ok(positions.into_iter().map(|s| s.position).collect())
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct StatisticStorage<S: Send + Sync> {
market_id: MarketId,
statistic: S,
}
impl<S: Send + Sync + Serialize + DeserializeOwned> StatisticHandler<S> for SystemRepository {
fn set_statistics(&mut self, market_id: MarketId, statistic: S) -> Result<(), RepositoryError> {
self.insert_db(StatisticStorage { market_id, statistic }, "statistic")
}
fn get_statistics(&mut self, market_id: &MarketId) -> Result<S, RepositoryError> {
let market_id = market_id.to_string();
let stat: StatisticStorage<S> = self.fetch_db("balance", doc! {
"market_id": market_id
})?;
Ok(stat.statistic)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment