9.3 實(shí)驗(yàn)內(nèi)容——“生產(chǎn)者消費(fèi)者”實(shí)驗(yàn) 1.實(shí)驗(yàn)?zāi)康?br /> “生產(chǎn)者消費(fèi)者”問題是一個著名的同時性編程問題的集合。通過學(xué)習(xí)經(jīng)典的“生產(chǎn)者消費(fèi)者”問題的實(shí)驗(yàn),讀者可以進(jìn)一步熟悉Linux中的多線程編程,并且掌握用信號量處理線程間的同步和互斥問題。 2.實(shí)驗(yàn)內(nèi)容 “生產(chǎn)者—消費(fèi)者”問題描述如下。 有一個有限緩沖區(qū)和兩個線程:生產(chǎn)者和消費(fèi)者。他們分別不停地把產(chǎn)品放入緩沖區(qū)和從緩沖區(qū)中拿走產(chǎn)品。一個生產(chǎn)者在緩沖區(qū)滿的時候必須等待,一個消費(fèi)者在緩沖區(qū)空的時候也必須等待。另外,因?yàn)榫彌_區(qū)是臨界資源,所以生產(chǎn)者和消費(fèi)者之間必須互斥執(zhí)行。它們之間的關(guān)系如圖9.4所示。 圖9.4 生產(chǎn)者消費(fèi)者問題描述 這里要求使用有名管道來模擬有限緩沖區(qū),并且使用信號量來解決“生產(chǎn)者—消費(fèi)者”問題中的同步和互斥問題。 3.實(shí)驗(yàn)步驟 (1)信號量的考慮。 這里使用3個信號量,其中兩個信號量avail和full分別用于解決生產(chǎn)者和消費(fèi)者線程之間的同步問題,mutex是用于這兩個線程之間的互斥問題。其中avail表示有界緩沖區(qū)中的空單元數(shù),初始值為N;full表示有界緩沖區(qū)中非空單元數(shù),初始值為0;mutex是互斥信號量,初始值為1。 (2)畫出流程圖。 本實(shí)驗(yàn)流程圖如圖9.5所示。 圖9.5 “生產(chǎn)者—消費(fèi)者”實(shí)驗(yàn)流程圖 (3)編寫代碼 本實(shí)驗(yàn)的代碼中采用的有界緩沖區(qū)擁有3個單元,每個單元為5個字節(jié)。為了盡量體現(xiàn)每個信號量的意義,在程序中生產(chǎn)過程和消費(fèi)過程是隨機(jī)(采取0~5s的隨機(jī)時間間隔)進(jìn)行的,而且生產(chǎn)者的速度比消費(fèi)者的速度平均快兩倍左右(這種關(guān)系可以相反)。生產(chǎn)者一次生產(chǎn)一個單元的產(chǎn)品(放入“hello”字符串),消費(fèi)者一次消費(fèi)一個單元的產(chǎn)品。 /*producer-customer.c*/ #include #include #include #include #include #include #include #include #define MYFIFO "myfifo" /* 緩沖區(qū)有名管道的名字 */ #define BUFFER_SIZE 3 /* 緩沖區(qū)的單元數(shù) */ #define UNIT_SIZE 5 /* 每個單元的大小 */ #define RUN_TIME 30 /* 運(yùn)行時間 */ #define DELAY_TIME_LEVELS 5.0 /* 周期的最大值 */ int fd; time_t end_time; sem_t mutex, full, avail; /* 3個信號量 */ /*生產(chǎn)者線程*/ void *producer(void *arg) { int real_write; int delay_time = 0; while(time(NULL) < end_time) { delay_time = (int)(rand() * DELAY_TIME_LEVELS/(RAND_MAX) / 2.0) + 1; sleep(delay_time); /*P操作信號量avail和mutex*/ sem_wait(&avail); sem_wait(&mutex); printf("\nProducer: delay = %d\n", delay_time); /*生產(chǎn)者寫入數(shù)據(jù)*/ if ((real_write = write(fd, "hello", UNIT_SIZE)) == -1) { if(errno == EAGAIN) { printf("The FIFO has not been read yet.Please try later\n"); } } else { printf("Write %d to the FIFO\n", real_write); } /*V操作信號量full和mutex*/ sem_post(&full); sem_post(&mutex); } pthread_exit(NULL); } /* 消費(fèi)者線程*/ void *customer(void *arg) { unsigned char read_buffer[UNIT_SIZE]; int real_read; int delay_time; while(time(NULL) < end_time) { delay_time = (int)(rand() * DELAY_TIME_LEVELS/(RAND_MAX)) + 1; sleep(delay_time); /*P操作信號量full和mutex*/ sem_wait(&full); sem_wait(&mutex); memset(read_buffer, 0, UNIT_SIZE); printf("\nCustomer: delay = %d\n", delay_time); if ((real_read = read(fd, read_buffer, UNIT_SIZE)) == -1) { if (errno == EAGAIN) { printf("No data yet\n"); } } printf("Read %s from FIFO\n", read_buffer); /*V操作信號量avail和mutex*/ sem_post(&avail); sem_post(&mutex); } pthread_exit(NULL); } int main() { pthread_t thrd_prd_id,thrd_cst_id; pthread_t mon_th_id; int ret; srand(time(NULL)); end_time = time(NULL) + RUN_TIME; /*創(chuàng)建有名管道*/ if((mkfifo(MYFIFO, O_CREAT|O_EXCL) < 0) && (errno != EEXIST)) { printf("Cannot create fifo\n"); return errno; } /*打開管道*/ fd = open(MYFIFO, O_RDWR); if (fd == -1) { printf("Open fifo error\n"); return fd; } /*初始化互斥信號量為1*/ ret = sem_init(&mutex, 0, 1); /*初始化avail信號量為N*/ ret += sem_init(&avail, 0, BUFFER_SIZE); /*初始化full信號量為0*/ ret += sem_init(&full, 0, 0); if (ret != 0) { printf("Any semaphore initialization failed\n"); return ret; } /*創(chuàng)建兩個線程*/ ret = pthread_create(&thrd_prd_id, NULL, producer, NULL); if (ret != 0) { printf("Create producer thread error\n"); return ret; } ret = pthread_create(&thrd_cst_id, NULL, customer, NULL); if(ret != 0) { printf("Create customer thread error\n"); return ret; } pthread_join(thrd_prd_id, NULL); pthread_join(thrd_cst_id, NULL); close(fd); unlink(MYFIFO); return 0; } 4.實(shí)驗(yàn)結(jié)果 運(yùn)行該程序,得到如下結(jié)果: $ ./producer_customer …… Producer: delay = 3 Write 5 to the FIFO Customer: delay = 3 Read hello from FIFO Producer: delay = 1 Write 5 to the FIFO Producer: delay = 2 Write 5 to the FIFO Customer: delay = 4 Read hello from FIFO Customer: delay = 1 Read hello from FIFO Producer: delay = 2 Write 5 to the FIFO …… |