Changeset 81ab5eb for libcfa/src/concurrency
- Timestamp:
- Mar 29, 2026, 9:52:51 PM (9 days ago)
- Branches:
- master
- Children:
- e6e250d
- Parents:
- 00675ed4
- Location:
- libcfa/src/concurrency
- Files:
-
- 6 edited
-
channel.hfa (modified) (15 diffs)
-
cofor.hfa (modified) (1 diff)
-
future.hfa (modified) (3 diffs)
-
io.cfa (modified) (4 diffs)
-
kernel.cfa (modified) (2 diffs)
-
select.hfa (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r00675ed4 r81ab5eb 117 117 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100); 118 118 #endif 119 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || isEmpty( cons ) && isEmpty( prods ),119 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || empty( cons ) && empty( prods ), 120 120 "Attempted to delete channel with waiting threads (Deadlock).\n" ); 121 121 if ( size != 0 ) delete( buffer ); … … 123 123 size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 124 124 size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 125 bool has_waiters( channel(T) & chan ) with(chan) { return ! isEmpty( cons ) || ! isEmpty( prods ); }126 bool has_waiting_consumers( channel(T) & chan ) with(chan) { return ! isEmpty( cons ); }127 bool has_waiting_producers( channel(T) & chan ) with(chan) { return ! isEmpty( prods ); }125 bool has_waiters( channel(T) & chan ) with(chan) { return ! empty( cons ) || ! empty( prods ); } 126 bool has_waiting_consumers( channel(T) & chan ) with(chan) { return ! empty( cons ); } 127 bool has_waiting_producers( channel(T) & chan ) with(chan) { return ! empty( prods ); } 128 128 129 129 // closes the channel and notifies all blocked threads … … 164 164 void flush( channel(T) & chan, T elem ) with(chan) { 165 165 lock( mutex_lock ); 166 while ( count == 0 && ! isEmpty( cons ) ) {166 while ( count == 0 && ! empty( cons ) ) { 167 167 __cons_handoff( chan, elem ); 168 168 } … … 186 186 187 187 ConsEmpty: 188 if ( ! isEmpty( cons ) ) {188 if ( ! empty( cons ) ) { 189 189 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty; 190 190 __cons_handoff( chan, elem ); … … 233 233 // buffer count must be zero if cons are blocked (also handles zero-size case) 234 234 ConsEmpty: 235 if ( ! isEmpty( cons ) ) {235 if ( ! empty( cons ) ) { 236 236 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty; 237 237 __cons_handoff( chan, elem ); … … 261 261 count -= 1; 262 262 front = (front + 1) % size; 263 if (count == size - 1 && ! isEmpty( prods ) ) {263 if (count == size - 1 && ! empty( prods ) ) { 264 264 if ( ! __handle_waituntil_OR( prods ) ) return; 265 265 __buf_insert( chan, *(T *)first( prods ).extra ); // do waiting producer work … … 276 276 277 277 ZeroSize: 278 if ( size == 0 && ! isEmpty( prods ) ) {278 if ( size == 0 && ! empty( prods ) ) { 279 279 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize; 280 280 __prods_handoff( chan, retval ); … … 332 332 // have to check for the zero size channel case 333 333 ZeroSize: 334 if ( size == 0 && ! isEmpty( prods ) ) {334 if ( size == 0 && ! empty( prods ) ) { 335 335 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize; 336 336 __prods_handoff( chan, retval ); … … 384 384 // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case 385 385 bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) { 386 while ( ! isEmpty( queue ) ) {386 while ( ! empty( queue ) ) { 387 387 // if node not a special OR case or if we win the special OR case race break 388 388 if ( ! first( queue ).clause_status || first( queue ).park_counter || __pending_set_other( first( queue ), mine, ((unsigned long int)(&(first( queue )))) ) ) … … 421 421 if ( ! node.park_counter ) { 422 422 // are we special case OR and front of cons is also special case OR 423 if ( ! unlikely(closed) && ! isEmpty( prods ) && first( prods ).clause_status && ! first( prods ).park_counter ) {423 if ( ! unlikely(closed) && ! empty( prods ) && first( prods ).clause_status && ! first( prods ).park_counter ) { 424 424 if ( ! __make_select_node_pending( node ) ) { 425 425 unlock( mutex_lock ); … … 437 437 } 438 438 // check if we can complete operation. If so race to establish winner in special OR case 439 if ( count != 0 || ! isEmpty( prods ) || unlikely(closed) ) {439 if ( count != 0 || ! empty( prods ) || unlikely(closed) ) { 440 440 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering 441 441 unlock( mutex_lock ); … … 453 453 // have to check for the zero size channel case 454 454 ZeroSize: 455 if ( size == 0 && ! isEmpty( prods ) ) {455 if ( size == 0 && ! empty( prods ) ) { 456 456 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize; 457 457 __prods_handoff( *chan, *ret ); … … 522 522 if ( ! node.park_counter ) { 523 523 // are we special case OR and front of cons is also special case OR 524 if ( ! unlikely(closed) && ! isEmpty( cons ) && first( cons ).clause_status && ! first( cons ).park_counter ) {524 if ( ! unlikely(closed) && ! empty( cons ) && first( cons ).clause_status && ! first( cons ).park_counter ) { 525 525 if ( ! __make_select_node_pending( node ) ) { 526 526 unlock( mutex_lock ); … … 537 537 } 538 538 // check if we can complete operation. If so race to establish winner in special OR case 539 if ( count != size || ! isEmpty( cons ) || unlikely(closed) ) {539 if ( count != size || ! empty( cons ) || unlikely(closed) ) { 540 540 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering 541 541 unlock( mutex_lock ); … … 554 554 // handle blocked consumer case via handoff (buffer is implicitly empty) 555 555 ConsEmpty: 556 if ( ! isEmpty( cons ) ) {556 if ( ! empty( cons ) ) { 557 557 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty; 558 558 __cons_handoff( *chan, elem ); -
libcfa/src/concurrency/cofor.hfa
r00675ed4 r81ab5eb 33 33 34 34 void main( cofor_runner & this ) with(this) { 35 while ( ! done || ! isEmpty( items ) ) {35 while ( ! done || ! empty( items ) ) { 36 36 lock( mutex_lock ); 37 37 runner_node * node = &remove_first( items ); -
libcfa/src/concurrency/future.hfa
r00675ed4 r81ab5eb 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Mon Nov 24 16:08:52 202513 // Update Count : 22 212 // Last Modified On : Sun Mar 29 21:13:04 2026 13 // Update Count : 223 14 14 // 15 15 … … 173 173 174 174 bool fulfil$( future(T) & fut ) with( fut ) { // helper 175 bool ret_val = ! isEmpty( waiters );175 bool ret_val = ! empty( waiters ); 176 176 state = FUTURE_FULFILLED$; 177 while ( ! isEmpty( waiters ) ) {177 while ( ! empty( waiters ) ) { 178 178 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 179 179 break; // if handle_OR returns false then waiters is empty so break … … 211 211 void reset( future(T) & fut ) with( fut ) { // mark future as empty (for reuse) 212 212 lock( lock ); 213 if ( ! isEmpty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" );213 if ( ! empty( waiters ) ) abort( "Attempting to reset a future with blocked waiters" ); 214 214 state = FUTURE_EMPTY$; 215 215 free( except ); -
libcfa/src/concurrency/io.cfa
r00675ed4 r81ab5eb 594 594 lock( queue.lock __cfaabi_dbg_ctx2 ); 595 595 { 596 was_empty = isEmpty( queue.queue );596 was_empty = empty( queue.queue ); 597 597 598 598 // Add our request to the list … … 632 632 // notify the arbiter that new allocations are available 633 633 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 634 /* paranoid */ verify( ! isEmpty( this.pending.queue ) );634 /* paranoid */ verify( ! empty( this.pending.queue ) ); 635 635 /* paranoid */ verify( __preemption_enabled() ); 636 636 … … 642 642 // as long as there are pending allocations try to satisfy them 643 643 // for simplicity do it in FIFO order 644 while( ! isEmpty( this.pending.queue ) ) {644 while( ! empty( this.pending.queue ) ) { 645 645 // get first pending allocs 646 646 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; … … 727 727 // pop each operation one at a time. 728 728 // There is no wait morphing because of the io sq ring 729 while( ! isEmpty( ctx.ext_sq.queue ) ) {729 while( ! empty( ctx.ext_sq.queue ) ) { 730 730 // drop the element from the queue 731 731 __external_io & ei = (__external_io&)remove_first( ctx.ext_sq.queue ); -
libcfa/src/concurrency/kernel.cfa
r00675ed4 r81ab5eb 10 10 // Created On : Tue Jan 17 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Fri Apr 25 07:02:42 202513 // Update Count : 8 212 // Last Modified On : Sun Mar 29 21:20:54 2026 13 // Update Count : 83 14 14 // 15 15 … … 785 785 // update the pointer to the head wait context 786 786 struct __fd_waitctx * wctx = 0; 787 if ( ! isEmpty( this.idles )) wctx = &first( this. idles ).idle_wctx;787 if ( ! empty( this.idles )) wctx = &first( this. idles ).idle_wctx; 788 788 __atomic_store_n(&this.fdw, wctx, __ATOMIC_SEQ_CST); 789 789 } -
libcfa/src/concurrency/select.hfa
r00675ed4 r81ab5eb 11 11 // Created On : Thu Jan 21 19:46:50 2023 12 12 // Last Modified By : Peter A. Buhr 13 // Last Modified On : Sun Nov 23 22:38:36 202514 // Update Count : 813 // Last Modified On : Sun Mar 29 21:12:52 2026 14 // Update Count : 9 15 15 // 16 16 … … 169 169 // Returns true if execution can continue normally and false if the queue has now been drained 170 170 static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) { 171 if ( isEmpty( queue ) ) return false;171 if ( empty( queue ) ) return false; 172 172 if ( first( queue ).clause_status && ! first( queue ).park_counter ) { 173 while ( ! isEmpty( queue ) ) {173 while ( ! empty( queue ) ) { 174 174 // if node not a special OR case or if we win the special OR case race break 175 175 if ( ! first( queue ).clause_status || first( queue ).park_counter || __make_select_node_available( first( queue ) ) )
Note:
See TracChangeset
for help on using the changeset viewer.