Last active
September 23, 2024 12:35
-
-
Save patrickelectric/0bd8315894fec32cdaa69f8119de1da8 to your computer and use it in GitHub Desktop.
This is a Zenoh-pico test where my PC and the ESP32S2 waits for each other to publish data and to answer. The ESP32S2 waits for data in `demo/example/topic/0` than receives the values, increment +1, and send it back to the PC via `demo/example/topic/1`. The computer gets the data from `demo/example/topic/1` and publishes it back to `demo/example…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This code runs in the ESP32S2 | |
#include <Arduino.h> | |
#include <WiFi.h> | |
#include <zenoh-pico.h> | |
#define SSID "ssid" | |
#define PASS "PASS" | |
// Client mode values | |
#define MODE "peer" | |
#define CONNECT "" // If empty, it will scout | |
// Base topic and suffix | |
constexpr const char* BASE_TOPIC = "demo/example/topic/"; | |
constexpr int TOPIC_SUFFIX = 0; // This is the single constexpr variable | |
z_owned_session_t s; | |
z_owned_subscriber_t sub; | |
z_owned_publisher_t pub; | |
char topic_a[256]; | |
char topic_b[256]; | |
void data_handler(z_loaned_sample_t *sample, void *arg) { | |
uint64_t value = 0; | |
z_bytes_deserialize_into_uint64(z_sample_payload(sample), &value); | |
z_owned_bytes_t payload; | |
z_bytes_serialize_from_uint64(&payload, value + 1); | |
if (z_publisher_put(z_publisher_loan(&pub), z_bytes_move(&payload), NULL) < 0) { | |
Serial.println("Error while publishing data"); | |
} | |
} | |
void setup() { | |
// Initialize Serial for debugging | |
Serial.begin(115200); | |
while (!Serial) { | |
delay(1000); | |
} | |
// Set WiFi in STA mode and trigger attachment | |
Serial.print("Connecting to WiFi..."); | |
WiFi.mode(WIFI_STA); | |
WiFi.begin(SSID, PASS); | |
while (WiFi.status() != WL_CONNECTED) { | |
delay(1000); | |
} | |
Serial.println("OK"); | |
// Build topic names based on the single constexpr variable | |
sprintf(topic_a, "%s%d", BASE_TOPIC, TOPIC_SUFFIX); | |
sprintf(topic_b, "%s%d", BASE_TOPIC, TOPIC_SUFFIX + 1); | |
// Initialize Zenoh Session and other parameters | |
z_owned_config_t config; | |
z_config_default(&config); | |
zp_config_insert(z_config_loan_mut(&config), Z_CONFIG_MODE_KEY, MODE); | |
if (strcmp(CONNECT, "") != 0) { | |
zp_config_insert(z_config_loan_mut(&config), Z_CONFIG_CONNECT_KEY, CONNECT); | |
} | |
// Open Zenoh session | |
Serial.print("Opening Zenoh Session..."); | |
while (z_open(&s, z_config_move(&config), NULL) < 0) { | |
Serial.println("Unable to open session!"); | |
delay(1000); | |
} | |
Serial.println("OK"); | |
// Start read and lease tasks for zenoh-pico | |
if (zp_start_read_task(z_session_loan_mut(&s), NULL) < 0 || | |
zp_start_lease_task(z_session_loan_mut(&s), NULL) < 0) { | |
Serial.println("Unable to start read and lease tasks"); | |
z_close(z_session_move(&s), NULL); | |
while (1) { /* Infinite loop */ } | |
} | |
// Declare Zenoh subscriber for topic A | |
Serial.print("Declaring Subscriber on "); | |
Serial.print(topic_a); | |
Serial.println(" ..."); | |
z_owned_closure_sample_t callback; | |
z_closure_sample(&callback, data_handler, NULL, NULL); | |
z_view_keyexpr_t ke_sub; | |
z_view_keyexpr_from_str_unchecked(&ke_sub, topic_a); | |
if (z_declare_subscriber(&sub, z_session_loan(&s), z_view_keyexpr_loan(&ke_sub), | |
z_closure_sample_move(&callback), NULL) < 0) { | |
Serial.println("Unable to declare subscriber."); | |
while (1) { /* Infinite loop */ } | |
} | |
Serial.println("OK"); | |
// Declare Zenoh publisher for topic B | |
Serial.print("Declaring Publisher on "); | |
Serial.print(topic_b); | |
Serial.println(" ..."); | |
z_view_keyexpr_t ke_pub; | |
z_view_keyexpr_from_str_unchecked(&ke_pub, topic_b); | |
if (z_declare_publisher(&pub, z_session_loan(&s), z_view_keyexpr_loan(&ke_pub), NULL) < 0) { | |
Serial.println("Unable to declare publisher."); | |
while (1) { /* Infinite loop */ } | |
} | |
Serial.println("OK"); | |
Serial.println("Zenoh setup finished!"); | |
pinMode(15, OUTPUT); | |
delay(300); | |
} | |
void loop() { | |
digitalWrite(15, HIGH); | |
z_sleep_us(1000*1000); | |
digitalWrite(15, LOW); | |
z_sleep_us(1000*1000); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This code runs in my PC | |
use std::time::Instant; | |
use tokio::sync::oneshot; | |
#[tokio::main] | |
async fn main() { | |
let session = zenoh::open(zenoh::config::Config::default()).await.unwrap(); | |
// Define base topic and index | |
let base_topic = "demo/example/topic/"; | |
let index = 0; // This is the single index variable | |
let topic_0 = format!("{}{}", base_topic, index); | |
let topic_1 = format!("{}{}", base_topic, index + 1); | |
// Declare subscriber to topic_1 | |
let subscriber = session.declare_subscriber(&topic_1).await.unwrap(); | |
println!("Subscribed to '{}'", topic_1); | |
// Declare publisher for topic_0 | |
let publisher = session.declare_publisher(&topic_0).await.unwrap(); | |
// Use a oneshot channel to signal when the loop is ready | |
let (tx, rx) = oneshot::channel(); | |
// Spawn the receive loop | |
let topic_0_cloned = topic_0.clone(); | |
let handler = tokio::spawn(async move { | |
// Signal that the loop has started | |
let _ = tx.send(()); | |
let mut current_time = Instant::now(); | |
let mut last_freq = 0.0; | |
// Main loop: receive messages and process them | |
while let Ok(sample) = subscriber.recv_async().await { | |
let value = sample.payload().into::<Vec<u8>>()[0]; | |
session.put(&topic_0_cloned, value + 1).await.unwrap(); | |
let elapsed = std::time::Instant::now().duration_since(current_time); | |
current_time = Instant::now(); | |
let freq = 1.0 / elapsed.as_secs_f64(); | |
// LPF | |
last_freq = 0.1 * freq + 0.9 * last_freq; | |
println!("Frequency: {:.2} Hz", last_freq); | |
} | |
}); | |
// Wait for the loop to start | |
let _ = rx.await; | |
// Now publish the initial message to topic_0 | |
let initial_value: u64 = 0; | |
publisher.put(initial_value).await.unwrap(); | |
println!("Published initial message to '{}'", topic_0); | |
handler.await.unwrap(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment