Skip to content

Instantly share code, notes, and snippets.

@yaoyi
Created February 4, 2013 15:57
Show Gist options
  • Save yaoyi/4707621 to your computer and use it in GitHub Desktop.
Save yaoyi/4707621 to your computer and use it in GitHub Desktop.
环形队列实现
/**
* @file queue.c
* @brief 基本的队列实现
* @author linyaoyi
* @version 1.1
* @date 2012-01-16
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"
/**
* @brief qid2addr 将队列id转为指向队列起始地址的指针
* @param qid 队列id
* @return 指向队列起始地址的指针
* @remark 不提供外部调用
*/
//inline unsigned char *qid2addr(int qid)
//{
// return (unsigned char*)qid;
//}
#define qid2addr(qid) ((unsigned char*)qid)
/**
* @brief get_queue_info 获取队列信息
* @param qid 队列id
* @return 指向queue_info_t的指针
* @remark 不提供外部调用
*/
//inline queue_info_t *get_queue_info(int qid)
//{
// return (queue_info_t*)qid2addr(qid);
//}
#define get_queue_info(qid) ((queue_info_t*)qid2addr(qid))
/**
* @brief get_queue_node_info 获取队列结点的信息
* @param qid 队列id
* @param pos 队列结点的偏移
* @return 队列结点的信息
* @remark 不提供外部调用
*/
//inline queue_node_info_t *get_queue_node_info(int qid, int pos)
//{
// return (queue_node_info_t *)(qid2addr(qid) + sizeof(queue_info_t) + pos * sizeof(queue_node_info_t));
//}
#define get_queue_node_info(qid,pos) \
(queue_node_info_t *)(qid2addr(qid) + sizeof(queue_info_t) + pos * sizeof(queue_node_info_t));
/**
* @brief get_queue_length 获取队列长度
* @param qid 队列id
* @return 队列长度
*/
size_t get_queue_length(size_t qid)
{
dbg("enter");
if (qid2addr(qid) == NULL)
{
return ERROR_QUEUE_NOT_EXIST;
}
queue_info_t *queueInfo = get_queue_info(qid);
dbg("end");
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
return queueInfo->queueLen;
}
/**
* @brief get_queue_size 获取队列大小(bytes)
* @param qid 队列id
* @return 返回队列长度
*/
size_t get_queue_size(size_t qid)
{
dbg("enter");
if (qid2addr(qid) == NULL)
{
return ERROR_QUEUE_NOT_EXIST;
}
queue_info_t *queueInfo = get_queue_info(qid);
dbg("end");
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
return queueInfo->size;
}
/**
* @brief create_queue 创建队列
* @param queueLen 队列长度
* @return 返回队列id
*/
size_t create_queue(size_t queueLen)
{
dbg("enter");
unsigned char *qid;
//多申请一个node
qid = (unsigned char *)malloc(queueLen * sizeof(queue_node_info_t) + sizeof(queue_info_t) + 1);
if (qid == NULL)
{
return ERROR_QUEUE_MALLOC_FAIL;
}
queue_info_t *queueInfo = get_queue_info(qid);
queueInfo->queueLen = queueLen;
queueInfo->rear = queueInfo->front = 0;
queueInfo->size = queueInfo->queueLen * sizeof(queue_node_info_t) + sizeof(queue_info_t);
//init semaphore
pthread_mutex_init(&queueInfo->rlock, NULL);
pthread_mutex_init(&queueInfo->wlock, NULL);
pthread_cond_init(&queueInfo->notEmpty, NULL);
pthread_cond_init(&queueInfo->notFull, NULL);
dbg("queueInfo->queueLen:%d", queueInfo->queueLen);
dbg("queueInfo->size:%d", queueInfo->size);
dbg("end");
return (size_t)qid;
}
/**
* @brief dequeue 队首出队
* @param qid
* @param buffer 存放出队数据的缓冲区
* @return 数据长度
* @remark 不加锁dequeue
*/
size_t __dequeue(size_t qid, unsigned char **buffer)
{
dbg("%u enter", (unsigned)pthread_self());
if (is_queue_empty(qid))
{
return ERROR_QUEUE_EMPTY;
}
queue_info_t *queueInfo = get_queue_info(qid);
dbg("before:");
dbg("queueInfo->front:%d", queueInfo->front);
dbg("queueInfo->rear:%d", queueInfo->rear);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
queue_node_info_t *queueNodeInfo = get_queue_node_info(qid, queueInfo->front);
if (queueNodeInfo == NULL)
{
return ERROR_QUEUE_NODE_INFO_NOT_EXIST;
}
*buffer = (unsigned char *)queueNodeInfo->data;
if (buffer == NULL)
{
return ERROR_QUEUE_NODE_HAS_NO_DATA;
}
//update front
queueInfo->front = (queueInfo->front + 1) % (queueInfo->queueLen);
dbg("after:");
dbg("queueInfo->front:%d", queueInfo->front);
dbg("queueInfo->rear:%d", queueInfo->rear);
dbg("end");
return queueNodeInfo->dataLen;
}
/**
* @brief dequeue 队首出队
* @param qid
* @param buffer 存放出队数据的缓冲区
* @return 数据长度
* @remark 加锁dequeue
*/
size_t dequeue(size_t qid, unsigned char **buffer)
{
queue_info_t *queueInfo = get_queue_info(qid);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
int ret = 0;
dbg("%u request read lock", pthread_self());
pthread_mutex_lock(&queueInfo->rlock);
dbg("%u get read lock", pthread_self());
while (is_queue_empty(qid))
{
printf("queue is empty, %u blocked and waiting for 'notEmpty' condition variable...\n", (unsigned int)pthread_self());
dbg("%u release read lock", pthread_self());
pthread_cond_wait(&queueInfo->notEmpty, &queueInfo->rlock);
//after being aroused, thread should check empty/full again, in case confliction with another thread
printf("%u wakeup and get read lock", (unsigned int)pthread_self());
}
ret = __dequeue(qid, buffer);
if (ret < 0)
{
dbg("something wrong, ret = %d", ret);
return ret;
}
printf("send notFull conditional variable to arouse some thread");
pthread_cond_broadcast(&queueInfo->notFull);
dbg("%u release read lock", (unsigned)pthread_self());
pthread_mutex_unlock(&queueInfo->rlock);
dbg("%u read lock released", (unsigned)pthread_self());
return ret;
}
/**
* @brief enqueue 队尾入队
* @param qid 队列id
* @param data 要入队的对象
* @param len 入队对象数据域的长度
* @return
* @remark 不加锁的enqueue
*/
size_t __enqueue(size_t qid, const unsigned char *data, size_t len)
{
dbg("enter");
if (is_queue_full(qid))
{
return ERROR_QUEUE_EMPTY;
}
queue_info_t *queueInfo = get_queue_info(qid);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
dbg("before:");
dbg("queueInfo->front:%d", queueInfo->front);
dbg("queueInfo->rear:%d", queueInfo->rear);
queue_node_info_t *queueNodeInfo = get_queue_node_info(qid, queueInfo->rear);
queueNodeInfo->dataLen = len;
queueNodeInfo->data = (void *)data;
//update rear
queueInfo->rear = (queueInfo->rear + 1) % queueInfo->queueLen;
queueInfo = get_queue_info(qid);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
dbg("after:");
dbg("queueInfo->front:%d", queueInfo->front);
dbg("queueInfo->rear:%d", queueInfo->rear);
dbg("end");
return QUEUE_SUCCESS;
}
/**
* @brief enqueue 队尾入队
* @param qid 队列id
* @param data 要入队的对象
* @param len 入队对象数据域的长度
* @return
* @remark 加锁的enqueue
*/
size_t enqueue(size_t qid, const unsigned char *data, size_t len)
{
queue_info_t *queueInfo = get_queue_info(qid);
queue_info_t *queueInfo_a = (queue_info_t *)qid;
printf("queuelen:%d\n", queueInfo_a->size);
printf("queuelen:%d\n", queueInfo->size);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
dbg("%u request write lock", (unsigned)pthread_self());
pthread_mutex_lock(&queueInfo->wlock);
dbg("%u get write lock", (unsigned)pthread_self());
int ret = 0;
while (is_queue_full(qid))
{
printf("queue is full,%u blocked and waiting for 'notFull' condition variable...\n", (unsigned)pthread_self());
dbg("%u release write lock", (unsigned)pthread_self());
pthread_cond_wait(&queueInfo->notFull, &queueInfo->wlock);
printf("%u wake up and get write lock\n", (unsigned)pthread_self());
}
ret = __enqueue(qid, data, len);
if (ret != QUEUE_SUCCESS)
{
dbg("something wrong, ret = %d", ret);
return ret;
}
printf("send notEmpty conditional variable to arouse some thread");
pthread_cond_broadcast(&queueInfo->notEmpty);
dbg("%u release write lock", (unsigned)pthread_self());
pthread_mutex_unlock(&queueInfo->wlock);
dbg("%u write lock released", (unsigned)pthread_self());
return QUEUE_SUCCESS;
}
/**
* @brief is_queue_empty 判断队列是否为空
* @param qid 队列id
* @return
*/
size_t is_queue_empty(size_t qid)
{
dbg("enter");
queue_info_t *queueInfo = get_queue_info(qid);
dbg("end");
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
if ( queueInfo->rear == queueInfo->front)
{
return QUEUE_EMPTY;
}
return QUEUE_NOT_EMPTY;
}
/**
* @brief is_queue_empty 判断队列是否为空
* @param qid 队列id
* @return
*/
size_t is_queue_full(size_t qid)
{
queue_info_t *queueInfo = get_queue_info(qid);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
if (queueInfo->front == (queueInfo->rear + 1) % queueInfo->queueLen)
{
return QUEUE_FULL;
}
return QUEUE_NOT_FULL;
}
/**
* @brief clear_queue 清除队列
* @param qid 队列id
* @return
*/
size_t clear_queue(size_t qid)
{
dbg("enter");
queue_info_t *queueInfo = get_queue_info(qid);
if (queueInfo == NULL)
{
return ERROR_QUEUE_INFO_NOT_EXIST;
}
queueInfo->rear = queueInfo->front = 0;
dbg("end");
return QUEUE_SUCCESS;
}
/**
* @brief destroy_queue 销毁队列
* @param qid 队列id
* @return
*/
size_t destroy_queue(size_t qid)
{
dbg("enter");
unsigned char *pqueue = qid2addr(qid);
free(pqueue);
pqueue = NULL;
dbg("end");
return QUEUE_SUCCESS;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment