-
-
Save nohupped/1e7e8222b74107eadaf6b9a77423d9ff to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Please find the full, tested version in | |
// https://github.com/influxdata/influxdb_iox/blob/fe155e15fb2ad166aee66b0458e63c24a8128dd4/query/src/exec/task.rs#L101-L118 | |
pub struct DedicatedExecutor { | |
state: Arc<Mutex<State>>, | |
} | |
/// Runs futures (and any `tasks` that are `tokio::task::spawned` by | |
/// them) on a separate Tokio Executor | |
struct State { | |
/// Channel for requests -- the dedicated executor takes requests | |
/// from here and runs them. | |
requests: Option<std::sync::mpsc::Sender<Task>>, | |
/// Thread which has a different Tokio runtime | |
/// installed and spawns tasks there | |
thread: Option<std::thread::JoinHandle<()>>, | |
} | |
impl DedicatedExecutor { | |
/// Creates a new `DedicatedExecutor` with a dedicated Tokio | |
/// executor that is separate from the threadpool created via | |
/// `[tokio::main]`. | |
pub fn new(thread_name: &str, num_threads: usize) -> Self { | |
let thread_name = thread_name.to_string(); | |
let (tx, rx) = std::sync::mpsc::channel::<Task>(); | |
let thread = std::thread::spawn(move || { | |
// Create a new Runtime to run tasks | |
let runtime = tokio::runtime::Builder::new_multi_thread() | |
.enable_all() | |
.thread_name(&thread_name) | |
.worker_threads(num_threads) | |
// Lower OS priority of worker threads to prioritize main runtime | |
.on_thread_start(move || set_current_thread_priority_low()) | |
.build() | |
.expect("Creating Tokio runtime"); | |
// Pull task requests off the channel and send them to the executor | |
runtime.block_on(async move { | |
while let Ok(task) = rx.recv() { | |
tokio::task::spawn(async move { | |
task.run().await; | |
}); | |
} | |
let state = State { | |
requests: Some(tx), | |
thread: Some(thread), | |
}; | |
Self { | |
state: Arc::new(Mutex::new(state)), | |
} | |
} | |
/// Runs the specified Future (and any tasks it spawns) on the | |
/// `DedicatedExecutor`. | |
pub fn spawn<T>(&self, task: T) -> Job<T::Output> | |
where | |
T: Future + Send + 'static, | |
T::Output: Send + 'static, | |
{ | |
let (tx, rx) = tokio::sync::oneshot::channel(); | |
let fut = Box::pin(async move { | |
let task_output = task.await; | |
tx.send(task_output).ok() | |
}); | |
let mut state = self.state.lock(); | |
let task = Task { | |
fut, | |
}; | |
if let Some(requests) = &mut state.requests { | |
// would fail if someone has started shutdown | |
requests.send(task).ok(); | |
} else { | |
warn!("tried to schedule task on an executor that was shutdown"); | |
} | |
Job { rx, cancel } | |
} | |
#[pin_project(PinnedDrop)] | |
pub struct Job<T> { | |
#[pin] | |
rx: Receiver<T>, | |
} | |
impl<T> Future for Job<T> { | |
type Output = Result<T, Error>; | |
fn poll( | |
self: Pin<&mut Self>, | |
cx: &mut std::task::Context<'_>, | |
) -> std::task::Poll<Self::Output> { | |
let this = self.project(); | |
this.rx.poll(cx) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment