source: tests/concurrent/pthread/bounded_buffer.cfa @ 20be782

ADTast-experimentalpthread-emulation
Last change on this file since 20be782 was 20be782, checked in by z277zhu <z277zhu@…>, 20 months ago

add pthread

  • Property mode set to 100644
File size: 5.0 KB
Line 
1#include <stdlib.h>                                                                             // prototype: rand
2#include <fstream.hfa>
3#include <thread.hfa>
4#include <pthread.h>
5#include <errno.h>
6enum { BufferSize = 50 };
7
8volatile int producer_val_total;
9volatile int consumer_val_total;
10
11pthread_mutex_t producer_cnt_lock, consumer_cnt_lock;
12
13
14forall( T ){
15    struct Buffer
16    {
17        int front, back, count;
18                T elements[BufferSize];
19        pthread_mutex_t _mutex;
20            pthread_cond_t Full, Empty;                                                 // waiting consumers & producers
21    };
22
23    void ?{}( Buffer(T) & buffer ) with( buffer ) {
24         [front, back, count] = 0;
25         pthread_mutex_init(&_mutex, NULL);
26         pthread_cond_init(&Full, NULL);
27         pthread_cond_init(&Empty, NULL);
28    }
29
30    void ^?{}( Buffer(T) & buffer ) with( buffer ){
31        pthread_mutex_destroy(&_mutex);
32        pthread_cond_destroy(&Full);
33        pthread_cond_destroy(&Empty);
34    }
35
36    int query( Buffer(T) & buffer ) { return buffer.count; } // read-only, no mutual exclusion
37   
38    void insert( Buffer(T) & buffer, T elem ) with(buffer) {
39                pthread_mutex_lock(&_mutex);
40                while ( count == 20 ) pthread_cond_wait( &Empty, &_mutex ); // block producer
41                elements[back] = elem;
42                back = ( back + 1 ) % 20;
43                count += 1;
44                pthread_cond_signal( &Full );                                   // unblock consumer
45        pthread_mutex_unlock(&_mutex);
46        }
47
48    T remove(Buffer(T) & buffer) with(buffer) {
49                pthread_mutex_lock(&_mutex);
50                while ( count == 0 ) pthread_cond_wait( &Full, &_mutex ); // block consumer
51                T elem = elements[front];
52                front = ( front + 1 ) % 20;
53                count -= 1;
54                pthread_cond_signal( &Empty );                                  // unblock producer
55        pthread_mutex_unlock(&_mutex);
56                return elem;
57        }
58
59}
60
61void *producer( void *arg ) {
62        Buffer(int) &buf = *(Buffer(int)*)arg;
63        const int NoOfItems = rand() % 40;
64        int item;
65        for ( int i = 1; i <= NoOfItems; i += 1 ) {                     // produce a bunch of items
66                item = rand() % 100 + 1;                                                // produce a random number
67                //sout | "Producer:" | pthread_self() | " value:" | item;
68                insert( buf,item );                                                             // insert element into queue
69        pthread_mutex_lock(&producer_cnt_lock);
70        producer_val_total += item;
71        pthread_mutex_unlock(&producer_cnt_lock);
72        } // for
73        //sout | "Producer:" | pthread_self() | " is finished";
74        return NULL;
75} // producer
76
77void *consumer( void *arg ) {
78        Buffer(int) &buf = *(Buffer(int) *)arg;
79        int item;
80        for ( ;; ) {                                                                            // consume until a negative element appears
81                item = remove(buf);                                                     // remove from front of queue
82                //sout | "Consumer:" | pthread_self() | " value:" | item;
83          if ( item == -1 ) break;
84        pthread_mutex_lock(&consumer_cnt_lock);
85        consumer_val_total += item;
86        pthread_mutex_unlock(&consumer_cnt_lock);
87        } // for
88        //sout | "Consumer:" | pthread_self() | " is finished";
89        return NULL;
90} // consumer
91
92int main() {
93        const int NoOfCons = 20, NoOfProds = 30;
94        Buffer(int) buf;                                                                // create a buffer monitor
95        pthread_t cons[NoOfCons];                                                       // pointer to an array of consumers
96        pthread_t prods[NoOfProds];                                                     // pointer to an array of producers
97    pthread_mutex_init(&producer_cnt_lock, NULL);
98    pthread_mutex_init(&consumer_cnt_lock, NULL);
99        // parallelism
100    srandom( 1003 );
101
102        processor p[5];
103    {
104        // create/join and mutex/condition test
105        //sout | "create/join and mutex/condition test";
106        for ( int i = 0; i < NoOfCons; i += 1 ) {                       // create consumers
107            if ( pthread_create( &cons[i], NULL, consumer, (void*)&buf ) != 0 ) {
108                sout | "create thread failure, errno:" | errno;
109                exit( EXIT_FAILURE );
110            } // if
111        } // for
112        for ( int i = 0; i < NoOfProds; i += 1 ) {                      //      create producers
113            if ( pthread_create( &prods[i], NULL, producer, (void*)&buf ) != 0 ) {
114                sout | "create thread failure";
115                exit( EXIT_FAILURE );
116            } // if
117        } // for
118
119        void *result;
120        for ( int i = 0; i < NoOfProds; i += 1 ) {                      // wait for producers to end
121            if ( pthread_join( prods[i], &result ) != 0 ) {
122                sout | " producers join thread failure";
123                exit( EXIT_FAILURE );
124            } // if
125            if ( (uint64_t)result != 0 ) {
126                sout | "producers" | prods[i] |" bad return value " | result;
127                exit( EXIT_FAILURE );
128            } // if
129            //sout | "join prods[" | i | "]:" | prods[i] | " result:" | result;
130        } // for
131
132        for ( int i = 0; i < NoOfCons; i += 1 ) {                       // terminate each consumer
133            insert(buf, -1 );
134        } // for
135
136        for ( int i = 0; i < NoOfCons; i += 1 ) {                       // wait for consumer to end
137            if ( pthread_join( cons[i], &result ) != 0 ) {
138                sout| "consumers join thread failure" ;
139                exit( EXIT_FAILURE );
140            } // if
141            if ( (uint64_t)result != 0 ) {
142                sout| "consumers bad return value" | result;
143                exit( EXIT_FAILURE );
144            } // if
145        } // for
146        sout | "producer total value is " | producer_val_total;
147        sout | "consumer total value is " | consumer_val_total;
148    }
149
150       
151
152       
153}
Note: See TracBrowser for help on using the repository browser.