Changeset 0e16a2d
- Timestamp:
- Mar 24, 2023, 4:49:57 PM (19 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 75d874a
- Parents:
- 12b006c
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r12b006c r0e16a2d 28 28 exp_backoff_then_block_lock c_lock, p_lock; 29 29 __spinlock_t mutex_lock; 30 char __padding[64]; // avoid false sharing in arrays of channels 30 31 }; 31 32 … … 33 34 size = _size; 34 35 front = back = count = 0; 35 buffer = a new( size );36 buffer = aalloc( size ); 36 37 chair = 0p; 37 38 mutex_lock{}; … … 140 141 #endif 141 142 142 #if ndef __PREVENTION_CHANNEL143 #ifdef __COOP_CHANNEL 143 144 forall( T ) { 144 145 struct channel { … … 153 154 size = _size; 154 155 front = back = count = 0; 155 buffer = a new( size );156 buffer = aalloc( size ); 156 157 prods{}; 157 158 cons{}; … … 234 235 } // forall( T ) 235 236 #endif 237 238 #ifdef __BARGE_CHANNEL 239 forall( T ) { 240 struct channel { 241 size_t size; 242 size_t front, back, count; 243 T * buffer; 244 fast_cond_var( exp_backoff_then_block_lock ) prods, cons; 245 exp_backoff_then_block_lock mutex_lock; 246 }; 247 248 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 249 size = _size; 250 front = back = count = 0; 251 buffer = aalloc( size ); 252 prods{}; 253 cons{}; 254 mutex_lock{}; 255 } 256 257 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 258 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 259 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 260 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 261 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); } 262 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); } 263 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); } 264 265 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 266 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 267 count += 1; 268 back++; 269 if ( back == size ) back = 0; 270 } 271 272 273 static inline void insert( channel(T) & chan, T elem ) with(chan) { 274 lock( mutex_lock ); 275 276 while ( count == size ) { 277 wait( prods, mutex_lock ); 278 } // if 279 280 insert_( chan, elem ); 281 282 if ( !notify_one( cons ) && count < size ) 283 notify_one( prods ); 284 285 unlock( mutex_lock ); 286 } 287 288 static inline T remove( channel(T) & chan ) with(chan) { 289 lock( mutex_lock ); 290 T retval; 291 292 while (count == 0) { 293 wait( cons, mutex_lock ); 294 } 295 296 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 297 count -= 1; 298 front = (front + 1) % size; 299 300 if ( !notify_one( prods ) && count > 0 ) 301 notify_one( cons ); 302 303 unlock( mutex_lock ); 304 return retval; 305 } 306 307 } // forall( T ) 308 #endif 309 310 #ifdef __NO_WAIT_CHANNEL 311 forall( T ) { 312 struct channel { 313 size_t size; 314 size_t front, back, count; 315 T * buffer; 316 thread$ * chair; 317 T * chair_elem; 318 exp_backoff_then_block_lock c_lock, p_lock; 319 __spinlock_t mutex_lock; 320 }; 321 322 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 323 size = _size; 324 front = back = count = 0; 325 buffer = aalloc( size ); 326 chair = 0p; 327 mutex_lock{}; 328 c_lock{}; 329 p_lock{}; 330 lock( c_lock ); 331 } 332 333 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 334 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 335 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 336 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 337 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; } 338 339 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 340 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 341 count += 1; 342 back++; 343 if ( back == size ) back = 0; 344 } 345 346 static inline void insert( channel(T) & chan, T elem ) with( chan ) { 347 lock( p_lock ); 348 lock( mutex_lock __cfaabi_dbg_ctx2 ); 349 350 insert_( chan, elem ); 351 352 if ( count != size ) 353 unlock( p_lock ); 354 355 if ( count == 1 ) 356 unlock( c_lock ); 357 358 unlock( mutex_lock ); 359 } 360 361 static inline T remove( channel(T) & chan ) with(chan) { 362 lock( c_lock ); 363 lock( mutex_lock __cfaabi_dbg_ctx2 ); 364 T retval; 365 366 // Remove from buffer 367 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 368 count -= 1; 369 front = (front + 1) % size; 370 371 if ( count != 0 ) 372 unlock( c_lock ); 373 374 if ( count == size - 1 ) 375 unlock( p_lock ); 376 377 unlock( mutex_lock ); 378 return retval; 379 } 380 381 } // forall( T ) 382 #endif
Note: See TracChangeset
for help on using the changeset viewer.