[5e4a830] | 1 | #pragma once |
---|
| 2 | |
---|
[4a962d8] | 3 | #include <locks.hfa> |
---|
[d30e3eb] | 4 | #include <list.hfa> |
---|
[a45e21c] | 5 | #include <mutex_stmt.hfa> |
---|
[d30e3eb] | 6 | |
---|
| 7 | // link field used for threads waiting on channel |
---|
| 8 | struct wait_link { |
---|
| 9 | // used to put wait_link on a dl queue |
---|
| 10 | inline dlink(wait_link); |
---|
| 11 | |
---|
| 12 | // waiting thread |
---|
| 13 | struct thread$ * t; |
---|
| 14 | |
---|
| 15 | // shadow field |
---|
| 16 | void * elem; |
---|
| 17 | }; |
---|
| 18 | P9_EMBEDDED( wait_link, dlink(wait_link) ) |
---|
| 19 | |
---|
| 20 | static inline void ?{}( wait_link & this, thread$ * t, void * elem ) { |
---|
| 21 | this.t = t; |
---|
| 22 | this.elem = elem; |
---|
| 23 | } |
---|
| 24 | |
---|
[a45e21c] | 25 | // wake one thread from the list |
---|
| 26 | static inline void wake_one( dlist( wait_link ) & queue ) { |
---|
| 27 | wait_link & popped = try_pop_front( queue ); |
---|
| 28 | unpark( popped.t ); |
---|
| 29 | } |
---|
| 30 | |
---|
| 31 | // returns true if woken due to shutdown |
---|
| 32 | // blocks thread on list and releases passed lock |
---|
| 33 | static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) { |
---|
| 34 | wait_link w{ active_thread(), elem_ptr }; |
---|
| 35 | insert_last( queue, w ); |
---|
| 36 | unlock( lock ); |
---|
| 37 | park(); |
---|
| 38 | return w.elem == 0p; |
---|
| 39 | } |
---|
| 40 | |
---|
| 41 | // void * used for some fields since exceptions don't work with parametric polymorphism currently |
---|
| 42 | exception channel_closed { |
---|
| 43 | // on failed insert elem is a ptr to the element attempting to be inserted |
---|
| 44 | // on failed remove elem ptr is 0p |
---|
| 45 | // on resumption of a failed insert this elem will be inserted |
---|
| 46 | // so a user may modify it in the resumption handler |
---|
| 47 | void * elem; |
---|
| 48 | |
---|
| 49 | // pointer to chan that is closed |
---|
| 50 | void * closed_chan; |
---|
| 51 | }; |
---|
| 52 | vtable(channel_closed) channel_closed_vt; |
---|
| 53 | |
---|
| 54 | // #define CHAN_STATS // define this to get channel stats printed in dtor |
---|
| 55 | |
---|
[4a962d8] | 56 | forall( T ) { |
---|
[d30e3eb] | 57 | |
---|
[a45e21c] | 58 | struct __attribute__((aligned(128))) channel { |
---|
| 59 | size_t size, front, back, count; |
---|
[4a962d8] | 60 | T * buffer; |
---|
[a45e21c] | 61 | dlist( wait_link ) prods, cons; // lists of blocked threads |
---|
| 62 | go_mutex mutex_lock; // MX lock |
---|
| 63 | bool closed; // indicates channel close/open |
---|
| 64 | #ifdef CHAN_STATS |
---|
| 65 | size_t blocks, operations; // counts total ops and ops resulting in a blocked thd |
---|
| 66 | #endif |
---|
[4a962d8] | 67 | }; |
---|
| 68 | |
---|
| 69 | static inline void ?{}( channel(T) &c, size_t _size ) with(c) { |
---|
| 70 | size = _size; |
---|
| 71 | front = back = count = 0; |
---|
[0e16a2d] | 72 | buffer = aalloc( size ); |
---|
[4a962d8] | 73 | prods{}; |
---|
| 74 | cons{}; |
---|
| 75 | mutex_lock{}; |
---|
[a45e21c] | 76 | closed = false; |
---|
| 77 | #ifdef CHAN_STATS |
---|
| 78 | blocks = 0; |
---|
| 79 | operations = 0; |
---|
| 80 | #endif |
---|
[4a962d8] | 81 | } |
---|
| 82 | |
---|
| 83 | static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } |
---|
[a45e21c] | 84 | static inline void ^?{}( channel(T) &c ) with(c) { |
---|
| 85 | #ifdef CHAN_STATS |
---|
| 86 | printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100); |
---|
| 87 | #endif |
---|
| 88 | verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); |
---|
| 89 | delete( buffer ); |
---|
| 90 | } |
---|
[42b739d7] | 91 | static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } |
---|
| 92 | static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } |
---|
[d30e3eb] | 93 | static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; } |
---|
| 94 | static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; } |
---|
| 95 | static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; } |
---|
[4a962d8] | 96 | |
---|
[a45e21c] | 97 | // closes the channel and notifies all blocked threads |
---|
| 98 | static inline void close( channel(T) & chan ) with(chan) { |
---|
| 99 | lock( mutex_lock ); |
---|
| 100 | closed = true; |
---|
| 101 | |
---|
| 102 | // flush waiting consumers and producers |
---|
| 103 | while ( has_waiting_consumers( chan ) ) { |
---|
| 104 | cons`first.elem = 0p; |
---|
| 105 | wake_one( cons ); |
---|
| 106 | } |
---|
| 107 | while ( has_waiting_producers( chan ) ) { |
---|
| 108 | prods`first.elem = 0p; |
---|
| 109 | wake_one( prods ); |
---|
| 110 | } |
---|
| 111 | unlock(mutex_lock); |
---|
| 112 | } |
---|
| 113 | |
---|
| 114 | static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } |
---|
| 115 | |
---|
| 116 | static inline void flush( channel(T) & chan, T elem ) with(chan) { |
---|
| 117 | lock( mutex_lock ); |
---|
| 118 | while ( count == 0 && !cons`isEmpty ) { |
---|
| 119 | memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work |
---|
| 120 | wake_one( cons ); |
---|
| 121 | } |
---|
| 122 | unlock( mutex_lock ); |
---|
| 123 | } |
---|
| 124 | |
---|
| 125 | // handles buffer insert |
---|
| 126 | static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { |
---|
[4a962d8] | 127 | memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); |
---|
| 128 | count += 1; |
---|
| 129 | back++; |
---|
| 130 | if ( back == size ) back = 0; |
---|
| 131 | } |
---|
| 132 | |
---|
[a45e21c] | 133 | // does the buffer insert or hands elem directly to consumer if one is waiting |
---|
| 134 | static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) { |
---|
| 135 | if ( count == 0 && !cons`isEmpty ) { |
---|
| 136 | memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work |
---|
| 137 | wake_one( cons ); |
---|
| 138 | } else __buf_insert( chan, elem ); |
---|
[d30e3eb] | 139 | } |
---|
| 140 | |
---|
[a45e21c] | 141 | // needed to avoid an extra copy in closed case |
---|
| 142 | static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { |
---|
| 143 | lock( mutex_lock ); |
---|
| 144 | #ifdef CHAN_STATS |
---|
| 145 | operations++; |
---|
| 146 | #endif |
---|
| 147 | if ( count == size ) { unlock( mutex_lock ); return false; } |
---|
| 148 | __do_insert( chan, elem ); |
---|
| 149 | unlock( mutex_lock ); |
---|
| 150 | return true; |
---|
| 151 | } |
---|
| 152 | |
---|
| 153 | // attempts a nonblocking insert |
---|
| 154 | // returns true if insert was successful, false otherwise |
---|
| 155 | static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); } |
---|
| 156 | |
---|
| 157 | // handles closed case of insert routine |
---|
| 158 | static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { |
---|
| 159 | channel_closed except{&channel_closed_vt, &elem, &chan }; |
---|
| 160 | throwResume except; // throw closed resumption |
---|
| 161 | if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination |
---|
[d30e3eb] | 162 | } |
---|
[4a962d8] | 163 | |
---|
[42b739d7] | 164 | static inline void insert( channel(T) & chan, T elem ) with(chan) { |
---|
[a45e21c] | 165 | // check for close before acquire mx |
---|
| 166 | if ( unlikely(closed) ) { |
---|
| 167 | __closed_insert( chan, elem ); |
---|
| 168 | return; |
---|
| 169 | } |
---|
| 170 | |
---|
[4a962d8] | 171 | lock( mutex_lock ); |
---|
| 172 | |
---|
[a45e21c] | 173 | #ifdef CHAN_STATS |
---|
| 174 | if ( !closed ) operations++; |
---|
| 175 | #endif |
---|
| 176 | |
---|
| 177 | // if closed handle |
---|
| 178 | if ( unlikely(closed) ) { |
---|
| 179 | unlock( mutex_lock ); |
---|
| 180 | __closed_insert( chan, elem ); |
---|
| 181 | return; |
---|
| 182 | } |
---|
| 183 | |
---|
[5c931e0] | 184 | // have to check for the zero size channel case |
---|
[d30e3eb] | 185 | if ( size == 0 && !cons`isEmpty ) { |
---|
| 186 | memcpy(cons`first.elem, (void *)&elem, sizeof(T)); |
---|
| 187 | wake_one( cons ); |
---|
[5c931e0] | 188 | unlock( mutex_lock ); |
---|
[a45e21c] | 189 | return true; |
---|
[5c931e0] | 190 | } |
---|
| 191 | |
---|
[4a962d8] | 192 | // wait if buffer is full, work will be completed by someone else |
---|
[d30e3eb] | 193 | if ( count == size ) { |
---|
[a45e21c] | 194 | #ifdef CHAN_STATS |
---|
| 195 | blocks++; |
---|
| 196 | #endif |
---|
| 197 | |
---|
| 198 | // check for if woken due to close |
---|
| 199 | if ( unlikely( block( prods, &elem, mutex_lock ) ) ) |
---|
| 200 | __closed_insert( chan, elem ); |
---|
[4a962d8] | 201 | return; |
---|
| 202 | } // if |
---|
| 203 | |
---|
[d30e3eb] | 204 | if ( count == 0 && !cons`isEmpty ) { |
---|
| 205 | memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work |
---|
| 206 | wake_one( cons ); |
---|
[a45e21c] | 207 | } else __buf_insert( chan, elem ); |
---|
[4a962d8] | 208 | |
---|
| 209 | unlock( mutex_lock ); |
---|
[a45e21c] | 210 | return; |
---|
[4a962d8] | 211 | } |
---|
| 212 | |
---|
[a45e21c] | 213 | // handles buffer remove |
---|
| 214 | static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { |
---|
[4a962d8] | 215 | memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); |
---|
| 216 | count -= 1; |
---|
| 217 | front = (front + 1) % size; |
---|
[a45e21c] | 218 | } |
---|
[4a962d8] | 219 | |
---|
[a45e21c] | 220 | // does the buffer remove and potentially does waiting producer work |
---|
| 221 | static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { |
---|
| 222 | __buf_remove( chan, retval ); |
---|
[d30e3eb] | 223 | if (count == size - 1 && !prods`isEmpty ) { |
---|
[a45e21c] | 224 | __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work |
---|
[d30e3eb] | 225 | wake_one( prods ); |
---|
| 226 | } |
---|
[4a962d8] | 227 | } |
---|
[0e16a2d] | 228 | |
---|
[a45e21c] | 229 | // needed to avoid an extra copy in closed case and single return val case |
---|
| 230 | static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) { |
---|
[0e16a2d] | 231 | lock( mutex_lock ); |
---|
[a45e21c] | 232 | #ifdef CHAN_STATS |
---|
| 233 | operations++; |
---|
| 234 | #endif |
---|
| 235 | if ( count == 0 ) { unlock( mutex_lock ); return false; } |
---|
| 236 | __do_remove( chan, retval ); |
---|
[0e16a2d] | 237 | unlock( mutex_lock ); |
---|
[a45e21c] | 238 | return true; |
---|
[0e16a2d] | 239 | } |
---|
| 240 | |
---|
[a45e21c] | 241 | // attempts a nonblocking remove |
---|
| 242 | // returns [T, true] if insert was successful |
---|
| 243 | // returns [T, false] if insert was successful (T uninit) |
---|
| 244 | static inline [T, bool] try_remove( channel(T) & chan ) { |
---|
[0e16a2d] | 245 | T retval; |
---|
[a45e21c] | 246 | return [ retval, __internal_try_remove( chan, retval ) ]; |
---|
| 247 | } |
---|
[0e16a2d] | 248 | |
---|
[a45e21c] | 249 | static inline T try_remove( channel(T) & chan, T elem ) { |
---|
| 250 | T retval; |
---|
| 251 | __internal_try_remove( chan, retval ); |
---|
[0e16a2d] | 252 | return retval; |
---|
| 253 | } |
---|
| 254 | |
---|
[a45e21c] | 255 | // handles closed case of insert routine |
---|
| 256 | static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { |
---|
| 257 | channel_closed except{&channel_closed_vt, 0p, &chan }; |
---|
| 258 | throwResume except; // throw resumption |
---|
| 259 | if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination |
---|
[0e16a2d] | 260 | } |
---|
| 261 | |
---|
[a45e21c] | 262 | static inline T remove( channel(T) & chan ) with(chan) { |
---|
| 263 | T retval; |
---|
| 264 | if ( unlikely(closed) ) { |
---|
| 265 | __closed_remove( chan, retval ); |
---|
| 266 | return retval; |
---|
| 267 | } |
---|
| 268 | lock( mutex_lock ); |
---|
[0e16a2d] | 269 | |
---|
[a45e21c] | 270 | #ifdef CHAN_STATS |
---|
| 271 | if ( !closed ) operations++; |
---|
| 272 | #endif |
---|
[0e16a2d] | 273 | |
---|
[a45e21c] | 274 | if ( unlikely(closed) ) { |
---|
| 275 | unlock( mutex_lock ); |
---|
| 276 | __closed_remove( chan, retval ); |
---|
| 277 | return retval; |
---|
| 278 | } |
---|
[0e16a2d] | 279 | |
---|
[a45e21c] | 280 | // have to check for the zero size channel case |
---|
| 281 | if ( size == 0 && !prods`isEmpty ) { |
---|
| 282 | memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T)); |
---|
| 283 | wake_one( prods ); |
---|
| 284 | unlock( mutex_lock ); |
---|
| 285 | return retval; |
---|
| 286 | } |
---|
[0e16a2d] | 287 | |
---|
[a45e21c] | 288 | // wait if buffer is empty, work will be completed by someone else |
---|
| 289 | if (count == 0) { |
---|
| 290 | #ifdef CHAN_STATS |
---|
| 291 | blocks++; |
---|
| 292 | #endif |
---|
| 293 | // check for if woken due to close |
---|
| 294 | if ( unlikely( block( cons, &retval, mutex_lock ) ) ) |
---|
| 295 | __closed_remove( chan, retval ); |
---|
| 296 | return retval; |
---|
| 297 | } |
---|
[0e16a2d] | 298 | |
---|
| 299 | // Remove from buffer |
---|
[a45e21c] | 300 | __do_remove( chan, retval ); |
---|
[0e16a2d] | 301 | |
---|
| 302 | unlock( mutex_lock ); |
---|
| 303 | return retval; |
---|
| 304 | } |
---|
| 305 | } // forall( T ) |
---|