User creates a new origin through the API like this:
http POST http://localhost:9636/v1/depot/origins Content-Type:application/json Authorization:Bearer:${HAB_AUTH_TOKEN} name=my_new_origin
This goes to builder-depot.
habitat/components/builder-depot/src/server.rs
pub fn routes<M: BeforeMiddleware + Clone>(insecure: bool, basic: M, worker: M) -> Router {
router!(
(...)
origin_create: post "/origins" => {
XHandler::new(origin_create).before(basic.clone())
},
(...)
}
pub fn origin_create(req: &mut Request) -> IronResult<Response> {
let mut request = OriginCreate::new();
{
let session = req.extensions.get::<Authenticated>().unwrap();
request.set_owner_id(session.get_id());
request.set_owner_name(session.get_name().to_string());
}
match req.get::<bodyparser::Struct<OriginCreateReq>>() {
Ok(Some(body)) => request.set_name(body.name),
_ => return Ok(Response::with(status::UnprocessableEntity)),
};
if !keys::is_valid_origin_name(request.get_name()) {
return Ok(Response::with(status::UnprocessableEntity));
}
let mut conn = Broker::connect().unwrap();
match conn.route::<OriginCreate, Origin>(&request) {
Ok(origin) => Ok(render_json(status::Created, &origin)),
Err(err) => Ok(render_net_error(&err)),
}
}
So when we hit origin_create, we pass it this message
Content-Type:application/json Authorization:Bearer:${HAB_AUTH_TOKEN} name=my_new_origin
And then it forms an OriginCreate structure that looks somewhat like this
request = {
owner_id: ${HAB_AUTH_TOKEN},
name: my_new_origin
}
Then that request variable is passed to a broker connection.
habitat/components/builder-depot/src/server.rs
let mut conn = Broker::connect().unwrap();
match conn.route::<OriginCreate, Origin>({owner_id: ${HAB_AUTH_TOKEN}, name: my_new_origin})
That Broker structure lives in the net component (this is condensed slightly).
habitat/components/net/src/routing.rs
/// A messaging Broker for proxying messages from clients to one or more `RouteSrv` and vice versa.
pub struct Broker {
client_sock: zmq::Socket,
router_sock: zmq::Socket,
}
pub fn route<M: Routable, R: protobuf::MessageStatic>(&mut self, msg: &M) -> RouteResult<R> {
(...)
So we are calling that route function like this:
route<M: OriginCreate, R: Origin>(&mut broker_instance, msg: &{owner_id: ${HAB_AUTH_TOKEN}, name: my_new_origin})
Let's look at that route function again
habitat/components/net/src/routing.rs
pub fn route<M: Routable, R: protobuf::MessageStatic>(&mut self, msg: &M) -> RouteResult<R> {
if self.route_async(msg).is_err() {
return Err(protocol::net::err(ErrCode::ZMQ, "net:route:1"));
}
So the route_async method is where it attempts to send the message.
habitat/components/net/src/routing.rs
pub fn route_async<M: Routable>(&mut self, msg: &M) -> Result<()> {
let route_hash = msg.route_key()
.map(|key| key.hash(&mut FnvHasher::default()));
let req = protocol::Message::new(msg).routing(route_hash).build();
let bytes = req.write_to_bytes().unwrap();
try!(self.sock.send_str("RQ", zmq::SNDMORE));
try!(self.sock.send(&bytes, 0));
Ok(())
}
Then we wait for the return message from the broker
habitat/components/net/src/routing.rs
pub fn route<M: Routable, R: protobuf::MessageStatic>(&mut self, msg: &M) -> RouteResult<R> {
if self.route_async(msg).is_err() {
return Err(protocol::net::err(ErrCode::ZMQ, "net:route:1"));
}
match self.recv() {
/// Receives a message from the connected broker. This function will block the calling thread
/// until a message is received or a timeout occurs.
///
/// # Errors
///
/// * `Broker` Queue became unavailable
/// * Message was not received within the timeout
/// * Received an unparseable message
pub fn recv(&mut self) -> Result<protocol::net::Msg> {
let envelope = try!(self.sock.recv_msg(0));
let msg: protocol::net::Msg = try!(parse_from_bytes(&envelope));
Ok(msg)
}