source: libcfa/src/concurrency/channel.hfa@ 4eebbcc

ADT ast-experimental
Last change on this file since 4eebbcc was 75d874a, checked in by caparsons <caparson@…>, 3 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 10.4 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <list.hfa>
5
6#define __COOP_CHANNEL
7#ifdef __PREVENTION_CHANNEL
8forall( T ) {
9struct channel {
10 size_t size, count, front, back;
11 T * buffer;
12 thread$ * chair;
13 T * chair_elem;
14 exp_backoff_then_block_lock c_lock, p_lock;
15 __spinlock_t mutex_lock;
16 char __padding[64]; // avoid false sharing in arrays of channels
17};
18
19static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
20 size = _size;
21 front = back = count = 0;
22 buffer = aalloc( size );
23 chair = 0p;
24 mutex_lock{};
25 c_lock{};
26 p_lock{};
27}
28
29static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
30static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
31static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
32static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
33static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
34
35static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
36 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
37 count += 1;
38 back++;
39 if ( back == size ) back = 0;
40}
41
42static inline void insert( channel(T) & chan, T elem ) with( chan ) {
43 lock( p_lock );
44 lock( mutex_lock __cfaabi_dbg_ctx2 );
45
46 // have to check for the zero size channel case
47 if ( size == 0 && chair != 0p ) {
48 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
49 unpark( chair );
50 chair = 0p;
51 unlock( mutex_lock );
52 unlock( p_lock );
53 unlock( c_lock );
54 return;
55 }
56
57 // wait if buffer is full, work will be completed by someone else
58 if ( count == size ) {
59 chair = active_thread();
60 chair_elem = &elem;
61 unlock( mutex_lock );
62 park( );
63 return;
64 } // if
65
66 if ( chair != 0p ) {
67 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
68 unpark( chair );
69 chair = 0p;
70 unlock( mutex_lock );
71 unlock( p_lock );
72 unlock( c_lock );
73 return;
74 }
75 insert_( chan, elem );
76
77 unlock( mutex_lock );
78 unlock( p_lock );
79}
80
81static inline T remove( channel(T) & chan ) with(chan) {
82 lock( c_lock );
83 lock( mutex_lock __cfaabi_dbg_ctx2 );
84 T retval;
85
86 // have to check for the zero size channel case
87 if ( size == 0 && chair != 0p ) {
88 memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
89 unpark( chair );
90 chair = 0p;
91 unlock( mutex_lock );
92 unlock( p_lock );
93 unlock( c_lock );
94 return retval;
95 }
96
97 // wait if buffer is empty, work will be completed by someone else
98 if ( count == 0 ) {
99 chair = active_thread();
100 chair_elem = &retval;
101 unlock( mutex_lock );
102 park( );
103 return retval;
104 }
105
106 // Remove from buffer
107 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
108 count -= 1;
109 front++;
110 if ( front == size ) front = 0;
111
112 if ( chair != 0p ) {
113 insert_( chan, *chair_elem ); // do waiting producer work
114 unpark( chair );
115 chair = 0p;
116 unlock( mutex_lock );
117 unlock( p_lock );
118 unlock( c_lock );
119 return retval;
120 }
121
122 unlock( mutex_lock );
123 unlock( c_lock );
124 return retval;
125}
126
127} // forall( T )
128#endif
129
130#ifdef __COOP_CHANNEL
131
132// link field used for threads waiting on channel
133struct wait_link {
134 // used to put wait_link on a dl queue
135 inline dlink(wait_link);
136
137 // waiting thread
138 struct thread$ * t;
139
140 // shadow field
141 void * elem;
142};
143P9_EMBEDDED( wait_link, dlink(wait_link) )
144
145static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
146 this.t = t;
147 this.elem = elem;
148}
149
150forall( T ) {
151
152struct channel {
153 size_t size;
154 size_t front, back, count;
155 T * buffer;
156 dlist( wait_link ) prods, cons;
157 exp_backoff_then_block_lock mutex_lock;
158};
159
160static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
161 size = _size;
162 front = back = count = 0;
163 buffer = aalloc( size );
164 prods{};
165 cons{};
166 mutex_lock{};
167}
168
169static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
170static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
171static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
172static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
173static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
174static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
175static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
176
177static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
178 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
179 count += 1;
180 back++;
181 if ( back == size ) back = 0;
182}
183
184static inline void wake_one( dlist( wait_link ) & queue ) {
185 wait_link & popped = try_pop_front( queue );
186 unpark( popped.t );
187}
188
189static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
190 wait_link w{ active_thread(), elem_ptr };
191 insert_last( queue, w );
192 unlock( lock );
193 park();
194}
195
196static inline void insert( channel(T) & chan, T elem ) with(chan) {
197 lock( mutex_lock );
198
199 // have to check for the zero size channel case
200 if ( size == 0 && !cons`isEmpty ) {
201 memcpy(cons`first.elem, (void *)&elem, sizeof(T));
202 wake_one( cons );
203 unlock( mutex_lock );
204 return;
205 }
206
207 // wait if buffer is full, work will be completed by someone else
208 if ( count == size ) {
209 block( prods, &elem, mutex_lock );
210 return;
211 } // if
212
213 if ( count == 0 && !cons`isEmpty ) {
214 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
215 wake_one( cons );
216 } else insert_( chan, elem );
217
218 unlock( mutex_lock );
219}
220
221static inline T remove( channel(T) & chan ) with(chan) {
222 lock( mutex_lock );
223 T retval;
224
225 // have to check for the zero size channel case
226 if ( size == 0 && !prods`isEmpty ) {
227 memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
228 wake_one( prods );
229 unlock( mutex_lock );
230 return retval;
231 }
232
233 // wait if buffer is empty, work will be completed by someone else
234 if (count == 0) {
235 block( cons, &retval, mutex_lock );
236 return retval;
237 }
238
239 // Remove from buffer
240 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
241 count -= 1;
242 front = (front + 1) % size;
243
244 if (count == size - 1 && !prods`isEmpty ) {
245 insert_( chan, *(T *)prods`first.elem ); // do waiting producer work
246 wake_one( prods );
247 }
248
249 unlock( mutex_lock );
250 return retval;
251}
252} // forall( T )
253#endif
254
255#ifdef __BARGE_CHANNEL
256forall( T ) {
257struct channel {
258 size_t size;
259 size_t front, back, count;
260 T * buffer;
261 fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
262 exp_backoff_then_block_lock mutex_lock;
263};
264
265static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
266 size = _size;
267 front = back = count = 0;
268 buffer = aalloc( size );
269 prods{};
270 cons{};
271 mutex_lock{};
272}
273
274static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
275static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
276static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
277static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
278static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
279static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
280static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
281
282static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
283 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
284 count += 1;
285 back++;
286 if ( back == size ) back = 0;
287}
288
289
290static inline void insert( channel(T) & chan, T elem ) with(chan) {
291 lock( mutex_lock );
292
293 while ( count == size ) {
294 wait( prods, mutex_lock );
295 } // if
296
297 insert_( chan, elem );
298
299 if ( !notify_one( cons ) && count < size )
300 notify_one( prods );
301
302 unlock( mutex_lock );
303}
304
305static inline T remove( channel(T) & chan ) with(chan) {
306 lock( mutex_lock );
307 T retval;
308
309 while (count == 0) {
310 wait( cons, mutex_lock );
311 }
312
313 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
314 count -= 1;
315 front = (front + 1) % size;
316
317 if ( !notify_one( prods ) && count > 0 )
318 notify_one( cons );
319
320 unlock( mutex_lock );
321 return retval;
322}
323
324} // forall( T )
325#endif
326
327#ifdef __NO_WAIT_CHANNEL
328forall( T ) {
329struct channel {
330 size_t size;
331 size_t front, back, count;
332 T * buffer;
333 thread$ * chair;
334 T * chair_elem;
335 exp_backoff_then_block_lock c_lock, p_lock;
336 __spinlock_t mutex_lock;
337};
338
339static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
340 size = _size;
341 front = back = count = 0;
342 buffer = aalloc( size );
343 chair = 0p;
344 mutex_lock{};
345 c_lock{};
346 p_lock{};
347 lock( c_lock );
348}
349
350static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
351static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
352static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
353static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
354static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
355
356static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
357 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
358 count += 1;
359 back++;
360 if ( back == size ) back = 0;
361}
362
363static inline void insert( channel(T) & chan, T elem ) with( chan ) {
364 lock( p_lock );
365 lock( mutex_lock __cfaabi_dbg_ctx2 );
366
367 insert_( chan, elem );
368
369 if ( count != size )
370 unlock( p_lock );
371
372 if ( count == 1 )
373 unlock( c_lock );
374
375 unlock( mutex_lock );
376}
377
378static inline T remove( channel(T) & chan ) with(chan) {
379 lock( c_lock );
380 lock( mutex_lock __cfaabi_dbg_ctx2 );
381 T retval;
382
383 // Remove from buffer
384 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
385 count -= 1;
386 front = (front + 1) % size;
387
388 if ( count != 0 )
389 unlock( c_lock );
390
391 if ( count == size - 1 )
392 unlock( p_lock );
393
394 unlock( mutex_lock );
395 return retval;
396}
397
398} // forall( T )
399#endif
Note: See TracBrowser for help on using the repository browser.