Created
April 29, 2017 16:22
-
-
Save allengeorge/62704497e4634a9c3725eb214c4bf83e to your computer and use it in GitHub Desktop.
compiler_error
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Compiling thrift v1.0.0 (file:///Users/allen/src/rust_projects/thrift/lib/rs) | |
Compiling thrift-test v0.1.0 (file:///Users/allen/src/rust_projects/thrift/test/rs) | |
error[E0277]: the trait bound `thrift::server::TProcessor + std::marker::Send + 'static: std::marker::Sync` is not satisfied | |
--> src/bin/test_server.rs:129:30 | |
| | |
129 | let mut server = TServer::new( | |
| ^^^^^^^^^^^^ the trait `std::marker::Sync` is not implemented for `thrift::server::TProcessor + std::marker::Send + 'static` | |
| | |
= note: `thrift::server::TProcessor + std::marker::Send + 'static` cannot be shared between threads safely | |
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<thrift::server::TProcessor + std::marker::Send + 'static>` | |
= note: required because it appears within the type `std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::collections::hash::table::RawTable<std::string::String, std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>>` | |
= note: required because it appears within the type `std::collections::HashMap<std::string::String, std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<std::collections::HashMap<std::string::String, std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>>>` | |
= note: required because it appears within the type `thrift::server::TMultiplexedProcessor` | |
= note: required by `<thrift::server::TServer<PRC, RTF, IPF, WTF, OPF>>::new` | |
error[E0277]: the trait bound `thrift::server::TProcessor + std::marker::Send + 'static: std::marker::Sync` is not satisfied | |
--> src/bin/test_server.rs:129:30 | |
| | |
129 | let mut server = TServer::new( | |
| ______________________________^ starting here... | |
130 | | i_transport_factory, | |
131 | | i_protocol_factory, | |
132 | | o_transport_factory, | |
133 | | o_protocol_factory, | |
134 | | multiplexed_processor, | |
135 | | workers, | |
136 | | ); | |
| |_____________^ ...ending here: the trait `std::marker::Sync` is not implemented for `thrift::server::TProcessor + std::marker::Send + 'static` | |
| | |
= note: `thrift::server::TProcessor + std::marker::Send + 'static` cannot be shared between threads safely | |
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<thrift::server::TProcessor + std::marker::Send + 'static>` | |
= note: required because it appears within the type `std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::collections::hash::table::RawTable<std::string::String, std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>>` | |
= note: required because it appears within the type `std::collections::HashMap<std::string::String, std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::sync::Mutex<std::collections::HashMap<std::string::String, std::sync::Arc<std::boxed::Box<thrift::server::TProcessor + std::marker::Send + 'static>>>>` | |
= note: required because it appears within the type `thrift::server::TMultiplexedProcessor` | |
= note: required by `thrift::server::TServer` | |
error: no method named `listen` found for type `thrift::server::TServer<thrift::server::TMultiplexedProcessor, std::boxed::Box<thrift::transport::TReadTransportFactory>, std::boxed::Box<thrift::protocol::TInputProtocolFactory>, std::boxed::Box<thrift::transport::TWriteTransportFactory>, std::boxed::Box<thrift::protocol::TOutputProtocolFactory>>` in the current scope | |
--> src/bin/test_server.rs:138:20 | |
| | |
138 | server.listen(&listen_address) | |
| ^^^^^^ | |
| | |
= note: the method `listen` exists but the following trait bounds were not satisfied: `thrift::server::TMultiplexedProcessor : std::marker::Send`, `thrift::server::TMultiplexedProcessor : std::marker::Sync` | |
error: aborting due to 3 previous errors |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed to the Apache Software Foundation (ASF) under one | |
// or more contributor license agreements. See the NOTICE file | |
// distributed with this work for additional information | |
// regarding copyright ownership. The ASF licenses this file | |
// to you under the Apache License, Version 2.0 (the | |
// "License"); you may not use this file except in compliance | |
// with the License. You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, | |
// software distributed under the License is distributed on an | |
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
// KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations | |
// under the License. | |
use std::collections::HashMap; | |
use std::convert::Into; | |
use std::sync::{Arc, Mutex}; | |
use {ApplicationErrorKind, new_application_error}; | |
use protocol::{TInputProtocol, TMessageIdentifier, TOutputProtocol, TStoredInputProtocol}; | |
use super::TProcessor; | |
/// A `TProcessor` that can demux service calls to multiple underlying | |
/// Thrift services. | |
/// | |
/// Users register service-specific `TProcessor` instances with a | |
/// `TMultiplexedProcessor`, and then register that processor with a server | |
/// implementation. Following that, all incoming service calls are automatically | |
/// routed to the service-specific `TProcessor`. | |
/// | |
/// A `TMultiplexedProcessor` can only handle messages sent by a | |
/// `TMultiplexedOutputProtocol`. | |
// FIXME: implement Debug | |
pub struct TMultiplexedProcessor { | |
processors: Mutex<HashMap<String, Arc<Box<TProcessor + Send>>>>, | |
} | |
impl TMultiplexedProcessor { | |
/// Create a new `TMultiplexedProcessor` with no registered service-specific | |
/// processors. | |
pub fn new() -> TMultiplexedProcessor { | |
TMultiplexedProcessor { | |
processors: Mutex::new(HashMap::new()) | |
} | |
} | |
/// Register a service-specific `processor` for the service named | |
/// `service_name`. | |
/// | |
/// Return `true` if this is the first registration for `service_name`. | |
/// | |
/// Return `false` if a mapping previously existed (the previous mapping is | |
/// *not* overwritten). | |
#[cfg_attr(feature = "cargo-clippy", allow(map_entry))] | |
pub fn register<S: Into<String>>( | |
&mut self, | |
service_name: S, | |
processor: Box<TProcessor + Send>, | |
) -> bool { | |
let mut processors = self.processors.lock().unwrap(); | |
let name = service_name.into(); | |
if processors.contains_key(&name) { | |
false | |
} else { | |
processors.insert(name, Arc::new(processor)); | |
true | |
} | |
} | |
} | |
impl TProcessor for TMultiplexedProcessor { | |
fn process(&self, i_prot: &mut TInputProtocol, o_prot: &mut TOutputProtocol) -> ::Result<()> { | |
let msg_ident = i_prot.read_message_begin()?; | |
let sep_index = msg_ident | |
.name | |
.find(':') | |
.ok_or_else( | |
|| { | |
new_application_error( | |
ApplicationErrorKind::Unknown, | |
"no service separator found in incoming message", | |
) | |
}, | |
)?; | |
let (svc_name, svc_call) = msg_ident.name.split_at(sep_index); | |
let processor: Option<Arc<Box<TProcessor + Send>>> = { | |
let processors = self.processors.lock().unwrap(); | |
processors.get(svc_name).cloned() | |
}; | |
match processor { | |
Some(arc) => { | |
let new_msg_ident = TMessageIdentifier::new( | |
svc_call, | |
msg_ident.message_type, | |
msg_ident.sequence_number, | |
); | |
let mut proxy_i_prot = TStoredInputProtocol::new(i_prot, new_msg_ident); | |
(*arc).process(&mut proxy_i_prot, o_prot) | |
} | |
None => { | |
Err( | |
new_application_error( | |
ApplicationErrorKind::Unknown, | |
format!("no processor found for service {}", svc_name), | |
), | |
) | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed to the Apache Software Foundation (ASF) under one | |
// or more contributor license agreements. See the NOTICE file | |
// distributed with this work for additional information | |
// regarding copyright ownership. The ASF licenses this file | |
// to you under the Apache License, Version 2.0 (the | |
// "License"); you may not use this file except in compliance | |
// with the License. You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, | |
// software distributed under the License is distributed on an | |
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
// KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations | |
// under the License. | |
use std::net::{TcpListener, TcpStream}; | |
use std::sync::Arc; | |
use threadpool::ThreadPool; | |
use {ApplicationError, ApplicationErrorKind}; | |
use protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory}; | |
use transport::{TIoChannel, TReadTransportFactory, TTcpChannel, TWriteTransportFactory}; | |
use super::TProcessor; | |
/// Fixed-size thread-pool blocking Thrift server. | |
/// | |
/// A `TServer` listens on a given address and submits accepted connections | |
/// to an **unbounded** queue. Connections from this queue are serviced by | |
/// the first available worker thread from a **fixed-size** thread pool. Each | |
/// accepted connection is handled by that worker thread, and communication | |
/// over this thread occurs sequentially and synchronously (i.e. calls block). | |
/// Accepted connections have an input half and an output half, each of which | |
/// uses a `TTransport` and `TInputProtocol`/`TOutputProtocol` to translate | |
/// messages to and from byes. Any combination of `TInputProtocol`, `TOutputProtocol` | |
/// and `TTransport` may be used. | |
/// | |
/// # Examples | |
/// | |
/// Creating and running a `TServer` using Thrift-compiler-generated | |
/// service code. | |
/// | |
/// ```no_run | |
/// use thrift; | |
/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; | |
/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory}; | |
/// use thrift::protocol::{TInputProtocol, TOutputProtocol}; | |
/// use thrift::transport::{TBufferedReadTransportFactory, TBufferedWriteTransportFactory, TReadTransportFactory, TWriteTransportFactory}; | |
/// use thrift::server::{TProcessor, TServer}; | |
/// | |
/// // | |
/// // auto-generated | |
/// // | |
/// | |
/// // processor for `SimpleService` | |
/// struct SimpleServiceSyncProcessor; | |
/// impl SimpleServiceSyncProcessor { | |
/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor { | |
/// unimplemented!(); | |
/// } | |
/// } | |
/// | |
/// // `TProcessor` implementation for `SimpleService` | |
/// impl TProcessor for SimpleServiceSyncProcessor { | |
/// fn process(&self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> { | |
/// unimplemented!(); | |
/// } | |
/// } | |
/// | |
/// // service functions for SimpleService | |
/// trait SimpleServiceSyncHandler { | |
/// fn service_call(&self) -> thrift::Result<()>; | |
/// } | |
/// | |
/// // | |
/// // user-code follows | |
/// // | |
/// | |
/// // define a handler that will be invoked when `service_call` is received | |
/// struct SimpleServiceHandlerImpl; | |
/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl { | |
/// fn service_call(&self) -> thrift::Result<()> { | |
/// unimplemented!(); | |
/// } | |
/// } | |
/// | |
/// // instantiate the processor | |
/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {}); | |
/// | |
/// // instantiate the server | |
/// let i_tr_fact: Box<TReadTransportFactory> = Box::new(TBufferedReadTransportFactory::new()); | |
/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new()); | |
/// let o_tr_fact: Box<TWriteTransportFactory> = Box::new(TBufferedWriteTransportFactory::new()); | |
/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new()); | |
/// | |
/// let mut server = TServer::new( | |
/// i_tr_fact, | |
/// i_pr_fact, | |
/// o_tr_fact, | |
/// o_pr_fact, | |
/// processor, | |
/// 10 | |
/// ); | |
/// | |
/// // start listening for incoming connections | |
/// match server.listen("127.0.0.1:8080") { | |
/// Ok(_) => println!("listen completed"), | |
/// Err(e) => println!("listen failed with error {:?}", e), | |
/// } | |
/// ``` | |
#[derive(Debug)] | |
pub struct TServer<PRC, RTF, IPF, WTF, OPF> | |
where | |
PRC: TProcessor + Send + Sync + 'static, | |
RTF: TReadTransportFactory + 'static, | |
IPF: TInputProtocolFactory + 'static, | |
WTF: TWriteTransportFactory + 'static, | |
OPF: TOutputProtocolFactory + 'static, | |
{ | |
r_trans_factory: RTF, | |
i_proto_factory: IPF, | |
w_trans_factory: WTF, | |
o_proto_factory: OPF, | |
processor: Arc<PRC>, | |
worker_pool: ThreadPool, | |
} | |
impl<PRC, RTF, IPF, WTF, OPF> TServer<PRC, RTF, IPF, WTF, OPF> | |
where PRC: TProcessor + Send + Sync + 'static, | |
RTF: TReadTransportFactory + 'static, | |
IPF: TInputProtocolFactory + 'static, | |
WTF: TWriteTransportFactory + 'static, | |
OPF: TOutputProtocolFactory + 'static { | |
/// Create a `TServer`. | |
/// | |
/// Each accepted connection has an input and output half, each of which | |
/// requires a `TTransport` and `TProtocol`. `TServer` uses | |
/// `read_transport_factory` and `input_protocol_factory` to create | |
/// implementations for the input, and `write_transport_factory` and | |
/// `output_protocol_factory` to create implementations for the output. | |
pub fn new( | |
read_transport_factory: RTF, | |
input_protocol_factory: IPF, | |
write_transport_factory: WTF, | |
output_protocol_factory: OPF, | |
processor: PRC, | |
num_workers: usize, | |
) -> TServer<PRC, RTF, IPF, WTF, OPF> { | |
TServer { | |
r_trans_factory: read_transport_factory, | |
i_proto_factory: input_protocol_factory, | |
w_trans_factory: write_transport_factory, | |
o_proto_factory: output_protocol_factory, | |
processor: Arc::new(processor), | |
worker_pool: ThreadPool::new_with_name( | |
"Thrift service processor".to_owned(), | |
num_workers, | |
), | |
} | |
} | |
/// Listen for incoming connections on `listen_address`. | |
/// | |
/// `listen_address` should be in the form `host:port`, | |
/// for example: `127.0.0.1:8080`. | |
/// | |
/// Return `()` if successful. | |
/// | |
/// Return `Err` when the server cannot bind to `listen_address` or there | |
/// is an unrecoverable error. | |
pub fn listen(&mut self, listen_address: &str) -> ::Result<()> { | |
let listener = TcpListener::bind(listen_address)?; | |
for stream in listener.incoming() { | |
match stream { | |
Ok(s) => { | |
let (i_prot, o_prot) = self.new_protocols_for_connection(s)?; | |
let processor = self.processor.clone(); | |
self.worker_pool | |
.execute(move || handle_incoming_connection(processor, i_prot, o_prot),); | |
} | |
Err(e) => { | |
warn!("failed to accept remote connection with error {:?}", e); | |
} | |
} | |
} | |
Err( | |
::Error::Application( | |
ApplicationError { | |
kind: ApplicationErrorKind::Unknown, | |
message: "aborted listen loop".into(), | |
}, | |
), | |
) | |
} | |
fn new_protocols_for_connection( | |
&mut self, | |
stream: TcpStream, | |
) -> ::Result<(Box<TInputProtocol + Send>, Box<TOutputProtocol + Send>)> { | |
// create the shared tcp stream | |
let channel = TTcpChannel::with_stream(stream); | |
// split it into two - one to be owned by the | |
// input tran/proto and the other by the output | |
let (r_chan, w_chan) = channel.split()?; | |
// input protocol and transport | |
let r_tran = self.r_trans_factory.create(Box::new(r_chan)); | |
let i_prot = self.i_proto_factory.create(r_tran); | |
// output protocol and transport | |
let w_tran = self.w_trans_factory.create(Box::new(w_chan)); | |
let o_prot = self.o_proto_factory.create(w_tran); | |
Ok((i_prot, o_prot)) | |
} | |
} | |
fn handle_incoming_connection<PRC>( | |
processor: Arc<PRC>, | |
i_prot: Box<TInputProtocol>, | |
o_prot: Box<TOutputProtocol>, | |
) where | |
PRC: TProcessor, | |
{ | |
let mut i_prot = i_prot; | |
let mut o_prot = o_prot; | |
loop { | |
let r = processor.process(&mut *i_prot, &mut *o_prot); | |
if let Err(e) = r { | |
warn!("processor completed with error: {:?}", e); | |
break; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment