Skip to content

Instantly share code, notes, and snippets.

@patrickelectric
Last active September 23, 2024 12:35
Show Gist options
  • Save patrickelectric/0bd8315894fec32cdaa69f8119de1da8 to your computer and use it in GitHub Desktop.
Save patrickelectric/0bd8315894fec32cdaa69f8119de1da8 to your computer and use it in GitHub Desktop.
This is a Zenoh-pico test where my PC and the ESP32S2 waits for each other to publish data and to answer. The ESP32S2 waits for data in `demo/example/topic/0` than receives the values, increment +1, and send it back to the PC via `demo/example/topic/1`. The computer gets the data from `demo/example/topic/1` and publishes it back to `demo/example…
// This code runs in the ESP32S2
#include <Arduino.h>
#include <WiFi.h>
#include <zenoh-pico.h>
#define SSID "ssid"
#define PASS "PASS"
// Client mode values
#define MODE "peer"
#define CONNECT "" // If empty, it will scout
// Base topic and suffix
constexpr const char* BASE_TOPIC = "demo/example/topic/";
constexpr int TOPIC_SUFFIX = 0; // This is the single constexpr variable
z_owned_session_t s;
z_owned_subscriber_t sub;
z_owned_publisher_t pub;
char topic_a[256];
char topic_b[256];
void data_handler(z_loaned_sample_t *sample, void *arg) {
uint64_t value = 0;
z_bytes_deserialize_into_uint64(z_sample_payload(sample), &value);
z_owned_bytes_t payload;
z_bytes_serialize_from_uint64(&payload, value + 1);
if (z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL) < 0) {
Serial.println("Error while publishing data");
}
}
void setup() {
// Initialize Serial for debugging
Serial.begin(115200);
while (!Serial) {
delay(1000);
}
// Set WiFi in STA mode and trigger attachment
Serial.print("Connecting to WiFi...");
WiFi.mode(WIFI_STA);
WiFi.begin(SSID, PASS);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
}
Serial.println("OK");
// Build topic names based on the single constexpr variable
sprintf(topic_a, "%s%d", BASE_TOPIC, TOPIC_SUFFIX);
sprintf(topic_b, "%s%d", BASE_TOPIC, TOPIC_SUFFIX + 1);
// Initialize Zenoh Session and other parameters
z_owned_config_t config;
z_config_default(&config);
zp_config_insert(z_config_loan_mut(&config), Z_CONFIG_MODE_KEY, MODE);
if (strcmp(CONNECT, "") != 0) {
zp_config_insert(z_config_loan_mut(&config), Z_CONFIG_CONNECT_KEY, CONNECT);
}
// Open Zenoh session
Serial.print("Opening Zenoh Session...");
while (z_open(&s, z_config_move(&config), NULL) < 0) {
Serial.println("Unable to open session!");
delay(1000);
}
Serial.println("OK");
// Start read and lease tasks for zenoh-pico
if (zp_start_read_task(z_session_loan_mut(&s), NULL) < 0 ||
zp_start_lease_task(z_session_loan_mut(&s), NULL) < 0) {
Serial.println("Unable to start read and lease tasks");
z_close(z_session_move(&s), NULL);
while (1) { /* Infinite loop */ }
}
// Declare Zenoh subscriber for topic A
Serial.print("Declaring Subscriber on ");
Serial.print(topic_a);
Serial.println(" ...");
z_owned_closure_sample_t callback;
z_closure_sample(&callback, data_handler, NULL, NULL);
z_view_keyexpr_t ke_sub;
z_view_keyexpr_from_str_unchecked(&ke_sub, topic_a);
if (z_declare_subscriber(&sub, z_session_loan(&s), z_view_keyexpr_loan(&ke_sub),
z_closure_sample_move(&callback), NULL) < 0) {
Serial.println("Unable to declare subscriber.");
while (1) { /* Infinite loop */ }
}
Serial.println("OK");
// Declare Zenoh publisher for topic B
Serial.print("Declaring Publisher on ");
Serial.print(topic_b);
Serial.println(" ...");
z_view_keyexpr_t ke_pub;
z_view_keyexpr_from_str_unchecked(&ke_pub, topic_b);
if (z_declare_publisher(&pub, z_session_loan(&s), z_view_keyexpr_loan(&ke_pub), NULL) < 0) {
Serial.println("Unable to declare publisher.");
while (1) { /* Infinite loop */ }
}
Serial.println("OK");
Serial.println("Zenoh setup finished!");
pinMode(15, OUTPUT);
delay(300);
}
void loop() {
digitalWrite(15, HIGH);
z_sleep_us(1000*1000);
digitalWrite(15, LOW);
z_sleep_us(1000*1000);
}
// This code runs in my PC
use std::time::Instant;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() {
let session = zenoh::open(zenoh::config::Config::default()).await.unwrap();
// Define base topic and index
let base_topic = "demo/example/topic/";
let index = 0; // This is the single index variable
let topic_0 = format!("{}{}", base_topic, index);
let topic_1 = format!("{}{}", base_topic, index + 1);
// Declare subscriber to topic_1
let subscriber = session.declare_subscriber(&topic_1).await.unwrap();
println!("Subscribed to '{}'", topic_1);
// Declare publisher for topic_0
let publisher = session.declare_publisher(&topic_0).await.unwrap();
// Use a oneshot channel to signal when the loop is ready
let (tx, rx) = oneshot::channel();
// Spawn the receive loop
let topic_0_cloned = topic_0.clone();
let handler = tokio::spawn(async move {
// Signal that the loop has started
let _ = tx.send(());
let mut current_time = Instant::now();
let mut last_freq = 0.0;
// Main loop: receive messages and process them
while let Ok(sample) = subscriber.recv_async().await {
let value = sample.payload().into::<Vec<u8>>()[0];
session.put(&topic_0_cloned, value + 1).await.unwrap();
let elapsed = std::time::Instant::now().duration_since(current_time);
current_time = Instant::now();
let freq = 1.0 / elapsed.as_secs_f64();
// LPF
last_freq = 0.1 * freq + 0.9 * last_freq;
println!("Frequency: {:.2} Hz", last_freq);
}
});
// Wait for the loop to start
let _ = rx.await;
// Now publish the initial message to topic_0
let initial_value: u64 = 0;
publisher.put(initial_value).await.unwrap();
println!("Published initial message to '{}'", topic_0);
handler.await.unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment