Skip to content

Instantly share code, notes, and snippets.

@nellshamrell
Created June 20, 2017 23:57
Show Gist options
  • Save nellshamrell/472f882317ac6e980466798031499ab5 to your computer and use it in GitHub Desktop.
Save nellshamrell/472f882317ac6e980466798031499ab5 to your computer and use it in GitHub Desktop.

User creates a new origin through the API like this:

http POST http://localhost:9636/v1/depot/origins Content-Type:application/json Authorization:Bearer:${HAB_AUTH_TOKEN} name=my_new_origin

This goes to builder-depot.

builder-depot

habitat/components/builder-depot/src/server.rs

pub fn routes<M: BeforeMiddleware + Clone>(insecure: bool, basic: M, worker: M) -> Router {
    router!(
    
    (...)
            origin_create: post "/origins" => {
            XHandler::new(origin_create).before(basic.clone())
        },
        
    (...)
    
}

pub fn origin_create(req: &mut Request) -> IronResult<Response> {
    let mut request = OriginCreate::new();
    {
        let session = req.extensions.get::<Authenticated>().unwrap();
        request.set_owner_id(session.get_id());
        request.set_owner_name(session.get_name().to_string());
    }
    match req.get::<bodyparser::Struct<OriginCreateReq>>() {
        Ok(Some(body)) => request.set_name(body.name),
        _ => return Ok(Response::with(status::UnprocessableEntity)),
    };

    if !keys::is_valid_origin_name(request.get_name()) {
        return Ok(Response::with(status::UnprocessableEntity));
    }

    let mut conn = Broker::connect().unwrap();
    match conn.route::<OriginCreate, Origin>(&request) {
        Ok(origin) => Ok(render_json(status::Created, &origin)),
        Err(err) => Ok(render_net_error(&err)),
    }
}

So when we hit origin_create, we pass it this message

Content-Type:application/json Authorization:Bearer:${HAB_AUTH_TOKEN} name=my_new_origin

And then it forms an OriginCreate structure that looks somewhat like this

request = {
  owner_id: ${HAB_AUTH_TOKEN},
  name: my_new_origin
}

Then that request variable is passed to a broker connection.

habitat/components/builder-depot/src/server.rs

let mut conn = Broker::connect().unwrap();
    match conn.route::<OriginCreate, Origin>({owner_id: ${HAB_AUTH_TOKEN}, name: my_new_origin})

That Broker structure lives in the net component (this is condensed slightly).

habitat/components/net/src/routing.rs

/// A messaging Broker for proxying messages from clients to one or more `RouteSrv` and vice versa.
pub struct Broker {
    client_sock: zmq::Socket,
    router_sock: zmq::Socket,
}

pub fn route<M: Routable, R: protobuf::MessageStatic>(&mut self, msg: &M) -> RouteResult<R> {
(...)

So we are calling that route function like this:

route<M: OriginCreate, R: Origin>(&mut broker_instance, msg: &{owner_id: ${HAB_AUTH_TOKEN}, name: my_new_origin})

Let's look at that route function again

habitat/components/net/src/routing.rs

    pub fn route<M: Routable, R: protobuf::MessageStatic>(&mut self, msg: &M) -> RouteResult<R> {
        if self.route_async(msg).is_err() {
            return Err(protocol::net::err(ErrCode::ZMQ, "net:route:1"));
        }

So the route_async method is where it attempts to send the message.

habitat/components/net/src/routing.rs

    pub fn route_async<M: Routable>(&mut self, msg: &M) -> Result<()> {
        let route_hash = msg.route_key()
            .map(|key| key.hash(&mut FnvHasher::default()));
        let req = protocol::Message::new(msg).routing(route_hash).build();
        let bytes = req.write_to_bytes().unwrap();
        try!(self.sock.send_str("RQ", zmq::SNDMORE));
        try!(self.sock.send(&bytes, 0));
        Ok(())
    }

Then we wait for the return message from the broker

habitat/components/net/src/routing.rs

    pub fn route<M: Routable, R: protobuf::MessageStatic>(&mut self, msg: &M) -> RouteResult<R> {
        if self.route_async(msg).is_err() {
            return Err(protocol::net::err(ErrCode::ZMQ, "net:route:1"));
        }
        match self.recv() {
        
          /// Receives a message from the connected broker. This function will block the calling thread
    /// until a message is received or a timeout occurs.
    ///
    /// # Errors
    ///
    /// * `Broker` Queue became unavailable
    /// * Message was not received within the timeout
    /// * Received an unparseable message
    pub fn recv(&mut self) -> Result<protocol::net::Msg> {
        let envelope = try!(self.sock.recv_msg(0));
        let msg: protocol::net::Msg = try!(parse_from_bytes(&envelope));
        Ok(msg)
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment