source: libcfa/src/concurrency/channel.hfa@ 056bee8

ADT ast-experimental
Last change on this file since 056bee8 was 1d245ea, checked in by caparson <caparson@…>, 3 years ago

added padding to channel to prevent false sharing that was occurring

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4
5struct no_reacq_lock {
6 inline exp_backoff_then_block_lock;
7};
8
9// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
10static inline void ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
11static inline bool try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
12static inline void lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
13static inline void unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
14static inline void on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
15static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
16// override wakeup so that we don't reacquire the lock if using a condvar
17static inline void on_wakeup( no_reacq_lock & this, size_t recursion ) {}
18
19#define __PREVENTION_CHANNEL
20#ifdef __PREVENTION_CHANNEL
21forall( T ) {
22struct channel {
23 size_t size;
24 size_t front, back, count;
25 T * buffer;
26 thread$ * chair;
27 T * chair_elem;
28 exp_backoff_then_block_lock c_lock, p_lock;
29 __spinlock_t mutex_lock;
30 char __padding[64]; // avoid false sharing in arrays
31};
32
33static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
34 size = _size;
35 front = back = count = 0;
36 buffer = anew( size );
37 chair = 0p;
38 mutex_lock{};
39 c_lock{};
40 p_lock{};
41}
42
43static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
44static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
45static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
46static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
47static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
48
49static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
50 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
51 count += 1;
52 back++;
53 if ( back == size ) back = 0;
54}
55
56static inline void insert( channel(T) & chan, T elem ) with( chan ) {
57 lock( p_lock );
58 lock( mutex_lock __cfaabi_dbg_ctx2 );
59
60 // have to check for the zero size channel case
61 if ( size == 0 && chair != 0p ) {
62 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
63 unpark( chair );
64 chair = 0p;
65 unlock( mutex_lock );
66 unlock( p_lock );
67 unlock( c_lock );
68 return;
69 }
70
71 // wait if buffer is full, work will be completed by someone else
72 if ( count == size ) {
73 chair = active_thread();
74 chair_elem = &elem;
75 unlock( mutex_lock );
76 park( );
77 return;
78 } // if
79
80 if ( chair != 0p ) {
81 memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
82 unpark( chair );
83 chair = 0p;
84 unlock( mutex_lock );
85 unlock( p_lock );
86 unlock( c_lock );
87 return;
88 }
89 else insert_( chan, elem );
90
91 unlock( mutex_lock );
92 unlock( p_lock );
93}
94
95static inline T remove( channel(T) & chan ) with(chan) {
96 lock( c_lock );
97 lock( mutex_lock __cfaabi_dbg_ctx2 );
98 T retval;
99
100 // have to check for the zero size channel case
101 if ( size == 0 && chair != 0p ) {
102 memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
103 unpark( chair );
104 chair = 0p;
105 unlock( mutex_lock );
106 unlock( p_lock );
107 unlock( c_lock );
108 return retval;
109 }
110
111 // wait if buffer is empty, work will be completed by someone else
112 if ( count == 0 ) {
113 chair = active_thread();
114 chair_elem = &retval;
115 unlock( mutex_lock );
116 park( );
117 return retval;
118 }
119
120 // Remove from buffer
121 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
122 count -= 1;
123 front = (front + 1) % size;
124
125 if ( chair != 0p ) {
126 insert_( chan, *chair_elem ); // do waiting producer work
127 unpark( chair );
128 chair = 0p;
129 unlock( mutex_lock );
130 unlock( p_lock );
131 unlock( c_lock );
132 return retval;
133 }
134
135 unlock( mutex_lock );
136 unlock( c_lock );
137 return retval;
138}
139
140} // forall( T )
141#endif
142
143#ifndef __PREVENTION_CHANNEL
144forall( T ) {
145struct channel {
146 size_t size;
147 size_t front, back, count;
148 T * buffer;
149 fast_cond_var( no_reacq_lock ) prods, cons;
150 no_reacq_lock mutex_lock;
151};
152
153static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
154 size = _size;
155 front = back = count = 0;
156 buffer = anew( size );
157 prods{};
158 cons{};
159 mutex_lock{};
160}
161
162static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
163static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
164static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
165static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
166static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
167static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
168static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
169
170static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
171 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
172 count += 1;
173 back++;
174 if ( back == size ) back = 0;
175}
176
177
178static inline void insert( channel(T) & chan, T elem ) with(chan) {
179 lock( mutex_lock );
180
181 // have to check for the zero size channel case
182 if ( size == 0 && !empty( cons ) ) {
183 memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
184 notify_one( cons );
185 unlock( mutex_lock );
186 return;
187 }
188
189 // wait if buffer is full, work will be completed by someone else
190 if ( count == size ) {
191 wait( prods, mutex_lock, (uintptr_t)&elem );
192 return;
193 } // if
194
195 if ( count == 0 && !empty( cons ) )
196 // do waiting consumer work
197 memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
198 else insert_( chan, elem );
199
200 notify_one( cons );
201 unlock( mutex_lock );
202}
203
204static inline T remove( channel(T) & chan ) with(chan) {
205 lock( mutex_lock );
206 T retval;
207
208 // have to check for the zero size channel case
209 if ( size == 0 && !empty( prods ) ) {
210 memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
211 notify_one( prods );
212 unlock( mutex_lock );
213 return retval;
214 }
215
216 // wait if buffer is empty, work will be completed by someone else
217 if (count == 0) {
218 wait( cons, mutex_lock, (uintptr_t)&retval );
219 return retval;
220 }
221
222 // Remove from buffer
223 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
224 count -= 1;
225 front = (front + 1) % size;
226
227 if (count == size - 1 && !empty( prods ) )
228 insert_( chan, *((T *)front( prods )) ); // do waiting producer work
229
230 notify_one( prods );
231 unlock( mutex_lock );
232 return retval;
233}
234
235} // forall( T )
236#endif
Note: See TracBrowser for help on using the repository browser.