- Timestamp:
- May 29, 2023, 11:44:29 AM (3 years ago)
- Branches:
- ADT
- Children:
- fa2c005
- Parents:
- 3a513d89 (diff), 2b78949 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa/src
- Files:
-
- 1 added
- 12 edited
-
Makefile.am (modified) (2 diffs)
-
bits/weakso_locks.cfa (modified) (2 diffs)
-
bits/weakso_locks.hfa (modified) (4 diffs)
-
concurrency/channel.hfa (modified) (21 diffs)
-
concurrency/future.hfa (modified) (8 diffs)
-
concurrency/invoke.h (modified) (1 diff)
-
concurrency/locks.cfa (modified) (15 diffs)
-
concurrency/locks.hfa (modified) (39 diffs)
-
concurrency/mutex_stmt.hfa (modified) (2 diffs)
-
concurrency/preemption.cfa (modified) (1 diff)
-
concurrency/select.cfa (added)
-
concurrency/select.hfa (modified) (1 diff)
-
concurrency/thread.cfa (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/Makefile.am
r3a513d89 r044ae62 115 115 concurrency/kernel/fwd.hfa \ 116 116 concurrency/mutex_stmt.hfa \ 117 concurrency/select.hfa \118 117 concurrency/channel.hfa \ 119 118 concurrency/actor.hfa … … 128 127 concurrency/monitor.hfa \ 129 128 concurrency/mutex.hfa \ 129 concurrency/select.hfa \ 130 130 concurrency/thread.hfa 131 131 -
libcfa/src/bits/weakso_locks.cfa
r3a513d89 r044ae62 15 15 // Update Count : 16 16 // 17 18 17 #include "bits/weakso_locks.hfa" 19 20 18 #pragma GCC visibility push(default) 21 19 … … 27 25 void unlock( blocking_lock & ) {} 28 26 void on_notify( blocking_lock &, struct thread$ * ) {} 29 size_t on_wait( blocking_lock & ) { return 0; }27 size_t on_wait( blocking_lock &, void (*pp_fn)( void * ), void * pp_datum ) { return 0; } 30 28 void on_wakeup( blocking_lock &, size_t ) {} 31 29 size_t wait_count( blocking_lock & ) { return 0; } 30 bool register_select( blocking_lock & this, select_node & node ) { return false; } 31 bool unregister_select( blocking_lock & this, select_node & node ) { return false; } 32 bool on_selected( blocking_lock & this, select_node & node ) { return true; } 33 -
libcfa/src/bits/weakso_locks.hfa
r3a513d89 r044ae62 23 23 #include "containers/list.hfa" 24 24 25 struct thread$;25 struct select_node; 26 26 27 27 //----------------------------------------------------------------------------- … … 32 32 33 33 // List of blocked threads 34 dlist( thread$) blocked_threads;34 dlist( select_node ) blocked_threads; 35 35 36 36 // Count of current blocked threads … … 57 57 void unlock( blocking_lock & this ) OPTIONAL_THREAD; 58 58 void on_notify( blocking_lock & this, struct thread$ * t ) OPTIONAL_THREAD; 59 size_t on_wait( blocking_lock & this ) OPTIONAL_THREAD;59 size_t on_wait( blocking_lock & this, void (*pp_fn)( void * ), void * pp_datum ) OPTIONAL_THREAD; 60 60 void on_wakeup( blocking_lock & this, size_t ) OPTIONAL_THREAD; 61 61 size_t wait_count( blocking_lock & this ) OPTIONAL_THREAD; 62 bool register_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD; 63 bool unregister_select( blocking_lock & this, select_node & node ) OPTIONAL_THREAD; 64 bool on_selected( blocking_lock & this, select_node & node ) OPTIONAL_THREAD; 62 65 63 66 //---------- … … 72 75 static inline bool try_lock ( multiple_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); } 73 76 static inline void unlock ( multiple_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 74 static inline size_t on_wait ( multiple_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this); }77 static inline size_t on_wait ( multiple_acquisition_lock & this, void (*pp_fn)( void * ), void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 75 78 static inline void on_wakeup( multiple_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 76 79 static inline void on_notify( multiple_acquisition_lock & this, struct thread$ * t ){ on_notify( (blocking_lock &)this, t ); } 80 static inline bool register_select( multiple_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 81 static inline bool unregister_select( multiple_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 82 static inline bool on_selected( multiple_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } -
libcfa/src/concurrency/channel.hfa
r3a513d89 r044ae62 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2022 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include <locks.hfa> 4 20 #include <list.hfa> 5 #include <mutex_stmt.hfa> 6 7 // link field used for threads waiting on channel 8 struct wait_link { 9 // used to put wait_link on a dl queue 10 inline dlink(wait_link); 11 12 // waiting thread 13 struct thread$ * t; 14 15 // shadow field 16 void * elem; 17 }; 18 P9_EMBEDDED( wait_link, dlink(wait_link) ) 19 20 static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { 21 this.t = t; 22 this.elem = elem; 23 } 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 21 #include "select.hfa" 30 22 31 23 // returns true if woken due to shutdown 32 24 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link) & queue, void * elem_ptr, go_mutex & lock ) {34 wait_link w{ active_thread(), elem_ptr };35 insert_last( queue, w);25 static inline bool block( dlist( select_node ) & queue, void * elem_ptr, go_mutex & lock ) { 26 select_node sn{ active_thread(), elem_ptr }; 27 insert_last( queue, sn ); 36 28 unlock( lock ); 37 29 park(); 38 return w.elem == 0p; 30 return sn.extra == 0p; 31 } 32 33 // Waituntil support (un)register_select helper routine 34 // Sets select node avail if not special OR case and then unlocks 35 static inline void __set_avail_then_unlock( select_node & node, go_mutex & mutex_lock ) { 36 if ( node.park_counter ) __make_select_node_available( node ); 37 unlock( mutex_lock ); 39 38 } 40 39 … … 59 58 size_t size, front, back, count; 60 59 T * buffer; 61 dlist( wait_link) prods, cons; // lists of blocked threads62 go_mutex mutex_lock; // MX lock63 bool closed; // indicates channel close/open64 #ifdef CHAN_STATS 65 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd60 dlist( select_node ) prods, cons; // lists of blocked threads 61 go_mutex mutex_lock; // MX lock 62 bool closed; // indicates channel close/open 63 #ifdef CHAN_STATS 64 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd 66 65 #endif 67 66 }; … … 70 69 size = _size; 71 70 front = back = count = 0; 72 buffer = aalloc( size );71 if ( size != 0 ) buffer = aalloc( size ); 73 72 prods{}; 74 73 cons{}; … … 76 75 closed = false; 77 76 #ifdef CHAN_STATS 78 blocks = 0; 79 operations = 0; 77 p_blocks = 0; 78 p_ops = 0; 79 c_blocks = 0; 80 c_ops = 0; 80 81 #endif 81 82 } … … 84 85 static inline void ^?{}( channel(T) &c ) with(c) { 85 86 #ifdef CHAN_STATS 86 printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100); 87 #endif 88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer ); 90 } 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 87 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100); 88 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100); 89 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100); 90 #endif 91 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty, 92 "Attempted to delete channel with waiting threads (Deadlock).\n" ); 93 if ( size != 0 ) delete( buffer ); 94 } 95 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 96 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 93 97 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 94 98 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } … … 102 106 // flush waiting consumers and producers 103 107 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 108 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 109 break; // if __handle_waituntil_OR returns false cons is empty so break 110 cons`first.extra = 0p; 105 111 wake_one( cons ); 106 112 } 107 113 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 114 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 115 break; // if __handle_waituntil_OR returns false prods is empty so break 116 prods`first.extra = 0p; 109 117 wake_one( prods ); 110 118 } … … 114 122 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 123 124 // used to hand an element to a blocked consumer and signal it 125 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 126 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 127 wake_one( cons ); 128 } 129 130 // used to hand an element to a blocked producer and signal it 131 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 132 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 133 wake_one( prods ); 134 } 135 116 136 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 137 lock( mutex_lock ); 118 138 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 139 __cons_handoff( chan, elem ); 121 140 } 122 141 unlock( mutex_lock ); … … 125 144 // handles buffer insert 126 145 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 127 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T));146 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 128 147 count += 1; 129 148 back++; … … 131 150 } 132 151 133 // does the buffer insert or hands elem directly to consumer if one is waiting134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {135 if ( count == 0 && !cons`isEmpty ) {136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work137 wake_one( cons );138 } else __buf_insert( chan, elem );139 }140 141 152 // needed to avoid an extra copy in closed case 142 153 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { 143 154 lock( mutex_lock ); 144 155 #ifdef CHAN_STATS 145 operations++; 146 #endif 156 p_ops++; 157 #endif 158 159 ConsEmpty: if ( !cons`isEmpty ) { 160 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 161 __cons_handoff( chan, elem ); 162 unlock( mutex_lock ); 163 return true; 164 } 165 147 166 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 167 168 __buf_insert( chan, elem ); 149 169 unlock( mutex_lock ); 150 170 return true; … … 157 177 // handles closed case of insert routine 158 178 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{ &channel_closed_vt, &elem, &chan };179 channel_closed except{ &channel_closed_vt, &elem, &chan }; 160 180 throwResume except; // throw closed resumption 161 181 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination … … 172 192 173 193 #ifdef CHAN_STATS 174 if ( !closed ) operations++;194 if ( !closed ) p_ops++; 175 195 #endif 176 196 … … 182 202 } 183 203 184 // have to check for the zero size channel case185 if ( size == 0 &&!cons`isEmpty ) {186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));187 wake_one( cons);188 unlock( mutex_lock ); 189 return true;204 // buffer count must be zero if cons are blocked (also handles zero-size case) 205 ConsEmpty: if ( !cons`isEmpty ) { 206 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 207 __cons_handoff( chan, elem ); 208 unlock( mutex_lock ); 209 return; 190 210 } 191 211 … … 193 213 if ( count == size ) { 194 214 #ifdef CHAN_STATS 195 blocks++;215 p_blocks++; 196 216 #endif 197 217 … … 202 222 } // if 203 223 204 if ( count == 0 && !cons`isEmpty ) { 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 206 wake_one( cons ); 207 } else __buf_insert( chan, elem ); 208 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 224 __buf_insert( chan, elem ); 225 unlock( mutex_lock ); 226 } 227 228 // does the buffer remove and potentially does waiting producer work 229 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 230 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 216 231 count -= 1; 217 232 front = (front + 1) % size; 218 }219 220 // does the buffer remove and potentially does waiting producer work221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {222 __buf_remove( chan, retval );223 233 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 234 if ( !__handle_waituntil_OR( prods ) ) return; 235 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 225 236 wake_one( prods ); 226 237 } … … 231 242 lock( mutex_lock ); 232 243 #ifdef CHAN_STATS 233 operations++; 234 #endif 244 c_ops++; 245 #endif 246 247 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 248 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 249 __prods_handoff( chan, retval ); 250 unlock( mutex_lock ); 251 return true; 252 } 253 235 254 if ( count == 0 ) { unlock( mutex_lock ); return false; } 255 236 256 __do_remove( chan, retval ); 237 257 unlock( mutex_lock ); … … 244 264 static inline [T, bool] try_remove( channel(T) & chan ) { 245 265 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 266 bool success = __internal_try_remove( chan, retval ); 267 return [ retval, success ]; 268 } 269 270 static inline T try_remove( channel(T) & chan ) { 250 271 T retval; 251 272 __internal_try_remove( chan, retval ); … … 255 276 // handles closed case of insert routine 256 277 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{ &channel_closed_vt, 0p, &chan };278 channel_closed except{ &channel_closed_vt, 0p, &chan }; 258 279 throwResume except; // throw resumption 259 280 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination … … 269 290 270 291 #ifdef CHAN_STATS 271 if ( !closed ) operations++;292 if ( !closed ) c_ops++; 272 293 #endif 273 294 … … 279 300 280 301 // have to check for the zero size channel case 281 if ( size == 0 && !prods`isEmpty ) {282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));283 wake_one( prods);302 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 303 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 304 __prods_handoff( chan, retval ); 284 305 unlock( mutex_lock ); 285 306 return retval; … … 287 308 288 309 // wait if buffer is empty, work will be completed by someone else 289 if ( count == 0) {310 if ( count == 0 ) { 290 311 #ifdef CHAN_STATS 291 blocks++;312 c_blocks++; 292 313 #endif 293 314 // check for if woken due to close … … 299 320 // Remove from buffer 300 321 __do_remove( chan, retval ); 301 302 322 unlock( mutex_lock ); 303 323 return retval; 304 324 } 325 326 /////////////////////////////////////////////////////////////////////////////////////////// 327 // The following is support for waituntil (select) statements 328 /////////////////////////////////////////////////////////////////////////////////////////// 329 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 330 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case 331 lock( mutex_lock ); 332 if ( node`isListed ) { // op wasn't performed 333 remove( node ); 334 unlock( mutex_lock ); 335 return false; 336 } 337 unlock( mutex_lock ); 338 339 // only return true when not special OR case, not exceptional calse and status is SAT 340 return ( node.extra == 0p || !node.park_counter ) ? false : *node.clause_status == __SELECT_SAT; 341 } 342 343 // type used by select statement to capture a chan read as the selected operation 344 struct chan_read { 345 T & ret; 346 channel(T) & chan; 347 }; 348 349 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 350 &cr.chan = &chan; 351 &cr.ret = &ret; 352 } 353 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 354 355 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 356 __closed_remove( chan, ret ); 357 // if we get here then the insert succeeded 358 __make_select_node_available( node ); 359 } 360 361 static inline bool register_select( chan_read(T) & this, select_node & node ) with(this.chan, this) { 362 lock( mutex_lock ); 363 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 364 365 #ifdef CHAN_STATS 366 if ( !closed ) c_ops++; 367 #endif 368 369 if ( !node.park_counter ) { 370 // are we special case OR and front of cons is also special case OR 371 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) { 372 if ( !__make_select_node_pending( node ) ) { 373 unlock( mutex_lock ); 374 return false; 375 } 376 377 if ( __handle_waituntil_OR( prods ) ) { 378 __prods_handoff( chan, ret ); 379 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 380 unlock( mutex_lock ); 381 return true; 382 } 383 __make_select_node_unsat( node ); 384 } 385 // check if we can complete operation. If so race to establish winner in special OR case 386 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) { 387 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 388 unlock( mutex_lock ); 389 return false; 390 } 391 } 392 } 393 394 if ( unlikely(closed) ) { 395 unlock( mutex_lock ); 396 __handle_select_closed_read( this, node ); 397 return true; 398 } 399 400 // have to check for the zero size channel case 401 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 402 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 403 __prods_handoff( chan, ret ); 404 __set_avail_then_unlock( node, mutex_lock ); 405 return true; 406 } 407 408 // wait if buffer is empty, work will be completed by someone else 409 if ( count == 0 ) { 410 #ifdef CHAN_STATS 411 c_blocks++; 412 #endif 413 414 insert_last( cons, node ); 415 unlock( mutex_lock ); 416 return false; 417 } 418 419 // Remove from buffer 420 __do_remove( chan, ret ); 421 __set_avail_then_unlock( node, mutex_lock ); 422 return true; 423 } 424 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 425 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 426 if ( node.extra == 0p ) // check if woken up due to closed channel 427 __closed_remove( chan, ret ); 428 // This is only reachable if not closed or closed exception was handled 429 return true; 430 } 431 432 // type used by select statement to capture a chan write as the selected operation 433 struct chan_write { 434 T elem; 435 channel(T) & chan; 436 }; 437 438 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 439 &cw.chan = &chan; 440 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 441 } 442 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 443 444 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 445 __closed_insert( chan, elem ); 446 // if we get here then the insert succeeded 447 __make_select_node_available( node ); 448 } 449 450 static inline bool register_select( chan_write(T) & this, select_node & node ) with(this.chan, this) { 451 lock( mutex_lock ); 452 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 453 454 #ifdef CHAN_STATS 455 if ( !closed ) p_ops++; 456 #endif 457 458 // special OR case handling 459 if ( !node.park_counter ) { 460 // are we special case OR and front of cons is also special case OR 461 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) { 462 if ( !__make_select_node_pending( node ) ) { 463 unlock( mutex_lock ); 464 return false; 465 } 466 467 if ( __handle_waituntil_OR( cons ) ) { 468 __cons_handoff( chan, elem ); 469 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 470 unlock( mutex_lock ); 471 return true; 472 } 473 __make_select_node_unsat( node ); 474 } 475 // check if we can complete operation. If so race to establish winner in special OR case 476 if ( count != size || !cons`isEmpty || unlikely(closed) ) { 477 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 478 unlock( mutex_lock ); 479 return false; 480 } 481 } 482 } 483 484 // if closed handle 485 if ( unlikely(closed) ) { 486 unlock( mutex_lock ); 487 __handle_select_closed_write( this, node ); 488 return true; 489 } 490 491 // handle blocked consumer case via handoff (buffer is implicitly empty) 492 ConsEmpty: if ( !cons`isEmpty ) { 493 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 494 __cons_handoff( chan, elem ); 495 __set_avail_then_unlock( node, mutex_lock ); 496 return true; 497 } 498 499 // insert node in list if buffer is full, work will be completed by someone else 500 if ( count == size ) { 501 #ifdef CHAN_STATS 502 p_blocks++; 503 #endif 504 505 insert_last( prods, node ); 506 unlock( mutex_lock ); 507 return false; 508 } // if 509 510 // otherwise carry out write either via normal insert 511 __buf_insert( chan, elem ); 512 __set_avail_then_unlock( node, mutex_lock ); 513 return true; 514 } 515 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 516 517 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 518 if ( node.extra == 0p ) // check if woken up due to closed channel 519 __closed_insert( chan, elem ); 520 521 // This is only reachable if not closed or closed exception was handled 522 return true; 523 } 524 305 525 } // forall( T ) 526 527 -
libcfa/src/concurrency/future.hfa
r3a513d89 r044ae62 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 #include "locks.hfa" 21 22 22 23 //---------------------------------------------------------------------------- … … 26 27 // future_t is lockfree and uses atomics which aren't needed given we use locks here 27 28 forall( T ) { 28 // enum (int){ FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards29 // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 29 30 30 31 // temporary enum replacement … … 44 45 }; 45 46 46 // C_TODO: perhaps allow exceptions to be inserted like uC++?47 48 47 static inline { 49 48 … … 53 52 } 54 53 55 void ?{}( future(T) & this) {54 void ?{}( future(T) & this ) { 56 55 this.waiters{}; 57 56 this.state = FUTURE_EMPTY; … … 60 59 61 60 // Reset future back to original state 62 void reset( future(T) & this) with(this)61 void reset( future(T) & this ) with(this) 63 62 { 64 63 lock( lock ); … … 82 81 void _internal_flush( future(T) & this ) with(this) { 83 82 while( ! waiters`isEmpty ) { 83 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 84 break; // if handle_OR returns false then waiters is empty so break 84 85 select_node &s = try_pop_front( waiters ); 85 86 86 if ( s.race_flag == 0p ) 87 // poke in result so that woken threads do not need to reacquire any locks 88 // *(((future_node(T) &)s).my_result) = result; 87 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 89 88 copy_T( result, *(((future_node(T) &)s).my_result) ); 90 else if ( !install_select_winner( s, &this ) ) continue;91 89 92 // only unpark if future is not selected 93 // or if it is selected we only unpark if we win the race 94 unpark( s.blocked_thread ); 90 wake_one( waiters, s ); 95 91 } 96 92 } 97 93 98 94 // Fulfil the future, returns whether or not someone was unblocked 99 bool fulfil( future(T) & this, T &val ) with(this) {95 bool fulfil( future(T) & this, T val ) with(this) { 100 96 lock( lock ); 101 97 if( state != FUTURE_EMPTY ) … … 153 149 } 154 150 155 void * register_select( future(T) & this, select_node & s ) with(this) { 156 lock( lock ); 157 158 // future not ready -> insert select node and return 0p 151 bool register_select( future(T) & this, select_node & s ) with(this) { 152 lock( lock ); 153 154 // check if we can complete operation. If so race to establish winner in special OR case 155 if ( !s.park_counter && state != FUTURE_EMPTY ) { 156 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 157 unlock( lock ); 158 return false; 159 } 160 } 161 162 // future not ready -> insert select node and return 159 163 if( state == FUTURE_EMPTY ) { 160 164 insert_last( waiters, s ); 161 165 unlock( lock ); 162 return 0p; 163 } 164 165 // future ready and we won race to install it as the select winner return 1p 166 if ( install_select_winner( s, &this ) ) { 167 unlock( lock ); 168 return 1p; 169 } 170 171 unlock( lock ); 172 // future ready and we lost race to install it as the select winner 173 return 2p; 174 } 175 176 void unregister_select( future(T) & this, select_node & s ) with(this) { 166 return false; 167 } 168 169 __make_select_node_available( s ); 170 unlock( lock ); 171 return true; 172 } 173 174 bool unregister_select( future(T) & this, select_node & s ) with(this) { 175 if ( ! s`isListed ) return false; 177 176 lock( lock ); 178 177 if ( s`isListed ) remove( s ); 179 178 unlock( lock ); 179 return false; 180 180 } 181 181 182 bool on_selected( future(T) & this, select_node & node ) { return true; } 182 183 } 183 184 } … … 186 187 // These futures below do not support select statements so they may not be as useful as 'future' 187 188 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 188 // since it uses raw atomics and no locks afaik189 // since it uses raw atomics and no locks 189 190 // 190 191 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' -
libcfa/src/concurrency/invoke.h
r3a513d89 r044ae62 217 217 struct __thread_user_link cltr_link; 218 218 219 // used to point to this thd's current clh node220 volatile bool * clh_node;221 222 219 struct processor * last_proc; 220 221 // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes 222 // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock 223 void * link_node; 223 224 224 225 PRNG_STATE_T random_state; // fast random numbers -
libcfa/src/concurrency/locks.cfa
r3a513d89 r044ae62 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // locks. hfa -- LIBCFATHREAD7 // locks.cfa -- LIBCFATHREAD 8 8 // Runtime locks that used with the runtime thread system. 9 9 // … … 79 79 // lock is held by some other thread 80 80 if ( owner != 0p && owner != thrd ) { 81 insert_last( blocked_threads, *thrd ); 81 select_node node; 82 insert_last( blocked_threads, node ); 82 83 wait_count++; 83 84 unlock( lock ); 84 85 park( ); 85 } 86 // multi acquisition lock is held by current thread 87 else if ( owner == thrd && multi_acquisition ) { 86 return; 87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 88 88 recursion_count++; 89 unlock( lock ); 90 } 91 // lock isn't held 92 else { 89 } else { // lock isn't held 93 90 owner = thrd; 94 91 recursion_count = 1; 95 unlock( lock );96 } 92 } 93 unlock( lock ); 97 94 } 98 95 … … 117 114 } 118 115 119 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) { 120 thread$ * t = &try_pop_front( blocked_threads ); 121 owner = t; 122 recursion_count = ( t ? 1 : 0 ); 123 if ( t ) wait_count--; 124 unpark( t ); 116 static inline void pop_node( blocking_lock & this ) with( this ) { 117 __handle_waituntil_OR( blocked_threads ); 118 select_node * node = &try_pop_front( blocked_threads ); 119 if ( node ) { 120 wait_count--; 121 owner = node->blocked_thread; 122 recursion_count = 1; 123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 124 wake_one( blocked_threads, *node ); 125 } else { 126 owner = 0p; 127 recursion_count = 0; 128 } 125 129 } 126 130 … … 134 138 recursion_count--; 135 139 if ( recursion_count == 0 ) { 136 pop_ and_set_new_owner( this );140 pop_node( this ); 137 141 } 138 142 unlock( lock ); … … 147 151 // lock held 148 152 if ( owner != 0p ) { 149 insert_last( blocked_threads, * t);153 insert_last( blocked_threads, *(select_node *)t->link_node ); 150 154 wait_count++; 151 unlock( lock );152 155 } 153 156 // lock not held … … 156 159 recursion_count = 1; 157 160 unpark( t ); 158 unlock( lock );159 } 160 } 161 162 size_t on_wait( blocking_lock & this ) with( this ) {161 } 162 unlock( lock ); 163 } 164 165 size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) { 163 166 lock( lock __cfaabi_dbg_ctx2 ); 164 167 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 167 170 size_t ret = recursion_count; 168 171 169 pop_and_set_new_owner( this ); 172 pop_node( this ); 173 174 select_node node; 175 active_thread()->link_node = (void *)&node; 170 176 unlock( lock ); 177 178 pre_park_then_park( pp_fn, pp_datum ); 179 171 180 return ret; 172 181 } … … 175 184 recursion_count = recursion; 176 185 } 186 187 // waituntil() support 188 bool register_select( blocking_lock & this, select_node & node ) with(this) { 189 lock( lock __cfaabi_dbg_ctx2 ); 190 thread$ * thrd = active_thread(); 191 192 // single acquisition lock is held by current thread 193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); 194 195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case 196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 197 unlock( lock ); 198 return false; 199 } 200 } 201 202 // lock is held by some other thread 203 if ( owner != 0p && owner != thrd ) { 204 insert_last( blocked_threads, node ); 205 wait_count++; 206 unlock( lock ); 207 return false; 208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 209 recursion_count++; 210 } else { // lock isn't held 211 owner = thrd; 212 recursion_count = 1; 213 } 214 215 if ( node.park_counter ) __make_select_node_available( node ); 216 unlock( lock ); 217 return true; 218 } 219 220 bool unregister_select( blocking_lock & this, select_node & node ) with(this) { 221 lock( lock __cfaabi_dbg_ctx2 ); 222 if ( node`isListed ) { 223 remove( node ); 224 wait_count--; 225 unlock( lock ); 226 return false; 227 } 228 229 if ( owner == active_thread() ) { 230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); 231 // if recursion count is zero release lock and set new owner if one is waiting 232 recursion_count--; 233 if ( recursion_count == 0 ) { 234 pop_node( this ); 235 } 236 } 237 unlock( lock ); 238 return false; 239 } 240 241 bool on_selected( blocking_lock & this, select_node & node ) { return true; } 177 242 178 243 //----------------------------------------------------------------------------- … … 311 376 int counter( condition_variable(L) & this ) with(this) { return count; } 312 377 313 static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {378 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) { 314 379 // add info_thread to waiting queue 315 380 insert_last( blocked_threads, *i ); 316 381 count++; 317 size_t recursion_count = 0; 318 if (i->lock) { 319 // if lock was passed get recursion count to reset to after waking thread 320 recursion_count = on_wait( *i->lock ); 321 } 322 return recursion_count; 323 } 382 } 383 384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) { 385 size_t recursion_count = 0; 386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread 387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks 388 else 389 pre_park_then_park( pp_fn, pp_datum ); 390 return recursion_count; 391 } 392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); } 324 393 325 394 // helper for wait()'s' with no timeout 326 395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { 327 396 lock( lock __cfaabi_dbg_ctx2 ); 328 size_t recursion_count = queue_and_get_recursion(this, &i);397 enqueue_thread( this, &i ); 329 398 unlock( lock ); 330 399 331 400 // blocks here 332 park();401 size_t recursion_count = block_and_get_recursion( i ); 333 402 334 403 // resets recursion count here after waking 335 if ( i.lock) on_wakeup(*i.lock, recursion_count);404 if ( i.lock ) on_wakeup( *i.lock, recursion_count ); 336 405 } 337 406 … … 340 409 queue_info_thread( this, i ); 341 410 411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); } 412 342 413 // helper for wait()'s' with a timeout 343 414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 344 415 lock( lock __cfaabi_dbg_ctx2 ); 345 size_t recursion_count = queue_and_get_recursion(this, &info);416 enqueue_thread( this, &info ); 346 417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 347 418 unlock( lock ); 348 419 349 // registers alarm outside cond lock to avoid deadlock 350 register_self( &node_wrap.alarm_node ); 351 352 // blocks here 353 park(); 420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 422 // park(); 354 423 355 424 // unregisters alarm so it doesn't go off if this happens first … … 357 426 358 427 // resets recursion count here after waking 359 if ( info.lock) on_wakeup(*info.lock, recursion_count);428 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 360 429 } 361 430 … … 417 486 info_thread( L ) i = { active_thread(), info, &l }; 418 487 insert_last( blocked_threads, i ); 419 size_t recursion_count = on_wait( *i.lock );420 park( );488 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here 489 // park( ); 421 490 on_wakeup(*i.lock, recursion_count); 422 491 } … … 459 528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 460 529 461 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {462 // add info_thread to waiting queue463 insert_last( blocked_threads, *i );464 size_t recursion_count = 0;465 recursion_count = on_wait( *i->lock );466 return recursion_count;467 }468 469 530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 470 531 lock( lock __cfaabi_dbg_ctx2 ); 471 size_t recursion_count = queue_and_get_recursion(this, &info);532 insert_last( blocked_threads, info ); 472 533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 473 534 unlock( lock ); 474 535 475 // registers alarm outside cond lock to avoid deadlock 476 register_self( &node_wrap.alarm_node ); 477 478 // blocks here 479 park(); 480 481 // unregisters alarm so it doesn't go off if this happens first 536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 538 539 // unregisters alarm so it doesn't go off if signal happens first 482 540 unregister_self( &node_wrap.alarm_node ); 483 541 484 542 // resets recursion count here after waking 485 if ( info.lock) on_wakeup(*info.lock, recursion_count);543 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 486 544 } 487 545 … … 493 551 lock( lock __cfaabi_dbg_ctx2 ); 494 552 info_thread( L ) i = { active_thread(), info, &l }; 495 size_t recursion_count = queue_and_get_recursion(this, &i); 496 unlock( lock ); 497 park( ); 498 on_wakeup(*i.lock, recursion_count); 553 insert_last( blocked_threads, i ); 554 unlock( lock ); 555 556 // blocks here 557 size_t recursion_count = block_and_get_recursion( i ); 558 559 on_wakeup( *i.lock, recursion_count ); 499 560 } 500 561 … … 584 645 return thrd != 0p; 585 646 } 647 -
libcfa/src/concurrency/locks.hfa
r3a513d89 r044ae62 30 30 #include "time.hfa" 31 31 32 #include "select.hfa" 33 32 34 #include <fstream.hfa> 33 35 … … 37 39 #include <unistd.h> 38 40 39 // C_TODO: cleanup this and locks.cfa 40 // - appropriate separation of interface and impl 41 // - clean up unused/unneeded locks 42 // - change messy big blocking lock from inheritance to composition to remove need for flags 41 typedef void (*__cfa_pre_park)( void * ); 42 43 static inline void pre_park_noop( void * ) {} 44 45 //----------------------------------------------------------------------------- 46 // is_blocking_lock 47 forall( L & | sized(L) ) 48 trait is_blocking_lock { 49 // For synchronization locks to use when acquiring 50 void on_notify( L &, struct thread$ * ); 51 52 // For synchronization locks to use when releasing 53 size_t on_wait( L &, __cfa_pre_park pp_fn, void * pp_datum ); 54 55 // to set recursion count after getting signalled; 56 void on_wakeup( L &, size_t recursion ); 57 }; 58 59 static inline void pre_park_then_park( __cfa_pre_park pp_fn, void * pp_datum ) { 60 pp_fn( pp_datum ); 61 park(); 62 } 63 64 // macros for default routine impls for is_blocking_lock trait that do not wait-morph 65 66 #define DEFAULT_ON_NOTIFY( lock_type ) \ 67 static inline void on_notify( lock_type & this, thread$ * t ){ unpark(t); } 68 69 #define DEFAULT_ON_WAIT( lock_type ) \ 70 static inline size_t on_wait( lock_type & this, __cfa_pre_park pp_fn, void * pp_datum ) { \ 71 unlock( this ); \ 72 pre_park_then_park( pp_fn, pp_datum ); \ 73 return 0; \ 74 } 75 76 // on_wakeup impl if lock should be reacquired after waking up 77 #define DEFAULT_ON_WAKEUP_REACQ( lock_type ) \ 78 static inline void on_wakeup( lock_type & this, size_t recursion ) { lock( this ); } 79 80 // on_wakeup impl if lock will not be reacquired after waking up 81 #define DEFAULT_ON_WAKEUP_NO_REACQ( lock_type ) \ 82 static inline void on_wakeup( lock_type & this, size_t recursion ) {} 83 84 43 85 44 86 //----------------------------------------------------------------------------- … … 67 109 static inline bool try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); } 68 110 static inline void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 69 static inline size_t on_wait ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this); }111 static inline size_t on_wait ( single_acquisition_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 70 112 static inline void on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 71 113 static inline void on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 114 static inline bool register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 115 static inline bool unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 116 static inline bool on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 72 117 73 118 //---------- … … 81 126 static inline bool try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); } 82 127 static inline void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); } 83 static inline size_t on_wait ( owner_lock & this ) { return on_wait ( (blocking_lock &)this); }128 static inline size_t on_wait ( owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 84 129 static inline void on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 85 130 static inline void on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 131 static inline bool register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 132 static inline bool unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 133 static inline bool on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 86 134 87 135 //----------------------------------------------------------------------------- … … 128 176 static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; } 129 177 130 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) {131 return node->next;132 }133 134 178 struct mcs_spin_lock { 135 179 mcs_spin_queue queue; … … 137 181 138 182 static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) { 183 n.locked = true; 139 184 mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST); 140 n.locked = true; 141 if(prev == 0p) return; 185 if( prev == 0p ) return; 142 186 prev->next = &n; 143 while( __atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause();187 while( __atomic_load_n(&n.locked, __ATOMIC_RELAXED) ) Pause(); 144 188 } 145 189 … … 147 191 mcs_spin_node * n_ptr = &n; 148 192 if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return; 149 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {}193 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) Pause(); 150 194 n.next->locked = false; 151 195 } … … 156 200 // - Kernel thd blocking alternative to the spinlock 157 201 // - No ownership (will deadlock on reacq) 202 // - no reacq on wakeup 158 203 struct futex_mutex { 159 204 // lock state any state other than UNLOCKED is locked … … 169 214 } 170 215 171 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; }172 173 static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) {216 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; } 217 218 static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) { 174 219 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 175 220 } 176 221 177 static inline int internal_exchange( futex_mutex & this) with(this) {222 static inline int internal_exchange( futex_mutex & this ) with(this) { 178 223 return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE); 179 224 } 180 225 181 226 // if this is called recursively IT WILL DEADLOCK!!!!! 182 static inline void lock( futex_mutex & this) with(this) {227 static inline void lock( futex_mutex & this ) with(this) { 183 228 int state; 184 229 … … 190 235 for (int i = 0; i < spin; i++) Pause(); 191 236 } 192 193 // // no contention try to acquire194 // if (internal_try_lock(this, state)) return;195 237 196 238 // if not in contended state, set to be in contended state … … 212 254 } 213 255 214 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); } 215 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;} 216 217 // to set recursion count after getting signalled; 218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 256 DEFAULT_ON_NOTIFY( futex_mutex ) 257 DEFAULT_ON_WAIT( futex_mutex ) 258 DEFAULT_ON_WAKEUP_NO_REACQ( futex_mutex ) 219 259 220 260 //----------------------------------------------------------------------------- … … 232 272 int val; 233 273 }; 234 235 274 static inline void ?{}( go_mutex & this ) with(this) { val = 0; } 275 // static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted 276 // static inline void ?=?( go_mutex & this, go_mutex this2 ) = void; 236 277 237 278 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) { … … 244 285 245 286 // if this is called recursively IT WILL DEADLOCK!!!!! 246 static inline void lock( go_mutex & this) with(this) {287 static inline void lock( go_mutex & this ) with( this ) { 247 288 int state, init_state; 248 289 … … 255 296 while( !val ) { // lock unlocked 256 297 state = 0; 257 if ( internal_try_lock(this, state, init_state)) return;298 if ( internal_try_lock( this, state, init_state ) ) return; 258 299 } 259 300 for (int i = 0; i < 30; i++) Pause(); … … 262 303 while( !val ) { // lock unlocked 263 304 state = 0; 264 if ( internal_try_lock(this, state, init_state)) return;305 if ( internal_try_lock( this, state, init_state ) ) return; 265 306 } 266 307 sched_yield(); 267 308 268 309 // if not in contended state, set to be in contended state 269 state = internal_exchange( this, 2);310 state = internal_exchange( this, 2 ); 270 311 if ( !state ) return; // state == 0 271 312 init_state = 2; 272 futex( (int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK313 futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK 273 314 } 274 315 } … … 276 317 static inline void unlock( go_mutex & this ) with(this) { 277 318 // if uncontended do atomic unlock and then return 278 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;319 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return; 279 320 280 321 // otherwise threads are blocked so we must wake one 281 futex((int *)&val, FUTEX_WAKE, 1); 282 } 283 284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); } 285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;} 286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {} 287 288 //----------------------------------------------------------------------------- 289 // CLH Spinlock 290 // - No recursive acquisition 291 // - Needs to be released by owner 292 293 struct clh_lock { 294 volatile bool * volatile tail; 295 volatile bool * volatile head; 296 }; 297 298 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; } 299 static inline void ^?{}( clh_lock & this ) { free(this.tail); } 300 301 static inline void lock(clh_lock & l) { 302 thread$ * curr_thd = active_thread(); 303 *(curr_thd->clh_node) = false; 304 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST); 305 while(!__atomic_load_n(prev, __ATOMIC_SEQ_CST)) Pause(); 306 __atomic_store_n((bool **)(&l.head), (bool *)curr_thd->clh_node, __ATOMIC_SEQ_CST); 307 curr_thd->clh_node = prev; 308 } 309 310 static inline void unlock(clh_lock & l) { 311 __atomic_store_n((bool *)(l.head), true, __ATOMIC_SEQ_CST); 312 } 313 314 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); } 315 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; } 316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); } 322 futex( (int *)&val, FUTEX_WAKE, 1 ); 323 } 324 325 DEFAULT_ON_NOTIFY( go_mutex ) 326 DEFAULT_ON_WAIT( go_mutex ) 327 DEFAULT_ON_WAKEUP_NO_REACQ( go_mutex ) 317 328 318 329 //----------------------------------------------------------------------------- … … 334 345 this.lock_value = 0; 335 346 } 347 static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 348 static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 336 349 337 350 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 338 351 339 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {352 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) { 340 353 return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); 341 354 } 342 355 343 static inline bool try_lock( exp_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }344 345 static inline bool try_lock_contention( exp_backoff_then_block_lock & this) with(this) {346 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE);347 } 348 349 static inline bool block( exp_backoff_then_block_lock & this) with(this) {356 static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); } 357 358 static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) { 359 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE ); 360 } 361 362 static inline bool block( exp_backoff_then_block_lock & this ) with(this) { 350 363 lock( spinlock __cfaabi_dbg_ctx2 ); 351 364 if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) { … … 359 372 } 360 373 361 static inline void lock( exp_backoff_then_block_lock & this) with(this) {374 static inline void lock( exp_backoff_then_block_lock & this ) with(this) { 362 375 size_t compare_val = 0; 363 376 int spin = 4; … … 378 391 } 379 392 380 static inline void unlock( exp_backoff_then_block_lock & this) with(this) {393 static inline void unlock( exp_backoff_then_block_lock & this ) with(this) { 381 394 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 382 395 lock( spinlock __cfaabi_dbg_ctx2 ); … … 386 399 } 387 400 388 static inline void on_notify(exp_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 389 static inline size_t on_wait(exp_backoff_then_block_lock & this) { unlock(this); return 0; } 390 static inline void on_wakeup(exp_backoff_then_block_lock & this, size_t recursion ) { lock(this); } 401 DEFAULT_ON_NOTIFY( exp_backoff_then_block_lock ) 402 DEFAULT_ON_WAIT( exp_backoff_then_block_lock ) 403 DEFAULT_ON_WAKEUP_REACQ( exp_backoff_then_block_lock ) 391 404 392 405 //----------------------------------------------------------------------------- … … 418 431 419 432 // if this is called recursively IT WILL DEADLOCK!!!!! 420 static inline void lock( fast_block_lock & this) with(this) {433 static inline void lock( fast_block_lock & this ) with(this) { 421 434 lock( lock __cfaabi_dbg_ctx2 ); 422 435 if ( held ) { … … 430 443 } 431 444 432 static inline void unlock( fast_block_lock & this) with(this) {445 static inline void unlock( fast_block_lock & this ) with(this) { 433 446 lock( lock __cfaabi_dbg_ctx2 ); 434 447 /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this ); … … 439 452 } 440 453 441 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {454 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) { 442 455 lock( lock __cfaabi_dbg_ctx2 ); 443 456 insert_last( blocked_threads, *t ); 444 457 unlock( lock ); 445 458 } 446 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 447 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } 459 DEFAULT_ON_WAIT( fast_block_lock ) 460 DEFAULT_ON_WAKEUP_NO_REACQ( fast_block_lock ) 448 461 449 462 //----------------------------------------------------------------------------- … … 456 469 struct simple_owner_lock { 457 470 // List of blocked threads 458 dlist( thread$) blocked_threads;471 dlist( select_node ) blocked_threads; 459 472 460 473 // Spin lock used for mutual exclusion … … 477 490 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void; 478 491 479 static inline void lock( simple_owner_lock & this) with(this) {480 if ( owner == active_thread()) {492 static inline void lock( simple_owner_lock & this ) with(this) { 493 if ( owner == active_thread() ) { 481 494 recursion_count++; 482 495 return; … … 484 497 lock( lock __cfaabi_dbg_ctx2 ); 485 498 486 if (owner != 0p) { 487 insert_last( blocked_threads, *active_thread() ); 499 if ( owner != 0p ) { 500 select_node node; 501 insert_last( blocked_threads, node ); 488 502 unlock( lock ); 489 503 park( ); … … 495 509 } 496 510 497 // TODO: fix duplicate def issue and bring this back 498 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) { 499 // thread$ * t = &try_pop_front( blocked_threads ); 500 // owner = t; 501 // recursion_count = ( t ? 1 : 0 ); 502 // unpark( t ); 503 // } 504 505 static inline void unlock(simple_owner_lock & this) with(this) { 511 static inline void pop_node( simple_owner_lock & this ) with(this) { 512 __handle_waituntil_OR( blocked_threads ); 513 select_node * node = &try_pop_front( blocked_threads ); 514 if ( node ) { 515 owner = node->blocked_thread; 516 recursion_count = 1; 517 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 518 wake_one( blocked_threads, *node ); 519 } else { 520 owner = 0p; 521 recursion_count = 0; 522 } 523 } 524 525 static inline void unlock( simple_owner_lock & this ) with(this) { 506 526 lock( lock __cfaabi_dbg_ctx2 ); 507 527 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 510 530 recursion_count--; 511 531 if ( recursion_count == 0 ) { 512 // pop_and_set_new_owner( this ); 513 thread$ * t = &try_pop_front( blocked_threads ); 514 owner = t; 515 recursion_count = ( t ? 1 : 0 ); 516 unpark( t ); 532 pop_node( this ); 517 533 } 518 534 unlock( lock ); 519 535 } 520 536 521 static inline void on_notify( simple_owner_lock & this, structthread$ * t ) with(this) {537 static inline void on_notify( simple_owner_lock & this, thread$ * t ) with(this) { 522 538 lock( lock __cfaabi_dbg_ctx2 ); 523 539 // lock held 524 540 if ( owner != 0p ) { 525 insert_last( blocked_threads, * t);541 insert_last( blocked_threads, *(select_node *)t->link_node ); 526 542 } 527 543 // lock not held … … 534 550 } 535 551 536 static inline size_t on_wait( simple_owner_lock & this) with(this) {552 static inline size_t on_wait( simple_owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with(this) { 537 553 lock( lock __cfaabi_dbg_ctx2 ); 538 554 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 541 557 size_t ret = recursion_count; 542 558 543 // pop_and_set_new_owner( this ); 544 545 thread$ * t = &try_pop_front( blocked_threads ); 546 owner = t; 547 recursion_count = ( t ? 1 : 0 ); 548 unpark( t ); 549 559 pop_node( this ); 560 561 select_node node; 562 active_thread()->link_node = (void *)&node; 550 563 unlock( lock ); 564 565 pre_park_then_park( pp_fn, pp_datum ); 566 551 567 return ret; 552 568 } 553 569 554 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 570 static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 571 572 // waituntil() support 573 static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) { 574 lock( lock __cfaabi_dbg_ctx2 ); 575 576 // check if we can complete operation. If so race to establish winner in special OR case 577 if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) { 578 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 579 unlock( lock ); 580 return false; 581 } 582 } 583 584 if ( owner == active_thread() ) { 585 recursion_count++; 586 if ( node.park_counter ) __make_select_node_available( node ); 587 unlock( lock ); 588 return true; 589 } 590 591 if ( owner != 0p ) { 592 insert_last( blocked_threads, node ); 593 unlock( lock ); 594 return false; 595 } 596 597 owner = active_thread(); 598 recursion_count = 1; 599 600 if ( node.park_counter ) __make_select_node_available( node ); 601 unlock( lock ); 602 return true; 603 } 604 605 static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) { 606 lock( lock __cfaabi_dbg_ctx2 ); 607 if ( node`isListed ) { 608 remove( node ); 609 unlock( lock ); 610 return false; 611 } 612 613 if ( owner == active_thread() ) { 614 recursion_count--; 615 if ( recursion_count == 0 ) { 616 pop_node( this ); 617 } 618 } 619 unlock( lock ); 620 return false; 621 } 622 623 static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; } 624 555 625 556 626 //----------------------------------------------------------------------------- … … 578 648 579 649 // if this is called recursively IT WILL DEADLOCK! 580 static inline void lock( spin_queue_lock & this) with(this) {650 static inline void lock( spin_queue_lock & this ) with(this) { 581 651 mcs_spin_node node; 582 652 lock( lock, node ); … … 586 656 } 587 657 588 static inline void unlock( spin_queue_lock & this) with(this) {658 static inline void unlock( spin_queue_lock & this ) with(this) { 589 659 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 590 660 } 591 661 592 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { 593 unpark(t); 594 } 595 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 596 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { lock(this); } 597 662 DEFAULT_ON_NOTIFY( spin_queue_lock ) 663 DEFAULT_ON_WAIT( spin_queue_lock ) 664 DEFAULT_ON_WAKEUP_REACQ( spin_queue_lock ) 598 665 599 666 //----------------------------------------------------------------------------- … … 621 688 622 689 // if this is called recursively IT WILL DEADLOCK!!!!! 623 static inline void lock( mcs_block_spin_lock & this) with(this) {690 static inline void lock( mcs_block_spin_lock & this ) with(this) { 624 691 mcs_node node; 625 692 lock( lock, node ); … … 633 700 } 634 701 635 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 636 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 637 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {lock(this); } 702 DEFAULT_ON_NOTIFY( mcs_block_spin_lock ) 703 DEFAULT_ON_WAIT( mcs_block_spin_lock ) 704 DEFAULT_ON_WAKEUP_REACQ( mcs_block_spin_lock ) 638 705 639 706 //----------------------------------------------------------------------------- … … 661 728 662 729 // if this is called recursively IT WILL DEADLOCK!!!!! 663 static inline void lock( block_spin_lock & this) with(this) {730 static inline void lock( block_spin_lock & this ) with(this) { 664 731 lock( lock ); 665 732 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 668 735 } 669 736 670 static inline void unlock( block_spin_lock & this) with(this) {737 static inline void unlock( block_spin_lock & this ) with(this) { 671 738 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 672 739 } 673 740 674 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {741 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) { 675 742 // first we acquire internal fast_block_lock 676 743 lock( lock __cfaabi_dbg_ctx2 ); … … 686 753 unpark(t); 687 754 } 688 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 689 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {755 DEFAULT_ON_WAIT( block_spin_lock ) 756 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) { 690 757 // now we acquire the entire block_spin_lock upon waking up 691 758 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 693 760 unlock( lock ); // Now we release the internal fast_spin_lock 694 761 } 695 696 //-----------------------------------------------------------------------------697 // is_blocking_lock698 forall( L & | sized(L) )699 trait is_blocking_lock {700 // For synchronization locks to use when acquiring701 void on_notify( L &, struct thread$ * );702 703 // For synchronization locks to use when releasing704 size_t on_wait( L & );705 706 // to set recursion count after getting signalled;707 void on_wakeup( L &, size_t recursion );708 };709 762 710 763 //----------------------------------------------------------------------------- … … 714 767 forall(L & | is_blocking_lock(L)) { 715 768 struct info_thread; 716 717 // // for use by sequence718 // info_thread(L) *& Back( info_thread(L) * this );719 // info_thread(L) *& Next( info_thread(L) * this );720 769 } 721 770 -
libcfa/src/concurrency/mutex_stmt.hfa
r3a513d89 r044ae62 15 15 }; 16 16 17 18 17 struct __mutex_stmt_lock_guard { 19 18 void ** lockarr; … … 30 29 31 30 forall(L & | is_lock(L)) { 32 33 struct scoped_lock { 34 L * internal_lock; 35 }; 36 37 static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) { 38 this.internal_lock = &internal_lock; 39 lock(internal_lock); 40 } 41 42 static inline void ^?{}( scoped_lock(L) & this ) with(this) { 43 unlock(*internal_lock); 44 } 45 46 static inline void * __get_mutexstmt_lock_ptr( L & this ) { 47 return &this; 48 } 49 50 static inline L __get_mutexstmt_lock_type( L & this ); 51 52 static inline L __get_mutexstmt_lock_type( L * this ); 31 static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; } 32 static inline L __get_mutexstmt_lock_type( L & this ) {} 33 static inline L __get_mutexstmt_lock_type( L * this ) {} 53 34 } -
libcfa/src/concurrency/preemption.cfa
r3a513d89 r044ae62 117 117 __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn); 118 118 Duration period = node->period; 119 if( period == 0 ) {119 if( period == 0 ) { 120 120 node->set = false; // Node is one-shot, just mark it as not pending 121 121 } -
libcfa/src/concurrency/select.hfa
r3a513d89 r044ae62 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // channel.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2023 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include "containers/list.hfa" 4 #include <stdint.h>5 #include <kernel.hfa>6 #include <locks.hfa>20 #include "alarm.hfa" 21 #include "kernel.hfa" 22 #include "time.hfa" 7 23 24 struct select_node; 25 26 // node status 27 static const unsigned long int __SELECT_UNSAT = 0; 28 static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case 29 static const unsigned long int __SELECT_SAT = 2; 30 static const unsigned long int __SELECT_RUN = 3; 31 32 // these are used inside the compiler to aid in code generation 33 static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; } 34 static inline void __CFA_maybe_park( int * park_counter ) { 35 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 ) 36 park(); 37 } 38 39 // node used for coordinating waituntil synchronization 8 40 struct select_node { 41 int * park_counter; // If this is 0p then the node is in a special OR case waituntil 42 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil 43 44 void * extra; // used to store arbitrary data needed by some primitives 45 9 46 thread$ * blocked_thread; 10 void ** race_flag;11 47 inline dlink(select_node); 12 48 }; 13 49 P9_EMBEDDED( select_node, dlink(select_node) ) 14 50 15 void ?{}( select_node & this ) { 16 this.blocked_thread = 0p; 17 this.race_flag = 0p; 51 static inline void ?{}( select_node & this ) { 52 this.blocked_thread = active_thread(); 53 this.clause_status = 0p; 54 this.park_counter = 0p; 55 this.extra = 0p; 18 56 } 19 57 20 void ?{}( select_node & this, thread$ * blocked_thread ) {58 static inline void ?{}( select_node & this, thread$ * blocked_thread ) { 21 59 this.blocked_thread = blocked_thread; 22 this.race_flag = 0p; 60 this.clause_status = 0p; 61 this.park_counter = 0p; 62 this.extra = 0p; 23 63 } 24 64 25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag) {65 static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) { 26 66 this.blocked_thread = blocked_thread; 27 this.race_flag = race_flag; 67 this.clause_status = 0p; 68 this.park_counter = 0p; 69 this.extra = extra; 28 70 } 71 static inline void ^?{}( select_node & this ) {} 29 72 30 void ^?{}( select_node & this ) {} 73 // this is used inside the compiler to aid in code generation 74 static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; } 31 75 76 // this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race 77 static inline bool __select_node_else_race( select_node & this ) with( this ) { 78 unsigned long int cmp_status = __SELECT_UNSAT; 79 return *clause_status == 0 80 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); 81 } 32 82 33 83 //----------------------------------------------------------------------------- 34 84 // is_selectable 35 trait is_selectable(T & | sized(T)) { 36 // For registering a select on a selectable concurrency primitive 37 // return 0p if primitive not accessible yet 38 // return 1p if primitive gets acquired 39 // return 2p if primitive is accessible but some other primitive won the race 40 // C_TODO: add enum for return values 41 void * register_select( T &, select_node & ); 85 forall(T & | sized(T)) 86 trait is_selectable { 87 // For registering a select stmt on a selectable concurrency primitive 88 // Returns bool that indicates if operation is already SAT 89 bool register_select( T &, select_node & ); 42 90 43 void unregister_select( T &, select_node & ); 91 // For unregistering a select stmt on a selectable concurrency primitive 92 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN) 93 bool unregister_select( T &, select_node & ); 94 95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 96 // passed as an arg to this routine 97 // If on_selected returns false, the statement is not run, if it returns true it is run. 98 bool on_selected( T &, select_node & ); 44 99 }; 45 100 46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) { 47 // temporary needed for atomic instruction 48 void * cmp_flag = 0p; 49 50 // if we dont win the selector race we need to potentially 51 // ignore this node and move to the next one so we return accordingly 52 if ( *race_flag != 0p || 53 !__atomic_compare_exchange_n( 54 race_flag, 55 &cmp_flag, 56 primitive_ptr, 57 false, 58 __ATOMIC_SEQ_CST, 59 __ATOMIC_SEQ_CST 60 ) 61 ) return false; // lost race and some other node triggered select 62 return true; // won race so this node is what the select proceeds with 101 //============================================================================================= 102 // Waituntil Helpers 103 //============================================================================================= 104 105 // used for the 2-stage avail needed by the special OR case 106 static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) { 107 /* paranoid */ verify( park_counter == 0p ); 108 /* paranoid */ verify( clause_status != 0p ); 109 110 unsigned long int cmp_status = __SELECT_UNSAT; 111 while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 112 if ( cmp_status != __SELECT_PENDING ) return false; 113 cmp_status = __SELECT_UNSAT; 114 } 115 return true; 63 116 } 117 118 static inline void __make_select_node_unsat( select_node & this ) with( this ) { 119 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST ); 120 } 121 static inline void __make_select_node_sat( select_node & this ) with( this ) { 122 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST ); 123 } 124 125 static inline bool __make_select_node_pending( select_node & this ) with( this ) { 126 return __mark_select_node( this, __SELECT_PENDING ); 127 } 128 129 // when a primitive becomes available it calls the following routine on it's node to update the select state: 130 // return true if we want to unpark the thd 131 static inline bool __make_select_node_available( select_node & this ) with( this ) { 132 /* paranoid */ verify( clause_status != 0p ); 133 if( !park_counter ) 134 return __mark_select_node( this, (unsigned long int)&this ); 135 136 unsigned long int cmp_status = __SELECT_UNSAT; 137 138 return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case 139 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write 140 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST); 141 } 142 143 // Handles the special OR case of the waituntil statement 144 // Since only one select node can win in the OR case, we need to race to set the node available BEFORE 145 // performing the operation since if we lose the race the operation should not be performed as it will be lost 146 // Returns true if execution can continue normally and false if the queue has now been drained 147 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 148 if ( queue`isEmpty ) return false; 149 if ( queue`first.clause_status && !queue`first.park_counter ) { 150 while ( !queue`isEmpty ) { 151 // if node not a special OR case or if we win the special OR case race break 152 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) 153 return true; 154 // otherwise we lost the special OR race so discard node 155 try_pop_front( queue ); 156 } 157 return false; 158 } 159 return true; 160 } 161 162 // wake one thread from the list 163 static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) { 164 if ( !popped.clause_status // normal case, node is not a select node 165 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available 166 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread 167 unpark( popped.blocked_thread ); 168 } 169 170 static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); } 171 172 static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) { 173 this.blocked_thread = active_thread(); 174 this.clause_status = clause_status; 175 this.park_counter = park_counter; 176 } 177 178 // waituntil ( timeout( ... ) ) support 179 struct select_timeout_node { 180 alarm_node_t a_node; 181 select_node * s_node; 182 }; 183 void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback ); 184 void ^?{}( select_timeout_node & this ); 185 void timeout_handler_select_cast( alarm_node_t & node ); 186 187 // Selectable trait routines 188 bool register_select( select_timeout_node & this, select_node & node ); 189 bool unregister_select( select_timeout_node & this, select_node & node ); 190 bool on_selected( select_timeout_node & this, select_node & node ); 191 192 // Gateway routines to waituntil on duration 193 select_timeout_node timeout( Duration duration ); 194 select_timeout_node sleep( Duration duration ); -
libcfa/src/concurrency/thread.cfa
r3a513d89 r044ae62 53 53 preferred = ready_queue_new_preferred(); 54 54 last_proc = 0p; 55 link_node = 0p; 55 56 PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() ); 56 57 #if defined( __CFA_WITH_VERIFY__ ) … … 59 60 #endif 60 61 61 clh_node = malloc( );62 *clh_node = false;63 64 62 doregister(curr_cluster, this); 65 63 monitors{ &self_mon_p, 1, (fptr_t)0 }; … … 70 68 canary = 0xDEADDEADDEADDEADp; 71 69 #endif 72 free(clh_node);73 70 unregister(curr_cluster, this); 74 71 ^self_cor{};
Note:
See TracChangeset
for help on using the changeset viewer.