Monitor Examples


Mutual Exclusion

Mutual Exclusion provides exclusive access to a shared resource.


Synchronization

Synchronization provides a timing relationship among threads.

Bounded Buffer

Concurrent generic bounded-buffer blocks producers when buffer full, consumers when buffer empty, and handles multiple producers/consumers inserting/removing simultaneously.

Internal SchedulingExternal Scheduling
#include <thread.hfa>
enum { BufferSize = 5 };

forall( T ) {
	monitor Buffer {
		condition full, empty;
		int front, back, count;
		T elements[BufferSize];
	};
	void ?{}( Buffer(T) & buffer ) with( buffer ) {
		[front, back, count] = 0;
	}
	// read-only, no mutual exclusion
	int query( Buffer(T) & buffer ) { return buffer.count; }

	void insert( Buffer(T) & mutex buffer,
				 T elem ) with( buffer ) {
		if ( count == BufferSize ) wait( empty );
		elements[back] = elem;
		back = ( back + 1 ) % BufferSize;
		count += 1;
		signal( full );
	}

	T remove( Buffer(T) & mutex buffer ) with( buffer ) {
		if ( count == 0 ) wait( full );
		T elem = elements[front];
		front = ( front + 1 ) % BufferSize;
		count -= 1;
		signal( empty );
		return elem;
	}
}
#include <thread.hfa>
enum { BufferSize = 5 };

forall( T ) {
	monitor Buffer {

		int front, back, count;
		T elements[BufferSize];
	};
	void ?{}( Buffer(T) & buffer ) with( buffer ) {
		[front, back, count] = 0;
	}
	// read-only, no mutual exclusion
	int query( Buffer(T) & buffer ) { return buffer.count; }

	void insert( Buffer(T) & mutex buffer,
				 T elem ) with( buffer ) {
		if ( count == BufferSize ) waitfor( remove : buffer );
		elements[back] = elem;
		back = ( back + 1 ) % BufferSize;
		count += 1;

	}

	T remove( Buffer(T) & mutex buffer ) with( buffer ) {
		if ( count == 0 ) waitfor( insert : buffer );
		T elem = elements[front];
		front = ( front + 1 ) % BufferSize;
		count -= 1;

		return elem;
	}
}

Dating Service

Girl/boy threads with matching compatibility codes exchange phone numbers.

signal (Explict Exchange)signal_block (Implicit Exchange)
#include <thread.hfa>
enum { CompCodes = 20 };

monitor DatingService {
	unsigned int GirlPhoneNo, BoyPhoneNo;
	condition Girls[CompCodes], Boys[CompCodes];
	condition exchange;
};
unsigned int girl( DatingService & mutex ds,
			unsigned int PhoneNo,
			unsigned int ccode ) with( ds ) {
	if ( is_empty( Boys[ccode] ) ) {
		wait( Girls[ccode] );
		GirlPhoneNo = PhoneNo;
		signal( exchange );
	} else {
		GirlPhoneNo = PhoneNo;
		signal( Boys[ccode] );
		wait( exchange );
	}
	return BoyPhoneNo;
}
unsigned int boy( DatingService & mutex ds,
			unsigned int PhoneNo,
			unsigned int ccode ) with( ds ) {
	if ( is_empty( Girls[ccode] ) ) {
		wait( Boys[ccode] );
		BoyPhoneNo = PhoneNo;
		signal( exchange );
	} else {
		BoyPhoneNo = PhoneNo;
		signal( Girls[ccode] );
		wait( exchange );
	}
	return GirlPhoneNo;
}
#include <thread.hfa>				// condition type
enum { CompCodes = 20 };			// number of compatibility codes

monitor DatingService {
	unsigned int GirlPhoneNo, BoyPhoneNo;
	condition Girls[CompCodes], Boys[CompCodes];

};
unsigned int girl( DatingService & mutex ds,
			unsigned int PhoneNo,
			unsigned int ccode ) with( ds ) {
	if ( is_empty( Boys[ccode] ) ) {// no compatible boy ?
		wait( Girls[ccode] );		// wait for boy
		GirlPhoneNo = PhoneNo;		// make phone number available

	} else {
		GirlPhoneNo = PhoneNo;		// make phone number available
		signal_block( Boys[ccode] );// restart boy to set phone number

	}
	return BoyPhoneNo;
}
unsigned int boy( DatingService & mutex ds,
			unsigned int PhoneNo,
			unsigned int ccode ) with( ds ) {
	if ( is_empty( Girls[ccode] ) ) {// no compatible girl ?
		wait( Boys[ccode] );		// wait for girl
		BoyPhoneNo = PhoneNo;		// make phone number available

	} else {
		BoyPhoneNo = PhoneNo;		// make phone number available
		signal_block( Girls[ccode] );// restart girl to set phone number

	}
	return GirlPhoneNo;
}

Readers Writer Lock

Allow simultaneous readers, only one writer, and maintain temporal order preventing stale or fresh reads.

#include <thread.hfa>					// condition type

monitor ReadersWriter {
	int rcnt, wcnt;						// number of readers/writer using resource
};
void EndRead( ReadersWriter & mutex rw ) with(rw) {
	rcnt -= 1;
}
void EndWrite( ReadersWriter & mutex rw ) with(rw) {
	wcnt = 0;
}
void StartRead( ReadersWriter & mutex rw ) with(rw) {
	if ( wcnt > 0 ) waitfor( EndWrite : rw );
	rcnt += 1;
}
void StartWrite( ReadersWriter & mutex rw ) with(rw) {
	if ( wcnt > 0 ) waitfor( EndWrite : rw );
	else while ( rcnt > 0 ) waitfor( EndRead : rw );
	wcnt = 1;
}
void ?{}( ReadersWriter & rw ) with(rw) { rcnt = wcnt = 0; }
int readers( ReadersWriter & rw ) { return rw.rcnt; }

void Read( ReadersWriter & rw ) {		// compress two step protocol to one
	StartRead( rw );
	sout | "Reader:" | active_thread() | ", shared:" | SharedRW | " with:" | readers( rw ) | " readers";
	yield( 3 );
	EndRead( rw );
}
void Write( ReadersWriter & rw ) {		// compress two step protocol to one
	StartWrite( rw );
	SharedRW += 1;
	sout | "Writer:" | active_thread() | ",  wrote:" | SharedRW;
	yield( 1 );
	EndWrite( rw );
}
thread Worker {
	ReadersWriter &rw;
};
void ?{}( Worker & w, ReadersWriter & rw ) { &w.rw = &rw; }
void main( Worker & w ) with(w) {
	for ( 10 ) {
		if ( rand() % 100 < 70 ) {			// decide to be a reader or writer
			Read( rw );
		} else {
			Write( rw );
		} // if
	} // for
}
int main() {
	enum { MaxTask = 5 };
	ReadersWriter rw;
	Worker * workers[MaxTask];
	for ( i; MaxTask ) workers[i] = new( rw );
	for ( i; MaxTask ) delete( workers[i] );
	sout | "successful completion";
} // main