Changes in / [1633e04:75d874a]
- File:
-
- 1 edited
-
libcfa/src/concurrency/channel.hfa (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r1633e04 r75d874a 4 4 #include <list.hfa> 5 5 6 // #define __PREVENTION_CHANNEL6 #define __COOP_CHANNEL 7 7 #ifdef __PREVENTION_CHANNEL 8 8 forall( T ) { … … 14 14 exp_backoff_then_block_lock c_lock, p_lock; 15 15 __spinlock_t mutex_lock; 16 char __padding[64]; // avoid false sharing in arrays 16 char __padding[64]; // avoid false sharing in arrays of channels 17 17 }; 18 18 … … 20 20 size = _size; 21 21 front = back = count = 0; 22 buffer = a new( size );22 buffer = aalloc( size ); 23 23 chair = 0p; 24 24 mutex_lock{}; … … 128 128 #endif 129 129 130 #if ndef __PREVENTION_CHANNEL130 #ifdef __COOP_CHANNEL 131 131 132 132 // link field used for threads waiting on channel … … 161 161 size = _size; 162 162 front = back = count = 0; 163 buffer = a new( size );163 buffer = aalloc( size ); 164 164 prods{}; 165 165 cons{}; … … 252 252 } // forall( T ) 253 253 #endif 254 255 #ifdef __BARGE_CHANNEL 256 forall( T ) { 257 struct channel { 258 size_t size; 259 size_t front, back, count; 260 T * buffer; 261 fast_cond_var( exp_backoff_then_block_lock ) prods, cons; 262 exp_backoff_then_block_lock mutex_lock; 263 }; 264 265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 266 size = _size; 267 front = back = count = 0; 268 buffer = aalloc( size ); 269 prods{}; 270 cons{}; 271 mutex_lock{}; 272 } 273 274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); } 279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); } 280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); } 281 282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 283 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 284 count += 1; 285 back++; 286 if ( back == size ) back = 0; 287 } 288 289 290 static inline void insert( channel(T) & chan, T elem ) with(chan) { 291 lock( mutex_lock ); 292 293 while ( count == size ) { 294 wait( prods, mutex_lock ); 295 } // if 296 297 insert_( chan, elem ); 298 299 if ( !notify_one( cons ) && count < size ) 300 notify_one( prods ); 301 302 unlock( mutex_lock ); 303 } 304 305 static inline T remove( channel(T) & chan ) with(chan) { 306 lock( mutex_lock ); 307 T retval; 308 309 while (count == 0) { 310 wait( cons, mutex_lock ); 311 } 312 313 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 314 count -= 1; 315 front = (front + 1) % size; 316 317 if ( !notify_one( prods ) && count > 0 ) 318 notify_one( cons ); 319 320 unlock( mutex_lock ); 321 return retval; 322 } 323 324 } // forall( T ) 325 #endif 326 327 #ifdef __NO_WAIT_CHANNEL 328 forall( T ) { 329 struct channel { 330 size_t size; 331 size_t front, back, count; 332 T * buffer; 333 thread$ * chair; 334 T * chair_elem; 335 exp_backoff_then_block_lock c_lock, p_lock; 336 __spinlock_t mutex_lock; 337 }; 338 339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 340 size = _size; 341 front = back = count = 0; 342 buffer = aalloc( size ); 343 chair = 0p; 344 mutex_lock{}; 345 c_lock{}; 346 p_lock{}; 347 lock( c_lock ); 348 } 349 350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; } 355 356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 357 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 358 count += 1; 359 back++; 360 if ( back == size ) back = 0; 361 } 362 363 static inline void insert( channel(T) & chan, T elem ) with( chan ) { 364 lock( p_lock ); 365 lock( mutex_lock __cfaabi_dbg_ctx2 ); 366 367 insert_( chan, elem ); 368 369 if ( count != size ) 370 unlock( p_lock ); 371 372 if ( count == 1 ) 373 unlock( c_lock ); 374 375 unlock( mutex_lock ); 376 } 377 378 static inline T remove( channel(T) & chan ) with(chan) { 379 lock( c_lock ); 380 lock( mutex_lock __cfaabi_dbg_ctx2 ); 381 T retval; 382 383 // Remove from buffer 384 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 385 count -= 1; 386 front = (front + 1) % size; 387 388 if ( count != 0 ) 389 unlock( c_lock ); 390 391 if ( count == size - 1 ) 392 unlock( p_lock ); 393 394 unlock( mutex_lock ); 395 return retval; 396 } 397 398 } // forall( T ) 399 #endif
Note:
See TracChangeset
for help on using the changeset viewer.