Changeset ce44c5f for libcfa/src/concurrency
- Timestamp:
- Feb 25, 2023, 6:45:40 PM (21 months ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- d964c39
- Parents:
- 640b3df
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/channel.hfa
r640b3df rce44c5f 15 15 static inline void on_wakeup( no_reacq_lock & this, size_t recursion ) {} 16 16 17 #define __PREVENTION_CHANNEL 18 #ifdef __PREVENTION_CHANNEL 19 forall( T ) { 20 struct channel { 21 size_t size; 22 size_t front, back, count; 23 T * buffer; 24 thread$ * chair; 25 T * chair_elem; 26 exp_backoff_then_block_lock c_lock, p_lock; 27 __spinlock_t mutex_lock; 28 }; 29 30 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 31 size = _size; 32 front = back = count = 0; 33 buffer = anew( size ); 34 chair = 0p; 35 mutex_lock{}; 36 c_lock{}; 37 p_lock{}; 38 } 39 40 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 41 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 42 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 43 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 44 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; } 45 46 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 47 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 48 count += 1; 49 back++; 50 if ( back == size ) back = 0; 51 } 52 53 static inline void insert( channel(T) & chan, T elem ) with( chan ) { 54 lock( p_lock ); 55 lock( mutex_lock __cfaabi_dbg_ctx2 ); 56 57 // have to check for the zero size channel case 58 if ( size == 0 && chair != 0p ) { 59 memcpy((void *)chair_elem, (void *)&elem, sizeof(T)); 60 unpark( chair ); 61 chair = 0p; 62 unlock( mutex_lock ); 63 unlock( p_lock ); 64 unlock( c_lock ); 65 return; 66 } 67 68 // wait if buffer is full, work will be completed by someone else 69 if ( count == size ) { 70 chair = active_thread(); 71 chair_elem = &elem; 72 unlock( mutex_lock ); 73 park( ); 74 return; 75 } // if 76 77 if ( chair != 0p ) { 78 memcpy((void *)chair_elem, (void *)&elem, sizeof(T)); 79 unpark( chair ); 80 chair = 0p; 81 unlock( mutex_lock ); 82 unlock( p_lock ); 83 unlock( c_lock ); 84 return; 85 } 86 else insert_( chan, elem ); 87 88 unlock( mutex_lock ); 89 unlock( p_lock ); 90 } 91 92 static inline T remove( channel(T) & chan ) with(chan) { 93 lock( c_lock ); 94 lock( mutex_lock __cfaabi_dbg_ctx2 ); 95 T retval; 96 97 // have to check for the zero size channel case 98 if ( size == 0 && chair != 0p ) { 99 memcpy((void *)&retval, (void *)chair_elem, sizeof(T)); 100 unpark( chair ); 101 chair = 0p; 102 unlock( mutex_lock ); 103 unlock( p_lock ); 104 unlock( c_lock ); 105 return retval; 106 } 107 108 // wait if buffer is empty, work will be completed by someone else 109 if ( count == 0 ) { 110 chair = active_thread(); 111 chair_elem = &retval; 112 unlock( mutex_lock ); 113 park( ); 114 return retval; 115 } 116 117 // Remove from buffer 118 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 119 count -= 1; 120 front = (front + 1) % size; 121 122 if ( chair != 0p ) { 123 insert_( chan, *chair_elem ); // do waiting producer work 124 unpark( chair ); 125 chair = 0p; 126 unlock( mutex_lock ); 127 unlock( p_lock ); 128 unlock( c_lock ); 129 return retval; 130 } 131 132 unlock( mutex_lock ); 133 unlock( c_lock ); 134 return retval; 135 } 136 137 } // forall( T ) 138 #endif 139 140 #ifndef __PREVENTION_CHANNEL 17 141 forall( T ) { 18 142 struct channel { … … 41 165 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); } 42 166 43 static inline void insert_( channel(T) & chan, T elem ) with(chan) {167 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 44 168 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 45 169 count += 1; … … 107 231 108 232 } // forall( T ) 233 #endif
Note: See TracChangeset
for help on using the changeset viewer.