source: libcfa/src/concurrency/channel.hfa@ 99fb52c

ADT ast-experimental
Last change on this file since 99fb52c was ce44c5f, checked in by caparson <caparson@…>, 3 years ago

Thought of new channel implementation while working on the prevention paper. Resulted in 30%+ greater throughput so impl is now switched to that.

  • Property mode set to 100644
File size: 6.9 KB
Line 
1#include <locks.hfa>
2
3struct no_reacq_lock {
4 inline exp_backoff_then_block_lock;
5};
6
7// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
8static inline void ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
9static inline bool try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
10static inline void lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
11static inline void unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
12static inline void on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
13static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
14// override wakeup so that we don't reacquire the lock if using a condvar
15static inline void on_wakeup( no_reacq_lock & this, size_t recursion ) {}
16
17#define __PREVENTION_CHANNEL
18#ifdef __PREVENTION_CHANNEL
19forall( T ) {
20struct channel {
21 size_t size;
22 size_t front, back, count;
23 T * buffer;
24 thread$ * chair;
25 T * chair_elem;
26 exp_backoff_then_block_lock c_lock, p_lock;
27 __spinlock_t mutex_lock;
28};
29
30static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
31 size = _size;
32 front = back = count = 0;
33 buffer = anew( size );
34 chair = 0p;
35 mutex_lock{};
36 c_lock{};
37 p_lock{};
38}
39
40static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
41static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
42static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
43static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
44static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
45
46static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
47 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
48 count += 1;
49 back++;
50 if ( back == size ) back = 0;
51}
52
53static inline void insert( channel(T) & chan, T elem ) with( chan ) {
54 lock( p_lock );
55 lock( mutex_lock __cfaabi_dbg_ctx2 );
56
57 // have to check for the zero size channel case
58 if ( size == 0 && chair != 0p ) {
59 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
60 unpark( chair );
61 chair = 0p;
62 unlock( mutex_lock );
63 unlock( p_lock );
64 unlock( c_lock );
65 return;
66 }
67
68 // wait if buffer is full, work will be completed by someone else
69 if ( count == size ) {
70 chair = active_thread();
71 chair_elem = &elem;
72 unlock( mutex_lock );
73 park( );
74 return;
75 } // if
76
77 if ( chair != 0p ) {
78 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
79 unpark( chair );
80 chair = 0p;
81 unlock( mutex_lock );
82 unlock( p_lock );
83 unlock( c_lock );
84 return;
85 }
86 else insert_( chan, elem );
87
88 unlock( mutex_lock );
89 unlock( p_lock );
90}
91
92static inline T remove( channel(T) & chan ) with(chan) {
93 lock( c_lock );
94 lock( mutex_lock __cfaabi_dbg_ctx2 );
95 T retval;
96
97 // have to check for the zero size channel case
98 if ( size == 0 && chair != 0p ) {
99 memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
100 unpark( chair );
101 chair = 0p;
102 unlock( mutex_lock );
103 unlock( p_lock );
104 unlock( c_lock );
105 return retval;
106 }
107
108 // wait if buffer is empty, work will be completed by someone else
109 if ( count == 0 ) {
110 chair = active_thread();
111 chair_elem = &retval;
112 unlock( mutex_lock );
113 park( );
114 return retval;
115 }
116
117 // Remove from buffer
118 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
119 count -= 1;
120 front = (front + 1) % size;
121
122 if ( chair != 0p ) {
123 insert_( chan, *chair_elem ); // do waiting producer work
124 unpark( chair );
125 chair = 0p;
126 unlock( mutex_lock );
127 unlock( p_lock );
128 unlock( c_lock );
129 return retval;
130 }
131
132 unlock( mutex_lock );
133 unlock( c_lock );
134 return retval;
135}
136
137} // forall( T )
138#endif
139
140#ifndef __PREVENTION_CHANNEL
141forall( T ) {
142struct channel {
143 size_t size;
144 size_t front, back, count;
145 T * buffer;
146 fast_cond_var( no_reacq_lock ) prods, cons;
147 no_reacq_lock mutex_lock;
148};
149
150static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
151 size = _size;
152 front = back = count = 0;
153 buffer = anew( size );
154 prods{};
155 cons{};
156 mutex_lock{};
157}
158
159static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
160static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
161static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
162static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
163static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
164static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
165static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
166
167static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
168 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
169 count += 1;
170 back++;
171 if ( back == size ) back = 0;
172}
173
174
175static inline void insert( channel(T) & chan, T elem ) with(chan) {
176 lock( mutex_lock );
177
178 // have to check for the zero size channel case
179 if ( size == 0 && !empty( cons ) ) {
180 memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
181 notify_one( cons );
182 unlock( mutex_lock );
183 return;
184 }
185
186 // wait if buffer is full, work will be completed by someone else
187 if ( count == size ) {
188 wait( prods, mutex_lock, (uintptr_t)&elem );
189 return;
190 } // if
191
192 if ( count == 0 && !empty( cons ) )
193 // do waiting consumer work
194 memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
195 else insert_( chan, elem );
196
197 notify_one( cons );
198 unlock( mutex_lock );
199}
200
201static inline T remove( channel(T) & chan ) with(chan) {
202 lock( mutex_lock );
203 T retval;
204
205 // have to check for the zero size channel case
206 if ( size == 0 && !empty( prods ) ) {
207 memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
208 notify_one( prods );
209 unlock( mutex_lock );
210 return retval;
211 }
212
213 // wait if buffer is empty, work will be completed by someone else
214 if (count == 0) {
215 wait( cons, mutex_lock, (uintptr_t)&retval );
216 return retval;
217 }
218
219 // Remove from buffer
220 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
221 count -= 1;
222 front = (front + 1) % size;
223
224 if (count == size - 1 && !empty( prods ) )
225 insert_( chan, *((T *)front( prods )) ); // do waiting producer work
226
227 notify_one( prods );
228 unlock( mutex_lock );
229 return retval;
230}
231
232} // forall( T )
233#endif
Note: See TracBrowser for help on using the repository browser.