source: libcfa/src/concurrency/channel.hfa @ 3e94a23

ADTast-experimental
Last change on this file since 3e94a23 was a45e21c, checked in by caparson <caparson@…>, 20 months ago

cleaned up channel, added safety/productivity features to channels. added go style spin/block lock. small cleanup in locks.hfa

  • Property mode set to 100644
File size: 9.2 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <list.hfa>
5#include <mutex_stmt.hfa>
6
7// link field used for threads waiting on channel
8struct wait_link {
9    // used to put wait_link on a dl queue
10    inline dlink(wait_link);
11
12    // waiting thread
13    struct thread$ * t;
14
15    // shadow field
16    void * elem;
17};
18P9_EMBEDDED( wait_link, dlink(wait_link) )
19
20static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
21    this.t = t;
22    this.elem = elem;
23}
24
25// wake one thread from the list
26static inline void wake_one( dlist( wait_link ) & queue ) {
27    wait_link & popped = try_pop_front( queue );
28    unpark( popped.t );
29}
30
31// returns true if woken due to shutdown
32// blocks thread on list and releases passed lock
33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
34    wait_link w{ active_thread(), elem_ptr };
35    insert_last( queue, w );
36    unlock( lock );
37    park();
38    return w.elem == 0p;
39}
40
41// void * used for some fields since exceptions don't work with parametric polymorphism currently
42exception channel_closed {
43    // on failed insert elem is a ptr to the element attempting to be inserted
44    // on failed remove elem ptr is 0p
45    // on resumption of a failed insert this elem will be inserted
46    // so a user may modify it in the resumption handler
47    void * elem;
48
49    // pointer to chan that is closed
50    void * closed_chan;
51};
52vtable(channel_closed) channel_closed_vt;
53
54// #define CHAN_STATS // define this to get channel stats printed in dtor
55
56forall( T ) {
57
58struct __attribute__((aligned(128))) channel {
59    size_t size, front, back, count;
60    T * buffer;
61    dlist( wait_link ) prods, cons; // lists of blocked threads
62    go_mutex mutex_lock;            // MX lock
63    bool closed;                    // indicates channel close/open
64    #ifdef CHAN_STATS
65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
66    #endif
67};
68
69static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
70    size = _size;
71    front = back = count = 0;
72    buffer = aalloc( size );
73    prods{};
74    cons{};
75    mutex_lock{};
76    closed = false;
77    #ifdef CHAN_STATS
78    blocks = 0;
79    operations = 0;
80    #endif
81}
82
83static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
84static inline void ^?{}( channel(T) &c ) with(c) {
85    #ifdef CHAN_STATS
86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
87    #endif
88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
89    delete( buffer );
90}
91static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
92static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
93static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
94static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
95static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
96
97// closes the channel and notifies all blocked threads
98static inline void close( channel(T) & chan ) with(chan) {
99    lock( mutex_lock );
100    closed = true;
101
102    // flush waiting consumers and producers
103    while ( has_waiting_consumers( chan ) ) {
104        cons`first.elem = 0p;
105        wake_one( cons );
106    }
107    while ( has_waiting_producers( chan ) ) {
108        prods`first.elem = 0p;
109        wake_one( prods );
110    }
111    unlock(mutex_lock);
112}
113
114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
115
116static inline void flush( channel(T) & chan, T elem ) with(chan) {
117    lock( mutex_lock );
118    while ( count == 0 && !cons`isEmpty ) {
119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
120        wake_one( cons );
121    }
122    unlock( mutex_lock );
123}
124
125// handles buffer insert
126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
128    count += 1;
129    back++;
130    if ( back == size ) back = 0;
131}
132
133// does the buffer insert or hands elem directly to consumer if one is waiting
134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
135    if ( count == 0 && !cons`isEmpty ) {
136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
137        wake_one( cons );
138    } else __buf_insert( chan, elem );
139}
140
141// needed to avoid an extra copy in closed case
142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
143    lock( mutex_lock );
144    #ifdef CHAN_STATS
145    operations++;
146    #endif
147    if ( count == size ) { unlock( mutex_lock ); return false; }
148    __do_insert( chan, elem );
149    unlock( mutex_lock );
150    return true;
151}
152
153// attempts a nonblocking insert
154// returns true if insert was successful, false otherwise
155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
156
157// handles closed case of insert routine
158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
159    channel_closed except{&channel_closed_vt, &elem, &chan };
160    throwResume except; // throw closed resumption
161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
162}
163
164static inline void insert( channel(T) & chan, T elem ) with(chan) {
165    // check for close before acquire mx
166    if ( unlikely(closed) ) {
167        __closed_insert( chan, elem );
168        return;
169    }
170
171    lock( mutex_lock );
172
173    #ifdef CHAN_STATS
174    if ( !closed ) operations++;
175    #endif
176
177    // if closed handle
178    if ( unlikely(closed) ) {
179        unlock( mutex_lock );
180        __closed_insert( chan, elem );
181        return;
182    }
183
184    // have to check for the zero size channel case
185    if ( size == 0 && !cons`isEmpty ) {
186        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
187        wake_one( cons );
188        unlock( mutex_lock );
189        return true;
190    }
191
192    // wait if buffer is full, work will be completed by someone else
193    if ( count == size ) {
194        #ifdef CHAN_STATS
195        blocks++;
196        #endif
197
198        // check for if woken due to close
199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
200            __closed_insert( chan, elem );
201        return;
202    } // if
203
204    if ( count == 0 && !cons`isEmpty ) {
205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
206        wake_one( cons );
207    } else __buf_insert( chan, elem );
208   
209    unlock( mutex_lock );
210    return;
211}
212
213// handles buffer remove
214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
216    count -= 1;
217    front = (front + 1) % size;
218}
219
220// does the buffer remove and potentially does waiting producer work
221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
222    __buf_remove( chan, retval );
223    if (count == size - 1 && !prods`isEmpty ) {
224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
225        wake_one( prods );
226    }
227}
228
229// needed to avoid an extra copy in closed case and single return val case
230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
231    lock( mutex_lock );
232    #ifdef CHAN_STATS
233    operations++;
234    #endif
235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
236    __do_remove( chan, retval );
237    unlock( mutex_lock );
238    return true;
239}
240
241// attempts a nonblocking remove
242// returns [T, true] if insert was successful
243// returns [T, false] if insert was successful (T uninit)
244static inline [T, bool] try_remove( channel(T) & chan ) {
245    T retval;
246    return [ retval, __internal_try_remove( chan, retval ) ];
247}
248
249static inline T try_remove( channel(T) & chan, T elem ) {
250    T retval;
251    __internal_try_remove( chan, retval );
252    return retval;
253}
254
255// handles closed case of insert routine
256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
257    channel_closed except{&channel_closed_vt, 0p, &chan };
258    throwResume except; // throw resumption
259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
260}
261
262static inline T remove( channel(T) & chan ) with(chan) {
263    T retval;
264    if ( unlikely(closed) ) {
265        __closed_remove( chan, retval );
266        return retval;
267    }
268    lock( mutex_lock );
269
270    #ifdef CHAN_STATS
271    if ( !closed ) operations++;
272    #endif
273
274    if ( unlikely(closed) ) {
275        unlock( mutex_lock );
276        __closed_remove( chan, retval );
277        return retval;
278    }
279
280    // have to check for the zero size channel case
281    if ( size == 0 && !prods`isEmpty ) {
282        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
283        wake_one( prods );
284        unlock( mutex_lock );
285        return retval;
286    }
287
288    // wait if buffer is empty, work will be completed by someone else
289    if (count == 0) {
290        #ifdef CHAN_STATS
291        blocks++;
292        #endif
293        // check for if woken due to close
294        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
295            __closed_remove( chan, retval );
296        return retval;
297    }
298
299    // Remove from buffer
300    __do_remove( chan, retval );
301
302    unlock( mutex_lock );
303    return retval;
304}
305} // forall( T )
Note: See TracBrowser for help on using the repository browser.