Created
April 11, 2017 01:38
-
-
Save allengeorge/b9122ed8951f837447744af1a3034d34 to your computer and use it in GitHub Desktop.
sync errors
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
allen@mrwiggles ~/s/r/t/l/rs> cargo build | |
Compiling thrift v1.0.0 (file:///Users/allen/src/rust_projects/thrift/lib/rs) | |
error[E0277]: the trait bound `transport::TTransportFactory + 'static: std::marker::Send` is not satisfied | |
--> src/server/threaded.rs:160:34 | |
| | |
160 | let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot)); | |
| ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `transport::TTransportFactory + 'static` | |
| | |
= note: `transport::TTransportFactory + 'static` cannot be sent between threads safely | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<transport::TTransportFactory + 'static>` | |
= note: required because it appears within the type `std::boxed::Box<transport::TTransportFactory + 'static>` | |
= note: required because it appears within the type `server::threaded::TThreadedServer<PR>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `&mut server::threaded::TThreadedServer<PR>` | |
= note: required because it appears within the type `[closure@src/server/threaded.rs:160:48: 160:119 self:&mut server::threaded::TThreadedServer<PR>, i_prot:std::boxed::Box<protocol::TInputProtocol>, o_prot:std::boxed::Box<protocol::TOutputProtocol>]` | |
= note: required by `std::thread::spawn` | |
error[E0277]: the trait bound `protocol::TInputProtocolFactory + 'static: std::marker::Send` is not satisfied | |
--> src/server/threaded.rs:160:34 | |
| | |
160 | let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot)); | |
| ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `protocol::TInputProtocolFactory + 'static` | |
| | |
= note: `protocol::TInputProtocolFactory + 'static` cannot be sent between threads safely | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<protocol::TInputProtocolFactory + 'static>` | |
= note: required because it appears within the type `std::boxed::Box<protocol::TInputProtocolFactory + 'static>` | |
= note: required because it appears within the type `server::threaded::TThreadedServer<PR>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `&mut server::threaded::TThreadedServer<PR>` | |
= note: required because it appears within the type `[closure@src/server/threaded.rs:160:48: 160:119 self:&mut server::threaded::TThreadedServer<PR>, i_prot:std::boxed::Box<protocol::TInputProtocol>, o_prot:std::boxed::Box<protocol::TOutputProtocol>]` | |
= note: required by `std::thread::spawn` | |
error[E0277]: the trait bound `protocol::TOutputProtocolFactory + 'static: std::marker::Send` is not satisfied | |
--> src/server/threaded.rs:160:34 | |
| | |
160 | let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot)); | |
| ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `protocol::TOutputProtocolFactory + 'static` | |
| | |
= note: `protocol::TOutputProtocolFactory + 'static` cannot be sent between threads safely | |
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<protocol::TOutputProtocolFactory + 'static>` | |
= note: required because it appears within the type `std::boxed::Box<protocol::TOutputProtocolFactory + 'static>` | |
= note: required because it appears within the type `server::threaded::TThreadedServer<PR>` | |
= note: required because of the requirements on the impl of `std::marker::Send` for `&mut server::threaded::TThreadedServer<PR>` | |
= note: required because it appears within the type `[closure@src/server/threaded.rs:160:48: 160:119 self:&mut server::threaded::TThreadedServer<PR>, i_prot:std::boxed::Box<protocol::TInputProtocol>, o_prot:std::boxed::Box<protocol::TOutputProtocol>]` | |
= note: required by `std::thread::spawn` | |
error: aborting due to 3 previous errors | |
error: Could not compile `thrift`. | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Licensed to the Apache Software Foundation (ASF) under one | |
// or more contributor license agreements. See the NOTICE file | |
// distributed with this work for additional information | |
// regarding copyright ownership. The ASF licenses this file | |
// to you under the Apache License, Version 2.0 (the | |
// "License"); you may not use this file except in compliance | |
// with the License. You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, | |
// software distributed under the License is distributed on an | |
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
// KIND, either express or implied. See the License for the | |
// specific language governing permissions and limitations | |
// under the License. | |
use std::cell::RefCell; | |
use std::net::{TcpListener, TcpStream}; | |
use std::rc::Rc; | |
use std::thread; | |
use std::thread::JoinHandle; | |
use ::{ApplicationError, ApplicationErrorKind}; | |
use ::protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory}; | |
use ::transport::{TTcpChannel, TIOChannel, TTransport, TTransportFactory}; | |
use super::TProcessor; | |
/// Unbounded thread-per-blocking-connection Thrift socket server. | |
/// | |
/// A `TThreadedServer` listens on a given address and spawns a *new thread* | |
/// per accepted connection. Each accepted connection is handled separately, | |
/// and requests for each connection are handled *synchronously* and | |
/// *sequentially* - i.e. in a blocking manner. Each accepted connection has an | |
/// input half and an output half, each of which uses a `TTransport` and `TProtocol` | |
/// to translate messages to and from byes. Any combination of `TProtocol` and | |
/// `TTransport` may be used. | |
/// | |
/// # Examples | |
/// | |
/// Creating and running a `TThreadedServer` using Thrift-compiler-generated | |
/// service code. | |
/// | |
/// ```no_run | |
/// use thrift; | |
/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory}; | |
/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory}; | |
/// use thrift::protocol::{TInputProtocol, TOutputProtocol}; | |
/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory}; | |
/// use thrift::server::{TProcessor, TThreadedServer}; | |
/// | |
/// // | |
/// // auto-generated | |
/// // | |
/// | |
/// // processor for `SimpleService` | |
/// struct SimpleServiceSyncProcessor; | |
/// impl SimpleServiceSyncProcessor { | |
/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor { | |
/// unimplemented!(); | |
/// } | |
/// } | |
/// | |
/// // `TProcessor` implementation for `SimpleService` | |
/// impl TProcessor for SimpleServiceSyncProcessor { | |
/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> { | |
/// unimplemented!(); | |
/// } | |
/// } | |
/// | |
/// // service functions for SimpleService | |
/// trait SimpleServiceSyncHandler { | |
/// fn service_call(&mut self) -> thrift::Result<()>; | |
/// } | |
/// | |
/// // | |
/// // user-code follows | |
/// // | |
/// | |
/// // define a handler that will be invoked when `service_call` is received | |
/// struct SimpleServiceHandlerImpl; | |
/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl { | |
/// fn service_call(&mut self) -> thrift::Result<()> { | |
/// unimplemented!(); | |
/// } | |
/// } | |
/// | |
/// // instantiate the processor | |
/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {}); | |
/// | |
/// // instantiate the server | |
/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new()); | |
/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new()); | |
/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new()); | |
/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new()); | |
/// | |
/// let mut server = TThreadedServer::new( | |
/// i_tr_fact, | |
/// i_pr_fact, | |
/// o_tr_fact, | |
/// o_pr_fact, | |
/// processor | |
/// ); | |
/// | |
/// // start listening for incoming connections | |
/// match server.listen("127.0.0.1:8080") { | |
/// Ok(_) => println!("listen completed"), | |
/// Err(e) => println!("listen failed with error {:?}", e), | |
/// } | |
/// ``` | |
pub struct TThreadedServer<PR: TProcessor + Send + Sync + 'static> { | |
i_trans_factory: Box<TTransportFactory>, | |
i_proto_factory: Box<TInputProtocolFactory>, | |
o_trans_factory: Box<TTransportFactory>, | |
o_proto_factory: Box<TOutputProtocolFactory>, | |
processing_threads: Vec<JoinHandle<()>>, | |
processor: PR, | |
} | |
impl<PR: TProcessor + Send + Sync + 'static> TThreadedServer<PR> { | |
/// Create a `TThreadedServer`. | |
/// | |
/// Each accepted connection has an input and output half, each of which | |
/// requires a `TTransport` and `TProtocol`. `TThreadedServer` uses | |
/// `input_transport_factory` and `input_protocol_factory` to create | |
/// implementations for the input, and `output_transport_factory` and | |
/// `output_protocol_factory` to create implementations for the output. | |
pub fn new(input_transport_factory: Box<TTransportFactory>, | |
input_protocol_factory: Box<TInputProtocolFactory>, | |
output_transport_factory: Box<TTransportFactory>, | |
output_protocol_factory: Box<TOutputProtocolFactory>, | |
processor: PR) | |
-> TThreadedServer<PR> { | |
TThreadedServer { | |
i_trans_factory: input_transport_factory, | |
i_proto_factory: input_protocol_factory, | |
o_trans_factory: output_transport_factory, | |
o_proto_factory: output_protocol_factory, | |
processing_threads: Vec::new(), | |
processor: processor, | |
} | |
} | |
/// Listen for incoming connections on `listen_address`. | |
/// | |
/// `listen_address` should be in the form `host:port`, | |
/// for example: `127.0.0.1:8080`. | |
/// | |
/// Return `()` if successful. | |
/// | |
/// Return `Err` when the server cannot bind to `listen_address` or there | |
/// is an unrecoverable error. | |
pub fn listen(&mut self, listen_address: &str) -> ::Result<()> { | |
let listener = TcpListener::bind(listen_address)?; | |
for stream in listener.incoming() { | |
match stream { | |
Ok(s) => { | |
let (i_prot, o_prot) = self.new_protocols_for_connection(s)?; | |
let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot)); | |
}, | |
Err(e) => { | |
warn!("failed to accept remote connection with error {:?}", e); | |
}, | |
} | |
} | |
Err(::Error::Application(ApplicationError { | |
kind: ApplicationErrorKind::Unknown, | |
message: "aborted listen loop".into(), | |
})) | |
} | |
fn new_protocols_for_connection(&mut self, stream: TcpStream) -> ::Result<(Box<TInputProtocol>, Box<TOutputProtocol>)>{ | |
// create the shared tcp stream | |
let channel = TTcpChannel::with_stream(stream); | |
let (i_chan, o_chan) = channel.try_clone()?; | |
// input protocol and transport | |
let i_tran = self.i_trans_factory.create(i_chan); | |
let i_prot = self.i_proto_factory.create(i_tran); | |
// output protocol and transport | |
let o_tran = self.o_trans_factory.create(o_chan); | |
let o_prot = self.o_proto_factory.create(o_tran); | |
Ok((i_prot, o_prot)) | |
} | |
} | |
impl <PR: TProcessor + Send + Sync> Drop for TThreadedServer<PR> { | |
fn drop(&mut self) { | |
for h in &self.processing_threads { | |
if !h.join().is_ok() { | |
println!("failed to terminate and join processing thread {:?}", h.thread()) | |
} | |
} | |
self.processing_threads.clear(); | |
} | |
} | |
fn handle_incoming_connection<PR>(processor: &mut PR, i_prot: Box<TInputProtocol>, o_prot: Box<TOutputProtocol>) where PR: TProcessor { | |
loop { | |
let r = processor.process(&mut *i_prot, &mut *o_prot); | |
if let Err(e) = r { | |
warn!("processor failed with error: {:?}", e); | |
break; // FIXME: close here | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment