Created
September 30, 2019 00:19
-
-
Save Micrified/45c28c72ca2023466fb10208900a8dae to your computer and use it in GitHub Desktop.
A task that manages sockets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "socket_task.h" | |
/* | |
******************************************************************************* | |
* Symbolic Constants * | |
******************************************************************************* | |
*/ | |
// Maximum amount of bytes that can be received from a socket on a call | |
#define SOCK_MAX_RECV_SIZE TASK_QUEUE_DATA_MAX | |
/* | |
******************************************************************************* | |
* Global Variables * | |
******************************************************************************* | |
*/ | |
// Socket table | |
sock_t g_socket_table[MAX_SOCKET_COUNT]; | |
// Socket table length | |
int g_tab_len; | |
/* | |
******************************************************************************* | |
* Internal Function Definitions * | |
******************************************************************************* | |
*/ | |
// Initializes a TCP/IP stream socket (but does not connect it). | |
int init_socket (int index) { | |
// Extract entry | |
sock_t *entry = g_socket_table + index; | |
// Initialize socket | |
if ((entry->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) { | |
ESP_LOGE("SOCK", "Couldn't initialize socket!"); | |
entry->sock = -1; | |
return -1; | |
} | |
ESP_LOGI("SOCK", "Initialized socket!"); | |
return 0; | |
} | |
// Connects a socket. Returns zero on success, nonzero on error | |
int connect_socket (int index) { | |
// Extract entry | |
sock_t *entry = g_socket_table + index; | |
// Configure descriptor | |
const struct sockaddr_in sock_descr = { | |
.sin_addr.s_addr = entry->addr, | |
.sin_family = AF_INET, | |
.sin_port = entry->port, | |
}; | |
// Connect socket | |
if (connect(entry->sock, (struct sockaddr *)&sock_descr, sizeof(sock_descr)) | |
!= 0) { | |
ESP_LOGE("SOCK", "Couldn't connect socket!"); | |
return -1; | |
} | |
ESP_LOGI("SOCK", "Connected socket!"); | |
return 0; | |
} | |
// Closes a socket | |
void close_socket (int index) { | |
// Extract entry | |
sock_t *entry = g_socket_table + index; | |
// If the socket is not active, then return now | |
if (entry->sock < 0) { | |
return; | |
} | |
// Close the socket | |
close(entry->sock); | |
entry->sock = -1; | |
} | |
// Writes the given buffer to the socket. Returns nonzero on error | |
int send_socket (int sock, uint8_t *data, size_t size) { | |
size_t sent = 0; int res = 0; | |
while (sent < size) { | |
if ((res = send(sock, data + sent, size - sent, 0x0)) <= 0) { | |
break; | |
} | |
sent += res; | |
} | |
if (res == 0) { | |
ESP_LOGI("SOCK", "Zero bytes written to socket!"); | |
return 0; | |
} | |
return (res < 0); | |
} | |
// Receives data from a socket and puts on recv queue. Returns nonzero on error | |
int recv_socket (int index) { | |
int32_t ret; | |
esp_err_t err; | |
static uint8_t recv_buffer[SOCK_MAX_RECV_SIZE]; | |
// Extract entry | |
sock_t *entry = g_socket_table + index; | |
// Read bytes | |
if ((ret = recv(entry->sock, recv_buffer, SOCK_MAX_RECV_SIZE, 0x0)) | |
< 0) { | |
return -1; | |
} | |
// If the socket closed (return error value but don't log as one) | |
if (ret == 0) { | |
return -1; | |
} | |
// If the task didn't register a handler, ignore the data and return now | |
if (entry->recv_queue == NULL) { | |
return 0; | |
} | |
// Otherwise place the data on the receive queue | |
if ((err = ipc_enqueue(entry->recv_queue, index, ret, recv_buffer)) | |
!= ESP_OK) { | |
ESP_LOGE("SOCK", "Couldn't put data on recv queue"); | |
} | |
// Notify tasks that data is ready (so they should check their queues) | |
xEventGroupSetBits(g_event_group, FLAG_SOCK_RECV_MSG); | |
return 0; | |
} | |
/* | |
******************************************************************************* | |
* External Function Definitions * | |
******************************************************************************* | |
*/ | |
int task_sock_manager_register (uint32_t addr, uint16_t port, | |
QueueHandle_t recv_queue) { | |
// If no room remains in the table, return an invalid index | |
if (g_tab_len >= MAX_SOCKET_COUNT) { | |
return -1; | |
} | |
// Register the new table entry | |
g_socket_table[g_tab_len] = (sock_t){ | |
.sock = -1, | |
.addr = addr, | |
.port = port, | |
.recv_queue = recv_queue, | |
}; | |
// Return index (post-incremented) | |
return g_tab_len++; | |
} | |
void task_sock_manager (void *args) { | |
uint32_t flags; | |
fd_set select_fds; | |
int s; | |
uint8_t active_connections = 0; | |
task_queue_msg_t queue_msg; | |
const TickType_t flag_block_time = 8; // Ticks to wait for flags to be set | |
struct timeval sock_block_time = (struct timeval){ | |
.tv_sec = 0, | |
.tv_usec = 10000, // Wait 10ms (10k us) | |
}; | |
/* State Bit Flags | |
* 0x1: WiFi is connected if set | |
*/ | |
uint8_t state = 0x0; | |
do { | |
// Block until something needs to be sent (idea: variable block time) | |
flags = xEventGroupWaitBits(g_event_group, | |
FLAG_SOCK_SEND_MSG | FLAG_WIFI_CONNECTED | FLAG_WIFI_DISCONNECTED, | |
pdFALSE, pdFALSE, flag_block_time); | |
// Clear the flag owned by this task | |
xEventGroupClearBits(g_event_group, FLAG_SOCK_SEND_MSG); | |
// If WiFi is connected | |
if (flags & FLAG_WIFI_CONNECTED) { | |
state |= 0x1; | |
} | |
// If WiFi is disconnected | |
if (flags & FLAG_WIFI_DISCONNECTED) { | |
state &= ~0x1; | |
} | |
// Try to send everything in the queue | |
while (uxQueueMessagesWaiting(g_sock_tx_queue) > 0) { | |
// Dequeue next message | |
xQueueReceive(g_sock_tx_queue, (void *)&queue_msg, | |
TASK_QUEUE_MAX_TICKS); | |
// Check the ID | |
if (queue_msg.id >= g_tab_len) { | |
ESP_LOGE("SOCK", "Invalid socket index for outgoing message"); | |
continue; | |
} | |
// If WiFi isn't available, discard message | |
if ((state & 0x1) == 0) { | |
ESP_LOGW("SOCK", "Discarding unsendable message (no WiFi)"); | |
continue; | |
} | |
// Create the socket if needed | |
if (g_socket_table[queue_msg.id].sock == -1) { | |
if (init_socket(queue_msg.id) != 0) { | |
ESP_LOGE("SOCK", "Unable to initialize socket"); | |
continue; | |
} | |
if (connect_socket(queue_msg.id) != 0) { | |
ESP_LOGE("SOCK", "Unable to connect socket"); | |
close_socket(queue_msg.id); | |
continue; | |
} | |
// Increment active connections | |
active_connections++; | |
} | |
// Close socket if size to send is zero | |
if (queue_msg.size == 0) { | |
FD_CLR(g_socket_table[queue_msg.id].sock, &select_fds); | |
ESP_LOGI("SOCK", "Closing socket by instruction"); | |
close_socket(queue_msg.id); | |
active_connections--; | |
continue; | |
} | |
// Close the socket if an error occurred sending the message | |
if (send_socket(g_socket_table[queue_msg.id].sock, | |
queue_msg.data, queue_msg.size) != 0) { | |
ESP_LOGE("SOCK", "Couldn't send on socket"); | |
close_socket(queue_msg.id); | |
active_connections--; | |
continue; | |
} | |
} | |
// Do not run select unless there are active connections | |
if (active_connections == 0) { | |
continue; | |
} | |
// Update file-descriptor set | |
FD_ZERO(&select_fds); | |
for (int i = 0; i < g_tab_len; ++i) { | |
if (g_socket_table[i].sock != -1) { | |
FD_SET(g_socket_table[i].sock, &select_fds); | |
} | |
} | |
// Perform select (read-events only) | |
s = select(FD_SETSIZE, &select_fds, NULL, NULL, &sock_block_time); | |
// Continue if an error occurred | |
if (s < 0) { | |
ESP_LOGE("SOCK", "Select error!"); | |
continue; | |
} | |
// Continue if no events occurred within the blocking interval | |
if (s == 0) { | |
continue; | |
} | |
// Otherwise check all the sockets and take action on possible events | |
for (int i = 0; i < g_tab_len; ++i) { | |
// Ignore inactive sockets | |
if (g_socket_table[i].sock < 0) { | |
continue; | |
} | |
// If a read event occurred | |
if (FD_ISSET(g_socket_table[i].sock, &select_fds)) { | |
if (recv_socket(i) == -1) { | |
ESP_LOGE("SOCK", "Error receiving data from socket"); | |
close_socket(i); | |
} | |
} | |
} | |
} while (1); | |
// Destroy task | |
vTaskDelete(NULL); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if !defined(SOCKET_TASK_H) | |
#define SOCKET_TASK_H | |
/* | |
******************************************************************************* | |
* * | |
* Created: 22/09/2019 * | |
* * | |
* Programmer(s): * | |
* - Micrifed * | |
* * | |
* Description: * | |
* Contains supporting data-structures and functionality for managing socket * | |
* connections. Because this task is a bit more complicated than other tasks, * | |
* I'm giving it its own header and source file. The rest of the tasks will * | |
* remain in the main file * | |
* * | |
******************************************************************************* | |
*/ | |
#include <inttypes.h> | |
#include "esp_system.h" | |
#include "esp_log.h" | |
#include "err.h" | |
#include "ipc.h" | |
#include "tasks.h" | |
#include "lwip/sockets.h" | |
#include "lwip/netdb.h" | |
/* | |
******************************************************************************* | |
* Symbolic Constants * | |
******************************************************************************* | |
*/ | |
// Maximum number of sockets the task will track | |
#define MAX_SOCKET_COUNT 4 | |
/* | |
******************************************************************************* | |
* Type Definitions * | |
******************************************************************************* | |
*/ | |
// Describes a socket table entry | |
typedef struct { | |
int sock; // TCP/IP socket file-descriptor | |
uint32_t addr; // 32-bit IPv4 address (network byte order) | |
uint16_t port; // 16-bit port (network byte order) | |
QueueHandle_t recv_queue; // Queue to place received data (if set) | |
} sock_t; | |
/* | |
******************************************************************************* | |
* Function Declarations * | |
******************************************************************************* | |
*/ | |
/* @brief: Registers a socket with the task | |
* | |
* @note: ALL CALLS TO THIS FUNCTION MUST BE FINISHED BEFORE STARTING THE TASK | |
* | |
* @note: Data sent to the task must be placed in the sock_tx_queue with the | |
* index of the table entry as the value of the message 'id' field. | |
* | |
* @param: | |
* - addr: 32-bit IPv4 address to connect to (in network byte order) | |
* - port: 16-bit port to connect on (in network byte order) | |
* - recv_queue: Handle to the queue on which return data will be placed. If | |
* NULL is specified, received data is ignored | |
* | |
* @return: Index of table entry. Value -1 is returned on error | |
*/ | |
int task_sock_manager_register (uint32_t addr, uint16_t port, | |
QueueHandle_t recv_queue); | |
/* @brief: The socket manager handles sockets for other tasks that want them | |
* The socket manager works in the following way: | |
* 1. A task registers a socket in the table during initialization | |
* 2. When any task wishes to send data on this socket, it creates | |
* a message and places it on the sock_tx_queue with the index | |
* of the socket it got back when registering the socket | |
* 3. When the task wishes to close the socket, it creates a | |
* message with a size of zero and places it on sock_tx_queue | |
* 4. This task will send the received message on the queue if: | |
* (a). There exists a WiFi connection | |
* (b). The socket can be created (if not created, it is | |
* created on the spot) | |
* | |
* @note: | |
* 1. A task whose message couldn't be sent gets no feedback. | |
* 2. A socket that is created will be destroyed when | |
* (a). WiFi is disconnected | |
* (b). The opposing agent terminates the connection | |
* | |
* @note: This task makes use of LwIP Select functionality in order | |
* to avoid hogging resources through timer-based polling. It | |
* blocks for a pre-determined time for any data to be made available | |
* on the pollable sockets, and then delivers the received data to | |
* the receive-queue (if set). It blocks in the background, allowing | |
* other tasks to run. When the timer expires, it will check other | |
* flags (e.g. data to send) and take action | |
* | |
* @param: | |
* - args: Pointer to task arguments (unused) | |
* | |
*/ | |
void task_sock_manager (void *args); | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment