builder-sessionsrv/src/data_store.rs
use db::pool::Pool;
use db::migration::Migrator;
use hab_net::privilege;
use protocol::sessionsrv;
use postgres;
use protobuf;
use config::Config;
use error::{Result, Error};
use migrations;
#[derive(Debug, Clone)]
pub struct DataStore {
pub pool: Pool,
}
impl DataStore {
pub fn new(config: &Config) -> Result<DataStore> {
let pool = Pool::new(&config.datastore, config.shards.clone())?;
Ok(DataStore { pool: pool })
}
pub fn from_pool(pool: Pool) -> Result<DataStore> {
Ok(DataStore { pool: pool })
}
(...)
Pool lives in the builder-db component.
builder-db/src/pool.rs
use std::ops::{Deref, DerefMut};
use std::thread;
use std::time::Duration;
use std::fmt;
use fnv::FnvHasher;
use rand::{self, Rng};
use r2d2;
use r2d2_postgres::{self, PostgresConnectionManager, TlsMode};
use config::DataStoreCfg;
use error::{Error, Result};
use protocol::{Routable, RouteKey, ShardId, SHARD_COUNT};
#[derive(Clone)]
pub struct Pool {
inner: r2d2::Pool<PostgresConnectionManager>,
pub shards: Vec<ShardId>,
}
impl fmt::Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f,
"Pool {{ inner: {:?}, shards: {:?} }}",
self.inner,
self.shards)
}
}
impl Pool {
pub fn new(config: &DataStoreCfg, shards: Vec<ShardId>) -> Result<Pool> {
loop {
let pool_config_builder =
r2d2::Config::builder()
.pool_size(config.pool_size)
.connection_timeout(Duration::from_secs(config.connection_timeout_sec));
let pool_config = pool_config_builder.build();
let manager = PostgresConnectionManager::new(config, TlsMode::None)?;
match r2d2::Pool::new(pool_config, manager) {
Ok(pool) => {
return Ok(Pool {
inner: pool,
shards: shards,
})
}
Err(e) => {
error!("Error initializing connection pool to Postgres, will retry: {}",
e)
}
}
thread::sleep(Duration::from_millis(config.connection_retry_ms));
}
}
(...)