#include // prototype: rand #include #include #include #include // tested pthread mutex related routines, pthread cond related routines // tested pthread_create/join enum { BufferSize = 50 }; volatile int producer_val_total; volatile int consumer_val_total; pthread_mutex_t producer_cnt_lock, consumer_cnt_lock; forall( T ){ struct Buffer { int front, back, count; T elements[BufferSize]; pthread_mutex_t _mutex; pthread_cond_t Full, Empty; // waiting consumers & producers }; void ?{}( Buffer(T) & buffer ) with( buffer ) { [front, back, count] = 0; pthread_mutex_init(&_mutex, NULL); pthread_cond_init(&Full, NULL); pthread_cond_init(&Empty, NULL); } void ^?{}( Buffer(T) & buffer ) with( buffer ){ pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&Full); pthread_cond_destroy(&Empty); } int query( Buffer(T) & buffer ) { return buffer.count; } // read-only, no mutual exclusion void insert( Buffer(T) & buffer, T elem ) with(buffer) { pthread_mutex_lock(&_mutex); while ( count == 20 ) pthread_cond_wait( &Empty, &_mutex ); // block producer elements[back] = elem; back = ( back + 1 ) % 20; count += 1; pthread_cond_signal( &Full ); // unblock consumer pthread_mutex_unlock(&_mutex); } T remove(Buffer(T) & buffer) with(buffer) { pthread_mutex_lock(&_mutex); while ( count == 0 ) pthread_cond_wait( &Full, &_mutex ); // block consumer T elem = elements[front]; front = ( front + 1 ) % 20; count -= 1; pthread_cond_signal( &Empty ); // unblock producer pthread_mutex_unlock(&_mutex); return elem; } } void *producer( void *arg ) { Buffer(int) &buf = *(Buffer(int)*)arg; const int NoOfItems = prng(*active_thread(), 40); int item; for ( int i = 1; i <= NoOfItems; i += 1 ) { // produce a bunch of items item = prng(*active_thread(), 1, 101); // produce a random number //sout | "Producer:" | pthread_self() | " value:" | item; insert( buf,item ); // insert element into queue pthread_mutex_lock(&producer_cnt_lock); producer_val_total += item; pthread_mutex_unlock(&producer_cnt_lock); } // for //sout | "Producer:" | pthread_self() | " is finished"; return NULL; } // producer void *consumer( void *arg ) { Buffer(int) &buf = *(Buffer(int) *)arg; int item; for ( ;; ) { // consume until a negative element appears item = remove(buf); // remove from front of queue //sout | "Consumer:" | pthread_self() | " value:" | item; if ( item == -1 ) break; pthread_mutex_lock(&consumer_cnt_lock); consumer_val_total += item; pthread_mutex_unlock(&consumer_cnt_lock); } // for //sout | "Consumer:" | pthread_self() | " is finished"; return NULL; } // consumer int main() { const int NoOfCons = 20, NoOfProds = 30; Buffer(int) buf; // create a buffer monitor pthread_t cons[NoOfCons]; // pointer to an array of consumers pthread_t prods[NoOfProds]; // pointer to an array of producers pthread_mutex_init(&producer_cnt_lock, NULL); pthread_mutex_init(&consumer_cnt_lock, NULL); // parallelism set_seed( 1003 ); processor p[5]; { // create/join and mutex/condition test //sout | "create/join and mutex/condition test"; for ( int i = 0; i < NoOfCons; i += 1 ) { // create consumers if ( pthread_create( &cons[i], NULL, consumer, (void*)&buf ) != 0 ) { sout | "create thread failure, errno:" | errno; exit( EXIT_FAILURE ); } // if } // for for ( int i = 0; i < NoOfProds; i += 1 ) { // create producers if ( pthread_create( &prods[i], NULL, producer, (void*)&buf ) != 0 ) { sout | "create thread failure"; exit( EXIT_FAILURE ); } // if } // for void *result; for ( int i = 0; i < NoOfProds; i += 1 ) { // wait for producers to end if ( pthread_join( prods[i], &result ) != 0 ) { sout | " producers join thread failure"; exit( EXIT_FAILURE ); } // if if ( result != 0p ) { sout | "producers" | prods[i] |" bad return value " | result; exit( EXIT_FAILURE ); } // if //sout | "join prods[" | i | "]:" | prods[i] | " result:" | result; } // for for ( int i = 0; i < NoOfCons; i += 1 ) { // terminate each consumer insert(buf, -1 ); } // for for ( int i = 0; i < NoOfCons; i += 1 ) { // wait for consumer to end if ( pthread_join( cons[i], &result ) != 0 ) { sout| "consumers join thread failure" ; exit( EXIT_FAILURE ); } // if if ( result != 0p ) { sout| "consumers bad return value" | result; exit( EXIT_FAILURE ); } // if } // for sout | "producer total value is " | producer_val_total; sout | "consumer total value is " | consumer_val_total; } }