Changeset beeff61e for libcfa/src
- Timestamp:
- May 1, 2023, 4:00:06 PM (14 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 73bf7ddc
- Parents:
- bb7422a
- Location:
- libcfa/src/concurrency
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
rbb7422a rbeeff61e 4 4 #include <list.hfa> 5 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 6 #include "select.hfa" 30 7 31 8 // returns true if woken due to shutdown 32 9 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link) & queue, void * elem_ptr, go_mutex & lock ) {34 wait_link w{ active_thread(), elem_ptr };35 insert_last( queue, w);10 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) { 11 select_node sn{ active_thread(), elem_ptr }; 12 insert_last( queue, sn ); 36 13 unlock( lock ); 37 14 park(); 38 return w.elem == 0p; 15 return sn.extra == 0p; 16 } 17 18 // Waituntil support (un)register_select helper routine 19 // Sets select node avail if not special OR case and then unlocks 20 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 21 if ( node.park_counter ) __make_select_node_available( node ); 22 unlock( mutex_lock ); 39 23 } 40 24 … … 59 43 size_t size, front, back, count; 60 44 T * buffer; 61 dlist( wait_link) prods, cons; // lists of blocked threads45 dlist( select_node ) prods, cons; // lists of blocked threads 62 46 go_mutex mutex_lock; // MX lock 63 47 bool closed; // indicates channel close/open … … 70 54 size = _size; 71 55 front = back = count = 0; 72 buffer = aalloc( size );56 if ( size != 0 ) buffer = aalloc( size ); 73 57 prods{}; 74 58 cons{}; … … 87 71 #endif 88 72 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer );73 if ( size != 0 ) delete( buffer ); 90 74 } 91 75 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } … … 102 86 // flush waiting consumers and producers 103 87 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 88 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 89 break; // if __handle_waituntil_OR returns false cons is empty so break 90 cons`first.extra = 0p; 105 91 wake_one( cons ); 106 92 } 107 93 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 94 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 95 break; // if __handle_waituntil_OR returns false prods is empty so break 96 prods`first.extra = 0p; 109 97 wake_one( prods ); 110 98 } … … 114 102 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 103 104 // used to hand an element to a blocked consumer and signal it 105 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 106 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 107 wake_one( cons ); 108 } 109 110 // used to hand an element to a blocked producer and signal it 111 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 112 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 113 wake_one( prods ); 114 } 115 116 116 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 117 lock( mutex_lock ); 118 118 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 119 __cons_handoff( chan, elem ); 121 120 } 122 121 unlock( mutex_lock ); … … 125 124 // handles buffer insert 126 125 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 127 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));126 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 128 127 count += 1; 129 128 back++; … … 131 130 } 132 131 133 // does the buffer insert or hands elem directly to consumer if one is waiting134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {135 if ( count == 0 && !cons`isEmpty ) {136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work137 wake_one( cons );138 } else __buf_insert( chan, elem );139 }140 141 132 // needed to avoid an extra copy in closed case 142 133 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { … … 145 136 operations++; 146 137 #endif 138 139 ConsEmpty: if ( !cons`isEmpty ) { 140 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 141 __cons_handoff( chan, elem ); 142 unlock( mutex_lock ); 143 return true; 144 } 145 147 146 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 147 148 __buf_insert( chan, elem ); 149 149 unlock( mutex_lock ); 150 150 return true; … … 157 157 // handles closed case of insert routine 158 158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };159 channel_closed except{ &channel_closed_vt, &elem, &chan }; 160 160 throwResume except; // throw closed resumption 161 161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 182 182 } 183 183 184 // have to check for the zero size channel case185 if ( size == 0 &&!cons`isEmpty ) {186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));187 wake_one( cons);188 unlock( mutex_lock ); 189 return true;184 // buffer count must be zero if cons are blocked (also handles zero-size case) 185 ConsEmpty: if ( !cons`isEmpty ) { 186 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 187 __cons_handoff( chan, elem ); 188 unlock( mutex_lock ); 189 return; 190 190 } 191 191 … … 202 202 } // if 203 203 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 204 __buf_insert( chan, elem ); 205 unlock( mutex_lock ); 206 } 207 208 // does the buffer remove and potentially does waiting producer work 209 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 210 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 216 211 count -= 1; 217 212 front = (front + 1) % size; 218 }219 220 // does the buffer remove and potentially does waiting producer work221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {222 __buf_remove( chan, retval );223 213 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 214 if ( !__handle_waituntil_OR( prods ) ) return; 215 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 225 216 wake_one( prods ); 226 217 } … … 233 224 operations++; 234 225 #endif 226 227 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 228 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 229 __prods_handoff( chan, retval ); 230 unlock( mutex_lock ); 231 return true; 232 } 233 235 234 if ( count == 0 ) { unlock( mutex_lock ); return false; } 235 236 236 __do_remove( chan, retval ); 237 237 unlock( mutex_lock ); … … 244 244 static inline [T, bool] try_remove( channel(T) & chan ) { 245 245 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 246 bool success = __internal_try_remove( chan, retval ); 247 return [ retval, success ]; 248 } 249 250 static inline T try_remove( channel(T) & chan ) { 250 251 T retval; 251 252 __internal_try_remove( chan, retval ); … … 255 256 // handles closed case of insert routine 256 257 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{ &channel_closed_vt, 0p, &chan };258 channel_closed except{ &channel_closed_vt, 0p, &chan }; 258 259 throwResume except; // throw resumption 259 260 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 279 280 280 281 // have to check for the zero size channel case 281 if ( size == 0 && !prods`isEmpty ) {282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));283 wake_one( prods);282 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 283 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 284 __prods_handoff( chan, retval ); 284 285 unlock( mutex_lock ); 285 286 return retval; … … 287 288 288 289 // wait if buffer is empty, work will be completed by someone else 289 if ( count == 0) {290 if ( count == 0 ) { 290 291 #ifdef CHAN_STATS 291 292 blocks++; … … 299 300 // Remove from buffer 300 301 __do_remove( chan, retval ); 301 302 302 unlock( mutex_lock ); 303 303 return retval; 304 304 } 305 306 /////////////////////////////////////////////////////////////////////////////////////////// 307 // The following is support for waituntil (select) statements 308 /////////////////////////////////////////////////////////////////////////////////////////// 309 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 310 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case 311 lock( mutex_lock ); 312 if ( node`isListed ) { // op wasn't performed 313 #ifdef CHAN_STATS 314 operations--; 315 #endif 316 remove( node ); 317 unlock( mutex_lock ); 318 return false; 319 } 320 unlock( mutex_lock ); 321 322 // only return true when not special OR case, not exceptional calse and status is SAT 323 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT; 324 } 325 326 // type used by select statement to capture a chan read as the selected operation 327 struct chan_read { 328 channel(T) & chan; 329 T & ret; 330 }; 331 332 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 333 &cr.chan = &chan; 334 &cr.ret = &ret; 335 } 336 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 337 338 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 339 __closed_remove( chan, ret ); 340 // if we get here then the insert succeeded 341 __make_select_node_available( node ); 342 } 343 344 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) { 345 // mutex(sout) sout | "register_read"; 346 lock( mutex_lock ); 347 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 348 349 #ifdef CHAN_STATS 350 if ( !closed ) operations++; 351 #endif 352 353 // check if we can complete operation. If so race to establish winner in special OR case 354 if ( !node.park_counter && ( count != 0 || !prods`isEmpty || unlikely(closed) ) ) { 355 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 356 unlock( mutex_lock ); 357 return false; 358 } 359 } 360 361 if ( unlikely(closed) ) { 362 unlock( mutex_lock ); 363 __handle_select_closed_read( this, node ); 364 return true; 365 } 366 367 // have to check for the zero size channel case 368 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 369 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 370 __prods_handoff( chan, ret ); 371 __set_avail_then_unlock( node, mutex_lock ); 372 return true; 373 } 374 375 // wait if buffer is empty, work will be completed by someone else 376 if ( count == 0 ) { 377 #ifdef CHAN_STATS 378 blocks++; 379 #endif 380 381 insert_last( cons, node ); 382 unlock( mutex_lock ); 383 return false; 384 } 385 386 // Remove from buffer 387 __do_remove( chan, ret ); 388 __set_avail_then_unlock( node, mutex_lock ); 389 return true; 390 } 391 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 392 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 393 if ( node.extra == 0p ) // check if woken up due to closed channel 394 __closed_remove( chan, ret ); 395 // This is only reachable if not closed or closed exception was handled 396 return true; 397 } 398 399 // type used by select statement to capture a chan write as the selected operation 400 struct chan_write { 401 channel(T) & chan; 402 T elem; 403 }; 404 405 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 406 &cw.chan = &chan; 407 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 408 } 409 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 410 411 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 412 __closed_insert( chan, elem ); 413 // if we get here then the insert succeeded 414 __make_select_node_available( node ); 415 } 416 417 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) { 418 // mutex(sout) sout | "register_write"; 419 lock( mutex_lock ); 420 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 421 422 #ifdef CHAN_STATS 423 if ( !closed ) operations++; 424 #endif 425 426 // check if we can complete operation. If so race to establish winner in special OR case 427 if ( !node.park_counter && ( count != size || !cons`isEmpty || unlikely(closed) ) ) { 428 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 429 unlock( mutex_lock ); 430 return false; 431 } 432 } 433 434 // if closed handle 435 if ( unlikely(closed) ) { 436 unlock( mutex_lock ); 437 __handle_select_closed_write( this, node ); 438 return true; 439 } 440 441 // handle blocked consumer case via handoff (buffer is implicitly empty) 442 ConsEmpty: if ( !cons`isEmpty ) { 443 if ( !__handle_waituntil_OR( cons ) ) { 444 // mutex(sout) sout | "empty"; 445 break ConsEmpty; 446 } 447 // mutex(sout) sout | "signal"; 448 __cons_handoff( chan, elem ); 449 __set_avail_then_unlock( node, mutex_lock ); 450 return true; 451 } 452 453 // insert node in list if buffer is full, work will be completed by someone else 454 if ( count == size ) { 455 #ifdef CHAN_STATS 456 blocks++; 457 #endif 458 459 insert_last( prods, node ); 460 unlock( mutex_lock ); 461 return false; 462 } // if 463 464 // otherwise carry out write either via normal insert 465 __buf_insert( chan, elem ); 466 __set_avail_then_unlock( node, mutex_lock ); 467 return true; 468 } 469 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 470 471 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 472 if ( node.extra == 0p ) // check if woken up due to closed channel 473 __closed_insert( chan, elem ); 474 475 // This is only reachable if not closed or closed exception was handled 476 return true; 477 } 478 479 305 480 } // forall( T ) 481 482 483 -
libcfa/src/concurrency/future.hfa
rbb7422a rbeeff61e 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 #include "locks.hfa" 21 22 22 23 //---------------------------------------------------------------------------- … … 26 27 // future_t is lockfree and uses atomics which aren't needed given we use locks here 27 28 forall( T ) { 28 // enum (int){ FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards29 // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 29 30 30 31 // temporary enum replacement … … 44 45 }; 45 46 46 // C_TODO: perhaps allow exceptions to be inserted like uC++?47 48 47 static inline { 49 48 … … 82 81 void _internal_flush( future(T) & this ) with(this) { 83 82 while( ! waiters`isEmpty ) { 83 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 84 break; // if handle_OR returns false then waiters is empty so break 84 85 select_node &s = try_pop_front( waiters ); 85 86 86 if ( s. race_flag== 0p )87 if ( s.clause_status == 0p ) 87 88 // poke in result so that woken threads do not need to reacquire any locks 88 // *(((future_node(T) &)s).my_result) = result;89 89 copy_T( result, *(((future_node(T) &)s).my_result) ); 90 else if ( ! install_select_winner( s, &this ) ) continue;90 else if ( !__make_select_node_available( s ) ) continue; 91 91 92 92 // only unpark if future is not selected … … 97 97 98 98 // Fulfil the future, returns whether or not someone was unblocked 99 bool fulfil( future(T) & this, T &val ) with(this) {99 bool fulfil( future(T) & this, T val ) with(this) { 100 100 lock( lock ); 101 101 if( state != FUTURE_EMPTY ) … … 153 153 } 154 154 155 void * register_select( future(T) & this, select_node & s ) with(this) { 156 lock( lock ); 157 158 // future not ready -> insert select node and return 0p 155 bool register_select( future(T) & this, select_node & s ) with(this) { 156 lock( lock ); 157 158 // check if we can complete operation. If so race to establish winner in special OR case 159 if ( !s.park_counter && state != FUTURE_EMPTY ) { 160 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 161 unlock( lock ); 162 return false; 163 } 164 } 165 166 // future not ready -> insert select node and return 159 167 if( state == FUTURE_EMPTY ) { 160 168 insert_last( waiters, s ); 161 169 unlock( lock ); 162 return 0p; 163 } 164 165 // future ready and we won race to install it as the select winner return 1p 166 if ( install_select_winner( s, &this ) ) { 167 unlock( lock ); 168 return 1p; 169 } 170 171 unlock( lock ); 172 // future ready and we lost race to install it as the select winner 173 return 2p; 174 } 175 176 void unregister_select( future(T) & this, select_node & s ) with(this) { 170 return false; 171 } 172 173 __make_select_node_available( s ); 174 unlock( lock ); 175 return true; 176 } 177 178 bool unregister_select( future(T) & this, select_node & s ) with(this) { 179 if ( ! s`isListed ) return false; 177 180 lock( lock ); 178 181 if ( s`isListed ) remove( s ); 179 182 unlock( lock ); 183 return false; 180 184 } 181 185 186 bool on_selected( future(T) & this, select_node & node ) { return true; } 182 187 } 183 188 } … … 186 191 // These futures below do not support select statements so they may not be as useful as 'future' 187 192 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 188 // since it uses raw atomics and no locks afaik193 // since it uses raw atomics and no locks 189 194 // 190 195 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' -
libcfa/src/concurrency/invoke.h
rbb7422a rbeeff61e 217 217 struct __thread_user_link cltr_link; 218 218 219 // used to point to this thd's current clh node220 volatile bool * clh_node;221 222 219 struct processor * last_proc; 220 221 // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes 222 // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock 223 void * link_node; 223 224 224 225 PRNG_STATE_T random_state; // fast random numbers -
libcfa/src/concurrency/locks.cfa
rbb7422a rbeeff61e 79 79 // lock is held by some other thread 80 80 if ( owner != 0p && owner != thrd ) { 81 insert_last( blocked_threads, *thrd ); 81 select_node node; 82 insert_last( blocked_threads, node ); 82 83 wait_count++; 83 84 unlock( lock ); 84 85 park( ); 85 } 86 // multi acquisition lock is held by current thread 87 else if ( owner == thrd && multi_acquisition ) { 86 return; 87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 88 88 recursion_count++; 89 unlock( lock ); 90 } 91 // lock isn't held 92 else { 89 } else { // lock isn't held 93 90 owner = thrd; 94 91 recursion_count = 1; 95 unlock( lock );96 } 92 } 93 unlock( lock ); 97 94 } 98 95 … … 117 114 } 118 115 119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) { 120 thread$ * t = &try_pop_front( blocked_threads ); 121 owner = t; 122 recursion_count = ( t ? 1 : 0 ); 123 if ( t ) wait_count--; 124 unpark( t ); 116 // static void pop_and_set_new_owner( blocking_lock & this ) with( this ) { 117 // thread$ * t = &try_pop_front( blocked_threads ); 118 // owner = t; 119 // recursion_count = ( t ? 1 : 0 ); 120 // if ( t ) wait_count--; 121 // unpark( t ); 122 // } 123 124 static inline void pop_node( blocking_lock & this ) with( this ) { 125 __handle_waituntil_OR( blocked_threads ); 126 select_node * node = &try_pop_front( blocked_threads ); 127 if ( node ) { 128 wait_count--; 129 owner = node->blocked_thread; 130 recursion_count = 1; 131 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 132 wake_one( blocked_threads, *node ); 133 } else { 134 owner = 0p; 135 recursion_count = 0; 136 } 125 137 } 126 138 … … 134 146 recursion_count--; 135 147 if ( recursion_count == 0 ) { 136 pop_ and_set_new_owner( this );148 pop_node( this ); 137 149 } 138 150 unlock( lock ); … … 147 159 // lock held 148 160 if ( owner != 0p ) { 149 insert_last( blocked_threads, * t);161 insert_last( blocked_threads, *(select_node *)t->link_node ); 150 162 wait_count++; 151 unlock( lock );152 163 } 153 164 // lock not held … … 156 167 recursion_count = 1; 157 168 unpark( t ); 158 unlock( lock );159 } 169 } 170 unlock( lock ); 160 171 } 161 172 … … 167 178 size_t ret = recursion_count; 168 179 169 pop_and_set_new_owner( this ); 180 pop_node( this ); 181 182 select_node node; 183 active_thread()->link_node = (void *)&node; 170 184 unlock( lock ); 185 186 park(); 187 171 188 return ret; 172 189 } … … 175 192 recursion_count = recursion; 176 193 } 194 195 // waituntil() support 196 bool register_select( blocking_lock & this, select_node & node ) with(this) { 197 lock( lock __cfaabi_dbg_ctx2 ); 198 thread$ * thrd = active_thread(); 199 200 // single acquisition lock is held by current thread 201 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); 202 203 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case 204 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 205 unlock( lock ); 206 return false; 207 } 208 } 209 210 // lock is held by some other thread 211 if ( owner != 0p && owner != thrd ) { 212 insert_last( blocked_threads, node ); 213 wait_count++; 214 unlock( lock ); 215 return false; 216 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 217 recursion_count++; 218 } else { // lock isn't held 219 owner = thrd; 220 recursion_count = 1; 221 } 222 223 if ( node.park_counter ) __make_select_node_available( node ); 224 unlock( lock ); 225 return true; 226 } 227 228 bool unregister_select( blocking_lock & this, select_node & node ) with(this) { 229 lock( lock __cfaabi_dbg_ctx2 ); 230 if ( node`isListed ) { 231 remove( node ); 232 wait_count--; 233 unlock( lock ); 234 return false; 235 } 236 237 if ( owner == active_thread() ) { 238 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); 239 // if recursion count is zero release lock and set new owner if one is waiting 240 recursion_count--; 241 if ( recursion_count == 0 ) { 242 pop_node( this ); 243 } 244 } 245 unlock( lock ); 246 return false; 247 } 248 249 bool on_selected( blocking_lock & this, select_node & node ) { return true; } 177 250 178 251 //----------------------------------------------------------------------------- … … 311 384 int counter( condition_variable(L) & this ) with(this) { return count; } 312 385 313 static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {386 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) { 314 387 // add info_thread to waiting queue 315 388 insert_last( blocked_threads, *i ); 316 389 count++; 317 size_t recursion_count = 0; 318 if (i->lock) { 390 // size_t recursion_count = 0; 391 // if (i->lock) { 392 // // if lock was passed get recursion count to reset to after waking thread 393 // recursion_count = on_wait( *i->lock ); 394 // } 395 // return recursion_count; 396 } 397 398 static size_t block_and_get_recursion( info_thread(L) & i ) { 399 size_t recursion_count = 0; 400 if ( i.lock ) { 319 401 // if lock was passed get recursion count to reset to after waking thread 320 recursion_count = on_wait( *i ->lock );321 } 322 323 402 recursion_count = on_wait( *i.lock ); // this call blocks 403 } else park( ); 404 return recursion_count; 405 } 324 406 325 407 // helper for wait()'s' with no timeout 326 408 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { 327 409 lock( lock __cfaabi_dbg_ctx2 ); 328 size_t recursion_count = queue_and_get_recursion(this, &i); 410 enqueue_thread( this, &i ); 411 // size_t recursion_count = queue_and_get_recursion( this, &i ); 329 412 unlock( lock ); 330 413 331 414 // blocks here 332 park( ); 415 size_t recursion_count = block_and_get_recursion( i ); 416 // park( ); 333 417 334 418 // resets recursion count here after waking 335 if ( i.lock) on_wakeup(*i.lock, recursion_count);419 if ( i.lock ) on_wakeup( *i.lock, recursion_count ); 336 420 } 337 421 … … 343 427 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 344 428 lock( lock __cfaabi_dbg_ctx2 ); 345 size_t recursion_count = queue_and_get_recursion(this, &info); 429 enqueue_thread( this, &info ); 430 // size_t recursion_count = queue_and_get_recursion( this, &info ); 346 431 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 347 432 unlock( lock ); … … 351 436 352 437 // blocks here 353 park(); 438 size_t recursion_count = block_and_get_recursion( info ); 439 // park(); 354 440 355 441 // unregisters alarm so it doesn't go off if this happens first … … 357 443 358 444 // resets recursion count here after waking 359 if ( info.lock) on_wakeup(*info.lock, recursion_count);445 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 360 446 } 361 447 … … 417 503 info_thread( L ) i = { active_thread(), info, &l }; 418 504 insert_last( blocked_threads, i ); 419 size_t recursion_count = on_wait( *i.lock ); 420 park( );505 size_t recursion_count = on_wait( *i.lock ); // blocks here 506 // park( ); 421 507 on_wakeup(*i.lock, recursion_count); 422 508 } … … 459 545 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 460 546 461 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) { 462 // add info_thread to waiting queue 463 insert_last( blocked_threads, *i ); 464 size_t recursion_count = 0; 465 recursion_count = on_wait( *i->lock ); 466 return recursion_count; 467 } 547 // static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) { 548 // // add info_thread to waiting queue 549 // insert_last( blocked_threads, *i ); 550 // size_t recursion_count = 0; 551 // recursion_count = on_wait( *i->lock ); 552 // return recursion_count; 553 // } 554 468 555 469 556 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 470 557 lock( lock __cfaabi_dbg_ctx2 ); 471 size_t recursion_count = queue_and_get_recursion(this, &info); 558 // size_t recursion_count = queue_and_get_recursion(this, &info); 559 insert_last( blocked_threads, info ); 472 560 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 473 561 unlock( lock ); … … 477 565 478 566 // blocks here 479 park(); 567 size_t recursion_count = block_and_get_recursion( info ); 568 // park(); 480 569 481 570 // unregisters alarm so it doesn't go off if this happens first … … 483 572 484 573 // resets recursion count here after waking 485 if ( info.lock) on_wakeup(*info.lock, recursion_count);574 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 486 575 } 487 576 … … 493 582 lock( lock __cfaabi_dbg_ctx2 ); 494 583 info_thread( L ) i = { active_thread(), info, &l }; 495 size_t recursion_count = queue_and_get_recursion(this, &i); 496 unlock( lock ); 497 park( ); 498 on_wakeup(*i.lock, recursion_count); 584 insert_last( blocked_threads, i ); 585 // size_t recursion_count = queue_and_get_recursion( this, &i ); 586 unlock( lock ); 587 588 // blocks here 589 size_t recursion_count = block_and_get_recursion( i ); 590 // park(); 591 on_wakeup( *i.lock, recursion_count ); 499 592 } 500 593 -
libcfa/src/concurrency/locks.hfa
rbb7422a rbeeff61e 30 30 #include "time.hfa" 31 31 32 #include "select.hfa" 33 32 34 #include <fstream.hfa> 33 35 … … 70 72 static inline void on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 71 73 static inline void on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 74 static inline bool register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 75 static inline bool unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 76 static inline bool on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 72 77 73 78 //---------- … … 84 89 static inline void on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 85 90 static inline void on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 91 static inline bool register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 92 static inline bool unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 93 static inline bool on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 86 94 87 95 //----------------------------------------------------------------------------- … … 180 188 181 189 // if this is called recursively IT WILL DEADLOCK!!!!! 182 static inline void lock( futex_mutex & this) with(this) {190 static inline void lock( futex_mutex & this ) with(this) { 183 191 int state; 184 192 … … 190 198 for (int i = 0; i < spin; i++) Pause(); 191 199 } 192 193 // // no contention try to acquire194 // if (internal_try_lock(this, state)) return;195 200 196 201 // if not in contended state, set to be in contended state … … 213 218 214 219 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); } 215 static inline size_t on_wait( futex_mutex & f ) { unlock(f); return 0;}220 static inline size_t on_wait( futex_mutex & f ) { unlock(f); park(); return 0; } 216 221 217 222 // to set recursion count after getting signalled; … … 244 249 245 250 // if this is called recursively IT WILL DEADLOCK!!!!! 246 static inline void lock( go_mutex & this) with(this) {251 static inline void lock( go_mutex & this ) with( this ) { 247 252 int state, init_state; 248 253 … … 255 260 while( !val ) { // lock unlocked 256 261 state = 0; 257 if ( internal_try_lock(this, state, init_state)) return;262 if ( internal_try_lock( this, state, init_state ) ) return; 258 263 } 259 264 for (int i = 0; i < 30; i++) Pause(); … … 262 267 while( !val ) { // lock unlocked 263 268 state = 0; 264 if ( internal_try_lock(this, state, init_state)) return;269 if ( internal_try_lock( this, state, init_state ) ) return; 265 270 } 266 271 sched_yield(); 267 272 268 273 // if not in contended state, set to be in contended state 269 state = internal_exchange( this, 2);274 state = internal_exchange( this, 2 ); 270 275 if ( !state ) return; // state == 0 271 276 init_state = 2; 272 futex( (int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK277 futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK 273 278 } 274 279 } … … 276 281 static inline void unlock( go_mutex & this ) with(this) { 277 282 // if uncontended do atomic unlock and then return 278 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;283 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return; 279 284 280 285 // otherwise threads are blocked so we must wake one 281 futex( (int *)&val, FUTEX_WAKE, 1);282 } 283 284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark( t); }285 static inline size_t on_wait( go_mutex & f ) { unlock(f); return 0;}286 futex( (int *)&val, FUTEX_WAKE, 1 ); 287 } 288 289 static inline void on_notify( go_mutex & f, thread$ * t){ unpark( t ); } 290 static inline size_t on_wait( go_mutex & f ) { unlock( f ); park(); return 0; } 286 291 static inline void on_wakeup( go_mutex & f, size_t recursion ) {} 287 288 //-----------------------------------------------------------------------------289 // CLH Spinlock290 // - No recursive acquisition291 // - Needs to be released by owner292 293 struct clh_lock {294 volatile bool * volatile tail;295 volatile bool * volatile head;296 };297 298 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }299 static inline void ^?{}( clh_lock & this ) { free(this.tail); }300 301 static inline void lock(clh_lock & l) {302 thread$ * curr_thd = active_thread();303 *(curr_thd->clh_node) = false;304 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);305 while(!__atomic_load_n(prev, __ATOMIC_SEQ_CST)) Pause();306 __atomic_store_n((bool **)(&l.head), (bool *)curr_thd->clh_node, __ATOMIC_SEQ_CST);307 curr_thd->clh_node = prev;308 }309 310 static inline void unlock(clh_lock & l) {311 __atomic_store_n((bool *)(l.head), true, __ATOMIC_SEQ_CST);312 }313 314 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }315 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }317 292 318 293 //----------------------------------------------------------------------------- … … 337 312 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 338 313 339 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {314 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) { 340 315 return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); 341 316 } 342 317 343 static inline bool try_lock( exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }344 345 static inline bool try_lock_contention( exp_backoff_then_block_lock & this) with(this) {346 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE);347 } 348 349 static inline bool block( exp_backoff_then_block_lock & this) with(this) {318 static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); } 319 320 static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) { 321 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE ); 322 } 323 324 static inline bool block( exp_backoff_then_block_lock & this ) with(this) { 350 325 lock( spinlock __cfaabi_dbg_ctx2 ); 351 326 if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) { … … 359 334 } 360 335 361 static inline void lock( exp_backoff_then_block_lock & this) with(this) {336 static inline void lock( exp_backoff_then_block_lock & this ) with(this) { 362 337 size_t compare_val = 0; 363 338 int spin = 4; … … 378 353 } 379 354 380 static inline void unlock( exp_backoff_then_block_lock & this) with(this) {355 static inline void unlock( exp_backoff_then_block_lock & this ) with(this) { 381 356 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 382 357 lock( spinlock __cfaabi_dbg_ctx2 ); … … 386 361 } 387 362 388 static inline void on_notify( exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }389 static inline size_t on_wait( exp_backoff_then_block_lock & this) { unlock(this); return 0; }390 static inline void on_wakeup( exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); }363 static inline void on_notify( exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark( t ); } 364 static inline size_t on_wait( exp_backoff_then_block_lock & this ) { unlock( this ); park(); return 0; } 365 static inline void on_wakeup( exp_backoff_then_block_lock & this, size_t recursion ) { lock( this ); } 391 366 392 367 //----------------------------------------------------------------------------- … … 418 393 419 394 // if this is called recursively IT WILL DEADLOCK!!!!! 420 static inline void lock( fast_block_lock & this) with(this) {395 static inline void lock( fast_block_lock & this ) with(this) { 421 396 lock( lock __cfaabi_dbg_ctx2 ); 422 397 if ( held ) { … … 430 405 } 431 406 432 static inline void unlock( fast_block_lock & this) with(this) {407 static inline void unlock( fast_block_lock & this ) with(this) { 433 408 lock( lock __cfaabi_dbg_ctx2 ); 434 409 /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this ); … … 439 414 } 440 415 441 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {416 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) { 442 417 lock( lock __cfaabi_dbg_ctx2 ); 443 418 insert_last( blocked_threads, *t ); 444 419 unlock( lock ); 445 420 } 446 static inline size_t on_wait( fast_block_lock & this) { unlock(this); return 0; }447 static inline void on_wakeup( fast_block_lock & this, size_t recursion ) { }421 static inline size_t on_wait( fast_block_lock & this) { unlock(this); park(); return 0; } 422 static inline void on_wakeup( fast_block_lock & this, size_t recursion ) { } 448 423 449 424 //----------------------------------------------------------------------------- … … 456 431 struct simple_owner_lock { 457 432 // List of blocked threads 458 dlist( thread$) blocked_threads;433 dlist( select_node ) blocked_threads; 459 434 460 435 // Spin lock used for mutual exclusion … … 477 452 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void; 478 453 479 static inline void lock( simple_owner_lock & this) with(this) {480 if ( owner == active_thread()) {454 static inline void lock( simple_owner_lock & this ) with(this) { 455 if ( owner == active_thread() ) { 481 456 recursion_count++; 482 457 return; … … 484 459 lock( lock __cfaabi_dbg_ctx2 ); 485 460 486 if (owner != 0p) { 487 insert_last( blocked_threads, *active_thread() ); 461 if ( owner != 0p ) { 462 select_node node; 463 insert_last( blocked_threads, node ); 488 464 unlock( lock ); 489 465 park( ); … … 495 471 } 496 472 497 // TODO: fix duplicate def issue and bring this back 498 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) { 499 // thread$ * t = &try_pop_front( blocked_threads ); 500 // owner = t; 501 // recursion_count = ( t ? 1 : 0 ); 502 // unpark( t ); 503 // } 504 505 static inline void unlock(simple_owner_lock & this) with(this) { 473 static inline void pop_node( simple_owner_lock & this ) with(this) { 474 __handle_waituntil_OR( blocked_threads ); 475 select_node * node = &try_pop_front( blocked_threads ); 476 if ( node ) { 477 owner = node->blocked_thread; 478 recursion_count = 1; 479 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 480 wake_one( blocked_threads, *node ); 481 } else { 482 owner = 0p; 483 recursion_count = 0; 484 } 485 } 486 487 static inline void unlock( simple_owner_lock & this ) with(this) { 506 488 lock( lock __cfaabi_dbg_ctx2 ); 507 489 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 510 492 recursion_count--; 511 493 if ( recursion_count == 0 ) { 512 // pop_and_set_new_owner( this ); 513 thread$ * t = &try_pop_front( blocked_threads ); 514 owner = t; 515 recursion_count = ( t ? 1 : 0 ); 516 unpark( t ); 494 pop_node( this ); 517 495 } 518 496 unlock( lock ); 519 497 } 520 498 521 static inline void on_notify(simple_owner_lock & this, structthread$ * t ) with(this) {499 static inline void on_notify(simple_owner_lock & this, thread$ * t ) with(this) { 522 500 lock( lock __cfaabi_dbg_ctx2 ); 523 501 // lock held 524 502 if ( owner != 0p ) { 525 insert_last( blocked_threads, * t);503 insert_last( blocked_threads, *(select_node *)t->link_node ); 526 504 } 527 505 // lock not held … … 534 512 } 535 513 536 static inline size_t on_wait( simple_owner_lock & this) with(this) {514 static inline size_t on_wait( simple_owner_lock & this ) with(this) { 537 515 lock( lock __cfaabi_dbg_ctx2 ); 538 516 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 541 519 size_t ret = recursion_count; 542 520 543 // pop_and_set_new_owner( this ); 544 545 thread$ * t = &try_pop_front( blocked_threads ); 546 owner = t; 547 recursion_count = ( t ? 1 : 0 ); 548 unpark( t ); 549 521 pop_node( this ); 522 523 select_node node; 524 active_thread()->link_node = (void *)&node; 550 525 unlock( lock ); 526 park(); 527 551 528 return ret; 552 529 } 553 530 554 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 531 static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 532 533 // waituntil() support 534 static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) { 535 lock( lock __cfaabi_dbg_ctx2 ); 536 537 // check if we can complete operation. If so race to establish winner in special OR case 538 if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) { 539 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 540 unlock( lock ); 541 return false; 542 } 543 } 544 545 if ( owner == active_thread() ) { 546 recursion_count++; 547 if ( node.park_counter ) __make_select_node_available( node ); 548 unlock( lock ); 549 return true; 550 } 551 552 if ( owner != 0p ) { 553 insert_last( blocked_threads, node ); 554 unlock( lock ); 555 return false; 556 } 557 558 owner = active_thread(); 559 recursion_count = 1; 560 561 if ( node.park_counter ) __make_select_node_available( node ); 562 unlock( lock ); 563 return true; 564 } 565 566 static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) { 567 lock( lock __cfaabi_dbg_ctx2 ); 568 if ( node`isListed ) { 569 remove( node ); 570 unlock( lock ); 571 return false; 572 } 573 574 if ( owner == active_thread() ) { 575 recursion_count--; 576 if ( recursion_count == 0 ) { 577 pop_node( this ); 578 } 579 } 580 unlock( lock ); 581 return false; 582 } 583 584 static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; } 585 555 586 556 587 //----------------------------------------------------------------------------- … … 578 609 579 610 // if this is called recursively IT WILL DEADLOCK! 580 static inline void lock( spin_queue_lock & this) with(this) {611 static inline void lock( spin_queue_lock & this ) with(this) { 581 612 mcs_spin_node node; 582 613 lock( lock, node ); … … 586 617 } 587 618 588 static inline void unlock( spin_queue_lock & this) with(this) {619 static inline void unlock( spin_queue_lock & this ) with(this) { 589 620 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 590 621 } 591 622 592 static inline void on_notify( spin_queue_lock & this, struct thread$ * t ) {623 static inline void on_notify( spin_queue_lock & this, struct thread$ * t ) { 593 624 unpark(t); 594 625 } 595 static inline size_t on_wait( spin_queue_lock & this) { unlock(this); return 0; }596 static inline void on_wakeup( spin_queue_lock & this, size_t recursion ) { lock(this); }626 static inline size_t on_wait( spin_queue_lock & this ) { unlock( this ); park(); return 0; } 627 static inline void on_wakeup( spin_queue_lock & this, size_t recursion ) { lock( this ); } 597 628 598 629 … … 621 652 622 653 // if this is called recursively IT WILL DEADLOCK!!!!! 623 static inline void lock( mcs_block_spin_lock & this) with(this) {654 static inline void lock( mcs_block_spin_lock & this ) with(this) { 624 655 mcs_node node; 625 656 lock( lock, node ); … … 633 664 } 634 665 635 static inline void on_notify( mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }636 static inline size_t on_wait( mcs_block_spin_lock & this) { unlock(this); return 0; }637 static inline void on_wakeup( mcs_block_spin_lock & this, size_t recursion ) {lock(this); }666 static inline void on_notify( mcs_block_spin_lock & this, struct thread$ * t ) { unpark( t ); } 667 static inline size_t on_wait( mcs_block_spin_lock & this) { unlock( this ); park(); return 0; } 668 static inline void on_wakeup( mcs_block_spin_lock & this, size_t recursion ) {lock( this ); } 638 669 639 670 //----------------------------------------------------------------------------- … … 661 692 662 693 // if this is called recursively IT WILL DEADLOCK!!!!! 663 static inline void lock( block_spin_lock & this) with(this) {694 static inline void lock( block_spin_lock & this ) with(this) { 664 695 lock( lock ); 665 696 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 668 699 } 669 700 670 static inline void unlock( block_spin_lock & this) with(this) {701 static inline void unlock( block_spin_lock & this ) with(this) { 671 702 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 672 703 } 673 704 674 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {705 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) { 675 706 // first we acquire internal fast_block_lock 676 707 lock( lock __cfaabi_dbg_ctx2 ); … … 686 717 unpark(t); 687 718 } 688 static inline size_t on_wait( block_spin_lock & this) { unlock(this); return 0; }689 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {719 static inline size_t on_wait( block_spin_lock & this ) { unlock( this ); park(); return 0; } 720 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) { 690 721 // now we acquire the entire block_spin_lock upon waking up 691 722 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 714 745 forall(L & | is_blocking_lock(L)) { 715 746 struct info_thread; 716 717 // // for use by sequence718 // info_thread(L) *& Back( info_thread(L) * this );719 // info_thread(L) *& Next( info_thread(L) * this );720 747 } 721 748 -
libcfa/src/concurrency/mutex_stmt.hfa
rbb7422a rbeeff61e 15 15 }; 16 16 17 18 17 struct __mutex_stmt_lock_guard { 19 18 void ** lockarr; … … 30 29 31 30 forall(L & | is_lock(L)) { 32 33 struct scoped_lock { 34 L * internal_lock; 35 }; 36 37 static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) { 38 this.internal_lock = &internal_lock; 39 lock(internal_lock); 40 } 41 42 static inline void ^?{}( scoped_lock(L) & this ) with(this) { 43 unlock(*internal_lock); 44 } 45 46 static inline void * __get_mutexstmt_lock_ptr( L & this ) { 47 return &this; 48 } 49 50 static inline L __get_mutexstmt_lock_type( L & this ); 51 52 static inline L __get_mutexstmt_lock_type( L * this ); 31 static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; } 32 static inline L __get_mutexstmt_lock_type( L & this ) {} 33 static inline L __get_mutexstmt_lock_type( L * this ) {} 53 34 } -
libcfa/src/concurrency/select.hfa
rbb7422a rbeeff61e 2 2 3 3 #include "containers/list.hfa" 4 #include <stdint.h> 5 #include <kernel.hfa> 6 #include <locks.hfa> 4 #include "stdint.h" 5 #include "kernel.hfa" 7 6 7 struct select_node; 8 9 // node status 10 static const unsigned long int __SELECT_UNSAT = 0; 11 static const unsigned long int __SELECT_SAT = 1; 12 static const unsigned long int __SELECT_RUN = 2; 13 14 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; } 15 static inline void __CFA_maybe_park( int * park_counter ) { 16 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 ) 17 park(); 18 } 19 20 // node used for coordinating waituntil synchronization 8 21 struct select_node { 22 int * park_counter; // If this is 0p then the node is in a special OR case waituntil 23 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil 24 25 void * extra; // used to store arbitrary data needed by some primitives 26 9 27 thread$ * blocked_thread; 10 void ** race_flag;11 28 inline dlink(select_node); 12 29 }; 13 30 P9_EMBEDDED( select_node, dlink(select_node) ) 14 31 15 void ?{}( select_node & this ) { 16 this.blocked_thread = 0p; 17 this.race_flag = 0p; 32 static inline void ?{}( select_node & this ) { 33 this.blocked_thread = active_thread(); 34 this.clause_status = 0p; 35 this.park_counter = 0p; 36 this.extra = 0p; 18 37 } 19 38 20 void ?{}( select_node & this, thread$ * blocked_thread ) {39 static inline void ?{}( select_node & this, thread$ * blocked_thread ) { 21 40 this.blocked_thread = blocked_thread; 22 this.race_flag = 0p; 41 this.clause_status = 0p; 42 this.park_counter = 0p; 43 this.extra = 0p; 23 44 } 24 45 25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag) {46 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) { 26 47 this.blocked_thread = blocked_thread; 27 this.race_flag = race_flag; 48 this.clause_status = 0p; 49 this.park_counter = 0p; 50 this.extra = extra; 28 51 } 29 52 30 void ^?{}( select_node & this ) {}53 static inline void ^?{}( select_node & this ) {} 31 54 55 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; } 32 56 33 57 //----------------------------------------------------------------------------- 34 58 // is_selectable 35 trait is_selectable(T & | sized(T)) { 36 // For registering a select on a selectable concurrency primitive 37 // return 0p if primitive not accessible yet 38 // return 1p if primitive gets acquired 39 // return 2p if primitive is accessible but some other primitive won the race 40 // C_TODO: add enum for return values 41 void * register_select( T &, select_node & ); 59 forall(T & | sized(T)) 60 trait is_selectable { 61 // For registering a select stmt on a selectable concurrency primitive 62 // Returns bool that indicates if operation is already SAT 63 bool register_select( T &, select_node & ); 42 64 43 void unregister_select( T &, select_node & ); 65 // For unregistering a select stmt on a selectable concurrency primitive 66 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 67 bool unregister_select( T &, select_node & ); 68 69 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 70 // passed as an arg to this routine 71 // If on_selected returns false, the statement is not run, if it returns true it is run. 72 bool on_selected( T &, select_node & ); 44 73 }; 45 74 46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) { 47 // temporary needed for atomic instruction 48 void * cmp_flag = 0p; 49 50 // if we dont win the selector race we need to potentially 51 // ignore this node and move to the next one so we return accordingly 52 if ( *race_flag != 0p || 53 !__atomic_compare_exchange_n( 54 race_flag, 55 &cmp_flag, 56 primitive_ptr, 57 false, 58 __ATOMIC_SEQ_CST, 59 __ATOMIC_SEQ_CST 60 ) 61 ) return false; // lost race and some other node triggered select 62 return true; // won race so this node is what the select proceeds with 75 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 76 static inline bool __select_node_else_race( select_node & this ) with( this ) { 77 unsigned long int cmp_status = __SELECT_UNSAT; 78 return *clause_status == 0 79 && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 63 80 } 81 82 // when a primitive becomes available it calls the following routine on it's node to update the select state: 83 // return true if we want to unpark the thd 84 static inline bool __make_select_node_available( select_node & this ) with( this ) { 85 unsigned long int cmp_status = __SELECT_UNSAT; 86 87 if( !park_counter ) 88 return *clause_status == 0 89 && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won 90 91 return *clause_status == 0 92 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write 93 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST); 94 } 95 96 // Handles the special OR case of the waituntil statement 97 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE 98 // performing the operation since if we lose the race the operation should not be performed as it will be lost 99 // Returns true if execution can continue normally and false if the queue has now been drained 100 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 101 if ( queue`isEmpty ) return false; 102 if ( queue`first.clause_status && !queue`first.park_counter ) { 103 while ( !queue`isEmpty ) { 104 // if node not a special OR case or if we win the special OR case race break 105 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; } 106 // otherwise we lost the special OR race so discard node 107 try_pop_front( queue ); 108 } 109 return false; 110 } 111 return true; 112 } 113 114 // wake one thread from the list 115 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) { 116 if ( !popped.clause_status // normal case, node is not a select node 117 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available 118 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread 119 unpark( popped.blocked_thread ); 120 } 121 122 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); } 123 124 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) { 125 this.blocked_thread = active_thread(); 126 this.clause_status = clause_status; 127 this.park_counter = park_counter; 128 } 129 -
libcfa/src/concurrency/thread.cfa
rbb7422a rbeeff61e 53 53 preferred = ready_queue_new_preferred(); 54 54 last_proc = 0p; 55 link_node = 0p; 55 56 PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() ); 56 57 #if defined( __CFA_WITH_VERIFY__ ) … … 59 60 #endif 60 61 61 clh_node = malloc( );62 *clh_node = false;63 64 62 doregister(curr_cluster, this); 65 63 monitors{ &self_mon_p, 1, (fptr_t)0 }; … … 70 68 canary = 0xDEADDEADDEADDEADp; 71 69 #endif 72 free(clh_node);73 70 unregister(curr_cluster, this); 74 71 ^self_cor{};
Note: See TracChangeset
for help on using the changeset viewer.