- File:
-
- 1 edited
-
libcfa/src/collections/lockfree.hfa (modified) (14 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/collections/lockfree.hfa
ra6b48f6 r6b33e89 16 16 }; 17 17 18 static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; } 19 static inline bool empty(const mcs_queue(T) & this) { return !this.tail; } 20 21 static inline forall(| { T * volatile & ?`next ( T * ); }) 22 { 18 static inline void ?{}( mcs_queue(T) & this ) { this.tail = 0p; } 19 static inline bool empty( const mcs_queue(T) & this ) { return ! this.tail; } 20 21 static inline forall( | { T * volatile & next ( T * ); }) { 23 22 // Adds an element to the list 24 23 // Multi-Thread Safe, Lock-Free 25 T * push( mcs_queue(T) & this, T * elem) __attribute__((artificial));26 T * push( mcs_queue(T) & this, T * elem) {27 /* paranoid */ verify( !(elem`next));24 T * push( mcs_queue(T) & this, T * elem ) __attribute__((artificial)); 25 T * push( mcs_queue(T) & this, T * elem ) { 26 /* paranoid */ verify( ! next( elem ) ); 28 27 // Race to add to the tail 29 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);28 T * prev_val = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST); 30 29 // If we aren't the first, we need to tell the person before us 31 30 // No need to 32 if ( prev) prev`next= elem;33 return prev ;31 if ( prev_val ) next( prev_val ) = elem; 32 return prev_val; 34 33 } 35 34 … … 37 36 // Passing an element that is not the head is undefined behavior 38 37 // NOT Multi-Thread Safe, concurrent pushes are safe 39 T * advance( mcs_queue(T) & this, T * elem) __attribute__((artificial));40 T * advance( mcs_queue(T) & this, T * elem) {38 T * advance( mcs_queue(T) & this, T * elem ) __attribute__((artificial)); 39 T * advance( mcs_queue(T) & this, T * elem ) { 41 40 T * expected = elem; 42 41 // Check if this is already the last item … … 44 43 45 44 // If not wait for next item to show-up, filled by push 46 while ( !(elem`next)) Pause();45 while ( ! next( elem ) ) Pause(); 47 46 48 47 // we need to return if the next link was empty 49 T * ret = elem`next;48 T * ret = next( elem ); 50 49 51 50 // invalidate link to reset to initial state 52 elem`next= 0p;51 next( elem ) = 0p; 53 52 return ret; 54 53 } … … 65 64 }; 66 65 67 static inline void ?{}( mpsc_queue(T) & this) {66 static inline void ?{}( mpsc_queue(T) & this ) { 68 67 ((mcs_queue(T)&)this){}; 69 68 this.head = 0p; 70 69 } 71 70 72 static inline forall(| { T * volatile & ?`next ( T * ); }) 73 { 71 static inline forall( | { T * volatile & next ( T * ); }) { 74 72 // Added a new element to the queue 75 73 // Multi-Thread Safe, Lock-Free 76 T * push( mpsc_queue(T) & this, T * elem) __attribute__((artificial));77 T * push( mpsc_queue(T) & this, T * elem) {78 T * prev = push((mcs_queue(T)&)this, elem);79 if ( !prev) this.head = elem;80 return prev ;74 T * push( mpsc_queue(T) & this, T * elem ) __attribute__((artificial)); 75 T * push( mpsc_queue(T) & this, T * elem ) { 76 T * prev_val = push( (mcs_queue(T)&)this, elem ); 77 if ( ! prev_val ) this.head = elem; 78 return prev_val; 81 79 } 82 80 83 81 // Pop an element from the queue 84 82 // return the element that was removed 85 // nextis set to the new head of the queue83 // head is set to the new head of the queue 86 84 // NOT Multi-Thread Safe 87 T * pop( mpsc_queue(T) & this, T *& next) __attribute__((artificial));88 T * pop( mpsc_queue(T) & this, T *& next) {85 T * pop( mpsc_queue(T) & this, T *& head ) __attribute__((artificial)); 86 T * pop( mpsc_queue(T) & this, T *& head ) { 89 87 T * elem = this.head; 90 88 // If head is empty just return 91 if ( !elem) return 0p;89 if ( ! elem ) return 0p; 92 90 93 91 // If there is already someone in the list, then it's easy 94 if ( elem`next) {95 this.head = next = elem`next;92 if ( next( elem ) ) { 93 this.head = head = next( elem ); 96 94 // force memory sync 97 95 __atomic_thread_fence(__ATOMIC_SEQ_CST); 98 96 99 97 // invalidate link to reset to initial state 100 elem`next= 0p;98 next( elem ) = 0p; 101 99 } 102 100 // Otherwise, there might be a race where it only looks but someone is enqueuing … … 106 104 // after that point, it could overwrite the write in push 107 105 this.head = 0p; 108 next = advance((mcs_queue(T)&)this, elem);106 head = advance( (mcs_queue(T)&)this, elem ); 109 107 110 108 // Only write to the head if there is a next element 111 109 // it is the only way we can guarantee we are not overwriting 112 110 // a write made in push 113 if (next) this.head = next; 114 } 115 111 if ( head ) this.head = head; 112 } 116 113 // return removed element 117 114 return elem; … … 119 116 120 117 // Same as previous function 121 T * pop( mpsc_queue(T) & this) {118 T * pop( mpsc_queue(T) & this ) { 122 119 T * _ = 0p; 123 120 return pop(this, _); … … 144 141 static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; } 145 142 146 static inline forall( | { T * volatile & ?`next( T * ); })143 static inline forall( | { T * volatile & next( T * ); }) 147 144 { 148 145 // Adds an element to the list 149 146 // Multi-Thread Safe, Lock-Free 150 bool push( poison_list(T) & this, T * elem) __attribute__((artificial));151 bool push( poison_list(T) & this, T * elem) {152 /* paranoid */ verify( 0p == (elem`next));153 __atomic_store_n( & elem`next, (T*)1p, __ATOMIC_RELAXED );147 bool push( poison_list(T) & this, T * elem ) __attribute__((artificial)); 148 bool push( poison_list(T) & this, T * elem ) { 149 /* paranoid */ verify( 0p == next( elem ) ); 150 __atomic_store_n( &next( elem ), (T *)1p, __ATOMIC_RELAXED ); 154 151 155 152 // read the head up-front … … 164 161 165 162 // We should never succeed the CAS if it's poisonned and the elem should be 1p. 166 /* paranoid */ verify( expected != 1p );167 /* paranoid */ verify( elem`next== 1p );163 /* paranoid */ verify( expected != 1p ); 164 /* paranoid */ verify( next( elem ) == 1p ); 168 165 169 166 // If we aren't the first, we need to tell the person before us 170 167 // No need to 171 elem`next= expected;168 next( elem ) = expected; 172 169 return true; 173 170 } … … 178 175 // Passing an element that is not the head is undefined behavior 179 176 // NOT Multi-Thread Safe, concurrent pushes are safe 180 T * advance( T * elem) __attribute__((artificial));181 T * advance( T * elem) {177 T * advance( T * elem ) __attribute__((artificial)); 178 T * advance( T * elem ) { 182 179 T * ret; 183 180 184 181 // Wait for next item to show-up, filled by push 185 while (1p == (ret = __atomic_load_n( &elem`next, __ATOMIC_RELAXED))) Pause();182 while (1p == (ret = __atomic_load_n( &next( elem ), __ATOMIC_RELAXED ) ) ) Pause(); 186 183 187 184 return ret; … … 189 186 190 187 // Poison the queue, preveting new pushes and returning the head 191 T * poison( poison_list(T) & this) __attribute__((artificial));192 T * poison( poison_list(T) & this) {188 T * poison( poison_list(T) & this ) __attribute__((artificial)); 189 T * poison( poison_list(T) & this ) { 193 190 T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST ); 194 191 /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this ); … … 215 212 }; // Link 216 213 217 forall( T /*| sized(T)*/ | { Link(T) * ?`next( T * ); } ) {214 forall( T /*| sized(T)*/ | { Link(T) * next( T * ); } ) { 218 215 struct StackLF { 219 216 Link(T) stack; … … 226 223 227 224 void push( StackLF(T) & this, T & n ) with(this) { 228 * ( &n )`next= stack; // atomic assignment unnecessary, or use CAA225 *next( &n ) = stack; // atomic assignment unnecessary, or use CAA 229 226 for () { // busy wait 230 if ( __atomic_compare_exchange_n( &stack.atom, & ( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node227 if ( __atomic_compare_exchange_n( &stack.atom, &next( &n )->atom, (Link(T))@{ (LinkData(T))@{ &n, next( &n )->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node 231 228 } // for 232 229 } // push … … 236 233 for () { // busy wait 237 234 if ( t.data.top == 0p ) return 0p; // empty stack ? 238 Link(T) * next = ( t.data.top )`next;235 Link(T) * next = next( t.data.top ); 239 236 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node 240 237 } // for … … 246 243 // TODO: Avoiding some problems with double fields access. 247 244 LinkData(T) * data = &link->data; 248 T * n ext= (T *)&(*data).top;249 if ( n ext== node ) {250 data->top = ( node )`next->data.top;245 T * ntop = (T *)&(*data).top; 246 if ( ntop == node ) { 247 data->top = next( node )->data.top; 251 248 return true; 252 249 } 253 if ( n ext== 0p ) return false;254 link = ( next )`next;250 if ( ntop == 0p ) return false; 251 link = next( ntop ); 255 252 } 256 253 }
Note:
See TracChangeset
for help on using the changeset viewer.