具体要求:
一个线程为生产者,10个线程为生产者。
任务为功能模块,任务运行时间为两秒。
生产者投递5次任务即可。
消费者线程持续获取任务(while循环)。
#include<stdlib.h>
#include<stdio.h>
#include<unistd.h>
#include<pthread.h>
#include<string.h>pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;//任务包
typedef struct{
void *(*tjob)(void*);void *targ;
}task_t;//任务容器
typedef struct{
int front;int rear;int max;int cur;task_t *task_queue;
}container_t;container_t *pthread_container_create(int);int pthread_container_destroy(container_t*);int pthread_add_task(container_t*,task_t);void *pthread_userjob(void*);void *pthread_userjob(void *arg)
{
printf(">>>0x%x>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>[执行任务]\n",(unsigned int)pthread_self());sleep(2);return NULL;
}container_t *pthread_container_create(int max)
{
container_t *ct = NULL;if((ct = (container_t *)malloc(sizeof(container_t))) == NULL){
perror("Malloc Container Error");exit(0);}ct->front = 0;ct->rear = 0;ct->cur = 0;ct->max = max;if((ct->task_queue = (task_t *)malloc(sizeof(task_t)*max)) == NULL){
perror("Malloc Taskqueue Error");exit(0);}return ct;
}int pthread_container_destroy(container_t *ct)
{
free(ct->task_queue);free(ct);return 0;
}int pthread_add_task(container_t *ct , task_t task)
{
//是否可以生产任务pthread_mutex_lock(&lock);while(ct->cur == ct->max)pthread_cond_wait(¬_full, &lock);//添加任务ct->task_queue[ct->front].tjob = task.tjob;ct->task_queue[ct->front].targ = task.targ;++(ct->cur);ct->front = (ct->front +1)%ct->max; //环形队列偏移处理printf("Producer thread tid[0x%x] get job 1,queue_cur:%d\n",(unsigned int)pthread_self(),ct->cur);pthread_mutex_unlock(&lock);pthread_cond_signal(¬_empty);return 0;
}
void *pthread_customer_job(void *arg)
{
container_t *ct =(container_t*)arg;task_t tmp;//消费者线程持续获取并执行任务while(1){
pthread_mutex_lock(&lock);while(ct->cur == 0)pthread_cond_wait(¬_empty,&lock);tmp.tjob = ct->task_queue[ct->rear].tjob;tmp.targ = ct->task_queue[ct->rear].targ;--(ct->cur);ct->rear = (ct->rear+1) % ct->max;//环形队列偏移处理printf("Customer thread tid[0x%x] get job 1,queue_cur:%d\n",(unsigned int)pthread_self(),ct->cur);pthread_mutex_unlock(&lock);pthread_cond_signal(¬_full);//执行任务tmp.tjob(tmp.targ);}
}int main()
{
//生产者消费者模型测试container_t *ct = pthread_container_create(100);pthread_t tid[10];for(int i=0;i<10;i++)pthread_create(&tid[i],NULL,pthread_customer_job,(void*)ct);task_t task;task.tjob = pthread_userjob;task.targ = NULL;for(int i=0;i<5;i++)pthread_add_task(ct,task);while(1)sleep(1);return 0;
}