Changes in / [d964c39:2d028003]
- Files:
-
- 2 edited
-
libcfa/src/concurrency/channel.hfa (modified) (3 diffs)
-
tests/concurrent/channels/parallel_harness.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
rd964c39 r2d028003 15 15 static inline void on_wakeup( no_reacq_lock & this, size_t recursion ) {} 16 16 17 #define __PREVENTION_CHANNEL18 #ifdef __PREVENTION_CHANNEL19 forall( T ) {20 struct channel {21 size_t size;22 size_t front, back, count;23 T * buffer;24 thread$ * chair;25 T * chair_elem;26 exp_backoff_then_block_lock c_lock, p_lock;27 __spinlock_t mutex_lock;28 };29 30 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {31 size = _size;32 front = back = count = 0;33 buffer = anew( size );34 chair = 0p;35 mutex_lock{};36 c_lock{};37 p_lock{};38 }39 40 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }41 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }42 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }43 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }44 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }45 46 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {47 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));48 count += 1;49 back++;50 if ( back == size ) back = 0;51 }52 53 static inline void insert( channel(T) & chan, T elem ) with( chan ) {54 lock( p_lock );55 lock( mutex_lock __cfaabi_dbg_ctx2 );56 57 // have to check for the zero size channel case58 if ( size == 0 && chair != 0p ) {59 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));60 unpark( chair );61 chair = 0p;62 unlock( mutex_lock );63 unlock( p_lock );64 unlock( c_lock );65 return;66 }67 68 // wait if buffer is full, work will be completed by someone else69 if ( count == size ) {70 chair = active_thread();71 chair_elem = &elem;72 unlock( mutex_lock );73 park( );74 return;75 } // if76 77 if ( chair != 0p ) {78 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));79 unpark( chair );80 chair = 0p;81 unlock( mutex_lock );82 unlock( p_lock );83 unlock( c_lock );84 return;85 }86 else insert_( chan, elem );87 88 unlock( mutex_lock );89 unlock( p_lock );90 }91 92 static inline T remove( channel(T) & chan ) with(chan) {93 lock( c_lock );94 lock( mutex_lock __cfaabi_dbg_ctx2 );95 T retval;96 97 // have to check for the zero size channel case98 if ( size == 0 && chair != 0p ) {99 memcpy((void *)&retval, (void *)chair_elem, sizeof(T));100 unpark( chair );101 chair = 0p;102 unlock( mutex_lock );103 unlock( p_lock );104 unlock( c_lock );105 return retval;106 }107 108 // wait if buffer is empty, work will be completed by someone else109 if ( count == 0 ) {110 chair = active_thread();111 chair_elem = &retval;112 unlock( mutex_lock );113 park( );114 return retval;115 }116 117 // Remove from buffer118 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));119 count -= 1;120 front = (front + 1) % size;121 122 if ( chair != 0p ) {123 insert_( chan, *chair_elem ); // do waiting producer work124 unpark( chair );125 chair = 0p;126 unlock( mutex_lock );127 unlock( p_lock );128 unlock( c_lock );129 return retval;130 }131 132 unlock( mutex_lock );133 unlock( c_lock );134 return retval;135 }136 137 } // forall( T )138 #endif139 140 #ifndef __PREVENTION_CHANNEL141 17 forall( T ) { 142 18 struct channel { … … 165 41 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); } 166 42 167 static inline void insert_( channel(T) & chan, T &elem ) with(chan) {43 static inline void insert_( channel(T) & chan, T elem ) with(chan) { 168 44 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 169 45 count += 1; … … 231 107 232 108 } // forall( T ) 233 #endif -
tests/concurrent/channels/parallel_harness.hfa
rd964c39 r2d028003 139 139 while( cons_done_count != Consumers * Channels ) { 140 140 for ( i; Channels ) { 141 if ( has_wait ers( channels[i] ) ){141 if ( has_waiting_consumers( channels[i] ) ){ 142 142 #ifdef BIG 143 143 bigObject b{0};
Note:
See TracChangeset
for help on using the changeset viewer.