Last active
January 22, 2024 09:47
-
-
Save jaytaph/d75eba7056a3e94bf00e822a5e977691 to your computer and use it in GitHub Desktop.
ipc in rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use fork::{fork, Fork}; | |
use std::{process, thread}; | |
use std::thread::sleep; | |
use anyhow::{anyhow, Error}; | |
use ipc_channel::ipc::{IpcReceiver, IpcSelectionResult}; | |
use ipc_channel::ipc::{self, IpcReceiverSet, IpcSender}; | |
use serde::{Deserialize, Serialize}; | |
use rand::Rng; | |
const MAX_PRIORITY: i8 = 127; | |
const MIN_PRIORITY: i8 = -128; | |
const DEFAULT_PRIORITY: i8 = 0; | |
/// A GIPC channel is a connection to another process. It is used to send and receive messages. However, due to the fact | |
/// that we cannot clone the IpcReceiver, we need to store the IpcSender in the GipcChannel struct, and the IpcReceiver | |
/// in the IpcReceiverSet. Bummer... | |
#[allow(dead_code)] | |
struct GipcChannel { | |
/// The name of the channel | |
name: String, | |
out_chan: IpcSender<GipcMessage> | |
} | |
/// A GIPC message is a message that is sent between processes. It contains the source and destination of the message, | |
/// the priority of the message, and the message itself. | |
#[derive(Serialize, Deserialize, Debug)] | |
struct GipcMessage { | |
/// The source name of the channel, the GIPC broker will fill this is, even if the user has set it. | |
source: String, | |
/// The destination name of the channel | |
dest: String, | |
/// When set, the channel that this message is a reply to | |
in_reply_to: Option<String>, | |
/// Priority of the message (+127 (highest) to -128 (lowest)) | |
priority: i8, | |
/// The message to send (this needs to be a protobuf type of system, but we use a simple string for now) | |
msg: String, | |
} | |
/// The GIPC broker. The system that deals with sending and receiving messages between processes. It also allows processes | |
/// to register their connection. | |
struct GipcBroker { | |
/// The registered channels | |
channels: Vec<GipcChannel>, | |
/// When a message needs to be send, it is added to the message queue. The GIPC broker will send the message to the | |
/// destination process based on priority. | |
msg_queue: Vec<GipcMessage>, | |
/// The IPC receiver set. receive channels are not stored in the GipcChannel struct, because we cannot share/clone them between the | |
/// gipc-channel struct and the ipc-receiver-set. | |
ipc_rx_set: IpcReceiverSet, | |
} | |
/// A gipc client is a connection to the GIPC broker. It is used to send and receive messages from a child process back to the broker (or other clients) | |
struct GipcClient { | |
/// The name of the client | |
name: String, | |
/// The IPC sender | |
tx: IpcSender<GipcMessage>, | |
/// The IPC receiver | |
rx: IpcReceiver<GipcMessage>, | |
} | |
impl GipcClient { | |
/// Create a new client with a specific name and tx/rx channels | |
fn new(name: &str, tx: IpcSender<GipcMessage>, rx: IpcReceiver<GipcMessage>) -> Self { | |
GipcClient { | |
name: name.to_string(), | |
tx, | |
rx, | |
} | |
} | |
/// Send a message to given destination | |
fn send_message(&self, dest: &str, prio: i8, msg: &str) -> Result<(), Error> { | |
let res = self.tx.send(GipcMessage { | |
source: self.name.clone(), | |
dest: dest.to_string(), | |
in_reply_to: None, | |
priority: prio, | |
msg: msg.to_string(), | |
}); | |
if res.is_ok() { | |
return Ok(()); | |
} | |
return Err(anyhow!("Failed to send message")); | |
} | |
} | |
// unsafe impl Send for GipcBroker {} | |
// unsafe impl Sync for GipcBroker {} | |
impl GipcBroker { | |
/// Simple init | |
fn init() -> GipcBroker { | |
GipcBroker { | |
channels: Vec::new(), | |
msg_queue: Vec::new(), | |
ipc_rx_set: IpcReceiverSet::new().unwrap(), | |
} | |
} | |
/// Register a new channel. This will create a new channel, and return the endpoints that needs to be used by the | |
/// child process. | |
fn register(&mut self, name: &str) -> Result<GipcClient, Error> { | |
for channel in &self.channels { | |
if channel.name == name { | |
return Err(anyhow!("Channel {} already exists", name)); | |
} | |
} | |
// IPC connections are unidirectional, so we need to create two channels for each connection | |
let (tx_to, rx_to) = ipc::channel::<GipcMessage>().unwrap(); | |
let (tx_from, rx_from) = ipc::channel::<GipcMessage>().unwrap(); | |
let channel = GipcChannel { | |
name: name.to_string(), | |
out_chan: tx_to, | |
// in_chan: rx_from, | |
}; | |
self.channels.push(channel); | |
self.ipc_rx_set.add(rx_from).unwrap(); | |
// Return the gipc client that the child process can use to send and receive messages | |
Ok(GipcClient::new(name, tx_from, rx_to)) | |
} | |
/// Not yet used. This will send a message to the destination process. | |
#[allow(dead_code)] | |
fn send(&mut self, msg: GipcMessage) -> Result<(), String> { | |
let mut found = false; | |
for channel in &self.channels { | |
if channel.name == msg.dest { | |
found = true; | |
break; | |
} | |
} | |
if !found { | |
return Err(format!("Channel {} does not exist", msg.dest)); | |
} | |
self.msg_queue.push(msg); | |
Ok(()) | |
} | |
/// The main loop of the broker. This will handle incoming and outgoing IPC messages. Note: should we have | |
/// a separate thread for outgoing messages? | |
fn run(&mut self) { | |
loop { | |
println!("[{}] IPC thread loop", process::id()); | |
for event in self.ipc_rx_set.select().unwrap() { | |
match event { | |
IpcSelectionResult::MessageReceived(id, message) => { | |
let msg: GipcMessage = message.to().unwrap(); | |
println!("[{}] Message {} received: {:?}", process::id(), id, msg); | |
} | |
IpcSelectionResult::ChannelClosed(id) => { | |
println!("[{}] Channel closed: {}", process::id(), id); | |
} | |
} | |
} | |
} | |
} | |
} | |
fn main() { | |
println!("[{}] Hello, world!", process::id()); | |
let mut broker = GipcBroker::init(); | |
broker.register("main").expect("Failed to register main"); | |
// register some channels.. it's fine when the broker has not started yet | |
create_session(&mut broker, "session#1"); | |
create_session(&mut broker, "session#2"); | |
// start the broker in a new thread | |
println!("[{}] Starting IPC thread", process::id()); | |
thread::spawn(move || broker.run()); | |
// Register some more channels afterwards. This fails as the broker is moved to the thread. | |
create_session(&mut broker, "session#3"); | |
create_session(&mut broker, "session#4"); | |
sleep(std::time::Duration::from_secs(100)); | |
println!("[{}] End program", process::id()); | |
} | |
/// THis will create a new session. It will fork a new process, and send messages to the main process. | |
fn create_session(broker: &mut GipcBroker, name: &str) { | |
// Register a new client to the broker | |
let gipc_client = broker.register(name).expect("Failed to register broker client"); | |
match fork() { | |
Ok(Fork::Parent(child)) => { | |
// Just continue execution | |
println!("[{}] fork::parent (child has: {})", process::id(), child); | |
} | |
Ok(Fork::Child) => { | |
// Child process | |
println!("[{}] fork::child", process::id()); | |
println!("[{}] Continuing execution in child process", process::id()); | |
loop { | |
gipc_client.send_message("main", DEFAULT_PRIORITY, &format!("Hello from child {} to main", name).to_string()).expect("Failed to send message"); | |
let ms = rand::thread_rng().gen_range(1000..4000); | |
sleep(std::time::Duration::from_millis(ms)); | |
} | |
} | |
Err(err) => panic!("Fork failed: {}", err), | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment