source: libcfa/src/concurrency/channel.hfa @ dbae916

ADTast-experimental
Last change on this file since dbae916 was 5e4a830, checked in by Peter A. Buhr <pabuhr@…>, 16 months ago

add #pragma once to .h and .hfa files

  • Property mode set to 100644
File size: 6.9 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4
5struct no_reacq_lock {
6    inline exp_backoff_then_block_lock;
7};
8
9// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
10static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
11static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
12static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
13static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
14static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
15static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
16// override wakeup so that we don't reacquire the lock if using a condvar
17static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
18
19#define __PREVENTION_CHANNEL
20#ifdef __PREVENTION_CHANNEL
21forall( T ) {
22struct channel {
23    size_t size;
24    size_t front, back, count;
25    T * buffer;
26    thread$ * chair;
27    T * chair_elem;
28    exp_backoff_then_block_lock c_lock, p_lock;
29    __spinlock_t mutex_lock;
30};
31
32static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
33    size = _size;
34    front = back = count = 0;
35    buffer = anew( size );
36    chair = 0p;
37    mutex_lock{};
38    c_lock{};
39    p_lock{};
40}
41
42static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
43static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
44static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
45static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
46static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
47
48static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
49    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
50    count += 1;
51    back++;
52    if ( back == size ) back = 0;
53}
54
55static inline void insert( channel(T) & chan, T elem ) with( chan ) {
56    lock( p_lock );
57    lock( mutex_lock __cfaabi_dbg_ctx2 );
58
59    // have to check for the zero size channel case
60    if ( size == 0 && chair != 0p ) {
61        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
62        unpark( chair );
63        chair = 0p;
64        unlock( mutex_lock );
65        unlock( p_lock );
66        unlock( c_lock );
67        return;
68    }
69
70    // wait if buffer is full, work will be completed by someone else
71    if ( count == size ) {
72        chair = active_thread();
73        chair_elem = &elem;
74        unlock( mutex_lock );
75        park( );
76        return;
77    } // if
78
79    if ( chair != 0p ) {
80        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
81        unpark( chair );
82        chair = 0p;
83        unlock( mutex_lock );
84        unlock( p_lock );
85        unlock( c_lock );
86        return;
87    }
88    else insert_( chan, elem );
89
90    unlock( mutex_lock );
91    unlock( p_lock );
92}
93
94static inline T remove( channel(T) & chan ) with(chan) {
95    lock( c_lock );
96    lock( mutex_lock __cfaabi_dbg_ctx2 );
97    T retval;
98
99    // have to check for the zero size channel case
100    if ( size == 0 && chair != 0p ) {
101        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
102        unpark( chair );
103        chair = 0p;
104        unlock( mutex_lock );
105        unlock( p_lock );
106        unlock( c_lock );
107        return retval;
108    }
109
110    // wait if buffer is empty, work will be completed by someone else
111    if ( count == 0 ) {
112        chair = active_thread();
113        chair_elem = &retval;
114        unlock( mutex_lock );
115        park( );
116        return retval;
117    }
118
119    // Remove from buffer
120    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
121    count -= 1;
122    front = (front + 1) % size;
123
124    if ( chair != 0p ) {
125        insert_( chan, *chair_elem );  // do waiting producer work
126        unpark( chair );
127        chair = 0p;
128        unlock( mutex_lock );
129        unlock( p_lock );
130        unlock( c_lock );
131        return retval;
132    }
133
134    unlock( mutex_lock );
135    unlock( c_lock );
136    return retval;
137}
138
139} // forall( T )
140#endif
141
142#ifndef __PREVENTION_CHANNEL
143forall( T ) {
144struct channel {
145    size_t size;
146    size_t front, back, count;
147    T * buffer;
148    fast_cond_var( no_reacq_lock ) prods, cons;
149    no_reacq_lock mutex_lock;
150};
151
152static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
153    size = _size;
154    front = back = count = 0;
155    buffer = anew( size );
156    prods{};
157    cons{};
158    mutex_lock{};
159}
160
161static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
162static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
163static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
164static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
165static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
166static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
167static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
168
169static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
170    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
171    count += 1;
172    back++;
173    if ( back == size ) back = 0;
174}
175
176
177static inline void insert( channel(T) & chan, T elem ) with(chan) {
178    lock( mutex_lock );
179
180    // have to check for the zero size channel case
181    if ( size == 0 && !empty( cons ) ) {
182        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
183        notify_one( cons );
184        unlock( mutex_lock );
185        return;
186    }
187
188    // wait if buffer is full, work will be completed by someone else
189    if ( count == size ) {
190        wait( prods, mutex_lock, (uintptr_t)&elem );
191        return;
192    } // if
193
194    if ( count == 0 && !empty( cons ) )
195        // do waiting consumer work
196        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
197    else insert_( chan, elem );
198   
199    notify_one( cons );
200    unlock( mutex_lock );
201}
202
203static inline T remove( channel(T) & chan ) with(chan) {
204    lock( mutex_lock );
205    T retval;
206
207    // have to check for the zero size channel case
208    if ( size == 0 && !empty( prods ) ) {
209        memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
210        notify_one( prods );
211        unlock( mutex_lock );
212        return retval;
213    }
214
215    // wait if buffer is empty, work will be completed by someone else
216    if (count == 0) {
217        wait( cons, mutex_lock, (uintptr_t)&retval );
218        return retval;
219    }
220
221    // Remove from buffer
222    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
223    count -= 1;
224    front = (front + 1) % size;
225
226    if (count == size - 1 && !empty( prods ) )
227        insert_( chan, *((T *)front( prods )) );  // do waiting producer work
228
229    notify_one( prods );
230    unlock( mutex_lock );
231    return retval;
232}
233
234} // forall( T )
235#endif
Note: See TracBrowser for help on using the repository browser.