Skip to content

Instantly share code, notes, and snippets.

@allengeorge
Created April 11, 2017 01:38
Show Gist options
  • Save allengeorge/b9122ed8951f837447744af1a3034d34 to your computer and use it in GitHub Desktop.
Save allengeorge/b9122ed8951f837447744af1a3034d34 to your computer and use it in GitHub Desktop.
sync errors
allen@mrwiggles ~/s/r/t/l/rs> cargo build
Compiling thrift v1.0.0 (file:///Users/allen/src/rust_projects/thrift/lib/rs)
error[E0277]: the trait bound `transport::TTransportFactory + 'static: std::marker::Send` is not satisfied
--> src/server/threaded.rs:160:34
|
160 | let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot));
| ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `transport::TTransportFactory + 'static`
|
= note: `transport::TTransportFactory + 'static` cannot be sent between threads safely
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<transport::TTransportFactory + 'static>`
= note: required because it appears within the type `std::boxed::Box<transport::TTransportFactory + 'static>`
= note: required because it appears within the type `server::threaded::TThreadedServer<PR>`
= note: required because of the requirements on the impl of `std::marker::Send` for `&mut server::threaded::TThreadedServer<PR>`
= note: required because it appears within the type `[closure@src/server/threaded.rs:160:48: 160:119 self:&mut server::threaded::TThreadedServer<PR>, i_prot:std::boxed::Box<protocol::TInputProtocol>, o_prot:std::boxed::Box<protocol::TOutputProtocol>]`
= note: required by `std::thread::spawn`
error[E0277]: the trait bound `protocol::TInputProtocolFactory + 'static: std::marker::Send` is not satisfied
--> src/server/threaded.rs:160:34
|
160 | let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot));
| ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `protocol::TInputProtocolFactory + 'static`
|
= note: `protocol::TInputProtocolFactory + 'static` cannot be sent between threads safely
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<protocol::TInputProtocolFactory + 'static>`
= note: required because it appears within the type `std::boxed::Box<protocol::TInputProtocolFactory + 'static>`
= note: required because it appears within the type `server::threaded::TThreadedServer<PR>`
= note: required because of the requirements on the impl of `std::marker::Send` for `&mut server::threaded::TThreadedServer<PR>`
= note: required because it appears within the type `[closure@src/server/threaded.rs:160:48: 160:119 self:&mut server::threaded::TThreadedServer<PR>, i_prot:std::boxed::Box<protocol::TInputProtocol>, o_prot:std::boxed::Box<protocol::TOutputProtocol>]`
= note: required by `std::thread::spawn`
error[E0277]: the trait bound `protocol::TOutputProtocolFactory + 'static: std::marker::Send` is not satisfied
--> src/server/threaded.rs:160:34
|
160 | let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot));
| ^^^^^^^^^^^^^ the trait `std::marker::Send` is not implemented for `protocol::TOutputProtocolFactory + 'static`
|
= note: `protocol::TOutputProtocolFactory + 'static` cannot be sent between threads safely
= note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<protocol::TOutputProtocolFactory + 'static>`
= note: required because it appears within the type `std::boxed::Box<protocol::TOutputProtocolFactory + 'static>`
= note: required because it appears within the type `server::threaded::TThreadedServer<PR>`
= note: required because of the requirements on the impl of `std::marker::Send` for `&mut server::threaded::TThreadedServer<PR>`
= note: required because it appears within the type `[closure@src/server/threaded.rs:160:48: 160:119 self:&mut server::threaded::TThreadedServer<PR>, i_prot:std::boxed::Box<protocol::TInputProtocol>, o_prot:std::boxed::Box<protocol::TOutputProtocol>]`
= note: required by `std::thread::spawn`
error: aborting due to 3 previous errors
error: Could not compile `thrift`.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::cell::RefCell;
use std::net::{TcpListener, TcpStream};
use std::rc::Rc;
use std::thread;
use std::thread::JoinHandle;
use ::{ApplicationError, ApplicationErrorKind};
use ::protocol::{TInputProtocol, TInputProtocolFactory, TOutputProtocol, TOutputProtocolFactory};
use ::transport::{TTcpChannel, TIOChannel, TTransport, TTransportFactory};
use super::TProcessor;
/// Unbounded thread-per-blocking-connection Thrift socket server.
///
/// A `TThreadedServer` listens on a given address and spawns a *new thread*
/// per accepted connection. Each accepted connection is handled separately,
/// and requests for each connection are handled *synchronously* and
/// *sequentially* - i.e. in a blocking manner. Each accepted connection has an
/// input half and an output half, each of which uses a `TTransport` and `TProtocol`
/// to translate messages to and from byes. Any combination of `TProtocol` and
/// `TTransport` may be used.
///
/// # Examples
///
/// Creating and running a `TThreadedServer` using Thrift-compiler-generated
/// service code.
///
/// ```no_run
/// use thrift;
/// use thrift::protocol::{TInputProtocolFactory, TOutputProtocolFactory};
/// use thrift::protocol::{TBinaryInputProtocolFactory, TBinaryOutputProtocolFactory};
/// use thrift::protocol::{TInputProtocol, TOutputProtocol};
/// use thrift::transport::{TBufferedTransportFactory, TTransportFactory};
/// use thrift::server::{TProcessor, TThreadedServer};
///
/// //
/// // auto-generated
/// //
///
/// // processor for `SimpleService`
/// struct SimpleServiceSyncProcessor;
/// impl SimpleServiceSyncProcessor {
/// fn new<H: SimpleServiceSyncHandler>(processor: H) -> SimpleServiceSyncProcessor {
/// unimplemented!();
/// }
/// }
///
/// // `TProcessor` implementation for `SimpleService`
/// impl TProcessor for SimpleServiceSyncProcessor {
/// fn process(&mut self, i: &mut TInputProtocol, o: &mut TOutputProtocol) -> thrift::Result<()> {
/// unimplemented!();
/// }
/// }
///
/// // service functions for SimpleService
/// trait SimpleServiceSyncHandler {
/// fn service_call(&mut self) -> thrift::Result<()>;
/// }
///
/// //
/// // user-code follows
/// //
///
/// // define a handler that will be invoked when `service_call` is received
/// struct SimpleServiceHandlerImpl;
/// impl SimpleServiceSyncHandler for SimpleServiceHandlerImpl {
/// fn service_call(&mut self) -> thrift::Result<()> {
/// unimplemented!();
/// }
/// }
///
/// // instantiate the processor
/// let processor = SimpleServiceSyncProcessor::new(SimpleServiceHandlerImpl {});
///
/// // instantiate the server
/// let i_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
/// let i_pr_fact: Box<TInputProtocolFactory> = Box::new(TBinaryInputProtocolFactory::new());
/// let o_tr_fact: Box<TTransportFactory> = Box::new(TBufferedTransportFactory::new());
/// let o_pr_fact: Box<TOutputProtocolFactory> = Box::new(TBinaryOutputProtocolFactory::new());
///
/// let mut server = TThreadedServer::new(
/// i_tr_fact,
/// i_pr_fact,
/// o_tr_fact,
/// o_pr_fact,
/// processor
/// );
///
/// // start listening for incoming connections
/// match server.listen("127.0.0.1:8080") {
/// Ok(_) => println!("listen completed"),
/// Err(e) => println!("listen failed with error {:?}", e),
/// }
/// ```
pub struct TThreadedServer<PR: TProcessor + Send + Sync + 'static> {
i_trans_factory: Box<TTransportFactory>,
i_proto_factory: Box<TInputProtocolFactory>,
o_trans_factory: Box<TTransportFactory>,
o_proto_factory: Box<TOutputProtocolFactory>,
processing_threads: Vec<JoinHandle<()>>,
processor: PR,
}
impl<PR: TProcessor + Send + Sync + 'static> TThreadedServer<PR> {
/// Create a `TThreadedServer`.
///
/// Each accepted connection has an input and output half, each of which
/// requires a `TTransport` and `TProtocol`. `TThreadedServer` uses
/// `input_transport_factory` and `input_protocol_factory` to create
/// implementations for the input, and `output_transport_factory` and
/// `output_protocol_factory` to create implementations for the output.
pub fn new(input_transport_factory: Box<TTransportFactory>,
input_protocol_factory: Box<TInputProtocolFactory>,
output_transport_factory: Box<TTransportFactory>,
output_protocol_factory: Box<TOutputProtocolFactory>,
processor: PR)
-> TThreadedServer<PR> {
TThreadedServer {
i_trans_factory: input_transport_factory,
i_proto_factory: input_protocol_factory,
o_trans_factory: output_transport_factory,
o_proto_factory: output_protocol_factory,
processing_threads: Vec::new(),
processor: processor,
}
}
/// Listen for incoming connections on `listen_address`.
///
/// `listen_address` should be in the form `host:port`,
/// for example: `127.0.0.1:8080`.
///
/// Return `()` if successful.
///
/// Return `Err` when the server cannot bind to `listen_address` or there
/// is an unrecoverable error.
pub fn listen(&mut self, listen_address: &str) -> ::Result<()> {
let listener = TcpListener::bind(listen_address)?;
for stream in listener.incoming() {
match stream {
Ok(s) => {
let (i_prot, o_prot) = self.new_protocols_for_connection(s)?;
let handle = thread::spawn(move || handle_incoming_connection(&mut self.processor, i_prot, o_prot));
},
Err(e) => {
warn!("failed to accept remote connection with error {:?}", e);
},
}
}
Err(::Error::Application(ApplicationError {
kind: ApplicationErrorKind::Unknown,
message: "aborted listen loop".into(),
}))
}
fn new_protocols_for_connection(&mut self, stream: TcpStream) -> ::Result<(Box<TInputProtocol>, Box<TOutputProtocol>)>{
// create the shared tcp stream
let channel = TTcpChannel::with_stream(stream);
let (i_chan, o_chan) = channel.try_clone()?;
// input protocol and transport
let i_tran = self.i_trans_factory.create(i_chan);
let i_prot = self.i_proto_factory.create(i_tran);
// output protocol and transport
let o_tran = self.o_trans_factory.create(o_chan);
let o_prot = self.o_proto_factory.create(o_tran);
Ok((i_prot, o_prot))
}
}
impl <PR: TProcessor + Send + Sync> Drop for TThreadedServer<PR> {
fn drop(&mut self) {
for h in &self.processing_threads {
if !h.join().is_ok() {
println!("failed to terminate and join processing thread {:?}", h.thread())
}
}
self.processing_threads.clear();
}
}
fn handle_incoming_connection<PR>(processor: &mut PR, i_prot: Box<TInputProtocol>, o_prot: Box<TOutputProtocol>) where PR: TProcessor {
loop {
let r = processor.process(&mut *i_prot, &mut *o_prot);
if let Err(e) = r {
warn!("processor failed with error: {:?}", e);
break; // FIXME: close here
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment