- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.cfa
rb93bf85 r6b33e89 79 79 // lock is held by some other thread 80 80 if ( owner != 0p && owner != thrd ) { 81 81 select_node node; 82 82 insert_last( blocked_threads, node ); 83 83 wait_count++; 84 84 unlock( lock ); 85 85 park( ); 86 86 return; 87 87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 88 88 recursion_count++; … … 91 91 recursion_count = 1; 92 92 } 93 93 unlock( lock ); 94 94 } 95 95 … … 115 115 116 116 static inline void pop_node( blocking_lock & this ) with( this ) { 117 118 select_node * node = &try_pop_front( blocked_threads );119 120 121 122 123 124 125 126 127 128 117 __handle_waituntil_OR( blocked_threads ); 118 select_node * node = &remove_first( blocked_threads ); 119 if ( node ) { 120 wait_count--; 121 owner = node->blocked_thread; 122 recursion_count = 1; 123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 124 wake_one( blocked_threads, *node ); 125 } else { 126 owner = 0p; 127 recursion_count = 0; 128 } 129 129 } 130 130 … … 160 160 unpark( t ); 161 161 } 162 162 unlock( lock ); 163 163 } 164 164 … … 172 172 pop_node( this ); 173 173 174 175 176 unlock( lock ); 177 178 174 select_node node; 175 active_thread()->link_node = (void *)&node; 176 unlock( lock ); 177 178 pre_park_then_park( pp_fn, pp_datum ); 179 179 180 180 return ret; … … 187 187 // waituntil() support 188 188 bool register_select( blocking_lock & this, select_node & node ) with(this) { 189 189 lock( lock __cfaabi_dbg_ctx2 ); 190 190 thread$ * thrd = active_thread(); 191 191 … … 193 193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); 194 194 195 196 197 198 199 200 195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case 196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 197 unlock( lock ); 198 return false; 199 } 200 } 201 201 202 202 // lock is held by some other thread … … 205 205 wait_count++; 206 206 unlock( lock ); 207 207 return false; 208 208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 209 209 recursion_count++; … … 213 213 } 214 214 215 216 217 215 if ( node.park_counter ) __make_select_node_available( node ); 216 unlock( lock ); 217 return true; 218 218 } 219 219 220 220 bool unregister_select( blocking_lock & this, select_node & node ) with(this) { 221 222 if ( node`isListed) {223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 unlock( lock ); 238 221 lock( lock __cfaabi_dbg_ctx2 ); 222 if ( isListed( node ) ) { 223 remove( node ); 224 wait_count--; 225 unlock( lock ); 226 return false; 227 } 228 229 if ( owner == active_thread() ) { 230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); 231 // if recursion count is zero release lock and set new owner if one is waiting 232 recursion_count--; 233 if ( recursion_count == 0 ) { 234 pop_node( this ); 235 } 236 } 237 unlock( lock ); 238 return false; 239 239 } 240 240 … … 265 265 // may still be called after a thread has been removed from the queue but 266 266 // before the alarm is unregistered 267 if ( (*info_thd)`isListed ) {// is thread on queue267 if ( isListed( *info_thd ) ) { // is thread on queue 268 268 info_thd->signalled = false; 269 269 // remove this thread O(1) 270 270 remove( *info_thd ); 271 271 cond->count--; 272 if ( info_thd->lock ) {272 if ( info_thd->lock ) { 273 273 // call lock's on_notify if a lock was passed 274 274 on_notify(*info_thd->lock, info_thd->t); … … 304 304 // may still be called after a thread has been removed from the queue but 305 305 // before the alarm is unregistered 306 if ( (*info_thd)`isListed ) {// is thread on queue306 if ( isListed( *info_thd ) ) { // is thread on queue 307 307 info_thd->signalled = false; 308 308 // remove this thread O(1) … … 332 332 333 333 static void process_popped( condition_variable(L) & this, info_thread(L) & popped ) with( this ) { 334 if (&popped != 0p) {334 if (&popped != 0p) { 335 335 popped.signalled = true; 336 336 count--; … … 347 347 bool notify_one( condition_variable(L) & this ) with( this ) { 348 348 lock( lock __cfaabi_dbg_ctx2 ); 349 bool ret = ! blocked_threads`isEmpty;350 process_popped(this, try_pop_front( blocked_threads ));349 bool ret = ! isEmpty( blocked_threads ); 350 process_popped(this, remove_first( blocked_threads )); 351 351 unlock( lock ); 352 352 return ret; … … 355 355 bool notify_all( condition_variable(L) & this ) with(this) { 356 356 lock( lock __cfaabi_dbg_ctx2 ); 357 bool ret = ! blocked_threads`isEmpty;358 while( ! blocked_threads`isEmpty) {359 process_popped(this, try_pop_front( blocked_threads ));357 bool ret = ! isEmpty( blocked_threads ); 358 while( ! isEmpty( blocked_threads ) ) { 359 process_popped(this, remove_first( blocked_threads )); 360 360 } 361 361 unlock( lock ); … … 364 364 365 365 uintptr_t front( condition_variable(L) & this ) with(this) { 366 return blocked_threads`isEmpty ? NULL : blocked_threads`first.info;366 return isEmpty( blocked_threads ) ? NULL : first( blocked_threads ).info; 367 367 } 368 368 369 369 bool empty( condition_variable(L) & this ) with(this) { 370 370 lock( lock __cfaabi_dbg_ctx2 ); 371 bool ret = blocked_threads`isEmpty;371 bool ret = isEmpty( blocked_threads ); 372 372 unlock( lock ); 373 373 return ret; … … 382 382 } 383 383 384 385 384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) { 385 size_t recursion_count = 0; 386 386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread 387 387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks 388 388 else 389 390 391 392 389 pre_park_then_park( pp_fn, pp_datum ); 390 return recursion_count; 391 } 392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); } 393 393 394 394 // helper for wait()'s' with no timeout 395 395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { 396 396 lock( lock __cfaabi_dbg_ctx2 ); 397 397 enqueue_thread( this, &i ); 398 398 unlock( lock ); 399 399 400 400 // blocks here 401 401 size_t recursion_count = block_and_get_recursion( i ); 402 402 403 403 // resets recursion count here after waking … … 409 409 queue_info_thread( this, i ); 410 410 411 411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); } 412 412 413 413 // helper for wait()'s' with a timeout 414 414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 415 415 lock( lock __cfaabi_dbg_ctx2 ); 416 416 enqueue_thread( this, &info ); 417 417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 418 418 unlock( lock ); 419 419 420 420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 421 421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 422 422 // park(); 423 423 … … 434 434 return i.signalled; 435 435 436 void wait( condition_variable(L) & this ) with(this) { WAIT( 0, 0p) }437 void wait( condition_variable(L) & this, uintptr_t info 438 void wait( condition_variable(L) & this, L & l ) with(this) { WAIT( 0, &l) }436 void wait( condition_variable(L) & this ) with(this) { WAIT( 0, 0p ) } 437 void wait( condition_variable(L) & this, uintptr_t info ) with(this) { WAIT( info, 0p ) } 438 void wait( condition_variable(L) & this, L & l ) with(this) { WAIT( 0, &l ) } 439 439 void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { WAIT( info, &l ) } 440 440 441 bool wait( condition_variable(L) & this, Duration duration ) with(this) { WAIT_TIME( 0, 0p , duration ) }442 bool wait( condition_variable(L) & this, uintptr_t info, Duration duration 443 bool wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { WAIT_TIME( 0, &l , duration ) }441 bool wait( condition_variable(L) & this, Duration duration ) with(this) { WAIT_TIME( 0 , 0p , duration ) } 442 bool wait( condition_variable(L) & this, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, 0p , duration ) } 443 bool wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { WAIT_TIME( 0 , &l , duration ) } 444 444 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, &l , duration ) } 445 445 446 446 //----------------------------------------------------------------------------- 447 447 // fast_cond_var 448 void 448 void ?{}( fast_cond_var(L) & this ){ 449 449 this.blocked_threads{}; 450 450 #ifdef __CFA_DEBUG__ … … 455 455 456 456 bool notify_one( fast_cond_var(L) & this ) with(this) { 457 bool ret = ! blocked_threads`isEmpty;457 bool ret = ! isEmpty( blocked_threads ); 458 458 if ( ret ) { 459 info_thread(L) & popped = try_pop_front( blocked_threads );459 info_thread(L) & popped = remove_first( blocked_threads ); 460 460 on_notify(*popped.lock, popped.t); 461 461 } … … 463 463 } 464 464 bool notify_all( fast_cond_var(L) & this ) with(this) { 465 bool ret = ! blocked_threads`isEmpty;466 while( ! blocked_threads`isEmpty) {467 info_thread(L) & popped = try_pop_front( blocked_threads );465 bool ret = ! isEmpty( blocked_threads ); 466 while( ! isEmpty( blocked_threads ) ) { 467 info_thread(L) & popped = remove_first( blocked_threads ); 468 468 on_notify(*popped.lock, popped.t); 469 469 } … … 471 471 } 472 472 473 uintptr_t front( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }474 bool empty ( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }473 uintptr_t front( fast_cond_var(L) & this ) with(this) { return isEmpty( blocked_threads ) ? NULL : first( blocked_threads ).info; } 474 bool empty ( fast_cond_var(L) & this ) with(this) { return isEmpty( blocked_threads ); } 475 475 476 476 void wait( fast_cond_var(L) & this, L & l ) { … … 494 494 // pthread_cond_var 495 495 496 void 496 void ?{}( pthread_cond_var(L) & this ) with(this) { 497 497 blocked_threads{}; 498 498 lock{}; … … 503 503 bool notify_one( pthread_cond_var(L) & this ) with(this) { 504 504 lock( lock __cfaabi_dbg_ctx2 ); 505 bool ret = ! blocked_threads`isEmpty;505 bool ret = ! isEmpty( blocked_threads ); 506 506 if ( ret ) { 507 info_thread(L) & popped = try_pop_front( blocked_threads );507 info_thread(L) & popped = remove_first( blocked_threads ); 508 508 popped.signalled = true; 509 509 on_notify(*popped.lock, popped.t); … … 515 515 bool notify_all( pthread_cond_var(L) & this ) with(this) { 516 516 lock( lock __cfaabi_dbg_ctx2 ); 517 bool ret = ! blocked_threads`isEmpty;518 while( ! blocked_threads`isEmpty) {519 info_thread(L) & popped = try_pop_front( blocked_threads );517 bool ret = ! isEmpty( blocked_threads ); 518 while( ! isEmpty( blocked_threads ) ) { 519 info_thread(L) & popped = remove_first( blocked_threads ); 520 520 popped.signalled = true; 521 521 on_notify(*popped.lock, popped.t); … … 525 525 } 526 526 527 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }527 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return isEmpty( blocked_threads ) ? NULL : first( blocked_threads ).info; } 528 bool empty ( pthread_cond_var(L) & this ) with(this) { return isEmpty( blocked_threads ); } 529 529 530 530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 531 531 lock( lock __cfaabi_dbg_ctx2 ); 532 532 insert_last( blocked_threads, info ); 533 533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 534 534 unlock( lock ); 535 535 536 536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 537 537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 538 538 539 539 // unregisters alarm so it doesn't go off if signal happens first … … 551 551 lock( lock __cfaabi_dbg_ctx2 ); 552 552 info_thread( L ) i = { active_thread(), info, &l }; 553 554 unlock( lock ); 555 556 553 insert_last( blocked_threads, i ); 554 unlock( lock ); 555 556 // blocks here 557 557 size_t recursion_count = block_and_get_recursion( i ); 558 558 … … 579 579 } 580 580 581 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t 581 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) { 582 582 PTHREAD_WAIT_TIME( info, &l , getDuration( t ) ) 583 583 } … … 585 585 //----------------------------------------------------------------------------- 586 586 // Semaphore 587 void 587 void ?{}( semaphore & this, int count = 1 ) { 588 588 (this.lock){}; 589 589 this.count = count; … … 603 603 park(); 604 604 return true; 605 } 606 else { 607 unlock( lock ); 608 return false; 605 } else { 606 unlock( lock ); 607 return false; 609 608 } 610 609 } … … 622 621 623 622 // make new owner 624 if ( doUnpark ) unpark( thrd );623 if ( doUnpark ) unpark( thrd ); 625 624 626 625 return thrd;
Note:
See TracChangeset
for help on using the changeset viewer.