source: libcfa/src/concurrency/channel.hfa @ d30e3eb

ADTast-experimental
Last change on this file since d30e3eb was d30e3eb, checked in by caparson <caparson@…>, 14 months ago

cleaned up exp_backoff lock and rewrote parts of channels to improve performance

  • Property mode set to 100644
File size: 6.7 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4#include <list.hfa>
5
6// #define __PREVENTION_CHANNEL
7#ifdef __PREVENTION_CHANNEL
8forall( T ) {
9struct channel {
10    size_t size, count, front, back;
11    T * buffer;
12    thread$ * chair;
13    T * chair_elem;
14    exp_backoff_then_block_lock c_lock, p_lock;
15    __spinlock_t mutex_lock;
16    char __padding[64]; // avoid false sharing in arrays
17};
18
19static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
20    size = _size;
21    front = back = count = 0;
22    buffer = anew( size );
23    chair = 0p;
24    mutex_lock{};
25    c_lock{};
26    p_lock{};
27}
28
29static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
30static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
31static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
32static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
33static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
34
35static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
36    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
37    count += 1;
38    back++;
39    if ( back == size ) back = 0;
40}
41
42static inline void insert( channel(T) & chan, T elem ) with( chan ) {
43    lock( p_lock );
44    lock( mutex_lock __cfaabi_dbg_ctx2 );
45
46    // have to check for the zero size channel case
47    if ( size == 0 && chair != 0p ) {
48        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
49        unpark( chair );
50        chair = 0p;
51        unlock( mutex_lock );
52        unlock( p_lock );
53        unlock( c_lock );
54        return;
55    }
56
57    // wait if buffer is full, work will be completed by someone else
58    if ( count == size ) {
59        chair = active_thread();
60        chair_elem = &elem;
61        unlock( mutex_lock );
62        park( );
63        return;
64    } // if
65
66    if ( chair != 0p ) {
67        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
68        unpark( chair );
69        chair = 0p;
70        unlock( mutex_lock );
71        unlock( p_lock );
72        unlock( c_lock );
73        return;
74    }
75    insert_( chan, elem );
76
77    unlock( mutex_lock );
78    unlock( p_lock );
79}
80
81static inline T remove( channel(T) & chan ) with(chan) {
82    lock( c_lock );
83    lock( mutex_lock __cfaabi_dbg_ctx2 );
84    T retval;
85
86    // have to check for the zero size channel case
87    if ( size == 0 && chair != 0p ) {
88        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
89        unpark( chair );
90        chair = 0p;
91        unlock( mutex_lock );
92        unlock( p_lock );
93        unlock( c_lock );
94        return retval;
95    }
96
97    // wait if buffer is empty, work will be completed by someone else
98    if ( count == 0 ) {
99        chair = active_thread();
100        chair_elem = &retval;
101        unlock( mutex_lock );
102        park( );
103        return retval;
104    }
105
106    // Remove from buffer
107    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
108    count -= 1;
109    front++;
110    if ( front == size ) front = 0;
111
112    if ( chair != 0p ) {
113        insert_( chan, *chair_elem );  // do waiting producer work
114        unpark( chair );
115        chair = 0p;
116        unlock( mutex_lock );
117        unlock( p_lock );
118        unlock( c_lock );
119        return retval;
120    }
121
122    unlock( mutex_lock );
123    unlock( c_lock );
124    return retval;
125}
126
127} // forall( T )
128#endif
129
130#ifndef __PREVENTION_CHANNEL
131
132// link field used for threads waiting on channel
133struct wait_link {
134    // used to put wait_link on a dl queue
135    inline dlink(wait_link);
136
137    // waiting thread
138    struct thread$ * t;
139
140    // shadow field
141    void * elem;
142};
143P9_EMBEDDED( wait_link, dlink(wait_link) )
144
145static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
146    this.t = t;
147    this.elem = elem;
148}
149
150forall( T ) {
151
152struct channel {
153    size_t size;
154    size_t front, back, count;
155    T * buffer;
156    dlist( wait_link ) prods, cons;
157    exp_backoff_then_block_lock mutex_lock;
158};
159
160static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
161    size = _size;
162    front = back = count = 0;
163    buffer = anew( size );
164    prods{};
165    cons{};
166    mutex_lock{};
167}
168
169static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
170static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
171static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
172static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
173static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
174static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
175static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
176
177static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
178    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
179    count += 1;
180    back++;
181    if ( back == size ) back = 0;
182}
183
184static inline void wake_one( dlist( wait_link ) & queue ) {
185    wait_link & popped = try_pop_front( queue );
186    unpark( popped.t );
187}
188
189static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
190    wait_link w{ active_thread(), elem_ptr };
191    insert_last( queue, w );
192    unlock( lock );
193    park();
194}
195
196static inline void insert( channel(T) & chan, T elem ) with(chan) {
197    lock( mutex_lock );
198
199    // have to check for the zero size channel case
200    if ( size == 0 && !cons`isEmpty ) {
201        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
202        wake_one( cons );
203        unlock( mutex_lock );
204        return;
205    }
206
207    // wait if buffer is full, work will be completed by someone else
208    if ( count == size ) {
209        block( prods, &elem, mutex_lock );
210        return;
211    } // if
212
213    if ( count == 0 && !cons`isEmpty ) {
214        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
215        wake_one( cons );
216    } else insert_( chan, elem );
217   
218    unlock( mutex_lock );
219}
220
221static inline T remove( channel(T) & chan ) with(chan) {
222    lock( mutex_lock );
223    T retval;
224
225    // have to check for the zero size channel case
226    if ( size == 0 && !prods`isEmpty ) {
227        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
228        wake_one( prods );
229        unlock( mutex_lock );
230        return retval;
231    }
232
233    // wait if buffer is empty, work will be completed by someone else
234    if (count == 0) {
235        block( cons, &retval, mutex_lock );
236        return retval;
237    }
238
239    // Remove from buffer
240    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
241    count -= 1;
242    front = (front + 1) % size;
243
244    if (count == size - 1 && !prods`isEmpty ) {
245        insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
246        wake_one( prods );
247    }
248
249    unlock( mutex_lock );
250    return retval;
251}
252} // forall( T )
253#endif
Note: See TracBrowser for help on using the repository browser.