Skip to content

Instantly share code, notes, and snippets.

@Micrified
Created September 30, 2019 00:19
Show Gist options
  • Save Micrified/45c28c72ca2023466fb10208900a8dae to your computer and use it in GitHub Desktop.
Save Micrified/45c28c72ca2023466fb10208900a8dae to your computer and use it in GitHub Desktop.
A task that manages sockets
#include "socket_task.h"
/*
*******************************************************************************
* Symbolic Constants *
*******************************************************************************
*/
// Maximum amount of bytes that can be received from a socket on a call
#define SOCK_MAX_RECV_SIZE TASK_QUEUE_DATA_MAX
/*
*******************************************************************************
* Global Variables *
*******************************************************************************
*/
// Socket table
sock_t g_socket_table[MAX_SOCKET_COUNT];
// Socket table length
int g_tab_len;
/*
*******************************************************************************
* Internal Function Definitions *
*******************************************************************************
*/
// Initializes a TCP/IP stream socket (but does not connect it).
int init_socket (int index) {
// Extract entry
sock_t *entry = g_socket_table + index;
// Initialize socket
if ((entry->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
ESP_LOGE("SOCK", "Couldn't initialize socket!");
entry->sock = -1;
return -1;
}
ESP_LOGI("SOCK", "Initialized socket!");
return 0;
}
// Connects a socket. Returns zero on success, nonzero on error
int connect_socket (int index) {
// Extract entry
sock_t *entry = g_socket_table + index;
// Configure descriptor
const struct sockaddr_in sock_descr = {
.sin_addr.s_addr = entry->addr,
.sin_family = AF_INET,
.sin_port = entry->port,
};
// Connect socket
if (connect(entry->sock, (struct sockaddr *)&sock_descr, sizeof(sock_descr))
!= 0) {
ESP_LOGE("SOCK", "Couldn't connect socket!");
return -1;
}
ESP_LOGI("SOCK", "Connected socket!");
return 0;
}
// Closes a socket
void close_socket (int index) {
// Extract entry
sock_t *entry = g_socket_table + index;
// If the socket is not active, then return now
if (entry->sock < 0) {
return;
}
// Close the socket
close(entry->sock);
entry->sock = -1;
}
// Writes the given buffer to the socket. Returns nonzero on error
int send_socket (int sock, uint8_t *data, size_t size) {
size_t sent = 0; int res = 0;
while (sent < size) {
if ((res = send(sock, data + sent, size - sent, 0x0)) <= 0) {
break;
}
sent += res;
}
if (res == 0) {
ESP_LOGI("SOCK", "Zero bytes written to socket!");
return 0;
}
return (res < 0);
}
// Receives data from a socket and puts on recv queue. Returns nonzero on error
int recv_socket (int index) {
int32_t ret;
esp_err_t err;
static uint8_t recv_buffer[SOCK_MAX_RECV_SIZE];
// Extract entry
sock_t *entry = g_socket_table + index;
// Read bytes
if ((ret = recv(entry->sock, recv_buffer, SOCK_MAX_RECV_SIZE, 0x0))
< 0) {
return -1;
}
// If the socket closed (return error value but don't log as one)
if (ret == 0) {
return -1;
}
// If the task didn't register a handler, ignore the data and return now
if (entry->recv_queue == NULL) {
return 0;
}
// Otherwise place the data on the receive queue
if ((err = ipc_enqueue(entry->recv_queue, index, ret, recv_buffer))
!= ESP_OK) {
ESP_LOGE("SOCK", "Couldn't put data on recv queue");
}
// Notify tasks that data is ready (so they should check their queues)
xEventGroupSetBits(g_event_group, FLAG_SOCK_RECV_MSG);
return 0;
}
/*
*******************************************************************************
* External Function Definitions *
*******************************************************************************
*/
int task_sock_manager_register (uint32_t addr, uint16_t port,
QueueHandle_t recv_queue) {
// If no room remains in the table, return an invalid index
if (g_tab_len >= MAX_SOCKET_COUNT) {
return -1;
}
// Register the new table entry
g_socket_table[g_tab_len] = (sock_t){
.sock = -1,
.addr = addr,
.port = port,
.recv_queue = recv_queue,
};
// Return index (post-incremented)
return g_tab_len++;
}
void task_sock_manager (void *args) {
uint32_t flags;
fd_set select_fds;
int s;
uint8_t active_connections = 0;
task_queue_msg_t queue_msg;
const TickType_t flag_block_time = 8; // Ticks to wait for flags to be set
struct timeval sock_block_time = (struct timeval){
.tv_sec = 0,
.tv_usec = 10000, // Wait 10ms (10k us)
};
/* State Bit Flags
* 0x1: WiFi is connected if set
*/
uint8_t state = 0x0;
do {
// Block until something needs to be sent (idea: variable block time)
flags = xEventGroupWaitBits(g_event_group,
FLAG_SOCK_SEND_MSG | FLAG_WIFI_CONNECTED | FLAG_WIFI_DISCONNECTED,
pdFALSE, pdFALSE, flag_block_time);
// Clear the flag owned by this task
xEventGroupClearBits(g_event_group, FLAG_SOCK_SEND_MSG);
// If WiFi is connected
if (flags & FLAG_WIFI_CONNECTED) {
state |= 0x1;
}
// If WiFi is disconnected
if (flags & FLAG_WIFI_DISCONNECTED) {
state &= ~0x1;
}
// Try to send everything in the queue
while (uxQueueMessagesWaiting(g_sock_tx_queue) > 0) {
// Dequeue next message
xQueueReceive(g_sock_tx_queue, (void *)&queue_msg,
TASK_QUEUE_MAX_TICKS);
// Check the ID
if (queue_msg.id >= g_tab_len) {
ESP_LOGE("SOCK", "Invalid socket index for outgoing message");
continue;
}
// If WiFi isn't available, discard message
if ((state & 0x1) == 0) {
ESP_LOGW("SOCK", "Discarding unsendable message (no WiFi)");
continue;
}
// Create the socket if needed
if (g_socket_table[queue_msg.id].sock == -1) {
if (init_socket(queue_msg.id) != 0) {
ESP_LOGE("SOCK", "Unable to initialize socket");
continue;
}
if (connect_socket(queue_msg.id) != 0) {
ESP_LOGE("SOCK", "Unable to connect socket");
close_socket(queue_msg.id);
continue;
}
// Increment active connections
active_connections++;
}
// Close socket if size to send is zero
if (queue_msg.size == 0) {
FD_CLR(g_socket_table[queue_msg.id].sock, &select_fds);
ESP_LOGI("SOCK", "Closing socket by instruction");
close_socket(queue_msg.id);
active_connections--;
continue;
}
// Close the socket if an error occurred sending the message
if (send_socket(g_socket_table[queue_msg.id].sock,
queue_msg.data, queue_msg.size) != 0) {
ESP_LOGE("SOCK", "Couldn't send on socket");
close_socket(queue_msg.id);
active_connections--;
continue;
}
}
// Do not run select unless there are active connections
if (active_connections == 0) {
continue;
}
// Update file-descriptor set
FD_ZERO(&select_fds);
for (int i = 0; i < g_tab_len; ++i) {
if (g_socket_table[i].sock != -1) {
FD_SET(g_socket_table[i].sock, &select_fds);
}
}
// Perform select (read-events only)
s = select(FD_SETSIZE, &select_fds, NULL, NULL, &sock_block_time);
// Continue if an error occurred
if (s < 0) {
ESP_LOGE("SOCK", "Select error!");
continue;
}
// Continue if no events occurred within the blocking interval
if (s == 0) {
continue;
}
// Otherwise check all the sockets and take action on possible events
for (int i = 0; i < g_tab_len; ++i) {
// Ignore inactive sockets
if (g_socket_table[i].sock < 0) {
continue;
}
// If a read event occurred
if (FD_ISSET(g_socket_table[i].sock, &select_fds)) {
if (recv_socket(i) == -1) {
ESP_LOGE("SOCK", "Error receiving data from socket");
close_socket(i);
}
}
}
} while (1);
// Destroy task
vTaskDelete(NULL);
}
#if !defined(SOCKET_TASK_H)
#define SOCKET_TASK_H
/*
*******************************************************************************
* *
* Created: 22/09/2019 *
* *
* Programmer(s): *
* - Micrifed *
* *
* Description: *
* Contains supporting data-structures and functionality for managing socket *
* connections. Because this task is a bit more complicated than other tasks, *
* I'm giving it its own header and source file. The rest of the tasks will *
* remain in the main file *
* *
*******************************************************************************
*/
#include <inttypes.h>
#include "esp_system.h"
#include "esp_log.h"
#include "err.h"
#include "ipc.h"
#include "tasks.h"
#include "lwip/sockets.h"
#include "lwip/netdb.h"
/*
*******************************************************************************
* Symbolic Constants *
*******************************************************************************
*/
// Maximum number of sockets the task will track
#define MAX_SOCKET_COUNT 4
/*
*******************************************************************************
* Type Definitions *
*******************************************************************************
*/
// Describes a socket table entry
typedef struct {
int sock; // TCP/IP socket file-descriptor
uint32_t addr; // 32-bit IPv4 address (network byte order)
uint16_t port; // 16-bit port (network byte order)
QueueHandle_t recv_queue; // Queue to place received data (if set)
} sock_t;
/*
*******************************************************************************
* Function Declarations *
*******************************************************************************
*/
/* @brief: Registers a socket with the task
*
* @note: ALL CALLS TO THIS FUNCTION MUST BE FINISHED BEFORE STARTING THE TASK
*
* @note: Data sent to the task must be placed in the sock_tx_queue with the
* index of the table entry as the value of the message 'id' field.
*
* @param:
* - addr: 32-bit IPv4 address to connect to (in network byte order)
* - port: 16-bit port to connect on (in network byte order)
* - recv_queue: Handle to the queue on which return data will be placed. If
* NULL is specified, received data is ignored
*
* @return: Index of table entry. Value -1 is returned on error
*/
int task_sock_manager_register (uint32_t addr, uint16_t port,
QueueHandle_t recv_queue);
/* @brief: The socket manager handles sockets for other tasks that want them
* The socket manager works in the following way:
* 1. A task registers a socket in the table during initialization
* 2. When any task wishes to send data on this socket, it creates
* a message and places it on the sock_tx_queue with the index
* of the socket it got back when registering the socket
* 3. When the task wishes to close the socket, it creates a
* message with a size of zero and places it on sock_tx_queue
* 4. This task will send the received message on the queue if:
* (a). There exists a WiFi connection
* (b). The socket can be created (if not created, it is
* created on the spot)
*
* @note:
* 1. A task whose message couldn't be sent gets no feedback.
* 2. A socket that is created will be destroyed when
* (a). WiFi is disconnected
* (b). The opposing agent terminates the connection
*
* @note: This task makes use of LwIP Select functionality in order
* to avoid hogging resources through timer-based polling. It
* blocks for a pre-determined time for any data to be made available
* on the pollable sockets, and then delivers the received data to
* the receive-queue (if set). It blocks in the background, allowing
* other tasks to run. When the timer expires, it will check other
* flags (e.g. data to send) and take action
*
* @param:
* - args: Pointer to task arguments (unused)
*
*/
void task_sock_manager (void *args);
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment