source: libcfa/src/concurrency/channel.hfa@ 32d6fdc

ADT ast-experimental
Last change on this file since 32d6fdc was a45e21c, checked in by caparson <caparson@…>, 2 years ago

cleaned up channel, added safety/productivity features to channels. added go style spin/block lock. small cleanup in locks.hfa

  • Property mode set to 100644
File size: 9.2 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <list.hfa>
5#include <mutex_stmt.hfa>
6
7// link field used for threads waiting on channel
8struct wait_link {
9 // used to put wait_link on a dl queue
10 inline dlink(wait_link);
11
12 // waiting thread
13 struct thread$ * t;
14
15 // shadow field
16 void * elem;
17};
18P9_EMBEDDED( wait_link, dlink(wait_link) )
19
20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
21 this.t = t;
22 this.elem = elem;
23}
24
25// wake one thread from the list
26static inline void wake_one( dlist( wait_link ) & queue ) {
27 wait_link & popped = try_pop_front( queue );
28 unpark( popped.t );
29}
30
31// returns true if woken due to shutdown
32// blocks thread on list and releases passed lock
33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
34 wait_link w{ active_thread(), elem_ptr };
35 insert_last( queue, w );
36 unlock( lock );
37 park();
38 return w.elem == 0p;
39}
40
41// void * used for some fields since exceptions don't work with parametric polymorphism currently
42exception channel_closed {
43 // on failed insert elem is a ptr to the element attempting to be inserted
44 // on failed remove elem ptr is 0p
45 // on resumption of a failed insert this elem will be inserted
46 // so a user may modify it in the resumption handler
47 void * elem;
48
49 // pointer to chan that is closed
50 void * closed_chan;
51};
52vtable(channel_closed) channel_closed_vt;
53
54// #define CHAN_STATS // define this to get channel stats printed in dtor
55
56forall( T ) {
57
58struct __attribute__((aligned(128))) channel {
59 size_t size, front, back, count;
60 T * buffer;
61 dlist( wait_link ) prods, cons; // lists of blocked threads
62 go_mutex mutex_lock; // MX lock
63 bool closed; // indicates channel close/open
64 #ifdef CHAN_STATS
65 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd
66 #endif
67};
68
69static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
70 size = _size;
71 front = back = count = 0;
72 buffer = aalloc( size );
73 prods{};
74 cons{};
75 mutex_lock{};
76 closed = false;
77 #ifdef CHAN_STATS
78 blocks = 0;
79 operations = 0;
80 #endif
81}
82
83static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
84static inline void ^?{}( channel(T) &c ) with(c) {
85 #ifdef CHAN_STATS
86 printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
87 #endif
88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
89 delete( buffer );
90}
91static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
92static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
93static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
94static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
95static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
96
97// closes the channel and notifies all blocked threads
98static inline void close( channel(T) & chan ) with(chan) {
99 lock( mutex_lock );
100 closed = true;
101
102 // flush waiting consumers and producers
103 while ( has_waiting_consumers( chan ) ) {
104 cons`first.elem = 0p;
105 wake_one( cons );
106 }
107 while ( has_waiting_producers( chan ) ) {
108 prods`first.elem = 0p;
109 wake_one( prods );
110 }
111 unlock(mutex_lock);
112}
113
114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
115
116static inline void flush( channel(T) & chan, T elem ) with(chan) {
117 lock( mutex_lock );
118 while ( count == 0 && !cons`isEmpty ) {
119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
120 wake_one( cons );
121 }
122 unlock( mutex_lock );
123}
124
125// handles buffer insert
126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
127 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
128 count += 1;
129 back++;
130 if ( back == size ) back = 0;
131}
132
133// does the buffer insert or hands elem directly to consumer if one is waiting
134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
135 if ( count == 0 && !cons`isEmpty ) {
136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
137 wake_one( cons );
138 } else __buf_insert( chan, elem );
139}
140
141// needed to avoid an extra copy in closed case
142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
143 lock( mutex_lock );
144 #ifdef CHAN_STATS
145 operations++;
146 #endif
147 if ( count == size ) { unlock( mutex_lock ); return false; }
148 __do_insert( chan, elem );
149 unlock( mutex_lock );
150 return true;
151}
152
153// attempts a nonblocking insert
154// returns true if insert was successful, false otherwise
155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
156
157// handles closed case of insert routine
158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
159 channel_closed except{&channel_closed_vt, &elem, &chan };
160 throwResume except; // throw closed resumption
161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
162}
163
164static inline void insert( channel(T) & chan, T elem ) with(chan) {
165 // check for close before acquire mx
166 if ( unlikely(closed) ) {
167 __closed_insert( chan, elem );
168 return;
169 }
170
171 lock( mutex_lock );
172
173 #ifdef CHAN_STATS
174 if ( !closed ) operations++;
175 #endif
176
177 // if closed handle
178 if ( unlikely(closed) ) {
179 unlock( mutex_lock );
180 __closed_insert( chan, elem );
181 return;
182 }
183
184 // have to check for the zero size channel case
185 if ( size == 0 && !cons`isEmpty ) {
186 memcpy(cons`first.elem, (void *)&elem, sizeof(T));
187 wake_one( cons );
188 unlock( mutex_lock );
189 return true;
190 }
191
192 // wait if buffer is full, work will be completed by someone else
193 if ( count == size ) {
194 #ifdef CHAN_STATS
195 blocks++;
196 #endif
197
198 // check for if woken due to close
199 if ( unlikely( block( prods, &elem, mutex_lock ) ) )
200 __closed_insert( chan, elem );
201 return;
202 } // if
203
204 if ( count == 0 && !cons`isEmpty ) {
205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
206 wake_one( cons );
207 } else __buf_insert( chan, elem );
208
209 unlock( mutex_lock );
210 return;
211}
212
213// handles buffer remove
214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
216 count -= 1;
217 front = (front + 1) % size;
218}
219
220// does the buffer remove and potentially does waiting producer work
221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
222 __buf_remove( chan, retval );
223 if (count == size - 1 && !prods`isEmpty ) {
224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work
225 wake_one( prods );
226 }
227}
228
229// needed to avoid an extra copy in closed case and single return val case
230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
231 lock( mutex_lock );
232 #ifdef CHAN_STATS
233 operations++;
234 #endif
235 if ( count == 0 ) { unlock( mutex_lock ); return false; }
236 __do_remove( chan, retval );
237 unlock( mutex_lock );
238 return true;
239}
240
241// attempts a nonblocking remove
242// returns [T, true] if insert was successful
243// returns [T, false] if insert was successful (T uninit)
244static inline [T, bool] try_remove( channel(T) & chan ) {
245 T retval;
246 return [ retval, __internal_try_remove( chan, retval ) ];
247}
248
249static inline T try_remove( channel(T) & chan, T elem ) {
250 T retval;
251 __internal_try_remove( chan, retval );
252 return retval;
253}
254
255// handles closed case of insert routine
256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
257 channel_closed except{&channel_closed_vt, 0p, &chan };
258 throwResume except; // throw resumption
259 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
260}
261
262static inline T remove( channel(T) & chan ) with(chan) {
263 T retval;
264 if ( unlikely(closed) ) {
265 __closed_remove( chan, retval );
266 return retval;
267 }
268 lock( mutex_lock );
269
270 #ifdef CHAN_STATS
271 if ( !closed ) operations++;
272 #endif
273
274 if ( unlikely(closed) ) {
275 unlock( mutex_lock );
276 __closed_remove( chan, retval );
277 return retval;
278 }
279
280 // have to check for the zero size channel case
281 if ( size == 0 && !prods`isEmpty ) {
282 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
283 wake_one( prods );
284 unlock( mutex_lock );
285 return retval;
286 }
287
288 // wait if buffer is empty, work will be completed by someone else
289 if (count == 0) {
290 #ifdef CHAN_STATS
291 blocks++;
292 #endif
293 // check for if woken due to close
294 if ( unlikely( block( cons, &retval, mutex_lock ) ) )
295 __closed_remove( chan, retval );
296 return retval;
297 }
298
299 // Remove from buffer
300 __do_remove( chan, retval );
301
302 unlock( mutex_lock );
303 return retval;
304}
305} // forall( T )
Note: See TracBrowser for help on using the repository browser.