Last active
August 29, 2015 14:08
-
-
Save b4n/b1135eaaac55224e9c56 to your computer and use it in GitHub Desktop.
A shot at the feasibility and implementation complexity of a thread-based asynchornous TM source file updating
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Threaded TM updates. | |
* | |
* One worker thread processes parsing jobs. | |
* | |
* A job represents the update of one single source file. | |
* A group represents the user's update request, that can consist of several update jobs. | |
* | |
* The job<->group relation is many-to-many: A group can consist of multiple jobs, and a job can be | |
* shared by several groups. This allows to perform bulk updates as a single job, while allowing | |
* higher-priority jobs to be processed in-between. | |
*/ | |
typedef void (*TMUpdateFinishedCallback) (gpointer data); | |
typedef struct | |
{ | |
guint jobs_remaning; | |
/* jobs this group has to update. this may not contain the complete list that | |
* the job requested if some jobs were already finished by another group */ | |
GPtrArray *jobs; | |
TMUpdateFinishedCallback callback; | |
gpointer data; | |
} TMUpdateGroup; | |
typedef struct | |
{ | |
GPtrArray *groups; | |
gint priority; /* FIXME: GLib's G_PRIORITY_... is smallest-is-highest. it's weird, but | |
supporting those names would probably be handy. Or define ours */ | |
TMSourceFile *source_file; /* FIXME: this has to be read-only */ | |
gboolean buffer_update; | |
guchar *buffer; | |
gsize buffer_length; | |
GPtrArray *tags_array; /*< parsed tags as a result of the job */ | |
} TMUpdateJob; | |
typedef struct | |
{ | |
GMutex mutex; | |
GCond cond; | |
GQueue queue; | |
} TMJobQueue; | |
static TMJobQueue *TM_job_queue = NULL; | |
static GThread *TM_job_thread = NULL; | |
static TMUpdateJob *update_job_new(gint priority, TMSourceFile *source_file) | |
{ | |
TMUpdateJob *job = g_malloc(sizeof *job); | |
job->groups = g_ptr_array_new(); | |
job->priority = priority; | |
job->source_file = tm_source_file_ref(source_file); | |
job->buffer_update = FALSE; | |
job->buffer = NULL; | |
job->buffer_length = 0; | |
job->tags_array = NULL; | |
return job; | |
} | |
static void update_job_free(TMUpdateJob *job) | |
{ | |
guint i; | |
for (i = 0; i < job->groups->len; i++) | |
{ | |
if (job->groups->pdata[i]) | |
g_warning("Destroying a job with remaining groups"); | |
} | |
g_ptr_array_free(job->groups, TRUE); | |
tm_source_file_unref(job->source_file); | |
g_free(job->buffer); | |
g_free(job); | |
} | |
static TMUpdateGroup *update_group_new(TMUpdateFinishedCallback callback, gpointer data) | |
{ | |
TMUpdateGroup *group = g_malloc(sizeof *group); | |
group->jobs_remaning = 0; | |
group->jobs = g_ptr_array_new(); | |
group->callback = callback; | |
group->data = data; | |
return group; | |
} | |
static void update_group_free(TMUpdateGroup *group) | |
{ | |
guint i; | |
for (i = 0; i < group->jobs->len; i++) | |
update_job_free(group->jobs->pdata[i]); | |
g_ptr_array_free(group->jobs, TRUE); | |
g_free(group); | |
} | |
static TMUpdateJob *job_queue_pop(TMJobQueue *queue) | |
{ | |
TMUpdateJob *job = NULL; | |
g_mutex_lock(&queue->mutex); | |
while (! g_queue_peek_tail_link(&queue->queue)) | |
g_cond_wait(&queue->cond, &queue->mutex); | |
job = g_queue_pop_tail(&queue->queue); | |
g_mutex_unlock(queue); | |
return job; | |
} | |
/* inserts a new job to the queue, possibly replacing an existing job */ | |
static void job_queue_push(TMJobQueue *queue, TMUpdateJob *new_job) | |
{ | |
GList *node; | |
g_mutex_lock(&queue->queue); | |
/* find and remove duplicate jobs */ | |
for (node = queue->queue.head; node; node = node->next) | |
{ | |
TMUpdateJob *job = node->data; | |
if (job->source_file == new_job->source_file || | |
strcmp(job->source_file->file_name, new_job->source_file->file_name) == 0) | |
{ | |
/* inherit the old job's groups */ | |
g_ptr_array_set_size(new_job->groups, new_job->groups->len + job->groups->len); | |
memcpy(&new_job->groups->pdata[new_job->groups->len], job->groups->pdata, job->groups->len); | |
memset(job->groups->pdata, 0, job->groups->len); | |
/* and possibly its priority */ | |
new_job->priority = MAX(new_job->priority, job->priority); | |
update_job_free(job); | |
g_queue_delete_link(&queue->queue, node); | |
break; /* there can only be one duplicate, so stop here */ | |
} | |
} | |
/* insert the new job */ | |
for (node = queue->queue.head; node; node = node->next) | |
{ | |
TMUpdateJob *job = node->data; | |
if (new_job->priority < job->priority) | |
break; | |
} | |
/* if we found a node with priority >= to the new job, insert the new job before it. | |
* otherwise, the queue is either empty or has only lower priority jobs, so push to the end. */ | |
if (node) | |
g_queue_insert_before(&queue->queue, node, new_job); | |
else | |
g_queue_push_tail(&queue->queue, new_job); | |
g_cond_signal(&queue->cond); | |
g_mutex_unlock(&queue->queue); | |
} | |
static gboolean finish_job_in_idle(gpointer data) | |
{ | |
TMUpdateGroup *group = data; | |
guint i; | |
/* FIXME: properly handle the case a source file has been removed from the workspace | |
* before we get here, e.g. we shouldn't merge its tags or update for it */ | |
for (i = 0; i < group->jobs->len; i++) | |
{ | |
TMUpdateJob *job = group->jobs->pdata[i]; | |
tm_tags_remove_file_tags(job->source_file, theWorkspace->tags_array); | |
job->source_file->tags_array = job->tags_array; | |
if (group->jobs->len == 1) | |
tm_workspace_merge_tags(&theWorkspace->tags_array, job->source_file->tags_array); | |
} | |
if (group->jobs->len > 1) | |
tm_workspace_recreate_tags_array(); | |
if (group->callback) | |
group->callback(group->data); | |
update_group_free(group); | |
return FALSE; | |
} | |
static gpointer update_thread(TMJobQueue *queue) | |
{ | |
TMUpdateJob *job; | |
while ((job = job_queue_pop(queue)) != NULL) | |
{ | |
guint i; | |
gboolean job_finished = FALSE; | |
if (job->buffer_update) | |
job->tags_array = tm_source_file_buffer_parse_to_array(job->source_file, | |
job->buffer, job->buffer_length); | |
else | |
job->tags_array = tm_source_file_parse_to_array(job->source_file); | |
tm_tags_sort(job->tags_array, NULL, FALSE, TRUE); | |
for (i = 0; i < job->groups->len; i++) | |
{ | |
TMUpdateGroup *group = job->groups->pdata[i]; | |
/* add the job to the result jobs of the groups, it will be removed | |
* later when a group handled it */ | |
if (group && !job_finished) | |
g_ptr_array_add(group->jobs, job); | |
if (group && --(group->jobs_remaning) < 1) | |
{ | |
job->groups->pdata[i] = NULL; | |
g_idle_add(finish_job_in_idle, group); | |
job_finished = TRUE; | |
} | |
} | |
/* if the job was finished by a group, remove it from the other groups */ | |
if (job_finished) | |
{ | |
for (i = 0; i < job->groups->len; i++) | |
{ | |
TMUpdateGroup *group = job->groups->pdata[i]; | |
if (group) | |
g_ptr_array_remove_fast(group->jobs, job); | |
} | |
} | |
} | |
return NULL; | |
} | |
static void update_thread_init(void) | |
{ | |
TM_job_thread = g_thread_create(update_thread, TM_job_queue, TRUE, NULL); | |
} | |
static void update_thread_destroy(void) | |
{ | |
GList *node; | |
/* FIXME: while this is nice to avoid blocking on useless remaining jobs when quitting, is it | |
* really OK to throw away jobs and the forget some callbacks? | |
* It probably is, as I doubt the main loop will keep running until there are no more | |
* GSources, so anyway the callback may not run even if submitted when we quit. | |
* NOTE: this is done the nice way. If we really don't care about the jobs or anything, we | |
* could just let the thread die with the process. */ | |
/* empty the queue and add a NULL job to notify the thread */ | |
g_mutex_lock(&TM_job_queue->lock); | |
for (node = TM_job_queue->queue.head; node; node = node->next) | |
{ | |
TMUpdateJob *job = node->data; | |
guint i; | |
for (i = 0; i < job->groups->len; i++) | |
{ | |
TMUpdateGroup *group = job->groups->pdata[i]; | |
if (group && --(group->jobs_remaning) < 1) | |
update_group_free(group); | |
} | |
update_job_free(job); | |
} | |
g_queue_clear(&TM_job_queue->queue); | |
/* push a NULL job to notify the thread */ | |
g_queue_push_tail(&TM_job_queue->queue, NULL); | |
g_cond_signal(&TM_job_queue->cond); | |
g_mutex_unlock(&TM_job_queue->lock); | |
g_thread_join(TM_job_thread); | |
TM_job_thread = NULL; | |
} | |
void tm_workspace_update_source_file_async(TMSourceFile *source_file, gboolean update_workspace, | |
gint priority, TMUpdateFinishedCallback *callback, gpointer data) | |
{ | |
TMUpdateJob *job = update_job_new(priority, source_file); | |
TMUpdateJob *group = update_group_new(callback, data); | |
job->buffer_update = FALSE; | |
g_ptr_array_add(job->groups, group); | |
group->jobs_remaning = 1; | |
job_queue_push(TM_job_queue, job); | |
} | |
/* like tm_workspace_update_source_file_buffer_async() but takes ownership of the buffer, to avoid | |
* memory duplication. The buffer must be valid to free with g_free(). */ | |
void tm_workspace_update_source_file_buffer_steal_async(TMSourceFile *source_file, guchar *buffer, gsize len, | |
gboolean update_workspace, gint priority, | |
TMUpdateFinishedCallback *callback, gpointer data) | |
{ | |
TMUpdateJob *job = update_job_new(priority, source_file); | |
TMUpdateJob *group = update_group_new(callback, data); | |
job->buffer_update = TRUE; | |
job->buffer = buffer; | |
job->buffer_length = len; | |
g_ptr_array_add(job->groups, group); | |
group->jobs_remaning = 1; | |
job_queue_push(TM_job_queue, job); | |
} | |
void tm_workspace_update_source_file_buffer_async(TMSourceFile *source_file, const guchar *buffer, gsize len, | |
gboolean update_workspace, gint priority, | |
TMUpdateFinishedCallback *callback, gpointer data) | |
{ | |
tm_workspace_update_source_file_buffer_steal_async(source_file, g_memdup(buffer, len), len, | |
update_workspace, priority, callback, data); | |
} | |
void tm_workspace_update_source_files_async(GPtrArray *source_files, gboolean update_workspace, | |
gint priority, TMUpdateFinishedCallback *callback, gpointer data) | |
{ | |
TMUpdateJob *group = update_group_new(callback, data); | |
guint i; | |
/* as we push each new job directly in the queue without a surrounding lock, | |
* we need not to modify the group in the loop */ | |
/* FIXME: should this manually lock the queue, push all jobs and then notify the condition? | |
* it would be a little more optimized, but would probably require duplicating a part of | |
* job_queue_push() which is a non-trivial function */ | |
group->jobs_remaning = source_files->len; | |
for (i = 0; i < source_files->len; i++) | |
{ | |
TMUpdateJob *job = update_job_new(priority, source_files->pdata[i]); | |
job->buffer_update = FALSE; | |
g_ptr_array_add(job->groups, group); | |
job_queue_push(TM_job_queue, job); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment