source: libcfa/src/concurrency/channel.hfa @ 3830c84

ADTast-experimental
Last change on this file since 3830c84 was ce44c5f, checked in by caparson <caparson@…>, 17 months ago

Thought of new channel implementation while working on the prevention paper. Resulted in 30%+ greater throughput so impl is now switched to that.

  • Property mode set to 100644
File size: 6.9 KB
Line 
1#include <locks.hfa>
2
3struct no_reacq_lock {
4    inline exp_backoff_then_block_lock;
5};
6
7// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
8static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
9static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
10static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
11static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
12static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
13static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
14// override wakeup so that we don't reacquire the lock if using a condvar
15static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
16
17#define __PREVENTION_CHANNEL
18#ifdef __PREVENTION_CHANNEL
19forall( T ) {
20struct channel {
21    size_t size;
22    size_t front, back, count;
23    T * buffer;
24    thread$ * chair;
25    T * chair_elem;
26    exp_backoff_then_block_lock c_lock, p_lock;
27    __spinlock_t mutex_lock;
28};
29
30static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
31    size = _size;
32    front = back = count = 0;
33    buffer = anew( size );
34    chair = 0p;
35    mutex_lock{};
36    c_lock{};
37    p_lock{};
38}
39
40static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
41static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
42static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
43static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
44static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
45
46static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
47    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
48    count += 1;
49    back++;
50    if ( back == size ) back = 0;
51}
52
53static inline void insert( channel(T) & chan, T elem ) with( chan ) {
54    lock( p_lock );
55    lock( mutex_lock __cfaabi_dbg_ctx2 );
56
57    // have to check for the zero size channel case
58    if ( size == 0 && chair != 0p ) {
59        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
60        unpark( chair );
61        chair = 0p;
62        unlock( mutex_lock );
63        unlock( p_lock );
64        unlock( c_lock );
65        return;
66    }
67
68    // wait if buffer is full, work will be completed by someone else
69    if ( count == size ) {
70        chair = active_thread();
71        chair_elem = &elem;
72        unlock( mutex_lock );
73        park( );
74        return;
75    } // if
76
77    if ( chair != 0p ) {
78        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
79        unpark( chair );
80        chair = 0p;
81        unlock( mutex_lock );
82        unlock( p_lock );
83        unlock( c_lock );
84        return;
85    }
86    else insert_( chan, elem );
87
88    unlock( mutex_lock );
89    unlock( p_lock );
90}
91
92static inline T remove( channel(T) & chan ) with(chan) {
93    lock( c_lock );
94    lock( mutex_lock __cfaabi_dbg_ctx2 );
95    T retval;
96
97    // have to check for the zero size channel case
98    if ( size == 0 && chair != 0p ) {
99        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
100        unpark( chair );
101        chair = 0p;
102        unlock( mutex_lock );
103        unlock( p_lock );
104        unlock( c_lock );
105        return retval;
106    }
107
108    // wait if buffer is empty, work will be completed by someone else
109    if ( count == 0 ) {
110        chair = active_thread();
111        chair_elem = &retval;
112        unlock( mutex_lock );
113        park( );
114        return retval;
115    }
116
117    // Remove from buffer
118    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
119    count -= 1;
120    front = (front + 1) % size;
121
122    if ( chair != 0p ) {
123        insert_( chan, *chair_elem );  // do waiting producer work
124        unpark( chair );
125        chair = 0p;
126        unlock( mutex_lock );
127        unlock( p_lock );
128        unlock( c_lock );
129        return retval;
130    }
131
132    unlock( mutex_lock );
133    unlock( c_lock );
134    return retval;
135}
136
137} // forall( T )
138#endif
139
140#ifndef __PREVENTION_CHANNEL
141forall( T ) {
142struct channel {
143    size_t size;
144    size_t front, back, count;
145    T * buffer;
146    fast_cond_var( no_reacq_lock ) prods, cons;
147    no_reacq_lock mutex_lock;
148};
149
150static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
151    size = _size;
152    front = back = count = 0;
153    buffer = anew( size );
154    prods{};
155    cons{};
156    mutex_lock{};
157}
158
159static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
160static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
161static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
162static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
163static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
164static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
165static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
166
167static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
168    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
169    count += 1;
170    back++;
171    if ( back == size ) back = 0;
172}
173
174
175static inline void insert( channel(T) & chan, T elem ) with(chan) {
176    lock( mutex_lock );
177
178    // have to check for the zero size channel case
179    if ( size == 0 && !empty( cons ) ) {
180        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
181        notify_one( cons );
182        unlock( mutex_lock );
183        return;
184    }
185
186    // wait if buffer is full, work will be completed by someone else
187    if ( count == size ) {
188        wait( prods, mutex_lock, (uintptr_t)&elem );
189        return;
190    } // if
191
192    if ( count == 0 && !empty( cons ) )
193        // do waiting consumer work
194        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
195    else insert_( chan, elem );
196   
197    notify_one( cons );
198    unlock( mutex_lock );
199}
200
201static inline T remove( channel(T) & chan ) with(chan) {
202    lock( mutex_lock );
203    T retval;
204
205    // have to check for the zero size channel case
206    if ( size == 0 && !empty( prods ) ) {
207        memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
208        notify_one( prods );
209        unlock( mutex_lock );
210        return retval;
211    }
212
213    // wait if buffer is empty, work will be completed by someone else
214    if (count == 0) {
215        wait( cons, mutex_lock, (uintptr_t)&retval );
216        return retval;
217    }
218
219    // Remove from buffer
220    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
221    count -= 1;
222    front = (front + 1) % size;
223
224    if (count == size - 1 && !empty( prods ) )
225        insert_( chan, *((T *)front( prods )) );  // do waiting producer work
226
227    notify_one( prods );
228    unlock( mutex_lock );
229    return retval;
230}
231
232} // forall( T )
233#endif
Note: See TracBrowser for help on using the repository browser.