Changeset 24d6572 for libcfa/src/concurrency/locks.cfa
- Timestamp:
- Jun 12, 2023, 2:45:32 PM (2 years ago)
- Branches:
- ast-experimental, master
- Children:
- 62d62db
- Parents:
- 34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.cfa
r34b4268 r24d6572 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // locks. hfa -- LIBCFATHREAD7 // locks.cfa -- LIBCFATHREAD 8 8 // Runtime locks that used with the runtime thread system. 9 9 // … … 16 16 17 17 #define __cforall_thread__ 18 #define _GNU_SOURCE19 18 20 19 #include "locks.hfa" … … 80 79 // lock is held by some other thread 81 80 if ( owner != 0p && owner != thrd ) { 82 insert_last( blocked_threads, *thrd ); 81 select_node node; 82 insert_last( blocked_threads, node ); 83 83 wait_count++; 84 84 unlock( lock ); 85 85 park( ); 86 } 87 // multi acquisition lock is held by current thread 88 else if ( owner == thrd && multi_acquisition ) { 86 return; 87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 89 88 recursion_count++; 90 unlock( lock ); 91 } 92 // lock isn't held 93 else { 89 } else { // lock isn't held 94 90 owner = thrd; 95 91 recursion_count = 1; 96 unlock( lock );97 } 92 } 93 unlock( lock ); 98 94 } 99 95 … … 118 114 } 119 115 120 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) { 121 thread$ * t = &try_pop_front( blocked_threads ); 122 owner = t; 123 recursion_count = ( t ? 1 : 0 ); 124 if ( t ) wait_count--; 125 unpark( t ); 116 static inline void pop_node( blocking_lock & this ) with( this ) { 117 __handle_waituntil_OR( blocked_threads ); 118 select_node * node = &try_pop_front( blocked_threads ); 119 if ( node ) { 120 wait_count--; 121 owner = node->blocked_thread; 122 recursion_count = 1; 123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 124 wake_one( blocked_threads, *node ); 125 } else { 126 owner = 0p; 127 recursion_count = 0; 128 } 126 129 } 127 130 … … 135 138 recursion_count--; 136 139 if ( recursion_count == 0 ) { 137 pop_ and_set_new_owner( this );140 pop_node( this ); 138 141 } 139 142 unlock( lock ); … … 148 151 // lock held 149 152 if ( owner != 0p ) { 150 insert_last( blocked_threads, * t);153 insert_last( blocked_threads, *(select_node *)t->link_node ); 151 154 wait_count++; 152 unlock( lock );153 155 } 154 156 // lock not held … … 157 159 recursion_count = 1; 158 160 unpark( t ); 159 unlock( lock );160 } 161 } 162 163 size_t on_wait( blocking_lock & this ) with( this ) {161 } 162 unlock( lock ); 163 } 164 165 size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) { 164 166 lock( lock __cfaabi_dbg_ctx2 ); 165 167 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 168 170 size_t ret = recursion_count; 169 171 170 pop_and_set_new_owner( this ); 172 pop_node( this ); 173 174 select_node node; 175 active_thread()->link_node = (void *)&node; 171 176 unlock( lock ); 177 178 pre_park_then_park( pp_fn, pp_datum ); 179 172 180 return ret; 173 181 } … … 176 184 recursion_count = recursion; 177 185 } 186 187 // waituntil() support 188 bool register_select( blocking_lock & this, select_node & node ) with(this) { 189 lock( lock __cfaabi_dbg_ctx2 ); 190 thread$ * thrd = active_thread(); 191 192 // single acquisition lock is held by current thread 193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); 194 195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case 196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 197 unlock( lock ); 198 return false; 199 } 200 } 201 202 // lock is held by some other thread 203 if ( owner != 0p && owner != thrd ) { 204 insert_last( blocked_threads, node ); 205 wait_count++; 206 unlock( lock ); 207 return false; 208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 209 recursion_count++; 210 } else { // lock isn't held 211 owner = thrd; 212 recursion_count = 1; 213 } 214 215 if ( node.park_counter ) __make_select_node_available( node ); 216 unlock( lock ); 217 return true; 218 } 219 220 bool unregister_select( blocking_lock & this, select_node & node ) with(this) { 221 lock( lock __cfaabi_dbg_ctx2 ); 222 if ( node`isListed ) { 223 remove( node ); 224 wait_count--; 225 unlock( lock ); 226 return false; 227 } 228 229 if ( owner == active_thread() ) { 230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); 231 // if recursion count is zero release lock and set new owner if one is waiting 232 recursion_count--; 233 if ( recursion_count == 0 ) { 234 pop_node( this ); 235 } 236 } 237 unlock( lock ); 238 return false; 239 } 240 241 void on_selected( blocking_lock & this, select_node & node ) {} 178 242 179 243 //----------------------------------------------------------------------------- … … 312 376 int counter( condition_variable(L) & this ) with(this) { return count; } 313 377 314 static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {378 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) { 315 379 // add info_thread to waiting queue 316 380 insert_last( blocked_threads, *i ); 317 381 count++; 318 size_t recursion_count = 0; 319 if (i->lock) { 320 // if lock was passed get recursion count to reset to after waking thread 321 recursion_count = on_wait( *i->lock ); 322 } 323 return recursion_count; 324 } 382 } 383 384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) { 385 size_t recursion_count = 0; 386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread 387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks 388 else 389 pre_park_then_park( pp_fn, pp_datum ); 390 return recursion_count; 391 } 392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); } 325 393 326 394 // helper for wait()'s' with no timeout 327 395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { 328 396 lock( lock __cfaabi_dbg_ctx2 ); 329 size_t recursion_count = queue_and_get_recursion(this, &i);397 enqueue_thread( this, &i ); 330 398 unlock( lock ); 331 399 332 400 // blocks here 333 park();401 size_t recursion_count = block_and_get_recursion( i ); 334 402 335 403 // resets recursion count here after waking 336 if ( i.lock) on_wakeup(*i.lock, recursion_count);404 if ( i.lock ) on_wakeup( *i.lock, recursion_count ); 337 405 } 338 406 … … 341 409 queue_info_thread( this, i ); 342 410 411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); } 412 343 413 // helper for wait()'s' with a timeout 344 414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 345 415 lock( lock __cfaabi_dbg_ctx2 ); 346 size_t recursion_count = queue_and_get_recursion(this, &info);416 enqueue_thread( this, &info ); 347 417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 348 418 unlock( lock ); 349 419 350 // registers alarm outside cond lock to avoid deadlock 351 register_self( &node_wrap.alarm_node ); 352 353 // blocks here 354 park(); 420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 422 // park(); 355 423 356 424 // unregisters alarm so it doesn't go off if this happens first … … 358 426 359 427 // resets recursion count here after waking 360 if ( info.lock) on_wakeup(*info.lock, recursion_count);428 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 361 429 } 362 430 … … 418 486 info_thread( L ) i = { active_thread(), info, &l }; 419 487 insert_last( blocked_threads, i ); 420 size_t recursion_count = on_wait( *i.lock );421 park( );488 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here 489 // park( ); 422 490 on_wakeup(*i.lock, recursion_count); 423 491 } … … 460 528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 461 529 462 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {463 // add info_thread to waiting queue464 insert_last( blocked_threads, *i );465 size_t recursion_count = 0;466 recursion_count = on_wait( *i->lock );467 return recursion_count;468 }469 470 530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 471 531 lock( lock __cfaabi_dbg_ctx2 ); 472 size_t recursion_count = queue_and_get_recursion(this, &info);532 insert_last( blocked_threads, info ); 473 533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 474 534 unlock( lock ); 475 535 476 // registers alarm outside cond lock to avoid deadlock 477 register_self( &node_wrap.alarm_node ); 478 479 // blocks here 480 park(); 481 482 // unregisters alarm so it doesn't go off if this happens first 536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 538 539 // unregisters alarm so it doesn't go off if signal happens first 483 540 unregister_self( &node_wrap.alarm_node ); 484 541 485 542 // resets recursion count here after waking 486 if ( info.lock) on_wakeup(*info.lock, recursion_count);543 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 487 544 } 488 545 … … 494 551 lock( lock __cfaabi_dbg_ctx2 ); 495 552 info_thread( L ) i = { active_thread(), info, &l }; 496 size_t recursion_count = queue_and_get_recursion(this, &i); 497 unlock( lock ); 498 park( ); 499 on_wakeup(*i.lock, recursion_count); 553 insert_last( blocked_threads, i ); 554 unlock( lock ); 555 556 // blocks here 557 size_t recursion_count = block_and_get_recursion( i ); 558 559 on_wakeup( *i.lock, recursion_count ); 500 560 } 501 561 … … 585 645 return thrd != 0p; 586 646 } 647
Note:
See TracChangeset
for help on using the changeset viewer.