Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "socket_task.h"
- /*
- *******************************************************************************
- * Symbolic Constants *
- *******************************************************************************
- */
- // Maximum amount of bytes that can be received from a socket on a call
- #define SOCK_MAX_RECV_SIZE TASK_QUEUE_DATA_MAX
- /*
- *******************************************************************************
- * Global Variables *
- *******************************************************************************
- */
- // Socket table
- sock_t g_socket_table[MAX_SOCKET_COUNT];
- // Socket table length
- int g_tab_len;
- /*
- *******************************************************************************
- * Internal Function Definitions *
- *******************************************************************************
- */
- // Initializes a TCP/IP stream socket (but does not connect it).
- int init_socket (int index) {
- // Extract entry
- sock_t *entry = g_socket_table + index;
- // Initialize socket
- if ((entry->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) < 0) {
- ESP_LOGE("SOCK", "Couldn't initialize socket!");
- entry->sock = -1;
- return -1;
- }
- ESP_LOGI("SOCK", "Initialized socket!");
- return 0;
- }
- // Connects a socket. Returns zero on success, nonzero on error
- int connect_socket (int index) {
- // Extract entry
- sock_t *entry = g_socket_table + index;
- // Configure descriptor
- const struct sockaddr_in sock_descr = {
- .sin_addr.s_addr = entry->addr,
- .sin_family = AF_INET,
- .sin_port = entry->port,
- };
- // Connect socket
- if (connect(entry->sock, (struct sockaddr *)&sock_descr, sizeof(sock_descr))
- != 0) {
- ESP_LOGE("SOCK", "Couldn't connect socket!");
- return -1;
- }
- ESP_LOGI("SOCK", "Connected socket!");
- return 0;
- }
- // Closes a socket
- void close_socket (int index) {
- // Extract entry
- sock_t *entry = g_socket_table + index;
- // If the socket is not active, then return now
- if (entry->sock < 0) {
- return;
- }
- // Close the socket
- close(entry->sock);
- entry->sock = -1;
- }
- // Writes the given buffer to the socket. Returns nonzero on error
- int send_socket (int sock, uint8_t *data, size_t size) {
- size_t sent = 0; int res = 0;
- while (sent < size) {
- if ((res = send(sock, data + sent, size - sent, 0x0)) <= 0) {
- break;
- }
- sent += res;
- }
- if (res == 0) {
- ESP_LOGI("SOCK", "Zero bytes written to socket!");
- return 0;
- }
- return (res < 0);
- }
- // Receives data from a socket and puts on recv queue. Returns nonzero on error
- int recv_socket (int index) {
- int32_t ret;
- esp_err_t err;
- static uint8_t recv_buffer[SOCK_MAX_RECV_SIZE];
- // Extract entry
- sock_t *entry = g_socket_table + index;
- // Read bytes
- if ((ret = recv(entry->sock, recv_buffer, SOCK_MAX_RECV_SIZE, 0x0))
- < 0) {
- return -1;
- }
- // If the socket closed (return error value but don't log as one)
- if (ret == 0) {
- return -1;
- }
- // If the task didn't register a handler, ignore the data and return now
- if (entry->recv_queue == NULL) {
- return 0;
- }
- // Otherwise place the data on the receive queue
- if ((err = ipc_enqueue(entry->recv_queue, index, ret, recv_buffer))
- != ESP_OK) {
- ESP_LOGE("SOCK", "Couldn't put data on recv queue");
- }
- // Notify tasks that data is ready (so they should check their queues)
- xEventGroupSetBits(g_event_group, FLAG_SOCK_RECV_MSG);
- return 0;
- }
- /*
- *******************************************************************************
- * External Function Definitions *
- *******************************************************************************
- */
- int task_sock_manager_register (uint32_t addr, uint16_t port,
- QueueHandle_t recv_queue) {
- // If no room remains in the table, return an invalid index
- if (g_tab_len >= MAX_SOCKET_COUNT) {
- return -1;
- }
- // Register the new table entry
- g_socket_table[g_tab_len] = (sock_t){
- .sock = -1,
- .addr = addr,
- .port = port,
- .recv_queue = recv_queue,
- };
- // Return index (post-incremented)
- return g_tab_len++;
- }
- void task_sock_manager (void *args) {
- uint32_t flags;
- fd_set select_fds;
- int s;
- uint8_t active_connections = 0;
- task_queue_msg_t queue_msg;
- const TickType_t flag_block_time = 8; // Ticks to wait for flags to be set
- struct timeval sock_block_time = (struct timeval){
- .tv_sec = 0,
- .tv_usec = 10000, // Wait 10ms (10k us)
- };
- /* State Bit Flags
- * 0x1: WiFi is connected if set
- */
- uint8_t state = 0x0;
- do {
- // Block until something needs to be sent (idea: variable block time)
- flags = xEventGroupWaitBits(g_event_group,
- FLAG_SOCK_SEND_MSG | FLAG_WIFI_CONNECTED | FLAG_WIFI_DISCONNECTED,
- pdFALSE, pdFALSE, flag_block_time);
- // Clear the flag owned by this task
- xEventGroupClearBits(g_event_group, FLAG_SOCK_SEND_MSG);
- // If WiFi is connected
- if (flags & FLAG_WIFI_CONNECTED) {
- state |= 0x1;
- }
- // If WiFi is disconnected
- if (flags & FLAG_WIFI_DISCONNECTED) {
- state &= ~0x1;
- }
- // Try to send everything in the queue
- while (uxQueueMessagesWaiting(g_sock_tx_queue) > 0) {
- // Dequeue next message
- xQueueReceive(g_sock_tx_queue, (void *)&queue_msg,
- TASK_QUEUE_MAX_TICKS);
- // Check the ID
- if (queue_msg.id >= g_tab_len) {
- ESP_LOGE("SOCK", "Invalid socket index for outgoing message");
- continue;
- }
- // If WiFi isn't available, discard message
- if ((state & 0x1) == 0) {
- ESP_LOGW("SOCK", "Discarding unsendable message (no WiFi)");
- continue;
- }
- // Create the socket if needed
- if (g_socket_table[queue_msg.id].sock == -1) {
- if (init_socket(queue_msg.id) != 0) {
- ESP_LOGE("SOCK", "Unable to initialize socket");
- continue;
- }
- if (connect_socket(queue_msg.id) != 0) {
- ESP_LOGE("SOCK", "Unable to connect socket");
- close_socket(queue_msg.id);
- continue;
- }
- // Increment active connections
- active_connections++;
- }
- // Close socket if size to send is zero
- if (queue_msg.size == 0) {
- FD_CLR(g_socket_table[queue_msg.id].sock, &select_fds);
- ESP_LOGI("SOCK", "Closing socket by instruction");
- close_socket(queue_msg.id);
- active_connections--;
- continue;
- }
- // Close the socket if an error occurred sending the message
- if (send_socket(g_socket_table[queue_msg.id].sock,
- queue_msg.data, queue_msg.size) != 0) {
- ESP_LOGE("SOCK", "Couldn't send on socket");
- close_socket(queue_msg.id);
- active_connections--;
- continue;
- }
- }
- // Do not run select unless there are active connections
- if (active_connections == 0) {
- continue;
- }
- // Update file-descriptor set
- FD_ZERO(&select_fds);
- for (int i = 0; i < g_tab_len; ++i) {
- if (g_socket_table[i].sock != -1) {
- FD_SET(g_socket_table[i].sock, &select_fds);
- }
- }
- // Perform select (read-events only)
- s = select(FD_SETSIZE, &select_fds, NULL, NULL, &sock_block_time);
- // Continue if an error occurred
- if (s < 0) {
- ESP_LOGE("SOCK", "Select error!");
- continue;
- }
- // Continue if no events occurred within the blocking interval
- if (s == 0) {
- continue;
- }
- // Otherwise check all the sockets and take action on possible events
- for (int i = 0; i < g_tab_len; ++i) {
- // Ignore inactive sockets
- if (g_socket_table[i].sock < 0) {
- continue;
- }
- // If a read event occurred
- if (FD_ISSET(g_socket_table[i].sock, &select_fds)) {
- if (recv_socket(i) == -1) {
- ESP_LOGE("SOCK", "Error receiving data from socket");
- close_socket(i);
- }
- }
- }
- } while (1);
- // Destroy task
- vTaskDelete(NULL);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement