Last active
February 2, 2018 16:46
-
-
Save boxdot/97f89585a10b3851f48c0d4b6171a5c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
impl Client { | |
pub fn connect( | |
handle: &Handle, | |
uri: hyper::Uri, | |
framework_info: mesos::FrameworkInfo, | |
) -> Box<Future<Item = Self, Error = failure::Error>> { | |
// Mesos subscribe essage | |
let mut call = scheduler::Call::new(); | |
let mut subscribe = scheduler::Call_Subscribe::new(); | |
subscribe.set_framework_info(framework_info); | |
call.set_subscribe(subscribe); | |
call.set_field_type(scheduler::Call_Type::SUBSCRIBE); | |
// Build request | |
let mut request = hyper::Request::new(hyper::Method::Post, uri); | |
// TODO: move out of body | |
let protobuf_media_type = "application/x-protobuf".parse::<mime::Mime>().unwrap(); | |
request.headers_mut().set(hyper::header::Accept(vec![ | |
hyper::header::qitem(protobuf_media_type.clone()), | |
])); | |
request | |
.headers_mut() | |
.set(hyper::header::ContentType(protobuf_media_type)); | |
// TODO: Handle error | |
let body = call.write_to_bytes().unwrap(); | |
request.set_body(body); | |
// Call Mesos | |
let http_client = hyper::Client::new(&handle); | |
let client = http_client | |
.request(request) | |
.map_err(failure::Error::from) | |
.and_then( | |
move |res: hyper::Response| -> Box<Future<Item = _, Error = _>> { | |
println!("Response status: {}", res.status()); | |
let stream_id = if let Some(header) = res.headers().get::<MesosStreamIdHeader>() | |
{ | |
header.clone() | |
} else { | |
return Box::new(future::err(format_err!("Missing header"))); | |
}; | |
let events = Events::new(res.body()); | |
let events = events | |
.into_future() | |
.map_err(|(err, _)| failure::Error::from(err)) | |
.map(|(event, stream)| (stream_id, event, stream)); | |
Box::new(events) | |
}, | |
) | |
.and_then(|(stream_id, event, stream)| { | |
let event = event.unwrap(); | |
let framework_id = event.get_subscribed().get_framework_id(); | |
Ok(Self { | |
framework_id: framework_id.get_value().into(), | |
stream_id: stream_id.as_str().into(), | |
events: stream, | |
}) | |
}); | |
Box::new(client) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment