#include "locks.hfa" #include "kernel_private.hfa" #include #include #include #include #include /////////////////////////////////////////////////////////////////// //// info_thread /////////////////////////////////////////////////////////////////// forall(dtype L | is_blocking_lock(L)) { void ?{}( info_thread(L) & this, $thread * t ) { this.t = t; this.lock = 0p; this.listed = false; } void ?{}( info_thread(L) & this, $thread * t, uintptr_t info ) { this.t = t; this.info = info; this.lock = 0p; this.listed = false; } void ^?{}( info_thread(L) & this ){ // default } info_thread(L) *& get_next( info_thread(L) & this ) { return this.next; } } /////////////////////////////////////////////////////////////////// //// Blocking Locks /////////////////////////////////////////////////////////////////// void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner ) { this.lock{}; this.blocked_threads{}; this.wait_count = 0; this.multi_acquisition = multi_acquisition; this.strict_owner = strict_owner; this.owner = 0p; this.recursion_count = 0; } void ^?{}( blocking_lock & this ) { // default } void ?{}( single_acquisition_lock & this ) { ((blocking_lock &)this){ false, false }; } void ^?{}( single_acquisition_lock & this ) { // default } void ?{}( owner_lock & this ) { ((blocking_lock &)this){ true, true }; } void ^?{}( owner_lock & this ) { // default } void ?{}( multiple_acquisition_lock & this ) { ((blocking_lock &)this){ true, false }; } void ^?{}( multiple_acquisition_lock & this ) { // default } void lock( blocking_lock & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); if ( owner == active_thread() && !multi_acquisition) { fprintf(stderr, "A single acquisition lock holder attempted to reacquire the lock resulting in a deadlock."); // Possibly throw instead exit(EXIT_FAILURE); } else if ( owner != 0p && owner != active_thread() ) { append( blocked_threads, active_thread() ); wait_count++; unlock( lock ); park( ); } else if ( owner == active_thread() && multi_acquisition ) { recursion_count++; unlock( lock ); } else { owner = active_thread(); recursion_count = 1; unlock( lock ); } } bool try_lock( blocking_lock & this ) with( this ) { bool ret = false; lock( lock __cfaabi_dbg_ctx2 ); if ( owner == 0p ) { owner = active_thread(); recursion_count = 1; ret = true; } else if ( owner == active_thread() && multi_acquisition ) { recursion_count++; ret = true; } unlock( lock ); return ret; } void unlock( blocking_lock & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); if ( owner == 0p ){ // no owner implies lock isn't held fprintf( stderr, "There was an attempt to release a lock that isn't held" ); return; } else if ( strict_owner && owner != active_thread() ) { fprintf( stderr, "A thread other than the owner attempted to release an owner lock" ); return; } recursion_count--; if ( recursion_count == 0 ) { $thread * thrd = pop_head( blocked_threads ); owner = thrd; recursion_count = ( thrd ? 1 : 0 ); wait_count--; unpark( thrd ); } unlock( lock ); } size_t wait_count( blocking_lock & this ) with( this ) { return wait_count; } void set_recursion_count( blocking_lock & this, size_t recursion ) with( this ) { recursion_count = recursion; } size_t get_recursion_count( blocking_lock & this ) with( this ) { return recursion_count; } void add_( blocking_lock & this, $thread * t ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); if ( owner != 0p ) { append( blocked_threads, t ); wait_count++; unlock( lock ); } else { owner = t; recursion_count = 1; #if !defined( __CFA_NO_STATISTICS__ ) //kernelTLS.this_stats = t->curr_cluster->stats; #endif unpark( t ); unlock( lock ); } } void remove_( blocking_lock & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); if ( owner == 0p ){ // no owner implies lock isn't held fprintf( stderr, "A lock that is not held was passed to a synchronization lock" ); } else if ( strict_owner && owner != active_thread() ) { fprintf( stderr, "A thread other than the owner of a lock passed it to a synchronization lock" ); } else { $thread * thrd = pop_head( blocked_threads ); owner = thrd; recursion_count = ( thrd ? 1 : 0 ); wait_count--; unpark( thrd ); } unlock( lock ); } /////////////////////////////////////////////////////////////////// //// Overloaded routines for traits /////////////////////////////////////////////////////////////////// // This is temporary until an inheritance bug is fixed void lock( single_acquisition_lock & this ){ lock( (blocking_lock &)this ); } void unlock( single_acquisition_lock & this ){ unlock( (blocking_lock &)this ); } void add_( single_acquisition_lock & this, struct $thread * t ){ add_( (blocking_lock &)this, t ); } void remove_( single_acquisition_lock & this ){ remove_( (blocking_lock &)this ); } void set_recursion_count( single_acquisition_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); } size_t get_recursion_count( single_acquisition_lock & this ){ return get_recursion_count( (blocking_lock &)this ); } void lock( owner_lock & this ){ lock( (blocking_lock &)this ); } void unlock( owner_lock & this ){ unlock( (blocking_lock &)this ); } void add_( owner_lock & this, struct $thread * t ){ add_( (blocking_lock &)this, t ); } void remove_( owner_lock & this ){ remove_( (blocking_lock &)this ); } void set_recursion_count( owner_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); } size_t get_recursion_count( owner_lock & this ){ return get_recursion_count( (blocking_lock &)this ); } void lock( multiple_acquisition_lock & this ){ lock( (blocking_lock &)this ); } void unlock( multiple_acquisition_lock & this ){ unlock( (blocking_lock &)this ); } void add_( multiple_acquisition_lock & this, struct $thread * t ){ add_( (blocking_lock &)this, t ); } void remove_( multiple_acquisition_lock & this ){ remove_( (blocking_lock &)this ); } void set_recursion_count( multiple_acquisition_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); } size_t get_recursion_count( multiple_acquisition_lock & this ){ return get_recursion_count( (blocking_lock &)this ); } /////////////////////////////////////////////////////////////////// //// condition variable /////////////////////////////////////////////////////////////////// forall(dtype L | is_blocking_lock(L)) { void timeout_handler ( alarm_node_wrap(L) & this ) with( this ) { // This condition_variable member is called from the kernel, and therefore, cannot block, but it can spin. lock( cond->lock __cfaabi_dbg_ctx2 ); if ( (*i)->listed ) { // is thread on queue info_thread(L) * copy = *i; remove( cond->blocked_threads, i ); //remove this thread O(1) cond->count--; if( !copy->lock ) { #if !defined( __CFA_NO_STATISTICS__ ) //kernelTLS.this_stats = copy->t->curr_cluster->stats; #endif unpark( copy->t ); } else { add_(*copy->lock, copy->t); // call lock's add_ } } unlock( cond->lock ); } void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); } void ?{}( condition_variable(L) & this ){ this.lock{}; this.blocked_threads{}; this.count = 0; } void ^?{}( condition_variable(L) & this ){ // default } void ?{}( alarm_node_wrap(L) & this, $thread * thrd, Time alarm, Duration period, Alarm_Callback callback ) { this.alarm_node{ thrd, alarm, period, callback }; } void ^?{}( alarm_node_wrap(L) & this ) { // default } bool notify_one( condition_variable(L) & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = !!blocked_threads; info_thread(L) * popped = pop_head( blocked_threads ); if(popped != 0p) { popped->listed = false; count--; if (popped->lock) { add_(*popped->lock, popped->t); } else { unpark(popped->t); } } unlock( lock ); return ret; } bool notify_all( condition_variable(L) & this ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = blocked_threads ? true : false; while( blocked_threads ) { info_thread(L) * popped = pop_head( blocked_threads ); if(popped != 0p){ popped->listed = false; count--; if (popped->lock) { add_(*popped->lock, popped->t); } else { unpark(popped->t); } } } unlock( lock ); return ret; } uintptr_t front( condition_variable(L) & this ) with(this) { if(!blocked_threads) return NULL; return peek(blocked_threads)->info; } bool empty( condition_variable(L) & this ) with(this) { return blocked_threads ? false : true; } int counter( condition_variable(L) & this ) with(this) { return count; } // helper for wait()'s' without a timeout void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); append( this.blocked_threads, &i ); count++; i.listed = true; size_t recursion_count; if (i.lock) { recursion_count = get_recursion_count(*i.lock); remove_( *i.lock ); } unlock( lock ); park( ); // blocks here if (i.lock) set_recursion_count(*i.lock, recursion_count); // resets recursion count here after waking } // helper for wait()'s' with a timeout void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Time t ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); info_thread(L) * queue_ptr = &info; alarm_node_wrap(L) node_wrap = { info.t, t, 0`s, alarm_node_wrap_cast }; node_wrap.cond = &this; node_wrap.i = &queue_ptr; register_self( &node_wrap.alarm_node ); append( blocked_threads, queue_ptr ); info.listed = true; count++; size_t recursion_count; if (info.lock) { recursion_count = get_recursion_count(*info.lock); remove_( *info.lock ); } unlock( lock ); park(); if (info.lock) set_recursion_count(*info.lock, recursion_count); } void wait( condition_variable(L) & this ) with(this) { info_thread( L ) i = { active_thread() }; queue_info_thread( this, i ); } void wait( condition_variable(L) & this, uintptr_t info ) with(this) { info_thread( L ) i = { active_thread(), info }; queue_info_thread( this, i ); } void wait( condition_variable(L) & this, Duration duration ) with(this) { info_thread( L ) i = { active_thread() }; queue_info_thread_timeout(this, i, __kernel_get_time() + duration ); } void wait( condition_variable(L) & this, uintptr_t info, Duration duration ) with(this) { info_thread( L ) i = { active_thread(), info }; queue_info_thread_timeout(this, i, __kernel_get_time() + duration ); } void wait( condition_variable(L) & this, Time time ) with(this) { info_thread( L ) i = { active_thread() }; queue_info_thread_timeout(this, i, time); } void wait( condition_variable(L) & this, uintptr_t info, Time time ) with(this) { info_thread( L ) i = { active_thread(), info }; queue_info_thread_timeout(this, i, time); } void wait( condition_variable(L) & this, L & l ) with(this) { info_thread(L) i = { active_thread() }; i.lock = &l; queue_info_thread( this, i ); } void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { info_thread(L) i = { active_thread(), info }; i.lock = &l; queue_info_thread( this, i ); } void wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { info_thread(L) i = { active_thread() }; i.lock = &l; queue_info_thread_timeout(this, i, __kernel_get_time() + duration ); } void wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { info_thread(L) i = { active_thread(), info }; i.lock = &l; queue_info_thread_timeout(this, i, __kernel_get_time() + duration ); } void wait( condition_variable(L) & this, L & l, Time time ) with(this) { info_thread(L) i = { active_thread() }; i.lock = &l; queue_info_thread_timeout(this, i, time ); } void wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ) with(this) { info_thread(L) i = { active_thread(), info }; i.lock = &l; queue_info_thread_timeout(this, i, time ); } } // thread T1 {}; // thread T2 {}; // multiple_acquisition_lock m; // condition_variable( multiple_acquisition_lock ) c; // void main( T1 & this ) { // printf("T1 start\n"); // lock(m); // printf("%d\n", counter(c)); // if(empty(c)) { // printf("T1 wait\n"); // wait(c,m,12); // }else{ // printf("%d\n", front(c)); // notify_one(c); // } // unlock(m); // printf("curr thd in main %p \n", active_thread()); // printf("T1 waits for 2s\n"); // lock(m); // wait( c, m, 2`s ); // unlock(m); // printf("T1 wakes\n"); // printf("T1 done\n"); // } // void main( T2 & this ) { // printf("T2 start\n"); // lock(m); // printf("%d\n", counter(c)); // if(empty(c)) { // printf("T2 wait\n"); // wait(c,m,12); // }else{ // printf("%d\n", front(c)); // notify_one(c); // } // unlock(m); // printf("T2 done\n"); // } // int main() { // printf("start\n"); // processor p[2]; // { // T1 t1; // T2 t2; // } // printf("done\n"); // }