Skip to content

Instantly share code, notes, and snippets.

@ammarfaizi2
Last active December 15, 2022 10:27
Show Gist options
  • Save ammarfaizi2/14f540d501b9a7592d659b89125f2c6d to your computer and use it in GitHub Desktop.
Save ammarfaizi2/14f540d501b9a7592d659b89125f2c6d to your computer and use it in GitHub Desktop.
// SPDX-License-Identifier: Public Domain
/*
* @author Ammar Faizi <[email protected]>
*
* A simple Telegram bot example for @nekonyameow.
*
* Goals:
* - Parallel messages processing.
* - Single getUpdate thread.
* - Workqueue implementation.
*
* Link: https://t.me/GNUWeeb/681196
*
* Compile with:
* gcc -ggdb3 -Wall -Wextra -O3 simple_tgbot.c.c -o simple_tgbot -lcurl -ljson-c -lpthread
*
* Update TELEGRAM_BOT_TOKEN before use!
*/
#include <stdbool.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <stdint.h>
#include <errno.h>
#include <stdio.h>
#include <curl/curl.h>
#include <json-c/json.h>
#define NUMBER_OF_THREADS 32
#define NUMBER_OF_QUEUE_SLOTS 128
#define TELEGRAM_BOT_TOKEN "123123123:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
/*
* TODO(ammarfaizi2): Implement sparse.
*/
#define __must_hold(MUTEX)
#define __acquires(MUTEX)
#define __releases(MUTEX)
#ifndef offsetof
#define offsetof(TYPE, FIELD) ((size_t) &((TYPE *)0)->FIELD)
#endif
#ifndef container_of
#define container_of(PTR, TYPE, FIELD) ({ \
__typeof__(((TYPE *)0)->FIELD) *__FIELD_PTR = (PTR); \
(TYPE *)((char *) __FIELD_PTR - offsetof(TYPE, FIELD)); \
})
#endif
struct data_queue {
void *data;
void (*data_deleter)(void *data);
};
struct worker_context;
struct worker {
struct worker_context *ctx;
size_t idx;
pthread_t thread;
volatile bool is_online;
};
struct worker_queue {
struct data_queue *arr;
size_t head;
size_t tail;
size_t mask;
pthread_cond_t cond;
pthread_cond_t enqueue_waiting_cond;
pthread_mutex_t mutex;
bool enqueue_waiting;
};
struct worker_context {
volatile bool stop;
struct worker_queue queue;
struct worker *workers;
size_t nr_workers;
};
struct tgbot_context {
const char *token;
char *updates_raw;
struct json_object *updates;
struct worker_context worker_ctx;
uint64_t max_update_id;
};
struct tgbot_send_message_arg {
const char *text;
int64_t chat_id;
uint64_t reply_to_message_id;
};
static struct tgbot_context g_ctx;
static __thread CURL *g_ch;
static void *worker_func(void *arg);
static void curl_thread_cleanup(void)
{
if (g_ch) {
curl_easy_cleanup(g_ch);
g_ch = NULL;
}
}
static int init_worker_queue(struct worker_queue *queue, size_t nr_queues)
{
size_t real_nr_queues = 1;
struct data_queue *arr;
int ret;
/*
* The number of queues has to be 2**N where N
* is an integer in order to use bitmask.
*/
while (real_nr_queues < nr_queues)
real_nr_queues *= 2;
arr = calloc(real_nr_queues, sizeof(*arr));
if (!arr)
return -ENOMEM;
ret = pthread_cond_init(&queue->cond, NULL);
if (ret) {
free(arr);
return -ret;
}
ret = pthread_cond_init(&queue->enqueue_waiting_cond, NULL);
if (ret) {
pthread_cond_destroy(&queue->cond);
free(arr);
return -ret;
}
queue->arr = arr;
queue->head = 0;
queue->tail = 0;
queue->mask = real_nr_queues - 1;
queue->enqueue_waiting = false;
return 0;
}
static int init_worker(struct worker *worker, size_t idx)
{
worker->is_online = false;
worker->idx = idx;
if (pthread_create(&worker->thread, NULL, worker_func, worker))
return -errno;
worker->is_online = true;
return 0;
}
static int init_workers(struct worker_context *ctx, size_t nr_workers)
{
struct worker *workers;
size_t i;
int ret;
workers = calloc(nr_workers, sizeof(*workers));
if (!workers)
return -ENOMEM;
ctx->workers = workers;
for (i = 0; i < nr_workers; i++) {
workers[i].ctx = ctx;
ret = init_worker(&workers[i], i);
if (ret)
return ret;
}
return 0;
}
static void destroy_workers(struct worker *workers, size_t nr_workers)
{
size_t i;
/*
* Make sure we wait for all thread workers to exit.
*/
for (i = 0; i < nr_workers; i++) {
struct worker *w = &workers[i];
if (!w->is_online)
continue;
pthread_join(w->thread, NULL);
}
}
static void destroy_worker_context(struct worker_context *ctx)
{
struct worker_queue *q = &ctx->queue;
size_t real_nr_queues = q->mask + 1;
struct data_queue *arr;
size_t i;
arr = q->arr;
pthread_mutex_lock(&q->mutex);
q->arr = NULL;
pthread_cond_broadcast(&q->cond);
pthread_mutex_unlock(&q->mutex);
for (i = 0; i < real_nr_queues; i++) {
if (!arr[i].data)
continue;
arr[i].data_deleter(arr[i].data);
memset(&arr[i], 0, sizeof(arr[i]));
}
destroy_workers(ctx->workers, ctx->nr_workers);
free(ctx->workers);
free(arr);
pthread_cond_destroy(&q->cond);
pthread_cond_destroy(&q->enqueue_waiting_cond);
}
static int init_worker_context(struct worker_context *ctx, size_t nr_workers,
size_t nr_queues)
{
int ret;
ctx->nr_workers = nr_workers;
ctx->workers = NULL;
ctx->stop = false;
ret = init_worker_queue(&ctx->queue, nr_queues);
if (ret)
return ret;
ret = init_workers(ctx, nr_workers);
if (ret) {
destroy_worker_context(ctx);
return ret;
}
return 0;
}
static int grab_queue(struct worker *worker, struct data_queue *data)
{
struct worker_context *ctx = worker->ctx;
struct worker_queue *q = &ctx->queue;
size_t idx;
int ret = 0;
pthread_mutex_lock(&q->mutex);
while (q->head == q->tail) {
if (ctx->stop) {
ret = -1;
goto out;
}
pthread_cond_wait(&q->cond, &q->mutex);
}
/*
* Is the queue still available?
* If not, don't grab anything it.
*/
if (!q->arr) {
ret = -1;
goto out;
}
/*
* The circular queue index calculation.
*/
idx = q->head++ & q->mask;
*data = q->arr[idx];
memset(&q->arr[idx], 0, sizeof(q->arr[idx]));
if (q->enqueue_waiting)
pthread_cond_signal(&q->enqueue_waiting_cond);
out:
pthread_mutex_unlock(&q->mutex);
return ret;
}
static uint64_t get_message_id(struct json_object *update)
{
struct json_object *message_id;
struct json_object *message;
message = json_object_object_get(update, "message");
if (!message)
return 0;
message_id = json_object_object_get(message, "message_id");
if (!message_id)
return 0;
return json_object_get_uint64(message_id);
}
static int64_t get_chat_id(struct json_object *update)
{
struct json_object *message;
struct json_object *chat;
struct json_object *id;
message = json_object_object_get(update, "message");
if (!message)
return 0;
chat = json_object_object_get(message, "chat");
if (!chat)
return 0;
id = json_object_object_get(chat, "id");
if (!id)
return 0;
return json_object_get_int64(id);
}
static int tgbot_send_message(struct tgbot_context *tgctx,
struct tgbot_send_message_arg *arg);
static void tgbot_handle_ping_command(struct tgbot_context *tgctx,
struct json_object *update)
{
struct tgbot_send_message_arg arg;
arg.reply_to_message_id = get_message_id(update);
arg.chat_id = get_chat_id(update);
arg.text = "Pong!";
tgbot_send_message(tgctx, &arg);
}
static void tgbot_handle_hello_command(struct tgbot_context *tgctx,
struct json_object *update)
{
struct tgbot_send_message_arg arg;
arg.reply_to_message_id = get_message_id(update);
arg.chat_id = get_chat_id(update);
arg.text = "Hello World!";
tgbot_send_message(tgctx, &arg);
}
/*
* Just to simulate a heavy work.
*/
static void tgbot_handle_sleep_command(struct tgbot_context *tgctx,
struct json_object *update)
{
struct tgbot_send_message_arg arg;
arg.reply_to_message_id = get_message_id(update);
arg.chat_id = get_chat_id(update);
arg.text = "Sleep command started!";
tgbot_send_message(tgctx, &arg);
sleep(10);
arg.reply_to_message_id = get_message_id(update);
arg.chat_id = get_chat_id(update);
arg.text = "Sleep command finished!";
tgbot_send_message(tgctx, &arg);
}
static void handle_text_message(struct tgbot_context *tgctx,
struct json_object *update, const char *text)
{
if (!strcmp("/ping", text)) {
tgbot_handle_ping_command(tgctx, update);
return;
}
if (!strcmp("/hello", text)) {
tgbot_handle_hello_command(tgctx, update);
return;
}
if (!strcmp("/sleep", text)) {
tgbot_handle_sleep_command(tgctx, update);
return;
}
}
static void process_update(struct tgbot_context *tgctx, void *data)
{
struct json_object *update = data;
struct json_object *message;
struct json_object *text;
message = json_object_object_get(update, "message");
if (!message)
return;
text = json_object_object_get(message, "text");
if (!text)
return;
handle_text_message(tgctx, update, json_object_get_string(text));
}
static void *worker_func(void *arg)
{
struct worker *worker = arg;
struct worker_context *ctx = worker->ctx;
struct tgbot_context *tgctx;
tgctx = container_of(ctx, struct tgbot_context, worker_ctx);
printf("Thread %zu is online!\n", worker->idx);
while (!ctx->stop) {
struct data_queue data;
memset(&data, 0, sizeof(data));
if (grab_queue(worker, &data))
break;
if (data.data)
process_update(tgctx, data.data);
if (data.data_deleter)
data.data_deleter(data.data);
}
printf("Thread %zu is exiting...\n", worker->idx);
curl_thread_cleanup();
pthread_mutex_lock(&ctx->queue.mutex);
if (ctx->queue.enqueue_waiting)
pthread_cond_broadcast(&ctx->queue.enqueue_waiting_cond);
pthread_mutex_unlock(&ctx->queue.mutex);
return NULL;
}
static void signal_handler(int sig)
{
struct worker_context *wctx = &g_ctx.worker_ctx;
printf("\nGot signal %d, exiting...\n", sig);
wctx->stop = true;
}
static int setup_signal_handlers(void)
{
struct sigaction act;
memset(&act, 0, sizeof(act));
act.sa_handler = signal_handler;
if (sigaction(SIGINT, &act, NULL))
return -errno;
if (sigaction(SIGHUP, &act, NULL))
return -errno;
if (sigaction(SIGTERM, &act, NULL))
return -errno;
return 0;
}
struct curl_data {
char *data;
size_t len;
size_t allocated;
int err;
};
static size_t tgbot_curl_write(char *ptr, size_t size, size_t nmemb, void *dd)
{
size_t real_size = size * nmemb;
struct curl_data *d = dd;
size_t new_len;
if (!dd)
return real_size;
if (!d->data) {
d->allocated = 4096;
d->data = malloc(d->allocated);
if (!d->data) {
d->err = -ENOMEM;
return 0;
}
}
new_len = d->len + real_size + 1;
if (d->allocated < new_len) {
size_t new_allocated;
char *data;
new_allocated = (d->allocated * 2) + new_len;
data = realloc(d->data, new_allocated);
if (!data) {
free(d->data);
d->data = NULL;
d->err = -ENOMEM;
return 0;
}
d->data = data;
d->allocated = new_allocated;
}
memcpy(&d->data[d->len], ptr, real_size);
d->len += real_size;
d->data[d->len] = '\0';
return real_size;
}
static int tgbot_call_method(const char *token, const char *method,
const char *post_data, struct curl_data *data)
{
char url[512];
CURLcode res;
if (data) {
data->err = 0;
data->len = 0;
data->allocated = 0;
data->data = NULL;
}
if (!g_ch) {
g_ch = curl_easy_init();
if (!g_ch)
return -ENOMEM;
}
snprintf(url, sizeof(url), "https://api.telegram.org/bot%s/%s", token,
method);
curl_easy_setopt(g_ch, CURLOPT_URL, url);
if (post_data) {
curl_easy_setopt(g_ch, CURLOPT_POST, 1);
curl_easy_setopt(g_ch, CURLOPT_POSTFIELDS, post_data);
}
curl_easy_setopt(g_ch, CURLOPT_WRITEFUNCTION, tgbot_curl_write);
curl_easy_setopt(g_ch, CURLOPT_WRITEDATA, data);
res = curl_easy_perform(g_ch);
if (res != CURLE_OK)
printf("Curl with URL \"%s\" failed: %s\n", url,
curl_easy_strerror(res));
return res;
}
static int tgbot_send_message(struct tgbot_context *tgctx,
struct tgbot_send_message_arg *arg)
{
static const size_t pdata_len = 4096 * 8;
char *post_data;
int ret;
post_data = malloc(pdata_len);
if (!post_data)
return -ENOMEM;
/*
* TODO(ammarfaizi2): Do rawurlencode(arg->text) to handle
* special characters.
*/
snprintf(post_data, pdata_len,
"chat_id=%lld&text=%s&reply_to_message_id=%llu",
(long long)arg->chat_id, arg->text,
(unsigned long long)arg->reply_to_message_id);
ret = tgbot_call_method(tgctx->token, "sendMessage", post_data, NULL);
free(post_data);
return ret;
}
static int tgbot_get_updates(struct tgbot_context *tgctx)
{
struct curl_data d;
char method[255];
int ret;
snprintf(method, sizeof(method), "getUpdates?offset=%llu",
(unsigned long long)tgctx->max_update_id + 1);
ret = tgbot_call_method(tgctx->token, method, NULL, &d);
if (ret) {
printf("tgbot_get_updates(): %s\n", strerror(-ret));
return ret;
}
tgctx->updates_raw = d.data;
return 0;
}
static int tgbot_parse_updates(struct tgbot_context *tgctx)
{
struct json_object *obj;
obj = json_tokener_parse(tgctx->updates_raw);
if (!obj) {
printf("json_tokener_parse() failed\n");
return -EINVAL;
}
tgctx->updates = obj;
return 0;
}
static void tgbot_json_object_put(void *data)
{
json_object_put(data);
}
static size_t queue_size(struct worker_queue *q)
__must_hold(&q->mutex)
{
size_t head = q->head;
size_t tail = q->tail;
if (head > tail)
return head - tail;
else
return tail - head;
}
static bool is_queue_full(struct worker_queue *q)
__must_hold(&q->mutex)
{
return queue_size(q) >= q->mask;
}
static int enqueue_update(struct tgbot_context *tgctx,
struct json_object *update)
{
struct worker_context *wctx = &tgctx->worker_ctx;
struct worker_queue *q = &wctx->queue;
struct data_queue *dq;
if (wctx->stop)
return -EOWNERDEAD;
pthread_mutex_lock(&q->mutex);
while (is_queue_full(q)) {
printf("Queue is full, waiting...\n");
q->enqueue_waiting = true;
pthread_cond_wait(&q->enqueue_waiting_cond, &q->mutex);
if (wctx->stop)
break;
}
q->enqueue_waiting = false;
if (!wctx->stop) {
json_object_get(update);
dq = &q->arr[q->tail++ & q->mask];
dq->data = update;
dq->data_deleter = tgbot_json_object_put;
pthread_cond_signal(&q->cond);
}
pthread_mutex_unlock(&q->mutex);
return 0;
}
static uint64_t get_update_id(struct json_object *update)
{
struct json_object *update_id;
update_id = json_object_object_get(update, "update_id");
if (!update_id)
return 0;
return json_object_get_uint64(update_id);
}
static int enqueue_updates(struct tgbot_context *tgctx)
{
struct json_object *result;
struct json_object *update;
uint64_t update_id;
int ret = 0;
size_t i, n;
result = json_object_object_get(tgctx->updates, "result");
if (!result)
return -EINVAL;
n = json_object_array_length(result);
printf("Got %llu new update(s)\n", (unsigned long long)n);
for (i = 0; i < n; i++) {
update = json_object_array_get_idx(result, i);
if (!update)
break;
update_id = get_update_id(update);
if (update_id <= tgctx->max_update_id)
continue;
tgctx->max_update_id = update_id;
ret = enqueue_update(tgctx, update);
if (ret)
break;
}
return ret;
}
static void run_app_loop(struct tgbot_context *tgctx)
{
struct worker_context *wctx = &tgctx->worker_ctx;
tgctx->max_update_id = 0;
while (!wctx->stop) {
int ret;
if (tgbot_get_updates(tgctx)) {
printf("tgbot_get_updates() failed!\n");
break;
}
ret = tgbot_parse_updates(tgctx);
if (ret) {
printf("tgbot_parse_updates() failed!\n");
printf("updates_raw: %s\n", tgctx->updates_raw);
free(tgctx->updates_raw);
break;
}
ret = enqueue_updates(tgctx);
json_object_put(tgctx->updates);
if (ret) {
printf("enqueue_updates() failed!\n");
printf("updates_raw: %s\n", tgctx->updates_raw);
free(tgctx->updates_raw);
break;
}
free(tgctx->updates_raw);
}
wctx->stop = true;
}
int main(void)
{
struct worker_context *wctx = &g_ctx.worker_ctx;
int ret;
if (curl_global_init(CURL_GLOBAL_ALL)) {
printf("curl_global_init() error!\n");
return 1;
}
ret = setup_signal_handlers();
if (ret)
goto out_curl;
ret = init_worker_context(wctx, NUMBER_OF_THREADS, NUMBER_OF_QUEUE_SLOTS);
if (ret)
goto out_curl;
g_ctx.token = TELEGRAM_BOT_TOKEN;
run_app_loop(&g_ctx);
destroy_worker_context(wctx);
printf("The main thread is exiting...\n");
out_curl:
curl_thread_cleanup();
curl_global_cleanup();
return -ret;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment