Changeset 6b33e89 for libcfa/src/concurrency/channel.hfa
- Timestamp:
- Apr 25, 2025, 7:39:09 AM (5 months ago)
- Branches:
- master
- Children:
- 65bd3c2
- Parents:
- b195498
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
rb195498 r6b33e89 57 57 58 58 forall( T ) { 59 60 struct __attribute__((aligned(128))) channel { 61 size_t size, front, back, count; 62 T * buffer; 63 dlist( select_node ) prods, cons; // lists of blocked threads 64 go_mutex mutex_lock; // MX lock 65 bool closed; // indicates channel close/open 66 #ifdef CHAN_STATS 67 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd 68 #endif 69 }; 70 static inline void ?{}( channel(T) & this, channel(T) this2 ) = void; 71 static inline void ?=?( channel(T) & this, channel(T) this2 ) = void; 72 73 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 74 size = _size; 75 front = back = count = 0; 76 if ( size != 0 ) buffer = aalloc( size ); 77 prods{}; 78 cons{}; 79 mutex_lock{}; 80 closed = false; 81 #ifdef CHAN_STATS 82 p_blocks = 0; 83 p_ops = 0; 84 c_blocks = 0; 85 c_ops = 0; 86 #endif 87 } 88 89 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 90 static inline void ^?{}( channel(T) &c ) with(c) { 91 #ifdef CHAN_STATS 92 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100); 93 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100); 94 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100); 95 #endif 96 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || cons`isEmpty && prods`isEmpty, 97 "Attempted to delete channel with waiting threads (Deadlock).\n" ); 98 if ( size != 0 ) delete( buffer ); 99 } 100 static inline size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 101 static inline size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 102 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } 103 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } 104 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; } 105 106 // closes the channel and notifies all blocked threads 107 static inline void close( channel(T) & chan ) with(chan) { 108 lock( mutex_lock ); 109 closed = true; 110 111 // flush waiting consumers and producers 112 while ( has_waiting_consumers( chan ) ) { 113 if( !__handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 114 break; // if __handle_waituntil_OR returns false cons is empty so break 115 cons`first.extra = 0p; 116 wake_one( cons ); 117 } 118 while ( has_waiting_producers( chan ) ) { 119 if( !__handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 120 break; // if __handle_waituntil_OR returns false prods is empty so break 121 prods`first.extra = 0p; 122 wake_one( prods ); 123 } 124 unlock(mutex_lock); 125 } 126 127 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 128 129 // used to hand an element to a blocked consumer and signal it 130 static inline void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 131 memcpy( cons`first.extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 132 wake_one( cons ); 133 } 134 135 // used to hand an element to a blocked producer and signal it 136 static inline void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 137 memcpy( (void *)&retval, prods`first.extra, sizeof(T) ); 138 wake_one( prods ); 139 } 140 141 static inline void flush( channel(T) & chan, T elem ) with(chan) { 142 lock( mutex_lock ); 143 while ( count == 0 && !cons`isEmpty ) { 144 __cons_handoff( chan, elem ); 145 } 146 unlock( mutex_lock ); 147 } 148 149 // handles buffer insert 150 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 151 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 152 count += 1; 153 back++; 154 if ( back == size ) back = 0; 155 } 156 157 // needed to avoid an extra copy in closed case 158 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { 159 lock( mutex_lock ); 160 #ifdef CHAN_STATS 161 p_ops++; 162 #endif 163 164 ConsEmpty: if ( !cons`isEmpty ) { 165 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 166 __cons_handoff( chan, elem ); 167 unlock( mutex_lock ); 168 return true; 169 } 170 171 if ( count == size ) { unlock( mutex_lock ); return false; } 172 173 __buf_insert( chan, elem ); 174 unlock( mutex_lock ); 175 return true; 176 } 177 178 // attempts a nonblocking insert 179 // returns true if insert was successful, false otherwise 180 static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); } 181 182 // handles closed case of insert routine 183 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 184 channel_closed except{ &channel_closed_vt, &elem, &chan }; 185 throwResume except; // throw closed resumption 186 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination 187 } 188 189 static inline void insert( channel(T) & chan, T elem ) with(chan) { 190 // check for close before acquire mx 191 if ( unlikely(closed) ) { 192 __closed_insert( chan, elem ); 193 return; 194 } 195 196 lock( mutex_lock ); 197 198 #ifdef CHAN_STATS 199 if ( !closed ) p_ops++; 200 #endif 201 202 // if closed handle 203 if ( unlikely(closed) ) { 204 unlock( mutex_lock ); 205 __closed_insert( chan, elem ); 206 return; 207 } 208 209 // buffer count must be zero if cons are blocked (also handles zero-size case) 210 ConsEmpty: if ( !cons`isEmpty ) { 211 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 212 __cons_handoff( chan, elem ); 213 unlock( mutex_lock ); 214 return; 215 } 216 217 // wait if buffer is full, work will be completed by someone else 218 if ( count == size ) { 219 #ifdef CHAN_STATS 220 p_blocks++; 221 #endif 222 223 // check for if woken due to close 224 if ( unlikely( block( prods, &elem, mutex_lock ) ) ) 225 __closed_insert( chan, elem ); 226 return; 227 } // if 228 229 __buf_insert( chan, elem ); 230 unlock( mutex_lock ); 231 } 232 233 // does the buffer remove and potentially does waiting producer work 234 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 235 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 236 count -= 1; 237 front = (front + 1) % size; 238 if (count == size - 1 && !prods`isEmpty ) { 239 if ( !__handle_waituntil_OR( prods ) ) return; 240 __buf_insert( chan, *(T *)prods`first.extra ); // do waiting producer work 241 wake_one( prods ); 242 } 243 } 244 245 // needed to avoid an extra copy in closed case and single return val case 246 static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) { 247 lock( mutex_lock ); 248 #ifdef CHAN_STATS 249 c_ops++; 250 #endif 251 252 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 253 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 254 __prods_handoff( chan, retval ); 255 unlock( mutex_lock ); 256 return true; 257 } 258 259 if ( count == 0 ) { unlock( mutex_lock ); return false; } 260 261 __do_remove( chan, retval ); 262 unlock( mutex_lock ); 263 return true; 264 } 265 266 // attempts a nonblocking remove 267 // returns [T, true] if insert was successful 268 // returns [T, false] if insert was successful (T uninit) 269 static inline [T, bool] try_remove( channel(T) & chan ) { 270 T retval; 271 bool success = __internal_try_remove( chan, retval ); 272 return [ retval, success ]; 273 } 274 275 static inline T try_remove( channel(T) & chan ) { 276 T retval; 277 __internal_try_remove( chan, retval ); 278 return retval; 279 } 280 281 // handles closed case of insert routine 282 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 283 channel_closed except{ &channel_closed_vt, 0p, &chan }; 284 throwResume except; // throw resumption 285 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination 286 } 287 288 static inline T remove( channel(T) & chan ) with(chan) { 289 T retval; 290 if ( unlikely(closed) ) { 291 __closed_remove( chan, retval ); 292 return retval; 293 } 294 lock( mutex_lock ); 295 296 #ifdef CHAN_STATS 297 if ( !closed ) c_ops++; 298 #endif 299 300 if ( unlikely(closed) ) { 301 unlock( mutex_lock ); 302 __closed_remove( chan, retval ); 303 return retval; 304 } 305 306 // have to check for the zero size channel case 307 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 308 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 309 __prods_handoff( chan, retval ); 310 unlock( mutex_lock ); 311 return retval; 312 } 313 314 // wait if buffer is empty, work will be completed by someone else 315 if ( count == 0 ) { 316 #ifdef CHAN_STATS 317 c_blocks++; 318 #endif 319 // check for if woken due to close 320 if ( unlikely( block( cons, &retval, mutex_lock ) ) ) 321 __closed_remove( chan, retval ); 322 return retval; 323 } 324 325 // Remove from buffer 326 __do_remove( chan, retval ); 327 unlock( mutex_lock ); 328 return retval; 329 } 330 static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); } 331 332 333 /////////////////////////////////////////////////////////////////////////////////////////// 334 // The following is Go-style operator support for channels 335 /////////////////////////////////////////////////////////////////////////////////////////// 336 337 static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); } 338 static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); } 339 340 /////////////////////////////////////////////////////////////////////////////////////////// 341 // The following is support for waituntil (select) statements 342 /////////////////////////////////////////////////////////////////////////////////////////// 343 static inline bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 344 if ( !node`isListed && !node.park_counter ) return false; // handle special OR case 345 lock( mutex_lock ); 346 if ( node`isListed ) { // op wasn't performed 347 remove( node ); 348 unlock( mutex_lock ); 349 return false; 350 } 351 unlock( mutex_lock ); 352 353 // only return true when not special OR case and status is SAT 354 return !node.park_counter ? false : *node.clause_status == __SELECT_SAT; 355 } 356 357 // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case 358 static inline bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) { 359 while ( !queue`isEmpty ) { 360 // if node not a special OR case or if we win the special OR case race break 361 if ( !queue`first.clause_status || queue`first.park_counter || __pending_set_other( queue`first, mine, ((unsigned long int)(&(queue`first))) ) ) 362 return true; 59 struct __attribute__((aligned(128))) channel { 60 size_t size, front, back, count; 61 T * buffer; 62 dlist( select_node ) prods, cons; // lists of blocked threads 63 go_mutex mutex_lock; // MX lock 64 bool closed; // indicates channel close/open 65 #ifdef CHAN_STATS 66 size_t p_blocks, p_ops, c_blocks, c_ops; // counts total ops and ops resulting in a blocked thd 67 #endif 68 }; 69 70 // type used by select statement to capture a chan read as the selected operation 71 struct chan_read { 72 T * ret; 73 channel(T) * chan; 74 }; 75 __CFA_SELECT_GET_TYPE( chan_read(T) ); 76 77 // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to 78 struct chan_read_no_ret { 79 T retval; 80 chan_read( T ) c_read; 81 }; 82 __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) ); 83 84 // type used by select statement to capture a chan write as the selected operation 85 struct chan_write { 86 T elem; 87 channel(T) * chan; 88 }; 89 __CFA_SELECT_GET_TYPE( chan_write(T) ); 90 } // distribution 91 92 static inline forall( T ) { 93 void ?{}( channel(T) & this, channel(T) this2 ) = void; 94 void ?=?( channel(T) & this, channel(T) this2 ) = void; 95 96 void ?{}( channel(T) &c, size_t _size ) with(c) { 97 size = _size; 98 front = back = count = 0; 99 if ( size != 0 ) buffer = aalloc( size ); 100 prods{}; 101 cons{}; 102 mutex_lock{}; 103 closed = false; 104 #ifdef CHAN_STATS 105 p_blocks = 0; 106 p_ops = 0; 107 c_blocks = 0; 108 c_ops = 0; 109 #endif 110 } 111 112 void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 113 void ^?{}( channel(T) &c ) with(c) { 114 #ifdef CHAN_STATS 115 printf("Channel %p Blocks: %lu,\t\tOperations: %lu,\t%.2f%% of ops blocked\n", &c, p_blocks + c_blocks, p_ops + c_ops, ((double)p_blocks + c_blocks)/(p_ops + c_ops) * 100); 116 printf("Channel %p Consumer Blocks: %lu,\tConsumer Ops: %lu,\t%.2f%% of Consumer ops blocked\n", &c, p_blocks, p_ops, ((double)p_blocks)/p_ops * 100); 117 printf("Channel %p Producer Blocks: %lu,\tProducer Ops: %lu,\t%.2f%% of Producer ops blocked\n", &c, c_blocks, c_ops, ((double)c_blocks)/c_ops * 100); 118 #endif 119 verifyf( __handle_waituntil_OR( cons ) || __handle_waituntil_OR( prods ) || isEmpty( cons ) && isEmpty( prods ), 120 "Attempted to delete channel with waiting threads (Deadlock).\n" ); 121 if ( size != 0 ) delete( buffer ); 122 } 123 size_t get_count( channel(T) & chan ) with(chan) { return __atomic_load_n( &count, __ATOMIC_RELAXED ); } 124 size_t get_size( channel(T) & chan ) with(chan) { return __atomic_load_n( &size, __ATOMIC_RELAXED ); } 125 bool has_waiters( channel(T) & chan ) with(chan) { return ! isEmpty( cons ) || ! isEmpty( prods ); } 126 bool has_waiting_consumers( channel(T) & chan ) with(chan) { return ! isEmpty( cons ); } 127 bool has_waiting_producers( channel(T) & chan ) with(chan) { return ! isEmpty( prods ); } 128 129 // closes the channel and notifies all blocked threads 130 void close( channel(T) & chan ) with(chan) { 131 lock( mutex_lock ); 132 closed = true; 133 134 // flush waiting consumers and producers 135 while ( has_waiting_consumers( chan ) ) { 136 if( ! __handle_waituntil_OR( cons ) ) // ensure we only signal special OR case threads when they win the race 137 break; // if __handle_waituntil_OR returns false cons is empty so break 138 first( cons ).extra = 0p; 139 wake_one( cons ); 140 } 141 while ( has_waiting_producers( chan ) ) { 142 if( ! __handle_waituntil_OR( prods ) ) // ensure we only signal special OR case threads when they win the race 143 break; // if __handle_waituntil_OR returns false prods is empty so break 144 first( prods ).extra = 0p; 145 wake_one( prods ); 146 } 147 unlock(mutex_lock); 148 } 149 150 void is_closed( channel(T) & chan ) with(chan) { return closed; } 151 152 // used to hand an element to a blocked consumer and signal it 153 void __cons_handoff( channel(T) & chan, T & elem ) with(chan) { 154 memcpy( first( cons ).extra, (void *)&elem, sizeof(T) ); // do waiting consumer work 155 wake_one( cons ); 156 } 157 158 // used to hand an element to a blocked producer and signal it 159 void __prods_handoff( channel(T) & chan, T & retval ) with(chan) { 160 memcpy( (void *)&retval, first( prods ).extra, sizeof(T) ); 161 wake_one( prods ); 162 } 163 164 void flush( channel(T) & chan, T elem ) with(chan) { 165 lock( mutex_lock ); 166 while ( count == 0 && ! isEmpty( cons ) ) { 167 __cons_handoff( chan, elem ); 168 } 169 unlock( mutex_lock ); 170 } 171 172 // handles buffer insert 173 void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 174 memcpy( (void *)&buffer[back], (void *)&elem, sizeof(T) ); 175 count += 1; 176 back++; 177 if ( back == size ) back = 0; 178 } 179 180 // needed to avoid an extra copy in closed case 181 bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { 182 lock( mutex_lock ); 183 #ifdef CHAN_STATS 184 p_ops++; 185 #endif 186 187 ConsEmpty: 188 if ( ! isEmpty( cons ) ) { 189 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty; 190 __cons_handoff( chan, elem ); 191 unlock( mutex_lock ); 192 return true; 193 } 194 195 if ( count == size ) { unlock( mutex_lock ); return false; } 196 197 __buf_insert( chan, elem ); 198 unlock( mutex_lock ); 199 return true; 200 } 201 202 // attempts a nonblocking insert 203 // returns true if insert was successful, false otherwise 204 bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); } 205 206 // handles closed case of insert routine 207 void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 208 channel_closed except{ &channel_closed_vt, &elem, &chan }; 209 throwResume except; // throw closed resumption 210 if ( ! __internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination 211 } 212 213 void insert( channel(T) & chan, T elem ) with(chan) { 214 // check for close before acquire mx 215 if ( unlikely(closed) ) { 216 __closed_insert( chan, elem ); 217 return; 218 } 219 220 lock( mutex_lock ); 221 222 #ifdef CHAN_STATS 223 if ( ! closed ) p_ops++; 224 #endif 225 226 // if closed handle 227 if ( unlikely(closed) ) { 228 unlock( mutex_lock ); 229 __closed_insert( chan, elem ); 230 return; 231 } 232 233 // buffer count must be zero if cons are blocked (also handles zero-size case) 234 ConsEmpty: 235 if ( ! isEmpty( cons ) ) { 236 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty; 237 __cons_handoff( chan, elem ); 238 unlock( mutex_lock ); 239 return; 240 } 241 242 // wait if buffer is full, work will be completed by someone else 243 if ( count == size ) { 244 #ifdef CHAN_STATS 245 p_blocks++; 246 #endif 247 248 // check for if woken due to close 249 if ( unlikely( block( prods, &elem, mutex_lock ) ) ) 250 __closed_insert( chan, elem ); 251 return; 252 } // if 253 254 __buf_insert( chan, elem ); 255 unlock( mutex_lock ); 256 } 257 258 // does the buffer remove and potentially does waiting producer work 259 void __do_remove( channel(T) & chan, T & retval ) with(chan) { 260 memcpy( (void *)&retval, (void *)&buffer[front], sizeof(T) ); 261 count -= 1; 262 front = (front + 1) % size; 263 if (count == size - 1 && ! isEmpty( prods ) ) { 264 if ( ! __handle_waituntil_OR( prods ) ) return; 265 __buf_insert( chan, *(T *)first( prods ).extra ); // do waiting producer work 266 wake_one( prods ); 267 } 268 } 269 270 // needed to avoid an extra copy in closed case and single return val case 271 bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) { 272 lock( mutex_lock ); 273 #ifdef CHAN_STATS 274 c_ops++; 275 #endif 276 277 ZeroSize: 278 if ( size == 0 && ! isEmpty( prods ) ) { 279 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize; 280 __prods_handoff( chan, retval ); 281 unlock( mutex_lock ); 282 return true; 283 } 284 285 if ( count == 0 ) { unlock( mutex_lock ); return false; } 286 287 __do_remove( chan, retval ); 288 unlock( mutex_lock ); 289 return true; 290 } 291 292 // attempts a nonblocking remove 293 // returns [T, true] if insert was successful 294 // returns [T, false] if insert was successful (T uninit) 295 [T, bool] try_remove( channel(T) & chan ) { 296 T retval; 297 bool success = __internal_try_remove( chan, retval ); 298 return [ retval, success ]; 299 } 300 301 T try_remove( channel(T) & chan ) { 302 T retval; 303 __internal_try_remove( chan, retval ); 304 return retval; 305 } 306 307 // handles closed case of insert routine 308 void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 309 channel_closed except{ &channel_closed_vt, 0p, &chan }; 310 throwResume except; // throw resumption 311 if ( ! __internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination 312 } 313 314 T remove( channel(T) & chan ) with(chan) { 315 T retval; 316 if ( unlikely(closed) ) { 317 __closed_remove( chan, retval ); 318 return retval; 319 } 320 lock( mutex_lock ); 321 322 #ifdef CHAN_STATS 323 if ( ! closed ) c_ops++; 324 #endif 325 326 if ( unlikely(closed) ) { 327 unlock( mutex_lock ); 328 __closed_remove( chan, retval ); 329 return retval; 330 } 331 332 // have to check for the zero size channel case 333 ZeroSize: 334 if ( size == 0 && ! isEmpty( prods ) ) { 335 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize; 336 __prods_handoff( chan, retval ); 337 unlock( mutex_lock ); 338 return retval; 339 } 340 341 // wait if buffer is empty, work will be completed by someone else 342 if ( count == 0 ) { 343 #ifdef CHAN_STATS 344 c_blocks++; 345 #endif 346 // check for if woken due to close 347 if ( unlikely( block( cons, &retval, mutex_lock ) ) ) 348 __closed_remove( chan, retval ); 349 return retval; 350 } 351 352 // Remove from buffer 353 __do_remove( chan, retval ); 354 unlock( mutex_lock ); 355 return retval; 356 } 357 void remove( channel(T) & chan ) { T elem = (T)remove( chan ); } 358 359 360 /////////////////////////////////////////////////////////////////////////////////////////// 361 // The following is Go-style operator support for channels 362 /////////////////////////////////////////////////////////////////////////////////////////// 363 364 void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); } 365 void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); } 366 367 /////////////////////////////////////////////////////////////////////////////////////////// 368 // The following is support for waituntil (select) statements 369 /////////////////////////////////////////////////////////////////////////////////////////// 370 bool unregister_chan( channel(T) & chan, select_node & node ) with(chan) { 371 if ( ! isListed( node ) && ! node.park_counter ) return false; // handle special OR case 372 lock( mutex_lock ); 373 if ( isListed( node ) ) { // op wasn't performed 374 remove( node ); 375 unlock( mutex_lock ); 376 return false; 377 } 378 unlock( mutex_lock ); 379 380 // only return true when not special OR case and status is SAT 381 return ! node.park_counter ? false : *node.clause_status == __SELECT_SAT; 382 } 383 384 // special case of __handle_waituntil_OR, that does some work to avoid starvation/deadlock case 385 bool __handle_pending( dlist( select_node ) & queue, select_node & mine ) { 386 while ( ! isEmpty( queue ) ) { 387 // if node not a special OR case or if we win the special OR case race break 388 if ( ! first( queue ).clause_status || first( queue ).park_counter || __pending_set_other( first( queue ), mine, ((unsigned long int)(&(first( queue )))) ) ) 389 return true; 363 390 364 // our node lost the race when toggling in __pending_set_other 365 if ( *mine.clause_status != __SELECT_PENDING ) 366 return false; 367 368 // otherwise we lost the special OR race so discard node 369 try_pop_front( queue ); 370 } 371 return false; 372 } 373 374 // type used by select statement to capture a chan read as the selected operation 375 struct chan_read { 376 T * ret; 377 channel(T) * chan; 378 }; 379 __CFA_SELECT_GET_TYPE( chan_read(T) ); 380 381 static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) { 382 cr.chan = chan; 383 cr.ret = ret; 384 } 385 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; } 386 387 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 388 __closed_remove( *chan, *ret ); 389 // if we get here then the insert succeeded 390 __make_select_node_available( node ); 391 } 392 393 static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 394 lock( mutex_lock ); 395 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 396 397 #ifdef CHAN_STATS 398 if ( !closed ) c_ops++; 399 #endif 400 401 if ( !node.park_counter ) { 402 // are we special case OR and front of cons is also special case OR 403 if ( !unlikely(closed) && !prods`isEmpty && prods`first.clause_status && !prods`first.park_counter ) { 404 if ( !__make_select_node_pending( node ) ) { 405 unlock( mutex_lock ); 406 return false; 407 } 408 409 if ( __handle_pending( prods, node ) ) { 410 __prods_handoff( *chan, *ret ); 411 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 412 unlock( mutex_lock ); 413 return true; 414 } 415 if ( *node.clause_status == __SELECT_PENDING ) 416 __make_select_node_unsat( node ); 417 } 418 // check if we can complete operation. If so race to establish winner in special OR case 419 if ( count != 0 || !prods`isEmpty || unlikely(closed) ) { 420 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 421 unlock( mutex_lock ); 422 return false; 423 } 424 } 425 } 426 427 if ( unlikely(closed) ) { 428 unlock( mutex_lock ); 429 __handle_select_closed_read( this, node ); 430 return true; 431 } 432 433 // have to check for the zero size channel case 434 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 435 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 436 __prods_handoff( *chan, *ret ); 437 __set_avail_then_unlock( node, mutex_lock ); 438 return true; 439 } 440 441 // wait if buffer is empty, work will be completed by someone else 442 if ( count == 0 ) { 443 #ifdef CHAN_STATS 444 c_blocks++; 445 #endif 391 // our node lost the race when toggling in __pending_set_other 392 if ( *mine.clause_status != __SELECT_PENDING ) 393 return false; 394 395 // otherwise we lost the special OR race so discard node 396 remove_first( queue ); 397 } 398 return false; 399 } 400 401 void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) { 402 cr.chan = chan; 403 cr.ret = ret; 404 } 405 chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; } 406 407 void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 408 __closed_remove( *chan, *ret ); 409 // if we get here then the insert succeeded 410 __make_select_node_available( node ); 411 } 412 413 bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 414 lock( mutex_lock ); 415 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 416 417 #ifdef CHAN_STATS 418 if ( ! closed ) c_ops++; 419 #endif 420 421 if ( ! node.park_counter ) { 422 // are we special case OR and front of cons is also special case OR 423 if ( ! unlikely(closed) && ! isEmpty( prods ) && first( prods ).clause_status && ! first( prods ).park_counter ) { 424 if ( ! __make_select_node_pending( node ) ) { 425 unlock( mutex_lock ); 426 return false; 427 } 428 429 if ( __handle_pending( prods, node ) ) { 430 __prods_handoff( *chan, *ret ); 431 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 432 unlock( mutex_lock ); 433 return true; 434 } 435 if ( *node.clause_status == __SELECT_PENDING ) 436 __make_select_node_unsat( node ); 437 } 438 // check if we can complete operation. If so race to establish winner in special OR case 439 if ( count != 0 || ! isEmpty( prods ) || unlikely(closed) ) { 440 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering 441 unlock( mutex_lock ); 442 return false; 443 } 444 } 445 } 446 447 if ( unlikely(closed) ) { 448 unlock( mutex_lock ); 449 __handle_select_closed_read( this, node ); 450 return true; 451 } 452 453 // have to check for the zero size channel case 454 ZeroSize: 455 if ( size == 0 && ! isEmpty( prods ) ) { 456 if ( ! __handle_waituntil_OR( prods ) ) break ZeroSize; 457 __prods_handoff( *chan, *ret ); 458 __set_avail_then_unlock( node, mutex_lock ); 459 return true; 460 } 461 462 // wait if buffer is empty, work will be completed by someone else 463 if ( count == 0 ) { 464 #ifdef CHAN_STATS 465 c_blocks++; 466 #endif 446 467 447 insert_last( cons, node ); 448 unlock( mutex_lock ); 449 return false; 450 } 451 452 // Remove from buffer 453 __do_remove( *chan, *ret ); 454 __set_avail_then_unlock( node, mutex_lock ); 455 return true; 456 } 457 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 458 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 459 if ( unlikely(node.extra == 0p) ) { 460 if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel 461 else return false; 462 } 463 // This is only reachable if not closed or closed exception was handled 464 return true; 465 } 466 467 // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to 468 struct chan_read_no_ret { 469 T retval; 470 chan_read( T ) c_read; 471 }; 472 __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) ); 473 474 static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) { 475 this.c_read{ &chan, &this.retval }; 476 } 477 478 static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; } 479 static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) { 480 this.c_read.ret = &this.retval; 481 return register_select( this.c_read, node ); 482 } 483 static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); } 484 static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); } 485 486 // type used by select statement to capture a chan write as the selected operation 487 struct chan_write { 488 T elem; 489 channel(T) * chan; 490 }; 491 __CFA_SELECT_GET_TYPE( chan_write(T) ); 492 493 static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) { 494 cw.chan = chan; 495 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 496 } 497 static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; } 498 static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; } 499 500 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 501 __closed_insert( *chan, elem ); 502 // if we get here then the insert succeeded 503 __make_select_node_available( node ); 504 } 505 506 static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 507 lock( mutex_lock ); 508 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 509 510 #ifdef CHAN_STATS 511 if ( !closed ) p_ops++; 512 #endif 513 514 // special OR case handling 515 if ( !node.park_counter ) { 516 // are we special case OR and front of cons is also special case OR 517 if ( !unlikely(closed) && !cons`isEmpty && cons`first.clause_status && !cons`first.park_counter ) { 518 if ( !__make_select_node_pending( node ) ) { 519 unlock( mutex_lock ); 520 return false; 521 } 522 523 if ( __handle_pending( cons, node ) ) { 524 __cons_handoff( *chan, elem ); 525 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 526 unlock( mutex_lock ); 527 return true; 528 } 529 if ( *node.clause_status == __SELECT_PENDING ) 530 __make_select_node_unsat( node ); 531 } 532 // check if we can complete operation. If so race to establish winner in special OR case 533 if ( count != size || !cons`isEmpty || unlikely(closed) ) { 534 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 535 unlock( mutex_lock ); 536 return false; 537 } 538 } 539 } 540 541 // if closed handle 542 if ( unlikely(closed) ) { 543 unlock( mutex_lock ); 544 __handle_select_closed_write( this, node ); 545 return true; 546 } 547 548 // handle blocked consumer case via handoff (buffer is implicitly empty) 549 ConsEmpty: if ( !cons`isEmpty ) { 550 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 551 __cons_handoff( *chan, elem ); 552 __set_avail_then_unlock( node, mutex_lock ); 553 return true; 554 } 555 556 // insert node in list if buffer is full, work will be completed by someone else 557 if ( count == size ) { 558 #ifdef CHAN_STATS 559 p_blocks++; 560 #endif 561 562 insert_last( prods, node ); 563 unlock( mutex_lock ); 564 return false; 565 } // if 566 567 // otherwise carry out write either via normal insert 568 __buf_insert( *chan, elem ); 569 __set_avail_then_unlock( node, mutex_lock ); 570 return true; 571 } 572 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 573 574 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 575 if ( unlikely(node.extra == 0p) ) { 576 if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel 577 else return false; 578 } 579 // This is only reachable if not closed or closed exception was handled 580 return true; 581 } 582 583 } // forall( T ) 584 585 468 insert_last( cons, node ); 469 unlock( mutex_lock ); 470 return false; 471 } 472 473 // Remove from buffer 474 __do_remove( *chan, *ret ); 475 __set_avail_then_unlock( node, mutex_lock ); 476 return true; 477 } 478 bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 479 bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 480 if ( unlikely(node.extra == 0p) ) { 481 if ( ! exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel 482 else return false; 483 } 484 // This is only reachable if not closed or closed exception was handled 485 return true; 486 } 487 488 void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) { 489 this.c_read{ &chan, &this.retval }; 490 } 491 492 chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; } 493 bool register_select( chan_read_no_ret(T) & this, select_node & node ) { 494 this.c_read.ret = &this.retval; 495 return register_select( this.c_read, node ); 496 } 497 bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); } 498 bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); } 499 500 void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) { 501 cw.chan = chan; 502 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 503 } 504 chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; } 505 chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; } 506 507 void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 508 __closed_insert( *chan, elem ); 509 // if we get here then the insert succeeded 510 __make_select_node_available( node ); 511 } 512 513 bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 514 lock( mutex_lock ); 515 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close 516 517 #ifdef CHAN_STATS 518 if ( ! closed ) p_ops++; 519 #endif 520 521 // special OR case handling 522 if ( ! node.park_counter ) { 523 // are we special case OR and front of cons is also special case OR 524 if ( ! unlikely(closed) && ! isEmpty( cons ) && first( cons ).clause_status && ! first( cons ).park_counter ) { 525 if ( ! __make_select_node_pending( node ) ) { 526 unlock( mutex_lock ); 527 return false; 528 } 529 if ( __handle_pending( cons, node ) ) { 530 __cons_handoff( *chan, elem ); 531 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 532 unlock( mutex_lock ); 533 return true; 534 } 535 if ( *node.clause_status == __SELECT_PENDING ) 536 __make_select_node_unsat( node ); 537 } 538 // check if we can complete operation. If so race to establish winner in special OR case 539 if ( count != size || ! isEmpty( cons ) || unlikely(closed) ) { 540 if ( ! __make_select_node_available( node ) ) { // we didn't win the race so give up on registering 541 unlock( mutex_lock ); 542 return false; 543 } 544 } 545 } 546 547 // if closed handle 548 if ( unlikely(closed) ) { 549 unlock( mutex_lock ); 550 __handle_select_closed_write( this, node ); 551 return true; 552 } 553 554 // handle blocked consumer case via handoff (buffer is implicitly empty) 555 ConsEmpty: 556 if ( ! isEmpty( cons ) ) { 557 if ( ! __handle_waituntil_OR( cons ) ) break ConsEmpty; 558 __cons_handoff( *chan, elem ); 559 __set_avail_then_unlock( node, mutex_lock ); 560 return true; 561 } 562 563 // insert node in list if buffer is full, work will be completed by someone else 564 if ( count == size ) { 565 #ifdef CHAN_STATS 566 p_blocks++; 567 #endif 568 569 insert_last( prods, node ); 570 unlock( mutex_lock ); 571 return false; 572 } // if 573 574 // otherwise carry out write either via normal insert 575 __buf_insert( *chan, elem ); 576 __set_avail_then_unlock( node, mutex_lock ); 577 return true; 578 } 579 bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 580 581 bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 582 if ( unlikely(node.extra == 0p) ) { 583 if ( ! exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel 584 else return false; 585 } 586 // This is only reachable if not closed or closed exception was handled 587 return true; 588 } 589 } // distribution 590 591
Note:
See TracChangeset
for help on using the changeset viewer.