- Timestamp:
- May 17, 2023, 1:35:09 AM (2 years ago)
- Branches:
- ADT, master
- Children:
- f11010e
- Parents:
- 6e4c44d (diff), 8db4708 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src
- Files:
-
- 1 added
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/Makefile.am
r6e4c44d r3982384 115 115 concurrency/kernel/fwd.hfa \ 116 116 concurrency/mutex_stmt.hfa \ 117 concurrency/select.hfa \118 117 concurrency/channel.hfa \ 119 118 concurrency/actor.hfa … … 128 127 concurrency/monitor.hfa \ 129 128 concurrency/mutex.hfa \ 129 concurrency/select.hfa \ 130 130 concurrency/thread.hfa 131 131 -
libcfa/src/bits/weakso_locks.cfa
r6e4c44d r3982384 15 15 // Update Count : 16 16 // 17 18 17 #include "bits/weakso_locks.hfa" 19 20 18 #pragma GCC visibility push(default) 21 19 … … 27 25 void unlock( blocking_lock & ) {} 28 26 void on_notify( blocking_lock &, struct thread$ * ) {} 29 size_t on_wait( blocking_lock & ) { return 0; }27 size_t on_wait( blocking_lock &, void (*pp_fn)( void * ), void * pp_datum ) { return 0; } 30 28 void on_wakeup( blocking_lock &, size_t ) {} 31 29 size_t wait_count( blocking_lock & ) { return 0; } 30 bool register_select( blocking_lock & this, select_node & node ) { return false; } 31 bool unregister_select( blocking_lock & this, select_node & node ) { return false; } 32 bool on_selected( blocking_lock & this, select_node & node ) { return true; } 33 -
libcfa/src/bits/weakso_locks.hfa
r6e4c44d r3982384 23 23 #include "containers/list.hfa" 24 24 25 struct thread$;25 struct select_node; 26 26 27 27 //----------------------------------------------------------------------------- … … 32 32 33 33 // List of blocked threads 34 dlist( thread$) blocked_threads;34 dlist( select_node ) blocked_threads; 35 35 36 36 // Count of current blocked threads … … 57 57 void unlock( blocking_lock & this ) OPTIONAL_THREAD; 58 58 void on_notify( blocking_lock & this, struct thread$ * t ) OPTIONAL_THREAD; 59 size_t on_wait( blocking_lock & this ) OPTIONAL_THREAD;59 size_t on_wait( blocking_lock & this, void (*pp_fn)( void * ), void * pp_datum ) OPTIONAL_THREAD; 60 60 void on_wakeup( blocking_lock & this, size_t ) OPTIONAL_THREAD; 61 61 size_t wait_count( blocking_lock & this ) OPTIONAL_THREAD; 62 bool register_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD; 63 bool unregister_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD; 64 bool on_selected( blocking_lock & this, select_node & node ) OPTIONAL_THREAD; 62 65 63 66 //---------- … … 72 75 static inline bool try_lock ( multiple_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); } 73 76 static inline void unlock ( multiple_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 74 static inline size_t on_wait ( multiple_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this); }77 static inline size_t on_wait ( multiple_acquisition_lock & this, void (*pp_fn)( void * ), void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 75 78 static inline void on_wakeup( multiple_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 76 79 static inline void on_notify( multiple_acquisition_lock & this, struct thread$ * t ){ on_notify( (blocking_lock &)this, t ); } 80 static inline bool register_select( multiple_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 81 static inline bool unregister_select( multiple_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 82 static inline bool on_selected( multiple_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } -
libcfa/src/concurrency/channel.hfa
r6e4c44d r3982384 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2022 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include <locks.hfa> 4 20 #include <list.hfa> 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 21 #include "select.hfa" 30 22 31 23 // returns true if woken due to shutdown 32 24 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link) & queue, void * elem_ptr, go_mutex & lock ) {34 wait_link w{ active_thread(), elem_ptr };35 insert_last( queue, w);25 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) { 26 select_node sn{ active_thread(), elem_ptr }; 27 insert_last( queue, sn ); 36 28 unlock( lock ); 37 29 park(); 38 return w.elem == 0p; 30 return sn.extra == 0p; 31 } 32 33 // Waituntil support (un)register_select helper routine 34 // Sets select node avail if not special OR case and then unlocks 35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 36 if ( node.park_counter ) __make_select_node_available( node ); 37 unlock( mutex_lock ); 39 38 } 40 39 … … 59 58 size_t size, front, back, count; 60 59 T * buffer; 61 dlist( wait_link) prods, cons; // lists of blocked threads62 go_mutex mutex_lock; // MX lock63 bool closed; // indicates channel close/open60 dlist( select_node ) prods, cons; // lists of blocked threads 61 go_mutex mutex_lock; // MX lock 62 bool closed; // indicates channel close/open 64 63 #ifdef CHAN_STATS 65 64 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd … … 70 69 size = _size; 71 70 front = back = count = 0; 72 buffer = aalloc( size );71 if ( size != 0 ) buffer = aalloc( size ); 73 72 prods{}; 74 73 cons{}; … … 87 86 #endif 88 87 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer );90 } 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }88 if ( size != 0 ) delete( buffer ); 89 } 90 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 91 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 93 92 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 94 93 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } … … 102 101 // flush waiting consumers and producers 103 102 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 103 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 104 break; // if __handle_waituntil_OR returns false cons is empty so break 105 cons`first.extra = 0p; 105 106 wake_one( cons ); 106 107 } 107 108 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 109 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 110 break; // if __handle_waituntil_OR returns false prods is empty so break 111 prods`first.extra = 0p; 109 112 wake_one( prods ); 110 113 } … … 114 117 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 118 119 // used to hand an element to a blocked consumer and signal it 120 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 121 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 122 wake_one( cons ); 123 } 124 125 // used to hand an element to a blocked producer and signal it 126 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 127 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 128 wake_one( prods ); 129 } 130 116 131 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 132 lock( mutex_lock ); 118 133 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 134 __cons_handoff( chan, elem ); 121 135 } 122 136 unlock( mutex_lock ); … … 125 139 // handles buffer insert 126 140 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 127 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));141 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 128 142 count += 1; 129 143 back++; … … 131 145 } 132 146 133 // does the buffer insert or hands elem directly to consumer if one is waiting134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {135 if ( count == 0 && !cons`isEmpty ) {136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work137 wake_one( cons );138 } else __buf_insert( chan, elem );139 }140 141 147 // needed to avoid an extra copy in closed case 142 148 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { … … 145 151 operations++; 146 152 #endif 153 154 ConsEmpty: if ( !cons`isEmpty ) { 155 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 156 __cons_handoff( chan, elem ); 157 unlock( mutex_lock ); 158 return true; 159 } 160 147 161 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 162 163 __buf_insert( chan, elem ); 149 164 unlock( mutex_lock ); 150 165 return true; … … 157 172 // handles closed case of insert routine 158 173 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };174 channel_closed except{ &channel_closed_vt, &elem, &chan }; 160 175 throwResume except; // throw closed resumption 161 176 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 182 197 } 183 198 184 // have to check for the zero size channel case185 if ( size == 0 &&!cons`isEmpty ) {186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));187 wake_one( cons);188 unlock( mutex_lock ); 189 return true;199 // buffer count must be zero if cons are blocked (also handles zero-size case) 200 ConsEmpty: if ( !cons`isEmpty ) { 201 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 202 __cons_handoff( chan, elem ); 203 unlock( mutex_lock ); 204 return; 190 205 } 191 206 … … 202 217 } // if 203 218 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 219 __buf_insert( chan, elem ); 220 unlock( mutex_lock ); 221 } 222 223 // does the buffer remove and potentially does waiting producer work 224 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 225 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 216 226 count -= 1; 217 227 front = (front + 1) % size; 218 }219 220 // does the buffer remove and potentially does waiting producer work221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {222 __buf_remove( chan, retval );223 228 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 229 if ( !__handle_waituntil_OR( prods ) ) return; 230 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 225 231 wake_one( prods ); 226 232 } … … 233 239 operations++; 234 240 #endif 241 242 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 243 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 244 __prods_handoff( chan, retval ); 245 unlock( mutex_lock ); 246 return true; 247 } 248 235 249 if ( count == 0 ) { unlock( mutex_lock ); return false; } 250 236 251 __do_remove( chan, retval ); 237 252 unlock( mutex_lock ); … … 244 259 static inline [T, bool] try_remove( channel(T) & chan ) { 245 260 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 261 bool success = __internal_try_remove( chan, retval ); 262 return [ retval, success ]; 263 } 264 265 static inline T try_remove( channel(T) & chan ) { 250 266 T retval; 251 267 __internal_try_remove( chan, retval ); … … 255 271 // handles closed case of insert routine 256 272 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{ &channel_closed_vt, 0p, &chan };273 channel_closed except{ &channel_closed_vt, 0p, &chan }; 258 274 throwResume except; // throw resumption 259 275 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 279 295 280 296 // have to check for the zero size channel case 281 if ( size == 0 && !prods`isEmpty ) {282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));283 wake_one( prods);297 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 298 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 299 __prods_handoff( chan, retval ); 284 300 unlock( mutex_lock ); 285 301 return retval; … … 287 303 288 304 // wait if buffer is empty, work will be completed by someone else 289 if ( count == 0) {305 if ( count == 0 ) { 290 306 #ifdef CHAN_STATS 291 307 blocks++; … … 299 315 // Remove from buffer 300 316 __do_remove( chan, retval ); 301 302 317 unlock( mutex_lock ); 303 318 return retval; 304 319 } 320 321 /////////////////////////////////////////////////////////////////////////////////////////// 322 // The following is support for waituntil (select) statements 323 /////////////////////////////////////////////////////////////////////////////////////////// 324 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 325 // if ( !node`isListed && !node.park_counter ) return false; // handle special OR case C_TODO: try adding this back 326 lock( mutex_lock ); 327 if ( node`isListed ) { // op wasn't performed 328 #ifdef CHAN_STATS 329 operations--; 330 #endif 331 remove( node ); 332 unlock( mutex_lock ); 333 return false; 334 } 335 unlock( mutex_lock ); 336 337 // only return true when not special OR case, not exceptional calse and status is SAT 338 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT; 339 } 340 341 // type used by select statement to capture a chan read as the selected operation 342 struct chan_read { 343 T & ret; 344 channel(T) & chan; 345 }; 346 347 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 348 &cr.chan = &chan; 349 &cr.ret = &ret; 350 } 351 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 352 353 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 354 __closed_remove( chan, ret ); 355 // if we get here then the insert succeeded 356 __make_select_node_available( node ); 357 } 358 359 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) { 360 lock( mutex_lock ); 361 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 362 363 #ifdef CHAN_STATS 364 if ( !closed ) operations++; 365 #endif 366 367 if ( !node.park_counter ) { 368 // are we special case OR and front of cons is also special case OR 369 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) { 370 if ( !__make_select_node_pending( node ) ) { 371 unlock( mutex_lock ); 372 return false; 373 } 374 375 if ( __handle_waituntil_OR( prods ) ) { 376 __prods_handoff( chan, ret ); 377 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 378 unlock( mutex_lock ); 379 return true; 380 } 381 __make_select_node_unsat( node ); 382 } 383 // check if we can complete operation. If so race to establish winner in special OR case 384 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) { 385 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 386 unlock( mutex_lock ); 387 return false; 388 } 389 } 390 } 391 392 if ( unlikely(closed) ) { 393 unlock( mutex_lock ); 394 __handle_select_closed_read( this, node ); 395 return true; 396 } 397 398 // have to check for the zero size channel case 399 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 400 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 401 __prods_handoff( chan, ret ); 402 __set_avail_then_unlock( node, mutex_lock ); 403 return true; 404 } 405 406 // wait if buffer is empty, work will be completed by someone else 407 if ( count == 0 ) { 408 #ifdef CHAN_STATS 409 blocks++; 410 #endif 411 412 insert_last( cons, node ); 413 unlock( mutex_lock ); 414 return false; 415 } 416 417 // Remove from buffer 418 __do_remove( chan, ret ); 419 __set_avail_then_unlock( node, mutex_lock ); 420 return true; 421 } 422 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 423 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 424 if ( node.extra == 0p ) // check if woken up due to closed channel 425 __closed_remove( chan, ret ); 426 // This is only reachable if not closed or closed exception was handled 427 return true; 428 } 429 430 // type used by select statement to capture a chan write as the selected operation 431 struct chan_write { 432 T elem; 433 channel(T) & chan; 434 }; 435 436 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 437 &cw.chan = &chan; 438 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 439 } 440 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 441 442 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 443 __closed_insert( chan, elem ); 444 // if we get here then the insert succeeded 445 __make_select_node_available( node ); 446 } 447 448 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) { 449 lock( mutex_lock ); 450 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 451 452 #ifdef CHAN_STATS 453 if ( !closed ) operations++; 454 #endif 455 456 // special OR case handling 457 if ( !node.park_counter ) { 458 // are we special case OR and front of cons is also special case OR 459 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) { 460 if ( !__make_select_node_pending( node ) ) { 461 unlock( mutex_lock ); 462 return false; 463 } 464 465 if ( __handle_waituntil_OR( cons ) ) { 466 __cons_handoff( chan, elem ); 467 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 468 unlock( mutex_lock ); 469 return true; 470 } 471 __make_select_node_unsat( node ); 472 } 473 // check if we can complete operation. If so race to establish winner in special OR case 474 if ( count != size || !cons`isEmpty || unlikely(closed) ) { 475 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 476 unlock( mutex_lock ); 477 return false; 478 } 479 } 480 } 481 482 // if closed handle 483 if ( unlikely(closed) ) { 484 unlock( mutex_lock ); 485 __handle_select_closed_write( this, node ); 486 return true; 487 } 488 489 // handle blocked consumer case via handoff (buffer is implicitly empty) 490 ConsEmpty: if ( !cons`isEmpty ) { 491 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 492 __cons_handoff( chan, elem ); 493 __set_avail_then_unlock( node, mutex_lock ); 494 return true; 495 } 496 497 // insert node in list if buffer is full, work will be completed by someone else 498 if ( count == size ) { 499 #ifdef CHAN_STATS 500 blocks++; 501 #endif 502 503 insert_last( prods, node ); 504 unlock( mutex_lock ); 505 return false; 506 } // if 507 508 // otherwise carry out write either via normal insert 509 __buf_insert( chan, elem ); 510 __set_avail_then_unlock( node, mutex_lock ); 511 return true; 512 } 513 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 514 515 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 516 if ( node.extra == 0p ) // check if woken up due to closed channel 517 __closed_insert( chan, elem ); 518 519 // This is only reachable if not closed or closed exception was handled 520 return true; 521 } 522 305 523 } // forall( T ) 524 525 -
libcfa/src/concurrency/future.hfa
r6e4c44d r3982384 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 #include "locks.hfa" 21 22 22 23 //---------------------------------------------------------------------------- … … 26 27 // future_t is lockfree and uses atomics which aren't needed given we use locks here 27 28 forall( T ) { 28 // enum (int){ FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards29 // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 29 30 30 31 // temporary enum replacement … … 44 45 }; 45 46 46 // C_TODO: perhaps allow exceptions to be inserted like uC++?47 48 47 static inline { 49 48 … … 53 52 } 54 53 55 void ?{}( future(T) & this) {54 void ?{}( future(T) & this ) { 56 55 this.waiters{}; 57 56 this.state = FUTURE_EMPTY; … … 60 59 61 60 // Reset future back to original state 62 void reset( future(T) & this) with(this)61 void reset( future(T) & this ) with(this) 63 62 { 64 63 lock( lock ); … … 82 81 void _internal_flush( future(T) & this ) with(this) { 83 82 while( ! waiters`isEmpty ) { 83 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 84 break; // if handle_OR returns false then waiters is empty so break 84 85 select_node &s = try_pop_front( waiters ); 85 86 86 if ( s.race_flag == 0p ) 87 // poke in result so that woken threads do not need to reacquire any locks 88 // *(((future_node(T) &)s).my_result) = result; 87 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 89 88 copy_T( result, *(((future_node(T) &)s).my_result) ); 90 else if ( !install_select_winner( s, &this ) ) continue;91 89 92 // only unpark if future is not selected 93 // or if it is selected we only unpark if we win the race 94 unpark( s.blocked_thread ); 90 wake_one( waiters, s ); 95 91 } 96 92 } 97 93 98 94 // Fulfil the future, returns whether or not someone was unblocked 99 bool fulfil( future(T) & this, T &val ) with(this) {95 bool fulfil( future(T) & this, T val ) with(this) { 100 96 lock( lock ); 101 97 if( state != FUTURE_EMPTY ) … … 153 149 } 154 150 155 void * register_select( future(T) & this, select_node & s ) with(this) { 156 lock( lock ); 157 158 // future not ready -> insert select node and return 0p 151 bool register_select( future(T) & this, select_node & s ) with(this) { 152 lock( lock ); 153 154 // check if we can complete operation. If so race to establish winner in special OR case 155 if ( !s.park_counter && state != FUTURE_EMPTY ) { 156 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 157 unlock( lock ); 158 return false; 159 } 160 } 161 162 // future not ready -> insert select node and return 159 163 if( state == FUTURE_EMPTY ) { 160 164 insert_last( waiters, s ); 161 165 unlock( lock ); 162 return 0p; 163 } 164 165 // future ready and we won race to install it as the select winner return 1p 166 if ( install_select_winner( s, &this ) ) { 167 unlock( lock ); 168 return 1p; 169 } 170 171 unlock( lock ); 172 // future ready and we lost race to install it as the select winner 173 return 2p; 174 } 175 176 void unregister_select( future(T) & this, select_node & s ) with(this) { 166 return false; 167 } 168 169 __make_select_node_available( s ); 170 unlock( lock ); 171 return true; 172 } 173 174 bool unregister_select( future(T) & this, select_node & s ) with(this) { 175 if ( ! s`isListed ) return false; 177 176 lock( lock ); 178 177 if ( s`isListed ) remove( s ); 179 178 unlock( lock ); 179 return false; 180 180 } 181 181 182 bool on_selected( future(T) & this, select_node & node ) { return true; } 182 183 } 183 184 } … … 186 187 // These futures below do not support select statements so they may not be as useful as 'future' 187 188 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 188 // since it uses raw atomics and no locks afaik189 // since it uses raw atomics and no locks 189 190 // 190 191 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' -
libcfa/src/concurrency/invoke.h
r6e4c44d r3982384 217 217 struct __thread_user_link cltr_link; 218 218 219 // used to point to this thd's current clh node220 volatile bool * clh_node;221 222 219 struct processor * last_proc; 220 221 // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes 222 // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock 223 void * link_node; 223 224 224 225 PRNG_STATE_T random_state; // fast random numbers -
libcfa/src/concurrency/locks.cfa
r6e4c44d r3982384 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // locks. hfa -- LIBCFATHREAD7 // locks.cfa -- LIBCFATHREAD 8 8 // Runtime locks that used with the runtime thread system. 9 9 // … … 79 79 // lock is held by some other thread 80 80 if ( owner != 0p && owner != thrd ) { 81 insert_last( blocked_threads, *thrd ); 81 select_node node; 82 insert_last( blocked_threads, node ); 82 83 wait_count++; 83 84 unlock( lock ); 84 85 park( ); 85 } 86 // multi acquisition lock is held by current thread 87 else if ( owner == thrd && multi_acquisition ) { 86 return; 87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 88 88 recursion_count++; 89 unlock( lock ); 90 } 91 // lock isn't held 92 else { 89 } else { // lock isn't held 93 90 owner = thrd; 94 91 recursion_count = 1; 95 unlock( lock );96 } 92 } 93 unlock( lock ); 97 94 } 98 95 … … 117 114 } 118 115 119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) { 120 thread$ * t = &try_pop_front( blocked_threads ); 121 owner = t; 122 recursion_count = ( t ? 1 : 0 ); 123 if ( t ) wait_count--; 124 unpark( t ); 116 static inline void pop_node( blocking_lock & this ) with( this ) { 117 __handle_waituntil_OR( blocked_threads ); 118 select_node * node = &try_pop_front( blocked_threads ); 119 if ( node ) { 120 wait_count--; 121 owner = node->blocked_thread; 122 recursion_count = 1; 123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 124 wake_one( blocked_threads, *node ); 125 } else { 126 owner = 0p; 127 recursion_count = 0; 128 } 125 129 } 126 130 … … 134 138 recursion_count--; 135 139 if ( recursion_count == 0 ) { 136 pop_ and_set_new_owner( this );140 pop_node( this ); 137 141 } 138 142 unlock( lock ); … … 147 151 // lock held 148 152 if ( owner != 0p ) { 149 insert_last( blocked_threads, * t);153 insert_last( blocked_threads, *(select_node *)t->link_node ); 150 154 wait_count++; 151 unlock( lock );152 155 } 153 156 // lock not held … … 156 159 recursion_count = 1; 157 160 unpark( t ); 158 unlock( lock );159 } 160 } 161 162 size_t on_wait( blocking_lock & this ) with( this ) {161 } 162 unlock( lock ); 163 } 164 165 size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) { 163 166 lock( lock __cfaabi_dbg_ctx2 ); 164 167 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 167 170 size_t ret = recursion_count; 168 171 169 pop_and_set_new_owner( this ); 172 pop_node( this ); 173 174 select_node node; 175 active_thread()->link_node = (void *)&node; 170 176 unlock( lock ); 177 178 pre_park_then_park( pp_fn, pp_datum ); 179 171 180 return ret; 172 181 } … … 175 184 recursion_count = recursion; 176 185 } 186 187 // waituntil() support 188 bool register_select( blocking_lock & this, select_node & node ) with(this) { 189 lock( lock __cfaabi_dbg_ctx2 ); 190 thread$ * thrd = active_thread(); 191 192 // single acquisition lock is held by current thread 193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); 194 195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case 196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 197 unlock( lock ); 198 return false; 199 } 200 } 201 202 // lock is held by some other thread 203 if ( owner != 0p && owner != thrd ) { 204 insert_last( blocked_threads, node ); 205 wait_count++; 206 unlock( lock ); 207 return false; 208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 209 recursion_count++; 210 } else { // lock isn't held 211 owner = thrd; 212 recursion_count = 1; 213 } 214 215 if ( node.park_counter ) __make_select_node_available( node ); 216 unlock( lock ); 217 return true; 218 } 219 220 bool unregister_select( blocking_lock & this, select_node & node ) with(this) { 221 lock( lock __cfaabi_dbg_ctx2 ); 222 if ( node`isListed ) { 223 remove( node ); 224 wait_count--; 225 unlock( lock ); 226 return false; 227 } 228 229 if ( owner == active_thread() ) { 230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); 231 // if recursion count is zero release lock and set new owner if one is waiting 232 recursion_count--; 233 if ( recursion_count == 0 ) { 234 pop_node( this ); 235 } 236 } 237 unlock( lock ); 238 return false; 239 } 240 241 bool on_selected( blocking_lock & this, select_node & node ) { return true; } 177 242 178 243 //----------------------------------------------------------------------------- … … 311 376 int counter( condition_variable(L) & this ) with(this) { return count; } 312 377 313 static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {378 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) { 314 379 // add info_thread to waiting queue 315 380 insert_last( blocked_threads, *i ); 316 381 count++; 317 size_t recursion_count = 0; 318 if (i->lock) { 319 // if lock was passed get recursion count to reset to after waking thread 320 recursion_count = on_wait( *i->lock ); 321 } 322 return recursion_count; 323 } 382 } 383 384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) { 385 size_t recursion_count = 0; 386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread 387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks 388 else 389 pre_park_then_park( pp_fn, pp_datum ); 390 return recursion_count; 391 } 392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); } 324 393 325 394 // helper for wait()'s' with no timeout 326 395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { 327 396 lock( lock __cfaabi_dbg_ctx2 ); 328 size_t recursion_count = queue_and_get_recursion(this, &i);397 enqueue_thread( this, &i ); 329 398 unlock( lock ); 330 399 331 400 // blocks here 332 park();401 size_t recursion_count = block_and_get_recursion( i ); 333 402 334 403 // resets recursion count here after waking 335 if ( i.lock) on_wakeup(*i.lock, recursion_count);404 if ( i.lock ) on_wakeup( *i.lock, recursion_count ); 336 405 } 337 406 … … 340 409 queue_info_thread( this, i ); 341 410 411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); } 412 342 413 // helper for wait()'s' with a timeout 343 414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 344 415 lock( lock __cfaabi_dbg_ctx2 ); 345 size_t recursion_count = queue_and_get_recursion(this, &info);416 enqueue_thread( this, &info ); 346 417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 347 418 unlock( lock ); 348 419 349 // registers alarm outside cond lock to avoid deadlock 350 register_self( &node_wrap.alarm_node ); 351 352 // blocks here 353 park(); 420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 422 // park(); 354 423 355 424 // unregisters alarm so it doesn't go off if this happens first … … 357 426 358 427 // resets recursion count here after waking 359 if ( info.lock) on_wakeup(*info.lock, recursion_count);428 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 360 429 } 361 430 … … 417 486 info_thread( L ) i = { active_thread(), info, &l }; 418 487 insert_last( blocked_threads, i ); 419 size_t recursion_count = on_wait( *i.lock );420 park( );488 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here 489 // park( ); 421 490 on_wakeup(*i.lock, recursion_count); 422 491 } … … 459 528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 460 529 461 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {462 // add info_thread to waiting queue463 insert_last( blocked_threads, *i );464 size_t recursion_count = 0;465 recursion_count = on_wait( *i->lock );466 return recursion_count;467 }468 469 530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 470 531 lock( lock __cfaabi_dbg_ctx2 ); 471 size_t recursion_count = queue_and_get_recursion(this, &info);532 insert_last( blocked_threads, info ); 472 533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 473 534 unlock( lock ); 474 535 475 // registers alarm outside cond lock to avoid deadlock 476 register_self( &node_wrap.alarm_node ); 477 478 // blocks here 479 park(); 480 481 // unregisters alarm so it doesn't go off if this happens first 536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 538 539 // unregisters alarm so it doesn't go off if signal happens first 482 540 unregister_self( &node_wrap.alarm_node ); 483 541 484 542 // resets recursion count here after waking 485 if ( info.lock) on_wakeup(*info.lock, recursion_count);543 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 486 544 } 487 545 … … 493 551 lock( lock __cfaabi_dbg_ctx2 ); 494 552 info_thread( L ) i = { active_thread(), info, &l }; 495 size_t recursion_count = queue_and_get_recursion(this, &i); 496 unlock( lock ); 497 park( ); 498 on_wakeup(*i.lock, recursion_count); 553 insert_last( blocked_threads, i ); 554 unlock( lock ); 555 556 // blocks here 557 size_t recursion_count = block_and_get_recursion( i ); 558 559 on_wakeup( *i.lock, recursion_count ); 499 560 } 500 561 … … 584 645 return thrd != 0p; 585 646 } 647 -
libcfa/src/concurrency/locks.hfa
r6e4c44d r3982384 30 30 #include "time.hfa" 31 31 32 #include "select.hfa" 33 32 34 #include <fstream.hfa> 33 35 … … 37 39 #include <unistd.h> 38 40 39 // C_TODO: cleanup this and locks.cfa 40 // - appropriate separation of interface and impl 41 // - clean up unused/unneeded locks 42 // - change messy big blocking lock from inheritance to composition to remove need for flags 41 typedef void (*__cfa_pre_park)( void * ); 42 43 static inline void pre_park_noop( void * ) {} 44 45 //----------------------------------------------------------------------------- 46 // is_blocking_lock 47 forall( L & | sized(L) ) 48 trait is_blocking_lock { 49 // For synchronization locks to use when acquiring 50 void on_notify( L &, struct thread$ * ); 51 52 // For synchronization locks to use when releasing 53 size_t on_wait( L &, __cfa_pre_park pp_fn, void * pp_datum ); 54 55 // to set recursion count after getting signalled; 56 void on_wakeup( L &, size_t recursion ); 57 }; 58 59 static inline void pre_park_then_park( __cfa_pre_park pp_fn, void * pp_datum ) { 60 pp_fn( pp_datum ); 61 park(); 62 } 63 64 // macros for default routine impls for is_blocking_lock trait that do not wait-morph 65 66 #define DEFAULT_ON_NOTIFY( lock_type ) \ 67 static inline void on_notify( lock_type & this, thread$ * t ){ unpark(t); } 68 69 #define DEFAULT_ON_WAIT( lock_type ) \ 70 static inline size_t on_wait( lock_type & this, __cfa_pre_park pp_fn, void * pp_datum ) { \ 71 unlock( this ); \ 72 pre_park_then_park( pp_fn, pp_datum ); \ 73 return 0; \ 74 } 75 76 // on_wakeup impl if lock should be reacquired after waking up 77 #define DEFAULT_ON_WAKEUP_REACQ( lock_type ) \ 78 static inline void on_wakeup( lock_type & this, size_t recursion ) { lock( this ); } 79 80 // on_wakeup impl if lock will not be reacquired after waking up 81 #define DEFAULT_ON_WAKEUP_NO_REACQ( lock_type ) \ 82 static inline void on_wakeup( lock_type & this, size_t recursion ) {} 83 84 43 85 44 86 //----------------------------------------------------------------------------- … … 67 109 static inline bool try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); } 68 110 static inline void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 69 static inline size_t on_wait ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this); }111 static inline size_t on_wait ( single_acquisition_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 70 112 static inline void on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 71 113 static inline void on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 114 static inline bool register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 115 static inline bool unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 116 static inline bool on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 72 117 73 118 //---------- … … 81 126 static inline bool try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); } 82 127 static inline void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); } 83 static inline size_t on_wait ( owner_lock & this ) { return on_wait ( (blocking_lock &)this); }128 static inline size_t on_wait ( owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 84 129 static inline void on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 85 130 static inline void on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 131 static inline bool register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 132 static inline bool unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 133 static inline bool on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 86 134 87 135 //----------------------------------------------------------------------------- … … 156 204 // - Kernel thd blocking alternative to the spinlock 157 205 // - No ownership (will deadlock on reacq) 206 // - no reacq on wakeup 158 207 struct futex_mutex { 159 208 // lock state any state other than UNLOCKED is locked … … 169 218 } 170 219 171 static inline void 172 173 static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) {220 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; } 221 222 static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) { 174 223 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 175 224 } 176 225 177 static inline int internal_exchange( futex_mutex & this) with(this) {226 static inline int internal_exchange( futex_mutex & this ) with(this) { 178 227 return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE); 179 228 } 180 229 181 230 // if this is called recursively IT WILL DEADLOCK!!!!! 182 static inline void lock( futex_mutex & this) with(this) {231 static inline void lock( futex_mutex & this ) with(this) { 183 232 int state; 184 233 … … 190 239 for (int i = 0; i < spin; i++) Pause(); 191 240 } 192 193 // // no contention try to acquire194 // if (internal_try_lock(this, state)) return;195 241 196 242 // if not in contended state, set to be in contended state … … 212 258 } 213 259 214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); } 215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;} 216 217 // to set recursion count after getting signalled; 218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 260 DEFAULT_ON_NOTIFY( futex_mutex ) 261 DEFAULT_ON_WAIT( futex_mutex ) 262 DEFAULT_ON_WAKEUP_NO_REACQ( futex_mutex ) 219 263 220 264 //----------------------------------------------------------------------------- … … 232 276 int val; 233 277 }; 234 235 278 static inline void ?{}( go_mutex & this ) with(this) { val = 0; } 279 // static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted 280 // static inline void ?=?( go_mutex & this, go_mutex this2 ) = void; 236 281 237 282 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) { … … 244 289 245 290 // if this is called recursively IT WILL DEADLOCK!!!!! 246 static inline void lock( go_mutex & this) with(this) {291 static inline void lock( go_mutex & this ) with( this ) { 247 292 int state, init_state; 248 293 … … 255 300 while( !val ) { // lock unlocked 256 301 state = 0; 257 if ( internal_try_lock(this, state, init_state)) return;302 if ( internal_try_lock( this, state, init_state ) ) return; 258 303 } 259 304 for (int i = 0; i < 30; i++) Pause(); … … 262 307 while( !val ) { // lock unlocked 263 308 state = 0; 264 if ( internal_try_lock(this, state, init_state)) return;309 if ( internal_try_lock( this, state, init_state ) ) return; 265 310 } 266 311 sched_yield(); 267 312 268 313 // if not in contended state, set to be in contended state 269 state = internal_exchange( this, 2);314 state = internal_exchange( this, 2 ); 270 315 if ( !state ) return; // state == 0 271 316 init_state = 2; 272 futex( (int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK317 futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK 273 318 } 274 319 } … … 276 321 static inline void unlock( go_mutex & this ) with(this) { 277 322 // if uncontended do atomic unlock and then return 278 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;323 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return; 279 324 280 325 // otherwise threads are blocked so we must wake one 281 futex((int *)&val, FUTEX_WAKE, 1); 282 } 283 284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); } 285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;} 286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {} 287 288 //----------------------------------------------------------------------------- 289 // CLH Spinlock 290 // - No recursive acquisition 291 // - Needs to be released by owner 292 293 struct clh_lock { 294 volatile bool * volatile tail; 295 volatile bool * volatile head; 296 }; 297 298 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; } 299 static inline void ^?{}( clh_lock & this ) { free(this.tail); } 300 301 static inline void lock(clh_lock & l) { 302 thread$ * curr_thd = active_thread(); 303 *(curr_thd->clh_node) = false; 304 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST); 305 while(!__atomic_load_n(prev, __ATOMIC_SEQ_CST)) Pause(); 306 __atomic_store_n((bool **)(&l.head), (bool *)curr_thd->clh_node, __ATOMIC_SEQ_CST); 307 curr_thd->clh_node = prev; 308 } 309 310 static inline void unlock(clh_lock & l) { 311 __atomic_store_n((bool *)(l.head), true, __ATOMIC_SEQ_CST); 312 } 313 314 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); } 315 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; } 316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); } 326 futex( (int *)&val, FUTEX_WAKE, 1 ); 327 } 328 329 DEFAULT_ON_NOTIFY( go_mutex ) 330 DEFAULT_ON_WAIT( go_mutex ) 331 DEFAULT_ON_WAKEUP_NO_REACQ( go_mutex ) 317 332 318 333 //----------------------------------------------------------------------------- … … 334 349 this.lock_value = 0; 335 350 } 351 static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 352 static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 336 353 337 354 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 338 355 339 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {356 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) { 340 357 return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); 341 358 } 342 359 343 static inline bool try_lock( exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }344 345 static inline bool try_lock_contention( exp_backoff_then_block_lock & this) with(this) {346 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE);347 } 348 349 static inline bool block( exp_backoff_then_block_lock & this) with(this) {360 static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); } 361 362 static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) { 363 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE ); 364 } 365 366 static inline bool block( exp_backoff_then_block_lock & this ) with(this) { 350 367 lock( spinlock __cfaabi_dbg_ctx2 ); 351 368 if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) { … … 359 376 } 360 377 361 static inline void lock( exp_backoff_then_block_lock & this) with(this) {378 static inline void lock( exp_backoff_then_block_lock & this ) with(this) { 362 379 size_t compare_val = 0; 363 380 int spin = 4; … … 378 395 } 379 396 380 static inline void unlock( exp_backoff_then_block_lock & this) with(this) {397 static inline void unlock( exp_backoff_then_block_lock & this ) with(this) { 381 398 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 382 399 lock( spinlock __cfaabi_dbg_ctx2 ); … … 386 403 } 387 404 388 static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 389 static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; } 390 static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); } 405 DEFAULT_ON_NOTIFY( exp_backoff_then_block_lock ) 406 DEFAULT_ON_WAIT( exp_backoff_then_block_lock ) 407 DEFAULT_ON_WAKEUP_REACQ( exp_backoff_then_block_lock ) 391 408 392 409 //----------------------------------------------------------------------------- … … 418 435 419 436 // if this is called recursively IT WILL DEADLOCK!!!!! 420 static inline void lock( fast_block_lock & this) with(this) {437 static inline void lock( fast_block_lock & this ) with(this) { 421 438 lock( lock __cfaabi_dbg_ctx2 ); 422 439 if ( held ) { … … 430 447 } 431 448 432 static inline void unlock( fast_block_lock & this) with(this) {449 static inline void unlock( fast_block_lock & this ) with(this) { 433 450 lock( lock __cfaabi_dbg_ctx2 ); 434 451 /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this ); … … 439 456 } 440 457 441 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {458 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) { 442 459 lock( lock __cfaabi_dbg_ctx2 ); 443 460 insert_last( blocked_threads, *t ); 444 461 unlock( lock ); 445 462 } 446 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 447 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } 463 DEFAULT_ON_WAIT( fast_block_lock ) 464 DEFAULT_ON_WAKEUP_NO_REACQ( fast_block_lock ) 448 465 449 466 //----------------------------------------------------------------------------- … … 456 473 struct simple_owner_lock { 457 474 // List of blocked threads 458 dlist( thread$) blocked_threads;475 dlist( select_node ) blocked_threads; 459 476 460 477 // Spin lock used for mutual exclusion … … 477 494 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void; 478 495 479 static inline void lock( simple_owner_lock & this) with(this) {480 if ( owner == active_thread()) {496 static inline void lock( simple_owner_lock & this ) with(this) { 497 if ( owner == active_thread() ) { 481 498 recursion_count++; 482 499 return; … … 484 501 lock( lock __cfaabi_dbg_ctx2 ); 485 502 486 if (owner != 0p) { 487 insert_last( blocked_threads, *active_thread() ); 503 if ( owner != 0p ) { 504 select_node node; 505 insert_last( blocked_threads, node ); 488 506 unlock( lock ); 489 507 park( ); … … 495 513 } 496 514 497 // TODO: fix duplicate def issue and bring this back 498 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) { 499 // thread$ * t = &try_pop_front( blocked_threads ); 500 // owner = t; 501 // recursion_count = ( t ? 1 : 0 ); 502 // unpark( t ); 503 // } 504 505 static inline void unlock(simple_owner_lock & this) with(this) { 515 static inline void pop_node( simple_owner_lock & this ) with(this) { 516 __handle_waituntil_OR( blocked_threads ); 517 select_node * node = &try_pop_front( blocked_threads ); 518 if ( node ) { 519 owner = node->blocked_thread; 520 recursion_count = 1; 521 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 522 wake_one( blocked_threads, *node ); 523 } else { 524 owner = 0p; 525 recursion_count = 0; 526 } 527 } 528 529 static inline void unlock( simple_owner_lock & this ) with(this) { 506 530 lock( lock __cfaabi_dbg_ctx2 ); 507 531 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 510 534 recursion_count--; 511 535 if ( recursion_count == 0 ) { 512 // pop_and_set_new_owner( this ); 513 thread$ * t = &try_pop_front( blocked_threads ); 514 owner = t; 515 recursion_count = ( t ? 1 : 0 ); 516 unpark( t ); 536 pop_node( this ); 517 537 } 518 538 unlock( lock ); 519 539 } 520 540 521 static inline void on_notify( simple_owner_lock & this, structthread$ * t ) with(this) {541 static inline void on_notify( simple_owner_lock & this, thread$ * t ) with(this) { 522 542 lock( lock __cfaabi_dbg_ctx2 ); 523 543 // lock held 524 544 if ( owner != 0p ) { 525 insert_last( blocked_threads, * t);545 insert_last( blocked_threads, *(select_node *)t->link_node ); 526 546 } 527 547 // lock not held … … 534 554 } 535 555 536 static inline size_t on_wait( simple_owner_lock & this) with(this) {556 static inline size_t on_wait( simple_owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with(this) { 537 557 lock( lock __cfaabi_dbg_ctx2 ); 538 558 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 541 561 size_t ret = recursion_count; 542 562 543 // pop_and_set_new_owner( this ); 544 545 thread$ * t = &try_pop_front( blocked_threads ); 546 owner = t; 547 recursion_count = ( t ? 1 : 0 ); 548 unpark( t ); 549 563 pop_node( this ); 564 565 select_node node; 566 active_thread()->link_node = (void *)&node; 550 567 unlock( lock ); 568 569 pre_park_then_park( pp_fn, pp_datum ); 570 551 571 return ret; 552 572 } 553 573 554 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 574 static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 575 576 // waituntil() support 577 static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) { 578 lock( lock __cfaabi_dbg_ctx2 ); 579 580 // check if we can complete operation. If so race to establish winner in special OR case 581 if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) { 582 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 583 unlock( lock ); 584 return false; 585 } 586 } 587 588 if ( owner == active_thread() ) { 589 recursion_count++; 590 if ( node.park_counter ) __make_select_node_available( node ); 591 unlock( lock ); 592 return true; 593 } 594 595 if ( owner != 0p ) { 596 insert_last( blocked_threads, node ); 597 unlock( lock ); 598 return false; 599 } 600 601 owner = active_thread(); 602 recursion_count = 1; 603 604 if ( node.park_counter ) __make_select_node_available( node ); 605 unlock( lock ); 606 return true; 607 } 608 609 static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) { 610 lock( lock __cfaabi_dbg_ctx2 ); 611 if ( node`isListed ) { 612 remove( node ); 613 unlock( lock ); 614 return false; 615 } 616 617 if ( owner == active_thread() ) { 618 recursion_count--; 619 if ( recursion_count == 0 ) { 620 pop_node( this ); 621 } 622 } 623 unlock( lock ); 624 return false; 625 } 626 627 static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; } 628 555 629 556 630 //----------------------------------------------------------------------------- … … 578 652 579 653 // if this is called recursively IT WILL DEADLOCK! 580 static inline void lock( spin_queue_lock & this) with(this) {654 static inline void lock( spin_queue_lock & this ) with(this) { 581 655 mcs_spin_node node; 582 656 lock( lock, node ); … … 586 660 } 587 661 588 static inline void unlock( spin_queue_lock & this) with(this) {662 static inline void unlock( spin_queue_lock & this ) with(this) { 589 663 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 590 664 } 591 665 592 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { 593 unpark(t); 594 } 595 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 596 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); } 597 666 DEFAULT_ON_NOTIFY( spin_queue_lock ) 667 DEFAULT_ON_WAIT( spin_queue_lock ) 668 DEFAULT_ON_WAKEUP_REACQ( spin_queue_lock ) 598 669 599 670 //----------------------------------------------------------------------------- … … 621 692 622 693 // if this is called recursively IT WILL DEADLOCK!!!!! 623 static inline void lock( mcs_block_spin_lock & this) with(this) {694 static inline void lock( mcs_block_spin_lock & this ) with(this) { 624 695 mcs_node node; 625 696 lock( lock, node ); … … 633 704 } 634 705 635 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 636 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 637 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); } 706 DEFAULT_ON_NOTIFY( mcs_block_spin_lock ) 707 DEFAULT_ON_WAIT( mcs_block_spin_lock ) 708 DEFAULT_ON_WAKEUP_REACQ( mcs_block_spin_lock ) 638 709 639 710 //----------------------------------------------------------------------------- … … 661 732 662 733 // if this is called recursively IT WILL DEADLOCK!!!!! 663 static inline void lock( block_spin_lock & this) with(this) {734 static inline void lock( block_spin_lock & this ) with(this) { 664 735 lock( lock ); 665 736 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 668 739 } 669 740 670 static inline void unlock( block_spin_lock & this) with(this) {741 static inline void unlock( block_spin_lock & this ) with(this) { 671 742 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 672 743 } 673 744 674 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {745 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) { 675 746 // first we acquire internal fast_block_lock 676 747 lock( lock __cfaabi_dbg_ctx2 ); … … 686 757 unpark(t); 687 758 } 688 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 689 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {759 DEFAULT_ON_WAIT( block_spin_lock ) 760 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) { 690 761 // now we acquire the entire block_spin_lock upon waking up 691 762 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 693 764 unlock( lock ); // Now we release the internal fast_spin_lock 694 765 } 695 696 //-----------------------------------------------------------------------------697 // is_blocking_lock698 forall( L & | sized(L) )699 trait is_blocking_lock {700 // For synchronization locks to use when acquiring701 void on_notify( L &, struct thread$ * );702 703 // For synchronization locks to use when releasing704 size_t on_wait( L & );705 706 // to set recursion count after getting signalled;707 void on_wakeup( L &, size_t recursion );708 };709 766 710 767 //----------------------------------------------------------------------------- … … 714 771 forall(L & | is_blocking_lock(L)) { 715 772 struct info_thread; 716 717 // // for use by sequence718 // info_thread(L) *& Back( info_thread(L) * this );719 // info_thread(L) *& Next( info_thread(L) * this );720 773 } 721 774 -
libcfa/src/concurrency/mutex_stmt.hfa
r6e4c44d r3982384 15 15 }; 16 16 17 18 17 struct __mutex_stmt_lock_guard { 19 18 void ** lockarr; … … 30 29 31 30 forall(L & | is_lock(L)) { 32 33 struct scoped_lock { 34 L * internal_lock; 35 }; 36 37 static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) { 38 this.internal_lock = &internal_lock; 39 lock(internal_lock); 40 } 41 42 static inline void ^?{}( scoped_lock(L) & this ) with(this) { 43 unlock(*internal_lock); 44 } 45 46 static inline void * __get_mutexstmt_lock_ptr( L & this ) { 47 return &this; 48 } 49 50 static inline L __get_mutexstmt_lock_type( L & this ); 51 52 static inline L __get_mutexstmt_lock_type( L * this ); 31 static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; } 32 static inline L __get_mutexstmt_lock_type( L & this ) {} 33 static inline L __get_mutexstmt_lock_type( L * this ) {} 53 34 } -
libcfa/src/concurrency/preemption.cfa
r6e4c44d r3982384 117 117 __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn); 118 118 Duration period = node->period; 119 if( period == 0 ) {119 if( period == 0 ) { 120 120 node->set = false; // Node is one-shot, just mark it as not pending 121 121 } -
libcfa/src/concurrency/select.hfa
r6e4c44d r3982384 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2023 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include "containers/list.hfa" 4 #include <stdint.h>5 #include <kernel.hfa>6 #include <locks.hfa>20 #include "alarm.hfa" 21 #include "kernel.hfa" 22 #include "time.hfa" 7 23 24 struct select_node; 25 26 // node status 27 static const unsigned long int __SELECT_UNSAT = 0; 28 static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case 29 static const unsigned long int __SELECT_SAT = 2; 30 static const unsigned long int __SELECT_RUN = 3; 31 32 // these are used inside the compiler to aid in code generation 33 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; } 34 static inline void __CFA_maybe_park( int * park_counter ) { 35 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 ) 36 park(); 37 } 38 39 // node used for coordinating waituntil synchronization 8 40 struct select_node { 41 int * park_counter; // If this is 0p then the node is in a special OR case waituntil 42 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil 43 44 void * extra; // used to store arbitrary data needed by some primitives 45 9 46 thread$ * blocked_thread; 10 void ** race_flag;11 47 inline dlink(select_node); 12 48 }; 13 49 P9_EMBEDDED( select_node, dlink(select_node) ) 14 50 15 void ?{}( select_node & this ) { 16 this.blocked_thread = 0p; 17 this.race_flag = 0p; 51 static inline void ?{}( select_node & this ) { 52 this.blocked_thread = active_thread(); 53 this.clause_status = 0p; 54 this.park_counter = 0p; 55 this.extra = 0p; 18 56 } 19 57 20 void ?{}( select_node & this, thread$ * blocked_thread ) {58 static inline void ?{}( select_node & this, thread$ * blocked_thread ) { 21 59 this.blocked_thread = blocked_thread; 22 this.race_flag = 0p; 60 this.clause_status = 0p; 61 this.park_counter = 0p; 62 this.extra = 0p; 23 63 } 24 64 25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag) {65 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) { 26 66 this.blocked_thread = blocked_thread; 27 this.race_flag = race_flag; 67 this.clause_status = 0p; 68 this.park_counter = 0p; 69 this.extra = extra; 28 70 } 71 static inline void ^?{}( select_node & this ) {} 29 72 30 void ^?{}( select_node & this ) {} 73 // this is used inside the compiler to aid in code generation 74 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; } 31 75 76 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 77 static inline bool __select_node_else_race( select_node & this ) with( this ) { 78 unsigned long int cmp_status = __SELECT_UNSAT; 79 return *clause_status == 0 80 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 81 } 32 82 33 83 //----------------------------------------------------------------------------- 34 84 // is_selectable 35 trait is_selectable(T & | sized(T)) { 36 // For registering a select on a selectable concurrency primitive 37 // return 0p if primitive not accessible yet 38 // return 1p if primitive gets acquired 39 // return 2p if primitive is accessible but some other primitive won the race 40 // C_TODO: add enum for return values 41 void * register_select( T &, select_node & ); 85 forall(T & | sized(T)) 86 trait is_selectable { 87 // For registering a select stmt on a selectable concurrency primitive 88 // Returns bool that indicates if operation is already SAT 89 bool register_select( T &, select_node & ); 42 90 43 void unregister_select( T &, select_node & ); 91 // For unregistering a select stmt on a selectable concurrency primitive 92 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 93 bool unregister_select( T &, select_node & ); 94 95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 96 // passed as an arg to this routine 97 // If on_selected returns false, the statement is not run, if it returns true it is run. 98 bool on_selected( T &, select_node & ); 44 99 }; 45 100 46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) { 47 // temporary needed for atomic instruction 48 void * cmp_flag = 0p; 49 50 // if we dont win the selector race we need to potentially 51 // ignore this node and move to the next one so we return accordingly 52 if ( *race_flag != 0p || 53 !__atomic_compare_exchange_n( 54 race_flag, 55 &cmp_flag, 56 primitive_ptr, 57 false, 58 __ATOMIC_SEQ_CST, 59 __ATOMIC_SEQ_CST 60 ) 61 ) return false; // lost race and some other node triggered select 62 return true; // won race so this node is what the select proceeds with 101 //============================================================================================= 102 // Waituntil Helpers 103 //============================================================================================= 104 105 // used for the 2-stage avail needed by the special OR case 106 static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) { 107 /* paranoid */ verify( park_counter == 0p ); 108 /* paranoid */ verify( clause_status != 0p ); 109 110 unsigned long int cmp_status = __SELECT_UNSAT; 111 while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 112 if ( cmp_status != __SELECT_PENDING ) return false; 113 cmp_status = __SELECT_UNSAT; 114 } 115 return true; 63 116 } 117 118 static inline void __make_select_node_unsat( select_node & this ) with( this ) { 119 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST ); 120 } 121 static inline void __make_select_node_sat( select_node & this ) with( this ) { 122 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST ); 123 } 124 125 static inline bool __make_select_node_pending( select_node & this ) with( this ) { 126 return __mark_select_node( this, __SELECT_PENDING ); 127 } 128 129 // when a primitive becomes available it calls the following routine on it's node to update the select state: 130 // return true if we want to unpark the thd 131 static inline bool __make_select_node_available( select_node & this ) with( this ) { 132 /* paranoid */ verify( clause_status != 0p ); 133 if( !park_counter ) 134 return __mark_select_node( this, (unsigned long int)&this ); 135 136 unsigned long int cmp_status = __SELECT_UNSAT; 137 138 return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case 139 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write 140 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST); 141 } 142 143 // Handles the special OR case of the waituntil statement 144 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE 145 // performing the operation since if we lose the race the operation should not be performed as it will be lost 146 // Returns true if execution can continue normally and false if the queue has now been drained 147 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 148 if ( queue`isEmpty ) return false; 149 if ( queue`first.clause_status && !queue`first.park_counter ) { 150 while ( !queue`isEmpty ) { 151 // if node not a special OR case or if we win the special OR case race break 152 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) 153 return true; 154 // otherwise we lost the special OR race so discard node 155 try_pop_front( queue ); 156 } 157 return false; 158 } 159 return true; 160 } 161 162 // wake one thread from the list 163 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) { 164 if ( !popped.clause_status // normal case, node is not a select node 165 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available 166 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread 167 unpark( popped.blocked_thread ); 168 } 169 170 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); } 171 172 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) { 173 this.blocked_thread = active_thread(); 174 this.clause_status = clause_status; 175 this.park_counter = park_counter; 176 } 177 178 // waituntil ( timeout( ... ) ) support 179 struct select_timeout_node { 180 alarm_node_t a_node; 181 select_node * s_node; 182 }; 183 void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback ); 184 void ^?{}( select_timeout_node & this ); 185 void timeout_handler_select_cast( alarm_node_t & node ); 186 187 // Selectable trait routines 188 bool register_select( select_timeout_node & this, select_node & node ); 189 bool unregister_select( select_timeout_node & this, select_node & node ); 190 bool on_selected( select_timeout_node & this, select_node & node ); 191 192 // Gateway routines to waituntil on duration 193 select_timeout_node timeout( Duration duration ); 194 select_timeout_node sleep( Duration duration ); -
libcfa/src/concurrency/thread.cfa
r6e4c44d r3982384 53 53 preferred = ready_queue_new_preferred(); 54 54 last_proc = 0p; 55 link_node = 0p; 55 56 PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() ); 56 57 #if defined( __CFA_WITH_VERIFY__ ) … … 59 60 #endif 60 61 61 clh_node = malloc( );62 *clh_node = false;63 64 62 doregister(curr_cluster, this); 65 63 monitors{ &self_mon_p, 1, (fptr_t)0 }; … … 70 68 canary = 0xDEADDEADDEADDEADp; 71 69 #endif 72 free(clh_node);73 70 unregister(curr_cluster, this); 74 71 ^self_cor{};
Note:
See TracChangeset
for help on using the changeset viewer.