source: libcfa/src/concurrency/channel.hfa @ efdd18c

ADTast-experimental
Last change on this file since efdd18c was 75d874a, checked in by caparsons <caparson@…>, 16 months ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 10.4 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <list.hfa>
5
6#define __COOP_CHANNEL
7#ifdef __PREVENTION_CHANNEL
8forall( T ) {
9struct channel {
10    size_t size, count, front, back;
11    T * buffer;
12    thread$ * chair;
13    T * chair_elem;
14    exp_backoff_then_block_lock c_lock, p_lock;
15    __spinlock_t mutex_lock;
16    char __padding[64]; // avoid false sharing in arrays of channels
17};
18
19static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
20    size = _size;
21    front = back = count = 0;
22    buffer = aalloc( size );
23    chair = 0p;
24    mutex_lock{};
25    c_lock{};
26    p_lock{};
27}
28
29static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
30static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
31static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
32static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
33static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
34
35static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
36    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
37    count += 1;
38    back++;
39    if ( back == size ) back = 0;
40}
41
42static inline void insert( channel(T) & chan, T elem ) with( chan ) {
43    lock( p_lock );
44    lock( mutex_lock __cfaabi_dbg_ctx2 );
45
46    // have to check for the zero size channel case
47    if ( size == 0 && chair != 0p ) {
48        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
49        unpark( chair );
50        chair = 0p;
51        unlock( mutex_lock );
52        unlock( p_lock );
53        unlock( c_lock );
54        return;
55    }
56
57    // wait if buffer is full, work will be completed by someone else
58    if ( count == size ) {
59        chair = active_thread();
60        chair_elem = &elem;
61        unlock( mutex_lock );
62        park( );
63        return;
64    } // if
65
66    if ( chair != 0p ) {
67        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
68        unpark( chair );
69        chair = 0p;
70        unlock( mutex_lock );
71        unlock( p_lock );
72        unlock( c_lock );
73        return;
74    }
75    insert_( chan, elem );
76
77    unlock( mutex_lock );
78    unlock( p_lock );
79}
80
81static inline T remove( channel(T) & chan ) with(chan) {
82    lock( c_lock );
83    lock( mutex_lock __cfaabi_dbg_ctx2 );
84    T retval;
85
86    // have to check for the zero size channel case
87    if ( size == 0 && chair != 0p ) {
88        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
89        unpark( chair );
90        chair = 0p;
91        unlock( mutex_lock );
92        unlock( p_lock );
93        unlock( c_lock );
94        return retval;
95    }
96
97    // wait if buffer is empty, work will be completed by someone else
98    if ( count == 0 ) {
99        chair = active_thread();
100        chair_elem = &retval;
101        unlock( mutex_lock );
102        park( );
103        return retval;
104    }
105
106    // Remove from buffer
107    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
108    count -= 1;
109    front++;
110    if ( front == size ) front = 0;
111
112    if ( chair != 0p ) {
113        insert_( chan, *chair_elem );  // do waiting producer work
114        unpark( chair );
115        chair = 0p;
116        unlock( mutex_lock );
117        unlock( p_lock );
118        unlock( c_lock );
119        return retval;
120    }
121
122    unlock( mutex_lock );
123    unlock( c_lock );
124    return retval;
125}
126
127} // forall( T )
128#endif
129
130#ifdef __COOP_CHANNEL
131
132// link field used for threads waiting on channel
133struct wait_link {
134    // used to put wait_link on a dl queue
135    inline dlink(wait_link);
136
137    // waiting thread
138    struct thread$ * t;
139
140    // shadow field
141    void * elem;
142};
143P9_EMBEDDED( wait_link, dlink(wait_link) )
144
145static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
146    this.t = t;
147    this.elem = elem;
148}
149
150forall( T ) {
151
152struct channel {
153    size_t size;
154    size_t front, back, count;
155    T * buffer;
156    dlist( wait_link ) prods, cons;
157    exp_backoff_then_block_lock mutex_lock;
158};
159
160static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
161    size = _size;
162    front = back = count = 0;
163    buffer = aalloc( size );
164    prods{};
165    cons{};
166    mutex_lock{};
167}
168
169static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
170static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
171static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
172static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
173static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
174static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
175static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
176
177static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
178    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
179    count += 1;
180    back++;
181    if ( back == size ) back = 0;
182}
183
184static inline void wake_one( dlist( wait_link ) & queue ) {
185    wait_link & popped = try_pop_front( queue );
186    unpark( popped.t );
187}
188
189static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
190    wait_link w{ active_thread(), elem_ptr };
191    insert_last( queue, w );
192    unlock( lock );
193    park();
194}
195
196static inline void insert( channel(T) & chan, T elem ) with(chan) {
197    lock( mutex_lock );
198
199    // have to check for the zero size channel case
200    if ( size == 0 && !cons`isEmpty ) {
201        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
202        wake_one( cons );
203        unlock( mutex_lock );
204        return;
205    }
206
207    // wait if buffer is full, work will be completed by someone else
208    if ( count == size ) {
209        block( prods, &elem, mutex_lock );
210        return;
211    } // if
212
213    if ( count == 0 && !cons`isEmpty ) {
214        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
215        wake_one( cons );
216    } else insert_( chan, elem );
217   
218    unlock( mutex_lock );
219}
220
221static inline T remove( channel(T) & chan ) with(chan) {
222    lock( mutex_lock );
223    T retval;
224
225    // have to check for the zero size channel case
226    if ( size == 0 && !prods`isEmpty ) {
227        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
228        wake_one( prods );
229        unlock( mutex_lock );
230        return retval;
231    }
232
233    // wait if buffer is empty, work will be completed by someone else
234    if (count == 0) {
235        block( cons, &retval, mutex_lock );
236        return retval;
237    }
238
239    // Remove from buffer
240    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
241    count -= 1;
242    front = (front + 1) % size;
243
244    if (count == size - 1 && !prods`isEmpty ) {
245        insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
246        wake_one( prods );
247    }
248
249    unlock( mutex_lock );
250    return retval;
251}
252} // forall( T )
253#endif
254
255#ifdef __BARGE_CHANNEL
256forall( T ) {
257struct channel {
258    size_t size;
259    size_t front, back, count;
260    T * buffer;
261    fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
262    exp_backoff_then_block_lock mutex_lock;
263};
264
265static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
266    size = _size;
267    front = back = count = 0;
268    buffer = aalloc( size );
269    prods{};
270    cons{};
271    mutex_lock{};
272}
273
274static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
275static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
276static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
277static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
278static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
279static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
280static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
281
282static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
283    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
284    count += 1;
285    back++;
286    if ( back == size ) back = 0;
287}
288
289
290static inline void insert( channel(T) & chan, T elem ) with(chan) {
291    lock( mutex_lock );
292
293    while ( count == size ) {
294        wait( prods, mutex_lock );
295    } // if
296
297    insert_( chan, elem );
298   
299    if ( !notify_one( cons ) && count < size )
300        notify_one( prods );
301
302    unlock( mutex_lock );
303}
304
305static inline T remove( channel(T) & chan ) with(chan) {
306    lock( mutex_lock );
307    T retval;
308
309    while (count == 0) {
310        wait( cons, mutex_lock );
311    }
312
313    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
314    count -= 1;
315    front = (front + 1) % size;
316
317    if ( !notify_one( prods ) && count > 0 )
318        notify_one( cons );
319
320    unlock( mutex_lock );
321    return retval;
322}
323
324} // forall( T )
325#endif
326
327#ifdef __NO_WAIT_CHANNEL
328forall( T ) {
329struct channel {
330    size_t size;
331    size_t front, back, count;
332    T * buffer;
333    thread$ * chair;
334    T * chair_elem;
335    exp_backoff_then_block_lock c_lock, p_lock;
336    __spinlock_t mutex_lock;
337};
338
339static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
340    size = _size;
341    front = back = count = 0;
342    buffer = aalloc( size );
343    chair = 0p;
344    mutex_lock{};
345    c_lock{};
346    p_lock{};
347    lock( c_lock );
348}
349
350static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
351static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
352static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
353static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
354static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
355
356static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
357    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
358    count += 1;
359    back++;
360    if ( back == size ) back = 0;
361}
362
363static inline void insert( channel(T) & chan, T elem ) with( chan ) {
364    lock( p_lock );
365    lock( mutex_lock __cfaabi_dbg_ctx2 );
366
367    insert_( chan, elem );
368
369    if ( count != size )
370        unlock( p_lock );
371
372    if ( count == 1 )
373        unlock( c_lock );
374       
375    unlock( mutex_lock );
376}
377
378static inline T remove( channel(T) & chan ) with(chan) {
379    lock( c_lock );
380    lock( mutex_lock __cfaabi_dbg_ctx2 );
381    T retval;
382
383    // Remove from buffer
384    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
385    count -= 1;
386    front = (front + 1) % size;
387
388    if ( count != 0 )
389        unlock( c_lock );
390
391    if ( count == size - 1 )
392        unlock( p_lock );
393       
394    unlock( mutex_lock );
395    return retval;
396}
397
398} // forall( T )
399#endif
Note: See TracBrowser for help on using the repository browser.