source: libcfa/src/concurrency/channel.hfa @ 1d245ea

ADTast-experimental
Last change on this file since 1d245ea was 1d245ea, checked in by caparson <caparson@…>, 19 months ago

added padding to channel to prevent false sharing that was occurring

  • Property mode set to 100644
File size: 7.0 KB
Line 
1#pragma once
2
3#include <locks.hfa>
4
5struct no_reacq_lock {
6    inline exp_backoff_then_block_lock;
7};
8
9// have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
10static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
11static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
12static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
13static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
14static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
15static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
16// override wakeup so that we don't reacquire the lock if using a condvar
17static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
18
19#define __PREVENTION_CHANNEL
20#ifdef __PREVENTION_CHANNEL
21forall( T ) {
22struct channel {
23    size_t size;
24    size_t front, back, count;
25    T * buffer;
26    thread$ * chair;
27    T * chair_elem;
28    exp_backoff_then_block_lock c_lock, p_lock;
29    __spinlock_t mutex_lock;
30    char __padding[64]; // avoid false sharing in arrays
31};
32
33static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
34    size = _size;
35    front = back = count = 0;
36    buffer = anew( size );
37    chair = 0p;
38    mutex_lock{};
39    c_lock{};
40    p_lock{};
41}
42
43static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
44static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
45static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
46static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
47static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
48
49static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
50    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
51    count += 1;
52    back++;
53    if ( back == size ) back = 0;
54}
55
56static inline void insert( channel(T) & chan, T elem ) with( chan ) {
57    lock( p_lock );
58    lock( mutex_lock __cfaabi_dbg_ctx2 );
59
60    // have to check for the zero size channel case
61    if ( size == 0 && chair != 0p ) {
62        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
63        unpark( chair );
64        chair = 0p;
65        unlock( mutex_lock );
66        unlock( p_lock );
67        unlock( c_lock );
68        return;
69    }
70
71    // wait if buffer is full, work will be completed by someone else
72    if ( count == size ) {
73        chair = active_thread();
74        chair_elem = &elem;
75        unlock( mutex_lock );
76        park( );
77        return;
78    } // if
79
80    if ( chair != 0p ) {
81        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
82        unpark( chair );
83        chair = 0p;
84        unlock( mutex_lock );
85        unlock( p_lock );
86        unlock( c_lock );
87        return;
88    }
89    else insert_( chan, elem );
90
91    unlock( mutex_lock );
92    unlock( p_lock );
93}
94
95static inline T remove( channel(T) & chan ) with(chan) {
96    lock( c_lock );
97    lock( mutex_lock __cfaabi_dbg_ctx2 );
98    T retval;
99
100    // have to check for the zero size channel case
101    if ( size == 0 && chair != 0p ) {
102        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
103        unpark( chair );
104        chair = 0p;
105        unlock( mutex_lock );
106        unlock( p_lock );
107        unlock( c_lock );
108        return retval;
109    }
110
111    // wait if buffer is empty, work will be completed by someone else
112    if ( count == 0 ) {
113        chair = active_thread();
114        chair_elem = &retval;
115        unlock( mutex_lock );
116        park( );
117        return retval;
118    }
119
120    // Remove from buffer
121    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
122    count -= 1;
123    front = (front + 1) % size;
124
125    if ( chair != 0p ) {
126        insert_( chan, *chair_elem );  // do waiting producer work
127        unpark( chair );
128        chair = 0p;
129        unlock( mutex_lock );
130        unlock( p_lock );
131        unlock( c_lock );
132        return retval;
133    }
134
135    unlock( mutex_lock );
136    unlock( c_lock );
137    return retval;
138}
139
140} // forall( T )
141#endif
142
143#ifndef __PREVENTION_CHANNEL
144forall( T ) {
145struct channel {
146    size_t size;
147    size_t front, back, count;
148    T * buffer;
149    fast_cond_var( no_reacq_lock ) prods, cons;
150    no_reacq_lock mutex_lock;
151};
152
153static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
154    size = _size;
155    front = back = count = 0;
156    buffer = anew( size );
157    prods{};
158    cons{};
159    mutex_lock{};
160}
161
162static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
163static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
164static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
165static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
166static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
167static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
168static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
169
170static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
171    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
172    count += 1;
173    back++;
174    if ( back == size ) back = 0;
175}
176
177
178static inline void insert( channel(T) & chan, T elem ) with(chan) {
179    lock( mutex_lock );
180
181    // have to check for the zero size channel case
182    if ( size == 0 && !empty( cons ) ) {
183        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
184        notify_one( cons );
185        unlock( mutex_lock );
186        return;
187    }
188
189    // wait if buffer is full, work will be completed by someone else
190    if ( count == size ) {
191        wait( prods, mutex_lock, (uintptr_t)&elem );
192        return;
193    } // if
194
195    if ( count == 0 && !empty( cons ) )
196        // do waiting consumer work
197        memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
198    else insert_( chan, elem );
199   
200    notify_one( cons );
201    unlock( mutex_lock );
202}
203
204static inline T remove( channel(T) & chan ) with(chan) {
205    lock( mutex_lock );
206    T retval;
207
208    // have to check for the zero size channel case
209    if ( size == 0 && !empty( prods ) ) {
210        memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
211        notify_one( prods );
212        unlock( mutex_lock );
213        return retval;
214    }
215
216    // wait if buffer is empty, work will be completed by someone else
217    if (count == 0) {
218        wait( cons, mutex_lock, (uintptr_t)&retval );
219        return retval;
220    }
221
222    // Remove from buffer
223    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
224    count -= 1;
225    front = (front + 1) % size;
226
227    if (count == size - 1 && !empty( prods ) )
228        insert_( chan, *((T *)front( prods )) );  // do waiting producer work
229
230    notify_one( prods );
231    unlock( mutex_lock );
232    return retval;
233}
234
235} // forall( T )
236#endif
Note: See TracBrowser for help on using the repository browser.