一 问题来源:南京烽火通信面试
二 问题:
现有一种定长队列,长度为2的n次方,此队列可被多线程访问,但必须确保线程级安全,即在任意时刻,队列的长度保持不变。
三 笔者分析
1. 队列的存储结构--链式和顺序均可。
2. 长度为2的n次方--便于移动队头和队尾指针,假设队列长度为size,那么rear = (rear + 1) & (size - 1),&运算比算术求模运算效率高。为此,笔者使用了顺序队列。
3. 怎么确保线程级安全
笔者认为此系统一定同时存在多个读者和写者,读者是读取队头元素,写者是不断向队尾插入元素,读者与读者之间,读者与写者之间,写者与写者之间都必须互斥访问,因此,必须采用互斥锁,保证任意时刻队列长度不变。为此,笔者定义一个全局变量balance。
读者 get
加锁
检测balance为1吗?如果为1,表明队尾已经有一个元素加入, 那么读取队头,并将balance = 0
解锁
写者 put
加锁
检测balance为0吗?如果为0,表明队列平衡,没有元素插入, 那么在队列插入元素,并将balance = 1
解锁
这样能够确保同一时刻读、写者之间彼此是互斥的。四 代码组织
五 代码详解
1. queue.h
/************************************************************************* > File Name: queue.h > Author: wangzhicheng > Mail: 2363702560@qq.com > Created Time: Fri 14 Nov 2014 11:23:55 AM WST ************************************************************************/ #ifndef _QUEUE_H_ #define _QUEUE_H_ #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define MAX 1024 // the max storage space typedef int DataType; // data type /* * sequenece queue * */ typedef struct Queue { DataType *array; int front; // pointer to the header node of queue int rear; // pointer to the next position of last node in the queue int size; // initial size of the queue size should be 8 bytes }Queue; Queue Q; int balance; // should be 0, 1 -- a node add in the queue pthread_mutex_t mutex; // mutex lock int InitQueue(); int reInitQueue(uint); int get(DataType *); int put(DataType); #endif
/************************************************************************* > File Name: queue.c > Author: wangzhicheng2013 > Mail: 2363702560@qq.com > Created Time: Fri 14 Nov 2014 03:01:32 PM WST ************************************************************************/ #include "queue.h" /* * initialize the queue * return 0 if successfully * else return -1 * */ int InitQueue() { int rc = 0; int i; Q.array = (DataType *)malloc(sizeof(DataType) * MAX); if(!Q.array) { rc = -1; printf("Init Queue failed, Memeory lack...!\n"); } else { // initialize the queue Q.front = 0; Q.size = 8; Q.rear = Q.size; for(i = 0;i < Q.size;i++) Q.array[i] = i; pthread_mutex_init(&mutex, NULL); } return rc; } /* * reinitialize the queue * return 0 if successfully * else return -1 * */ int reInitQueue(uint capacity) { int rc = 0; int i; if(capacity >= MAX) rc = -1; else { // initialize the queue for(i = 2;i < capacity;i = i << 1) ; if(i > capacity) i = i / 2; // compute the max 2 ^ n less than capacity Q.size = i; for(i = Q.rear;i < Q.size;i++) Q.array[i] = i; Q.rear = Q.size; } return rc; } /* * get a element from the queue * return 0 if successfully * else return -1 * */ int get(DataType *e) { int rc = -1; pthread_mutex_lock(&mutex); if(balance) { // if put a node in the queue before *e = Q.array[Q.front]; Q.front = (Q.front + 1) & (Q.size - 1); // circularly move, save much time balance = 0; // return orgin rc = 0; } pthread_mutex_unlock(&mutex); return rc; } /* * put a element in the queue * return 0 if successfully * else return -1 * */ int put(DataType e) { int rc = -1; pthread_mutex_lock(&mutex); if(!balance) { // if the queue is balanced Q.array[Q.rear] = e; Q.rear = (Q.rear + 1) & (Q.rear - 1); // circularly move, save much time balance = 1; rc = 0; } pthread_mutex_unlock(&mutex); return rc; }
/************************************************************************* > File Name: main.c > Author: ma6174 > Mail: ma6174@163.com > Created Time: Fri 14 Nov 2014 03:26:48 PM WST ************************************************************************/ #include "queue.h" #include <time.h> #define THREAD_NUM 20 void *_read(void *arg) { DataType e; while(1) { if(get(&e)) { perror("read failed...!\n"); } else printf("%d has been read...!\n", e); usleep(20); // sleep(1); } } void *_write(void *arg) { DataType e = rand() % 10; while(1) { if(put(e)) { perror("write failed...!\n"); } else printf("%d has been write...!\n", e); usleep(20); // sleep(1); } } int main() { if(InitQueue()) { exit(EXIT_FAILURE); } srand((unsigned)time(NULL)); pthread_t threads[THREAD_NUM]; int i; for(i = 0;i < THREAD_NUM;i++) { if(i < THREAD_NUM / 2) { if(pthread_create(&threads[i], NULL, _read, NULL)) { perror("thread created failed...!\n"); } } else { if(pthread_create(&threads[i], NULL, _write, NULL)) { perror("thread created failed...!\n"); } } } for(i = 0;i < THREAD_NUM;i++) { pthread_join(threads[i], NULL); } return 0; }
CC=gcc all: $(CC) -g -o main src/main.c src/queue.c header/queue.h -I src/ -I header/ -lpthread
原文:http://blog.csdn.net/wangzhicheng1983/article/details/41125537