// // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // locks.cfa -- LIBCFATHREAD // Runtime locks that used with the runtime thread system. // // Author : Colby Alexander Parsons // Created On : Thu Jan 21 19:46:50 2021 // Last Modified By : // Last Modified On : // Update Count : // #define __cforall_thread__ #include "locks.hfa" #include "kernel/private.hfa" #include #include #pragma GCC visibility push(default) //----------------------------------------------------------------------------- // info_thread forall(L & | is_blocking_lock(L)) { struct info_thread { // used to put info_thread on a dl queue inline dlink(info_thread(L)); // waiting thread struct thread$ * t; // shadow field uintptr_t info; // lock that is passed to wait() (if one is passed) L * lock; // true when signalled and false when timeout wakes thread bool signalled; }; P9_EMBEDDED( info_thread(L), dlink(info_thread(L)) ) void ?{}( info_thread(L) & this, thread$ * t, uintptr_t info, L * l ) { this.t = t; this.info = info; this.lock = l; } void ^?{}( info_thread(L) & this ) {} } //----------------------------------------------------------------------------- // Blocking Locks void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner ) { this.lock{}; this.blocked_threads{}; this.wait_count = 0; this.multi_acquisition = multi_acquisition; this.strict_owner = strict_owner; this.owner = 0p; this.recursion_count = 0; } void ^?{}( blocking_lock & this ) {} void lock( blocking_lock & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); thread$ * thrd = active_thread(); // single acquisition lock is held by current thread /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); // lock is held by some other thread if ( owner != 0p && owner != thrd ) { select_node node; insert_last( blocked_threads, node ); wait_count++; unlock( lock ); park( ); return; } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread recursion_count++; } else { // lock isn't held owner = thrd; recursion_count = 1; } unlock( lock ); } bool try_lock( blocking_lock & this ) with( this ) { bool ret = false; lock( lock __cfaabi_dbg_ctx2 ); // lock isn't held if ( owner == 0p ) { owner = active_thread(); recursion_count = 1; ret = true; } // multi acquisition lock is held by current thread else if ( owner == active_thread() && multi_acquisition ) { recursion_count++; ret = true; } unlock( lock ); return ret; } static inline void pop_node( blocking_lock & this ) with( this ) { __handle_waituntil_OR( blocked_threads ); select_node * node = &try_pop_front( blocked_threads ); if ( node ) { wait_count--; owner = node->blocked_thread; recursion_count = 1; // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); wake_one( blocked_threads, *node ); } else { owner = 0p; recursion_count = 0; } } void unlock( blocking_lock & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); /* paranoid */ verifyf( owner == active_thread() || !strict_owner , "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this ); /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to release owner lock %p which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); // if recursion count is zero release lock and set new owner if one is waiting recursion_count--; if ( recursion_count == 0 ) { pop_node( this ); } unlock( lock ); } size_t wait_count( blocking_lock & this ) with( this ) { return wait_count; } void on_notify( blocking_lock & this, thread$ * t ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); // lock held if ( owner != 0p ) { insert_last( blocked_threads, *(select_node *)t->link_node ); wait_count++; } // lock not held else { owner = t; recursion_count = 1; unpark( t ); } unlock( lock ); } size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this ); size_t ret = recursion_count; pop_node( this ); select_node node; active_thread()->link_node = (void *)&node; unlock( lock ); pre_park_then_park( pp_fn, pp_datum ); return ret; } void on_wakeup( blocking_lock & this, size_t recursion ) with( this ) { recursion_count = recursion; } // waituntil() support bool register_select( blocking_lock & this, select_node & node ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); thread$ * thrd = active_thread(); // single acquisition lock is held by current thread /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering unlock( lock ); return false; } } // lock is held by some other thread if ( owner != 0p && owner != thrd ) { insert_last( blocked_threads, node ); wait_count++; unlock( lock ); return false; } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread recursion_count++; } else { // lock isn't held owner = thrd; recursion_count = 1; } if ( node.park_counter ) __make_select_node_available( node ); unlock( lock ); return true; } bool unregister_select( blocking_lock & this, select_node & node ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); if ( node`isListed ) { remove( node ); wait_count--; unlock( lock ); return false; } if ( owner == active_thread() ) { /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); // if recursion count is zero release lock and set new owner if one is waiting recursion_count--; if ( recursion_count == 0 ) { pop_node( this ); } } unlock( lock ); return false; } bool on_selected( blocking_lock & this, select_node & node ) { return true; } //----------------------------------------------------------------------------- // alarm node wrapper forall(L & | is_blocking_lock(L)) { struct alarm_node_wrap { alarm_node_t alarm_node; condition_variable(L) * cond; info_thread(L) * info_thd; }; void ?{}( alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, condition_variable(L) * c, info_thread(L) * i ) { this.alarm_node{ callback, alarm, period }; this.cond = c; this.info_thd = i; } void ^?{}( alarm_node_wrap(L) & this ) { } static void timeout_handler ( alarm_node_wrap(L) & this ) with( this ) { // This condition_variable member is called from the kernel, and therefore, cannot block, but it can spin. lock( cond->lock __cfaabi_dbg_ctx2 ); // this check is necessary to avoid a race condition since this timeout handler // may still be called after a thread has been removed from the queue but // before the alarm is unregistered if ( (*info_thd)`isListed ) { // is thread on queue info_thd->signalled = false; // remove this thread O(1) remove( *info_thd ); cond->count--; if( info_thd->lock ) { // call lock's on_notify if a lock was passed on_notify(*info_thd->lock, info_thd->t); } else { // otherwise wake thread unpark( info_thd->t ); } } unlock( cond->lock ); } // this casts the alarm node to our wrapped type since we used type erasure static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); } struct pthread_alarm_node_wrap { alarm_node_t alarm_node; pthread_cond_var(L) * cond; info_thread(L) * info_thd; }; void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) { this.alarm_node{ callback, alarm, period }; this.cond = c; this.info_thd = i; } void ^?{}( pthread_alarm_node_wrap(L) & this ) { } static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) { // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin. lock( cond->lock __cfaabi_dbg_ctx2 ); // this check is necessary to avoid a race condition since this timeout handler // may still be called after a thread has been removed from the queue but // before the alarm is unregistered if ( (*info_thd)`isListed ) { // is thread on queue info_thd->signalled = false; // remove this thread O(1) remove( *info_thd ); on_notify(*info_thd->lock, info_thd->t); } unlock( cond->lock ); } // this casts the alarm node to our wrapped type since we used type erasure static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); } } //----------------------------------------------------------------------------- // Synchronization Locks forall(L & | is_blocking_lock(L)) { //----------------------------------------------------------------------------- // condition variable void ?{}( condition_variable(L) & this ){ this.lock{}; this.blocked_threads{}; this.count = 0; } void ^?{}( condition_variable(L) & this ){ } static void process_popped( condition_variable(L) & this, info_thread(L) & popped ) with( this ) { if(&popped != 0p) { popped.signalled = true; count--; if (popped.lock) { // if lock passed call on_notify on_notify(*popped.lock, popped.t); } else { // otherwise wake thread unpark(popped.t); } } } bool notify_one( condition_variable(L) & this ) with( this ) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = ! blocked_threads`isEmpty; process_popped(this, try_pop_front( blocked_threads )); unlock( lock ); return ret; } bool notify_all( condition_variable(L) & this ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = ! blocked_threads`isEmpty; while( ! blocked_threads`isEmpty ) { process_popped(this, try_pop_front( blocked_threads )); } unlock( lock ); return ret; } uintptr_t front( condition_variable(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; } bool empty( condition_variable(L) & this ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = blocked_threads`isEmpty; unlock( lock ); return ret; } int counter( condition_variable(L) & this ) with(this) { return count; } static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) { // add info_thread to waiting queue insert_last( blocked_threads, *i ); count++; } static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) { size_t recursion_count = 0; if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks else pre_park_then_park( pp_fn, pp_datum ); return recursion_count; } static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); } // helper for wait()'s' with no timeout static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); enqueue_thread( this, &i ); unlock( lock ); // blocks here size_t recursion_count = block_and_get_recursion( i ); // resets recursion count here after waking if ( i.lock ) on_wakeup( *i.lock, recursion_count ); } #define WAIT( u, l ) \ info_thread( L ) i = { active_thread(), u, l }; \ queue_info_thread( this, i ); static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); } // helper for wait()'s' with a timeout static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); enqueue_thread( this, &info ); alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; unlock( lock ); // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); // park(); // unregisters alarm so it doesn't go off if this happens first unregister_self( &node_wrap.alarm_node ); // resets recursion count here after waking if ( info.lock ) on_wakeup( *info.lock, recursion_count ); } #define WAIT_TIME( u, l, t ) \ info_thread( L ) i = { active_thread(), u, l }; \ queue_info_thread_timeout(this, i, t, alarm_node_wrap_cast ); \ return i.signalled; void wait( condition_variable(L) & this ) with(this) { WAIT( 0, 0p ) } void wait( condition_variable(L) & this, uintptr_t info ) with(this) { WAIT( info, 0p ) } void wait( condition_variable(L) & this, L & l ) with(this) { WAIT( 0, &l ) } void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { WAIT( info, &l ) } bool wait( condition_variable(L) & this, Duration duration ) with(this) { WAIT_TIME( 0 , 0p , duration ) } bool wait( condition_variable(L) & this, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, 0p , duration ) } bool wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { WAIT_TIME( 0 , &l , duration ) } bool wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, &l , duration ) } //----------------------------------------------------------------------------- // fast_cond_var void ?{}( fast_cond_var(L) & this ){ this.blocked_threads{}; #ifdef __CFA_DEBUG__ this.lock_used = 0p; #endif } void ^?{}( fast_cond_var(L) & this ){ } bool notify_one( fast_cond_var(L) & this ) with(this) { bool ret = ! blocked_threads`isEmpty; if ( ret ) { info_thread(L) & popped = try_pop_front( blocked_threads ); on_notify(*popped.lock, popped.t); } return ret; } bool notify_all( fast_cond_var(L) & this ) with(this) { bool ret = ! blocked_threads`isEmpty; while( ! blocked_threads`isEmpty ) { info_thread(L) & popped = try_pop_front( blocked_threads ); on_notify(*popped.lock, popped.t); } return ret; } uintptr_t front( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; } bool empty ( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } void wait( fast_cond_var(L) & this, L & l ) { wait( this, l, 0 ); } void wait( fast_cond_var(L) & this, L & l, uintptr_t info ) with(this) { // brand cond lock with lock #ifdef __CFA_DEBUG__ if ( lock_used == 0p ) lock_used = &l; else assert(lock_used == &l); #endif info_thread( L ) i = { active_thread(), info, &l }; insert_last( blocked_threads, i ); size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here // park( ); on_wakeup(*i.lock, recursion_count); } //----------------------------------------------------------------------------- // pthread_cond_var void ?{}( pthread_cond_var(L) & this ) with(this) { blocked_threads{}; lock{}; } void ^?{}( pthread_cond_var(L) & this ) { } bool notify_one( pthread_cond_var(L) & this ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = ! blocked_threads`isEmpty; if ( ret ) { info_thread(L) & popped = try_pop_front( blocked_threads ); popped.signalled = true; on_notify(*popped.lock, popped.t); } unlock( lock ); return ret; } bool notify_all( pthread_cond_var(L) & this ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); bool ret = ! blocked_threads`isEmpty; while( ! blocked_threads`isEmpty ) { info_thread(L) & popped = try_pop_front( blocked_threads ); popped.signalled = true; on_notify(*popped.lock, popped.t); } unlock( lock ); return ret; } uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; } bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); insert_last( blocked_threads, info ); pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; unlock( lock ); // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); // unregisters alarm so it doesn't go off if signal happens first unregister_self( &node_wrap.alarm_node ); // resets recursion count here after waking if ( info.lock ) on_wakeup( *info.lock, recursion_count ); } void wait( pthread_cond_var(L) & this, L & l ) with(this) { wait( this, l, 0 ); } void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) { lock( lock __cfaabi_dbg_ctx2 ); info_thread( L ) i = { active_thread(), info, &l }; insert_last( blocked_threads, i ); unlock( lock ); // blocks here size_t recursion_count = block_and_get_recursion( i ); on_wakeup( *i.lock, recursion_count ); } #define PTHREAD_WAIT_TIME( u, l, t ) \ info_thread( L ) i = { active_thread(), u, l }; \ queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \ return i.signalled; Duration getDuration(timespec t) { timespec currTime; clock_gettime(CLOCK_REALTIME, &currTime); Duration waitUntil = { t }; Duration currDur = { currTime }; if ( currDur >= waitUntil ) return currDur - waitUntil; Duration zero = { 0 }; return zero; } bool wait( pthread_cond_var(L) & this, L & l, timespec t ) { PTHREAD_WAIT_TIME( 0, &l , getDuration( t ) ) } bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) { PTHREAD_WAIT_TIME( info, &l , getDuration( t ) ) } } //----------------------------------------------------------------------------- // Semaphore void ?{}( semaphore & this, int count = 1 ) { (this.lock){}; this.count = count; (this.waiting){}; } void ^?{}(semaphore & this) {} bool P(semaphore & this) with( this ){ lock( lock __cfaabi_dbg_ctx2 ); count -= 1; if ( count < 0 ) { // queue current task append( waiting, active_thread() ); // atomically release spin lock and block unlock( lock ); park(); return true; } else { unlock( lock ); return false; } } thread$ * V (semaphore & this, const bool doUnpark ) with( this ) { thread$ * thrd = 0p; lock( lock __cfaabi_dbg_ctx2 ); count += 1; if ( count <= 0 ) { // remove task at head of waiting list thrd = pop_head( waiting ); } unlock( lock ); // make new owner if( doUnpark ) unpark( thrd ); return thrd; } bool V(semaphore & this) with( this ) { thread$ * thrd = V(this, true); return thrd != 0p; } bool V(semaphore & this, unsigned diff) with( this ) { thread$ * thrd = 0p; lock( lock __cfaabi_dbg_ctx2 ); int release = max(-count, (int)diff); count += diff; for(release) { unpark( pop_head( waiting ) ); } unlock( lock ); return thrd != 0p; }