source: libcfa/src/concurrency/channel.hfa@ cd477ca

ADT ast-experimental
Last change on this file since cd477ca was 5e4a830, checked in by Peter A. Buhr <pabuhr@…>, 3 years ago

add #pragma once to .h and .hfa files

  • Property mode set to 100644
File size: 6.9 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4
5struct no_reacq_lock {
6 inline exp_backoff_then_block_lock;
7};
8
9// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
10static inline void ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
11static inline bool try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
12static inline void lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
13static inline void unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
14static inline void on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
15static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
16// override wakeup so that we don't reacquire the lock if using a condvar
17static inline void on_wakeup( no_reacq_lock & this, size_t recursion ) {}
18
19#define __PREVENTION_CHANNEL
20#ifdef __PREVENTION_CHANNEL
21forall( T ) {
22struct channel {
23 size_t size;
24 size_t front, back, count;
25 T * buffer;
26 thread$ * chair;
27 T * chair_elem;
28 exp_backoff_then_block_lock c_lock, p_lock;
29 __spinlock_t mutex_lock;
30};
31
32static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
33 size = _size;
34 front = back = count = 0;
35 buffer = anew( size );
36 chair = 0p;
37 mutex_lock{};
38 c_lock{};
39 p_lock{};
40}
41
42static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
43static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
44static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
45static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
46static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
47
48static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
49 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
50 count += 1;
51 back++;
52 if ( back == size ) back = 0;
53}
54
55static inline void insert( channel(T) & chan, T elem ) with( chan ) {
56 lock( p_lock );
57 lock( mutex_lock __cfaabi_dbg_ctx2 );
58
59 // have to check for the zero size channel case
60 if ( size == 0 && chair != 0p ) {
61 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
62 unpark( chair );
63 chair = 0p;
64 unlock( mutex_lock );
65 unlock( p_lock );
66 unlock( c_lock );
67 return;
68 }
69
70 // wait if buffer is full, work will be completed by someone else
71 if ( count == size ) {
72 chair = active_thread();
73 chair_elem = &elem;
74 unlock( mutex_lock );
75 park( );
76 return;
77 } // if
78
79 if ( chair != 0p ) {
80 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
81 unpark( chair );
82 chair = 0p;
83 unlock( mutex_lock );
84 unlock( p_lock );
85 unlock( c_lock );
86 return;
87 }
88 else insert_( chan, elem );
89
90 unlock( mutex_lock );
91 unlock( p_lock );
92}
93
94static inline T remove( channel(T) & chan ) with(chan) {
95 lock( c_lock );
96 lock( mutex_lock __cfaabi_dbg_ctx2 );
97 T retval;
98
99 // have to check for the zero size channel case
100 if ( size == 0 && chair != 0p ) {
101 memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
102 unpark( chair );
103 chair = 0p;
104 unlock( mutex_lock );
105 unlock( p_lock );
106 unlock( c_lock );
107 return retval;
108 }
109
110 // wait if buffer is empty, work will be completed by someone else
111 if ( count == 0 ) {
112 chair = active_thread();
113 chair_elem = &retval;
114 unlock( mutex_lock );
115 park( );
116 return retval;
117 }
118
119 // Remove from buffer
120 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
121 count -= 1;
122 front = (front + 1) % size;
123
124 if ( chair != 0p ) {
125 insert_( chan, *chair_elem ); // do waiting producer work
126 unpark( chair );
127 chair = 0p;
128 unlock( mutex_lock );
129 unlock( p_lock );
130 unlock( c_lock );
131 return retval;
132 }
133
134 unlock( mutex_lock );
135 unlock( c_lock );
136 return retval;
137}
138
139} // forall( T )
140#endif
141
142#ifndef __PREVENTION_CHANNEL
143forall( T ) {
144struct channel {
145 size_t size;
146 size_t front, back, count;
147 T * buffer;
148 fast_cond_var( no_reacq_lock ) prods, cons;
149 no_reacq_lock mutex_lock;
150};
151
152static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
153 size = _size;
154 front = back = count = 0;
155 buffer = anew( size );
156 prods{};
157 cons{};
158 mutex_lock{};
159}
160
161static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
162static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
163static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
164static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
165static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
166static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
167static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
168
169static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
170 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
171 count += 1;
172 back++;
173 if ( back == size ) back = 0;
174}
175
176
177static inline void insert( channel(T) & chan, T elem ) with(chan) {
178 lock( mutex_lock );
179
180 // have to check for the zero size channel case
181 if ( size == 0 && !empty( cons ) ) {
182 memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
183 notify_one( cons );
184 unlock( mutex_lock );
185 return;
186 }
187
188 // wait if buffer is full, work will be completed by someone else
189 if ( count == size ) {
190 wait( prods, mutex_lock, (uintptr_t)&elem );
191 return;
192 } // if
193
194 if ( count == 0 && !empty( cons ) )
195 // do waiting consumer work
196 memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
197 else insert_( chan, elem );
198
199 notify_one( cons );
200 unlock( mutex_lock );
201}
202
203static inline T remove( channel(T) & chan ) with(chan) {
204 lock( mutex_lock );
205 T retval;
206
207 // have to check for the zero size channel case
208 if ( size == 0 && !empty( prods ) ) {
209 memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
210 notify_one( prods );
211 unlock( mutex_lock );
212 return retval;
213 }
214
215 // wait if buffer is empty, work will be completed by someone else
216 if (count == 0) {
217 wait( cons, mutex_lock, (uintptr_t)&retval );
218 return retval;
219 }
220
221 // Remove from buffer
222 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
223 count -= 1;
224 front = (front + 1) % size;
225
226 if (count == size - 1 && !empty( prods ) )
227 insert_( chan, *((T *)front( prods )) ); // do waiting producer work
228
229 notify_one( prods );
230 unlock( mutex_lock );
231 return retval;
232}
233
234} // forall( T )
235#endif
Note: See TracBrowser for help on using the repository browser.