Mutual Exclusion
Mutual Exclusion provides exclusive access to a shared resource.
Synchronization
Synchronization provides a timing relationship among threads.
Bounded Buffer
Concurrent generic bounded-buffer blocks producers when buffer full, consumers when buffer empty, and handles multiple producers/consumers inserting/removing simultaneously.
Internal Scheduling | External Scheduling |
#include <thread.hfa>
enum { BufferSize = 5 };
forall( T ) {
monitor Buffer {
condition full, empty;
int front, back, count;
T elements[BufferSize];
};
void ?{}( Buffer(T) & buffer ) with( buffer ) {
[front, back, count] = 0;
}
// read-only, no mutual exclusion
int query( Buffer(T) & buffer ) { return buffer.count; }
void insert( Buffer(T) & mutex buffer,
T elem ) with( buffer ) {
if ( count == BufferSize ) wait( empty );
elements[back] = elem;
back = ( back + 1 ) % BufferSize;
count += 1;
signal( full );
}
T remove( Buffer(T) & mutex buffer ) with( buffer ) {
if ( count == 0 ) wait( full );
T elem = elements[front];
front = ( front + 1 ) % BufferSize;
count -= 1;
signal( empty );
return elem;
}
}
|
#include <thread.hfa>
enum { BufferSize = 5 };
forall( T ) {
monitor Buffer {
int front, back, count;
T elements[BufferSize];
};
void ?{}( Buffer(T) & buffer ) with( buffer ) {
[front, back, count] = 0;
}
// read-only, no mutual exclusion
int query( Buffer(T) & buffer ) { return buffer.count; }
void insert( Buffer(T) & mutex buffer,
T elem ) with( buffer ) {
if ( count == BufferSize ) waitfor( remove : buffer );
elements[back] = elem;
back = ( back + 1 ) % BufferSize;
count += 1;
}
T remove( Buffer(T) & mutex buffer ) with( buffer ) {
if ( count == 0 ) waitfor( insert : buffer );
T elem = elements[front];
front = ( front + 1 ) % BufferSize;
count -= 1;
return elem;
}
}
|
Dating Service
Girl/boy threads with matching compatibility codes exchange phone numbers.
signal (Explict Exchange) | signal_block (Implicit Exchange) |
#include <thread.hfa>
enum { CompCodes = 20 };
monitor DatingService {
unsigned int GirlPhoneNo, BoyPhoneNo;
condition Girls[CompCodes], Boys[CompCodes];
condition exchange;
};
unsigned int girl( DatingService & mutex ds,
unsigned int PhoneNo,
unsigned int ccode ) with( ds ) {
if ( is_empty( Boys[ccode] ) ) {
wait( Girls[ccode] );
GirlPhoneNo = PhoneNo;
signal( exchange );
} else {
GirlPhoneNo = PhoneNo;
signal( Boys[ccode] );
wait( exchange );
}
return BoyPhoneNo;
}
unsigned int boy( DatingService & mutex ds,
unsigned int PhoneNo,
unsigned int ccode ) with( ds ) {
if ( is_empty( Girls[ccode] ) ) {
wait( Boys[ccode] );
BoyPhoneNo = PhoneNo;
signal( exchange );
} else {
BoyPhoneNo = PhoneNo;
signal( Girls[ccode] );
wait( exchange );
}
return GirlPhoneNo;
}
|
#include <thread.hfa> // condition type
enum { CompCodes = 20 }; // number of compatibility codes
monitor DatingService {
unsigned int GirlPhoneNo, BoyPhoneNo;
condition Girls[CompCodes], Boys[CompCodes];
};
unsigned int girl( DatingService & mutex ds,
unsigned int PhoneNo,
unsigned int ccode ) with( ds ) {
if ( is_empty( Boys[ccode] ) ) {// no compatible boy ?
wait( Girls[ccode] ); // wait for boy
GirlPhoneNo = PhoneNo; // make phone number available
} else {
GirlPhoneNo = PhoneNo; // make phone number available
signal_block( Boys[ccode] );// restart boy to set phone number
}
return BoyPhoneNo;
}
unsigned int boy( DatingService & mutex ds,
unsigned int PhoneNo,
unsigned int ccode ) with( ds ) {
if ( is_empty( Girls[ccode] ) ) {// no compatible girl ?
wait( Boys[ccode] ); // wait for girl
BoyPhoneNo = PhoneNo; // make phone number available
} else {
BoyPhoneNo = PhoneNo; // make phone number available
signal_block( Girls[ccode] );// restart girl to set phone number
}
return GirlPhoneNo;
}
|
Readers Writer Lock
Allow simultaneous readers, only one writer, and maintain temporal order preventing stale or fresh reads.
#include <thread.hfa> // condition type
monitor ReadersWriter {
int rcnt, wcnt; // number of readers/writer using resource
};
void EndRead( ReadersWriter & mutex rw ) with(rw) {
rcnt -= 1;
}
void EndWrite( ReadersWriter & mutex rw ) with(rw) {
wcnt = 0;
}
void StartRead( ReadersWriter & mutex rw ) with(rw) {
if ( wcnt > 0 ) waitfor( EndWrite : rw );
rcnt += 1;
}
void StartWrite( ReadersWriter & mutex rw ) with(rw) {
if ( wcnt > 0 ) waitfor( EndWrite : rw );
else while ( rcnt > 0 ) waitfor( EndRead : rw );
wcnt = 1;
}
void ?{}( ReadersWriter & rw ) with(rw) { rcnt = wcnt = 0; }
int readers( ReadersWriter & rw ) { return rw.rcnt; }
void Read( ReadersWriter & rw ) { // compress two step protocol to one
StartRead( rw );
sout | "Reader:" | active_thread() | ", shared:" | SharedRW | " with:" | readers( rw ) | " readers";
yield( 3 );
EndRead( rw );
}
void Write( ReadersWriter & rw ) { // compress two step protocol to one
StartWrite( rw );
SharedRW += 1;
sout | "Writer:" | active_thread() | ", wrote:" | SharedRW;
yield( 1 );
EndWrite( rw );
}
thread Worker {
ReadersWriter &rw;
};
void ?{}( Worker & w, ReadersWriter & rw ) { &w.rw = &rw; }
void main( Worker & w ) with(w) {
for ( 10 ) {
if ( rand() % 100 < 70 ) { // decide to be a reader or writer
Read( rw );
} else {
Write( rw );
} // if
} // for
}
int main() {
enum { MaxTask = 5 };
ReadersWriter rw;
Worker * workers[MaxTask];
for ( i; MaxTask ) workers[i] = new( rw );
for ( i; MaxTask ) delete( workers[i] );
sout | "successful completion";
} // main