Last active
December 15, 2022 10:27
-
-
Save ammarfaizi2/14f540d501b9a7592d659b89125f2c6d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SPDX-License-Identifier: Public Domain | |
/* | |
* @author Ammar Faizi <[email protected]> | |
* | |
* A simple Telegram bot example for @nekonyameow. | |
* | |
* Goals: | |
* - Parallel messages processing. | |
* - Single getUpdate thread. | |
* - Workqueue implementation. | |
* | |
* Link: https://t.me/GNUWeeb/681196 | |
* | |
* Compile with: | |
* gcc -ggdb3 -Wall -Wextra -O3 simple_tgbot.c.c -o simple_tgbot -lcurl -ljson-c -lpthread | |
* | |
* Update TELEGRAM_BOT_TOKEN before use! | |
*/ | |
#include <stdbool.h> | |
#include <pthread.h> | |
#include <stdlib.h> | |
#include <unistd.h> | |
#include <string.h> | |
#include <signal.h> | |
#include <stdint.h> | |
#include <errno.h> | |
#include <stdio.h> | |
#include <curl/curl.h> | |
#include <json-c/json.h> | |
#define NUMBER_OF_THREADS 32 | |
#define NUMBER_OF_QUEUE_SLOTS 128 | |
#define TELEGRAM_BOT_TOKEN "123123123:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
/* | |
* TODO(ammarfaizi2): Implement sparse. | |
*/ | |
#define __must_hold(MUTEX) | |
#define __acquires(MUTEX) | |
#define __releases(MUTEX) | |
#ifndef offsetof | |
#define offsetof(TYPE, FIELD) ((size_t) &((TYPE *)0)->FIELD) | |
#endif | |
#ifndef container_of | |
#define container_of(PTR, TYPE, FIELD) ({ \ | |
__typeof__(((TYPE *)0)->FIELD) *__FIELD_PTR = (PTR); \ | |
(TYPE *)((char *) __FIELD_PTR - offsetof(TYPE, FIELD)); \ | |
}) | |
#endif | |
struct data_queue { | |
void *data; | |
void (*data_deleter)(void *data); | |
}; | |
struct worker_context; | |
struct worker { | |
struct worker_context *ctx; | |
size_t idx; | |
pthread_t thread; | |
volatile bool is_online; | |
}; | |
struct worker_queue { | |
struct data_queue *arr; | |
size_t head; | |
size_t tail; | |
size_t mask; | |
pthread_cond_t cond; | |
pthread_cond_t enqueue_waiting_cond; | |
pthread_mutex_t mutex; | |
bool enqueue_waiting; | |
}; | |
struct worker_context { | |
volatile bool stop; | |
struct worker_queue queue; | |
struct worker *workers; | |
size_t nr_workers; | |
}; | |
struct tgbot_context { | |
const char *token; | |
char *updates_raw; | |
struct json_object *updates; | |
struct worker_context worker_ctx; | |
uint64_t max_update_id; | |
}; | |
struct tgbot_send_message_arg { | |
const char *text; | |
int64_t chat_id; | |
uint64_t reply_to_message_id; | |
}; | |
static struct tgbot_context g_ctx; | |
static __thread CURL *g_ch; | |
static void *worker_func(void *arg); | |
static void curl_thread_cleanup(void) | |
{ | |
if (g_ch) { | |
curl_easy_cleanup(g_ch); | |
g_ch = NULL; | |
} | |
} | |
static int init_worker_queue(struct worker_queue *queue, size_t nr_queues) | |
{ | |
size_t real_nr_queues = 1; | |
struct data_queue *arr; | |
int ret; | |
/* | |
* The number of queues has to be 2**N where N | |
* is an integer in order to use bitmask. | |
*/ | |
while (real_nr_queues < nr_queues) | |
real_nr_queues *= 2; | |
arr = calloc(real_nr_queues, sizeof(*arr)); | |
if (!arr) | |
return -ENOMEM; | |
ret = pthread_cond_init(&queue->cond, NULL); | |
if (ret) { | |
free(arr); | |
return -ret; | |
} | |
ret = pthread_cond_init(&queue->enqueue_waiting_cond, NULL); | |
if (ret) { | |
pthread_cond_destroy(&queue->cond); | |
free(arr); | |
return -ret; | |
} | |
queue->arr = arr; | |
queue->head = 0; | |
queue->tail = 0; | |
queue->mask = real_nr_queues - 1; | |
queue->enqueue_waiting = false; | |
return 0; | |
} | |
static int init_worker(struct worker *worker, size_t idx) | |
{ | |
worker->is_online = false; | |
worker->idx = idx; | |
if (pthread_create(&worker->thread, NULL, worker_func, worker)) | |
return -errno; | |
worker->is_online = true; | |
return 0; | |
} | |
static int init_workers(struct worker_context *ctx, size_t nr_workers) | |
{ | |
struct worker *workers; | |
size_t i; | |
int ret; | |
workers = calloc(nr_workers, sizeof(*workers)); | |
if (!workers) | |
return -ENOMEM; | |
ctx->workers = workers; | |
for (i = 0; i < nr_workers; i++) { | |
workers[i].ctx = ctx; | |
ret = init_worker(&workers[i], i); | |
if (ret) | |
return ret; | |
} | |
return 0; | |
} | |
static void destroy_workers(struct worker *workers, size_t nr_workers) | |
{ | |
size_t i; | |
/* | |
* Make sure we wait for all thread workers to exit. | |
*/ | |
for (i = 0; i < nr_workers; i++) { | |
struct worker *w = &workers[i]; | |
if (!w->is_online) | |
continue; | |
pthread_join(w->thread, NULL); | |
} | |
} | |
static void destroy_worker_context(struct worker_context *ctx) | |
{ | |
struct worker_queue *q = &ctx->queue; | |
size_t real_nr_queues = q->mask + 1; | |
struct data_queue *arr; | |
size_t i; | |
arr = q->arr; | |
pthread_mutex_lock(&q->mutex); | |
q->arr = NULL; | |
pthread_cond_broadcast(&q->cond); | |
pthread_mutex_unlock(&q->mutex); | |
for (i = 0; i < real_nr_queues; i++) { | |
if (!arr[i].data) | |
continue; | |
arr[i].data_deleter(arr[i].data); | |
memset(&arr[i], 0, sizeof(arr[i])); | |
} | |
destroy_workers(ctx->workers, ctx->nr_workers); | |
free(ctx->workers); | |
free(arr); | |
pthread_cond_destroy(&q->cond); | |
pthread_cond_destroy(&q->enqueue_waiting_cond); | |
} | |
static int init_worker_context(struct worker_context *ctx, size_t nr_workers, | |
size_t nr_queues) | |
{ | |
int ret; | |
ctx->nr_workers = nr_workers; | |
ctx->workers = NULL; | |
ctx->stop = false; | |
ret = init_worker_queue(&ctx->queue, nr_queues); | |
if (ret) | |
return ret; | |
ret = init_workers(ctx, nr_workers); | |
if (ret) { | |
destroy_worker_context(ctx); | |
return ret; | |
} | |
return 0; | |
} | |
static int grab_queue(struct worker *worker, struct data_queue *data) | |
{ | |
struct worker_context *ctx = worker->ctx; | |
struct worker_queue *q = &ctx->queue; | |
size_t idx; | |
int ret = 0; | |
pthread_mutex_lock(&q->mutex); | |
while (q->head == q->tail) { | |
if (ctx->stop) { | |
ret = -1; | |
goto out; | |
} | |
pthread_cond_wait(&q->cond, &q->mutex); | |
} | |
/* | |
* Is the queue still available? | |
* If not, don't grab anything it. | |
*/ | |
if (!q->arr) { | |
ret = -1; | |
goto out; | |
} | |
/* | |
* The circular queue index calculation. | |
*/ | |
idx = q->head++ & q->mask; | |
*data = q->arr[idx]; | |
memset(&q->arr[idx], 0, sizeof(q->arr[idx])); | |
if (q->enqueue_waiting) | |
pthread_cond_signal(&q->enqueue_waiting_cond); | |
out: | |
pthread_mutex_unlock(&q->mutex); | |
return ret; | |
} | |
static uint64_t get_message_id(struct json_object *update) | |
{ | |
struct json_object *message_id; | |
struct json_object *message; | |
message = json_object_object_get(update, "message"); | |
if (!message) | |
return 0; | |
message_id = json_object_object_get(message, "message_id"); | |
if (!message_id) | |
return 0; | |
return json_object_get_uint64(message_id); | |
} | |
static int64_t get_chat_id(struct json_object *update) | |
{ | |
struct json_object *message; | |
struct json_object *chat; | |
struct json_object *id; | |
message = json_object_object_get(update, "message"); | |
if (!message) | |
return 0; | |
chat = json_object_object_get(message, "chat"); | |
if (!chat) | |
return 0; | |
id = json_object_object_get(chat, "id"); | |
if (!id) | |
return 0; | |
return json_object_get_int64(id); | |
} | |
static int tgbot_send_message(struct tgbot_context *tgctx, | |
struct tgbot_send_message_arg *arg); | |
static void tgbot_handle_ping_command(struct tgbot_context *tgctx, | |
struct json_object *update) | |
{ | |
struct tgbot_send_message_arg arg; | |
arg.reply_to_message_id = get_message_id(update); | |
arg.chat_id = get_chat_id(update); | |
arg.text = "Pong!"; | |
tgbot_send_message(tgctx, &arg); | |
} | |
static void tgbot_handle_hello_command(struct tgbot_context *tgctx, | |
struct json_object *update) | |
{ | |
struct tgbot_send_message_arg arg; | |
arg.reply_to_message_id = get_message_id(update); | |
arg.chat_id = get_chat_id(update); | |
arg.text = "Hello World!"; | |
tgbot_send_message(tgctx, &arg); | |
} | |
/* | |
* Just to simulate a heavy work. | |
*/ | |
static void tgbot_handle_sleep_command(struct tgbot_context *tgctx, | |
struct json_object *update) | |
{ | |
struct tgbot_send_message_arg arg; | |
arg.reply_to_message_id = get_message_id(update); | |
arg.chat_id = get_chat_id(update); | |
arg.text = "Sleep command started!"; | |
tgbot_send_message(tgctx, &arg); | |
sleep(10); | |
arg.reply_to_message_id = get_message_id(update); | |
arg.chat_id = get_chat_id(update); | |
arg.text = "Sleep command finished!"; | |
tgbot_send_message(tgctx, &arg); | |
} | |
static void handle_text_message(struct tgbot_context *tgctx, | |
struct json_object *update, const char *text) | |
{ | |
if (!strcmp("/ping", text)) { | |
tgbot_handle_ping_command(tgctx, update); | |
return; | |
} | |
if (!strcmp("/hello", text)) { | |
tgbot_handle_hello_command(tgctx, update); | |
return; | |
} | |
if (!strcmp("/sleep", text)) { | |
tgbot_handle_sleep_command(tgctx, update); | |
return; | |
} | |
} | |
static void process_update(struct tgbot_context *tgctx, void *data) | |
{ | |
struct json_object *update = data; | |
struct json_object *message; | |
struct json_object *text; | |
message = json_object_object_get(update, "message"); | |
if (!message) | |
return; | |
text = json_object_object_get(message, "text"); | |
if (!text) | |
return; | |
handle_text_message(tgctx, update, json_object_get_string(text)); | |
} | |
static void *worker_func(void *arg) | |
{ | |
struct worker *worker = arg; | |
struct worker_context *ctx = worker->ctx; | |
struct tgbot_context *tgctx; | |
tgctx = container_of(ctx, struct tgbot_context, worker_ctx); | |
printf("Thread %zu is online!\n", worker->idx); | |
while (!ctx->stop) { | |
struct data_queue data; | |
memset(&data, 0, sizeof(data)); | |
if (grab_queue(worker, &data)) | |
break; | |
if (data.data) | |
process_update(tgctx, data.data); | |
if (data.data_deleter) | |
data.data_deleter(data.data); | |
} | |
printf("Thread %zu is exiting...\n", worker->idx); | |
curl_thread_cleanup(); | |
pthread_mutex_lock(&ctx->queue.mutex); | |
if (ctx->queue.enqueue_waiting) | |
pthread_cond_broadcast(&ctx->queue.enqueue_waiting_cond); | |
pthread_mutex_unlock(&ctx->queue.mutex); | |
return NULL; | |
} | |
static void signal_handler(int sig) | |
{ | |
struct worker_context *wctx = &g_ctx.worker_ctx; | |
printf("\nGot signal %d, exiting...\n", sig); | |
wctx->stop = true; | |
} | |
static int setup_signal_handlers(void) | |
{ | |
struct sigaction act; | |
memset(&act, 0, sizeof(act)); | |
act.sa_handler = signal_handler; | |
if (sigaction(SIGINT, &act, NULL)) | |
return -errno; | |
if (sigaction(SIGHUP, &act, NULL)) | |
return -errno; | |
if (sigaction(SIGTERM, &act, NULL)) | |
return -errno; | |
return 0; | |
} | |
struct curl_data { | |
char *data; | |
size_t len; | |
size_t allocated; | |
int err; | |
}; | |
static size_t tgbot_curl_write(char *ptr, size_t size, size_t nmemb, void *dd) | |
{ | |
size_t real_size = size * nmemb; | |
struct curl_data *d = dd; | |
size_t new_len; | |
if (!dd) | |
return real_size; | |
if (!d->data) { | |
d->allocated = 4096; | |
d->data = malloc(d->allocated); | |
if (!d->data) { | |
d->err = -ENOMEM; | |
return 0; | |
} | |
} | |
new_len = d->len + real_size + 1; | |
if (d->allocated < new_len) { | |
size_t new_allocated; | |
char *data; | |
new_allocated = (d->allocated * 2) + new_len; | |
data = realloc(d->data, new_allocated); | |
if (!data) { | |
free(d->data); | |
d->data = NULL; | |
d->err = -ENOMEM; | |
return 0; | |
} | |
d->data = data; | |
d->allocated = new_allocated; | |
} | |
memcpy(&d->data[d->len], ptr, real_size); | |
d->len += real_size; | |
d->data[d->len] = '\0'; | |
return real_size; | |
} | |
static int tgbot_call_method(const char *token, const char *method, | |
const char *post_data, struct curl_data *data) | |
{ | |
char url[512]; | |
CURLcode res; | |
if (data) { | |
data->err = 0; | |
data->len = 0; | |
data->allocated = 0; | |
data->data = NULL; | |
} | |
if (!g_ch) { | |
g_ch = curl_easy_init(); | |
if (!g_ch) | |
return -ENOMEM; | |
} | |
snprintf(url, sizeof(url), "https://api.telegram.org/bot%s/%s", token, | |
method); | |
curl_easy_setopt(g_ch, CURLOPT_URL, url); | |
if (post_data) { | |
curl_easy_setopt(g_ch, CURLOPT_POST, 1); | |
curl_easy_setopt(g_ch, CURLOPT_POSTFIELDS, post_data); | |
} | |
curl_easy_setopt(g_ch, CURLOPT_WRITEFUNCTION, tgbot_curl_write); | |
curl_easy_setopt(g_ch, CURLOPT_WRITEDATA, data); | |
res = curl_easy_perform(g_ch); | |
if (res != CURLE_OK) | |
printf("Curl with URL \"%s\" failed: %s\n", url, | |
curl_easy_strerror(res)); | |
return res; | |
} | |
static int tgbot_send_message(struct tgbot_context *tgctx, | |
struct tgbot_send_message_arg *arg) | |
{ | |
static const size_t pdata_len = 4096 * 8; | |
char *post_data; | |
int ret; | |
post_data = malloc(pdata_len); | |
if (!post_data) | |
return -ENOMEM; | |
/* | |
* TODO(ammarfaizi2): Do rawurlencode(arg->text) to handle | |
* special characters. | |
*/ | |
snprintf(post_data, pdata_len, | |
"chat_id=%lld&text=%s&reply_to_message_id=%llu", | |
(long long)arg->chat_id, arg->text, | |
(unsigned long long)arg->reply_to_message_id); | |
ret = tgbot_call_method(tgctx->token, "sendMessage", post_data, NULL); | |
free(post_data); | |
return ret; | |
} | |
static int tgbot_get_updates(struct tgbot_context *tgctx) | |
{ | |
struct curl_data d; | |
char method[255]; | |
int ret; | |
snprintf(method, sizeof(method), "getUpdates?offset=%llu", | |
(unsigned long long)tgctx->max_update_id + 1); | |
ret = tgbot_call_method(tgctx->token, method, NULL, &d); | |
if (ret) { | |
printf("tgbot_get_updates(): %s\n", strerror(-ret)); | |
return ret; | |
} | |
tgctx->updates_raw = d.data; | |
return 0; | |
} | |
static int tgbot_parse_updates(struct tgbot_context *tgctx) | |
{ | |
struct json_object *obj; | |
obj = json_tokener_parse(tgctx->updates_raw); | |
if (!obj) { | |
printf("json_tokener_parse() failed\n"); | |
return -EINVAL; | |
} | |
tgctx->updates = obj; | |
return 0; | |
} | |
static void tgbot_json_object_put(void *data) | |
{ | |
json_object_put(data); | |
} | |
static size_t queue_size(struct worker_queue *q) | |
__must_hold(&q->mutex) | |
{ | |
size_t head = q->head; | |
size_t tail = q->tail; | |
if (head > tail) | |
return head - tail; | |
else | |
return tail - head; | |
} | |
static bool is_queue_full(struct worker_queue *q) | |
__must_hold(&q->mutex) | |
{ | |
return queue_size(q) >= q->mask; | |
} | |
static int enqueue_update(struct tgbot_context *tgctx, | |
struct json_object *update) | |
{ | |
struct worker_context *wctx = &tgctx->worker_ctx; | |
struct worker_queue *q = &wctx->queue; | |
struct data_queue *dq; | |
if (wctx->stop) | |
return -EOWNERDEAD; | |
pthread_mutex_lock(&q->mutex); | |
while (is_queue_full(q)) { | |
printf("Queue is full, waiting...\n"); | |
q->enqueue_waiting = true; | |
pthread_cond_wait(&q->enqueue_waiting_cond, &q->mutex); | |
if (wctx->stop) | |
break; | |
} | |
q->enqueue_waiting = false; | |
if (!wctx->stop) { | |
json_object_get(update); | |
dq = &q->arr[q->tail++ & q->mask]; | |
dq->data = update; | |
dq->data_deleter = tgbot_json_object_put; | |
pthread_cond_signal(&q->cond); | |
} | |
pthread_mutex_unlock(&q->mutex); | |
return 0; | |
} | |
static uint64_t get_update_id(struct json_object *update) | |
{ | |
struct json_object *update_id; | |
update_id = json_object_object_get(update, "update_id"); | |
if (!update_id) | |
return 0; | |
return json_object_get_uint64(update_id); | |
} | |
static int enqueue_updates(struct tgbot_context *tgctx) | |
{ | |
struct json_object *result; | |
struct json_object *update; | |
uint64_t update_id; | |
int ret = 0; | |
size_t i, n; | |
result = json_object_object_get(tgctx->updates, "result"); | |
if (!result) | |
return -EINVAL; | |
n = json_object_array_length(result); | |
printf("Got %llu new update(s)\n", (unsigned long long)n); | |
for (i = 0; i < n; i++) { | |
update = json_object_array_get_idx(result, i); | |
if (!update) | |
break; | |
update_id = get_update_id(update); | |
if (update_id <= tgctx->max_update_id) | |
continue; | |
tgctx->max_update_id = update_id; | |
ret = enqueue_update(tgctx, update); | |
if (ret) | |
break; | |
} | |
return ret; | |
} | |
static void run_app_loop(struct tgbot_context *tgctx) | |
{ | |
struct worker_context *wctx = &tgctx->worker_ctx; | |
tgctx->max_update_id = 0; | |
while (!wctx->stop) { | |
int ret; | |
if (tgbot_get_updates(tgctx)) { | |
printf("tgbot_get_updates() failed!\n"); | |
break; | |
} | |
ret = tgbot_parse_updates(tgctx); | |
if (ret) { | |
printf("tgbot_parse_updates() failed!\n"); | |
printf("updates_raw: %s\n", tgctx->updates_raw); | |
free(tgctx->updates_raw); | |
break; | |
} | |
ret = enqueue_updates(tgctx); | |
json_object_put(tgctx->updates); | |
if (ret) { | |
printf("enqueue_updates() failed!\n"); | |
printf("updates_raw: %s\n", tgctx->updates_raw); | |
free(tgctx->updates_raw); | |
break; | |
} | |
free(tgctx->updates_raw); | |
} | |
wctx->stop = true; | |
} | |
int main(void) | |
{ | |
struct worker_context *wctx = &g_ctx.worker_ctx; | |
int ret; | |
if (curl_global_init(CURL_GLOBAL_ALL)) { | |
printf("curl_global_init() error!\n"); | |
return 1; | |
} | |
ret = setup_signal_handlers(); | |
if (ret) | |
goto out_curl; | |
ret = init_worker_context(wctx, NUMBER_OF_THREADS, NUMBER_OF_QUEUE_SLOTS); | |
if (ret) | |
goto out_curl; | |
g_ctx.token = TELEGRAM_BOT_TOKEN; | |
run_app_loop(&g_ctx); | |
destroy_worker_context(wctx); | |
printf("The main thread is exiting...\n"); | |
out_curl: | |
curl_thread_cleanup(); | |
curl_global_cleanup(); | |
return -ret; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment