Changes in / [75d874a:1633e04]
- File:
-
- 1 edited
-
libcfa/src/concurrency/channel.hfa (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r75d874a r1633e04 4 4 #include <list.hfa> 5 5 6 #define __COOP_CHANNEL6 // #define __PREVENTION_CHANNEL 7 7 #ifdef __PREVENTION_CHANNEL 8 8 forall( T ) { … … 14 14 exp_backoff_then_block_lock c_lock, p_lock; 15 15 __spinlock_t mutex_lock; 16 char __padding[64]; // avoid false sharing in arrays of channels16 char __padding[64]; // avoid false sharing in arrays 17 17 }; 18 18 … … 20 20 size = _size; 21 21 front = back = count = 0; 22 buffer = a alloc( size );22 buffer = anew( size ); 23 23 chair = 0p; 24 24 mutex_lock{}; … … 128 128 #endif 129 129 130 #if def __COOP_CHANNEL130 #ifndef __PREVENTION_CHANNEL 131 131 132 132 // link field used for threads waiting on channel … … 161 161 size = _size; 162 162 front = back = count = 0; 163 buffer = a alloc( size );163 buffer = anew( size ); 164 164 prods{}; 165 165 cons{}; … … 252 252 } // forall( T ) 253 253 #endif 254 255 #ifdef __BARGE_CHANNEL256 forall( T ) {257 struct channel {258 size_t size;259 size_t front, back, count;260 T * buffer;261 fast_cond_var( exp_backoff_then_block_lock ) prods, cons;262 exp_backoff_then_block_lock mutex_lock;263 };264 265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {266 size = _size;267 front = back = count = 0;268 buffer = aalloc( size );269 prods{};270 cons{};271 mutex_lock{};272 }273 274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }281 282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {283 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));284 count += 1;285 back++;286 if ( back == size ) back = 0;287 }288 289 290 static inline void insert( channel(T) & chan, T elem ) with(chan) {291 lock( mutex_lock );292 293 while ( count == size ) {294 wait( prods, mutex_lock );295 } // if296 297 insert_( chan, elem );298 299 if ( !notify_one( cons ) && count < size )300 notify_one( prods );301 302 unlock( mutex_lock );303 }304 305 static inline T remove( channel(T) & chan ) with(chan) {306 lock( mutex_lock );307 T retval;308 309 while (count == 0) {310 wait( cons, mutex_lock );311 }312 313 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));314 count -= 1;315 front = (front + 1) % size;316 317 if ( !notify_one( prods ) && count > 0 )318 notify_one( cons );319 320 unlock( mutex_lock );321 return retval;322 }323 324 } // forall( T )325 #endif326 327 #ifdef __NO_WAIT_CHANNEL328 forall( T ) {329 struct channel {330 size_t size;331 size_t front, back, count;332 T * buffer;333 thread$ * chair;334 T * chair_elem;335 exp_backoff_then_block_lock c_lock, p_lock;336 __spinlock_t mutex_lock;337 };338 339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {340 size = _size;341 front = back = count = 0;342 buffer = aalloc( size );343 chair = 0p;344 mutex_lock{};345 c_lock{};346 p_lock{};347 lock( c_lock );348 }349 350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }355 356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {357 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));358 count += 1;359 back++;360 if ( back == size ) back = 0;361 }362 363 static inline void insert( channel(T) & chan, T elem ) with( chan ) {364 lock( p_lock );365 lock( mutex_lock __cfaabi_dbg_ctx2 );366 367 insert_( chan, elem );368 369 if ( count != size )370 unlock( p_lock );371 372 if ( count == 1 )373 unlock( c_lock );374 375 unlock( mutex_lock );376 }377 378 static inline T remove( channel(T) & chan ) with(chan) {379 lock( c_lock );380 lock( mutex_lock __cfaabi_dbg_ctx2 );381 T retval;382 383 // Remove from buffer384 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));385 count -= 1;386 front = (front + 1) % size;387 388 if ( count != 0 )389 unlock( c_lock );390 391 if ( count == size - 1 )392 unlock( p_lock );393 394 unlock( mutex_lock );395 return retval;396 }397 398 } // forall( T )399 #endif
Note:
See TracChangeset
for help on using the changeset viewer.