- File:
-
- 1 edited
-
libcfa/src/collections/lockfree.hfa (modified) (14 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/collections/lockfree.hfa
r6b33e89 ra6b48f6 16 16 }; 17 17 18 static inline void ?{}( mcs_queue(T) & this ) { this.tail = 0p; } 19 static inline bool empty( const mcs_queue(T) & this ) { return ! this.tail; } 20 21 static inline forall( | { T * volatile & next ( T * ); }) { 18 static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; } 19 static inline bool empty(const mcs_queue(T) & this) { return !this.tail; } 20 21 static inline forall(| { T * volatile & ?`next ( T * ); }) 22 { 22 23 // Adds an element to the list 23 24 // Multi-Thread Safe, Lock-Free 24 T * push( mcs_queue(T) & this, T * elem) __attribute__((artificial));25 T * push( mcs_queue(T) & this, T * elem) {26 /* paranoid */ verify( ! next( elem ));25 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial)); 26 T * push(mcs_queue(T) & this, T * elem) { 27 /* paranoid */ verify(!(elem`next)); 27 28 // Race to add to the tail 28 T * prev _val= __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);29 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST); 29 30 // If we aren't the first, we need to tell the person before us 30 31 // No need to 31 if ( prev_val ) next( prev_val )= elem;32 return prev _val;32 if (prev) prev`next = elem; 33 return prev; 33 34 } 34 35 … … 36 37 // Passing an element that is not the head is undefined behavior 37 38 // NOT Multi-Thread Safe, concurrent pushes are safe 38 T * advance( mcs_queue(T) & this, T * elem) __attribute__((artificial));39 T * advance( mcs_queue(T) & this, T * elem) {39 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial)); 40 T * advance(mcs_queue(T) & this, T * elem) { 40 41 T * expected = elem; 41 42 // Check if this is already the last item … … 43 44 44 45 // If not wait for next item to show-up, filled by push 45 while ( ! next( elem )) Pause();46 while (!(elem`next)) Pause(); 46 47 47 48 // we need to return if the next link was empty 48 T * ret = next( elem );49 T * ret = elem`next; 49 50 50 51 // invalidate link to reset to initial state 51 next( elem )= 0p;52 elem`next = 0p; 52 53 return ret; 53 54 } … … 64 65 }; 65 66 66 static inline void ?{}( mpsc_queue(T) & this) {67 static inline void ?{}(mpsc_queue(T) & this) { 67 68 ((mcs_queue(T)&)this){}; 68 69 this.head = 0p; 69 70 } 70 71 71 static inline forall( | { T * volatile & next ( T * ); }) { 72 static inline forall(| { T * volatile & ?`next ( T * ); }) 73 { 72 74 // Added a new element to the queue 73 75 // Multi-Thread Safe, Lock-Free 74 T * push( mpsc_queue(T) & this, T * elem) __attribute__((artificial));75 T * push( mpsc_queue(T) & this, T * elem) {76 T * prev _val = push( (mcs_queue(T)&)this, elem);77 if ( ! prev_val) this.head = elem;78 return prev _val;76 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial)); 77 T * push(mpsc_queue(T) & this, T * elem) { 78 T * prev = push((mcs_queue(T)&)this, elem); 79 if (!prev) this.head = elem; 80 return prev; 79 81 } 80 82 81 83 // Pop an element from the queue 82 84 // return the element that was removed 83 // headis set to the new head of the queue85 // next is set to the new head of the queue 84 86 // NOT Multi-Thread Safe 85 T * pop( mpsc_queue(T) & this, T *& head) __attribute__((artificial));86 T * pop( mpsc_queue(T) & this, T *& head) {87 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial)); 88 T * pop(mpsc_queue(T) & this, T *& next) { 87 89 T * elem = this.head; 88 90 // If head is empty just return 89 if ( ! elem) return 0p;91 if (!elem) return 0p; 90 92 91 93 // If there is already someone in the list, then it's easy 92 if ( next( elem )) {93 this.head = head = next( elem );94 if (elem`next) { 95 this.head = next = elem`next; 94 96 // force memory sync 95 97 __atomic_thread_fence(__ATOMIC_SEQ_CST); 96 98 97 99 // invalidate link to reset to initial state 98 next( elem )= 0p;100 elem`next = 0p; 99 101 } 100 102 // Otherwise, there might be a race where it only looks but someone is enqueuing … … 104 106 // after that point, it could overwrite the write in push 105 107 this.head = 0p; 106 head = advance( (mcs_queue(T)&)this, elem);108 next = advance((mcs_queue(T)&)this, elem); 107 109 108 110 // Only write to the head if there is a next element 109 111 // it is the only way we can guarantee we are not overwriting 110 112 // a write made in push 111 if ( head ) this.head = head; 112 } 113 if (next) this.head = next; 114 } 115 113 116 // return removed element 114 117 return elem; … … 116 119 117 120 // Same as previous function 118 T * pop( mpsc_queue(T) & this) {121 T * pop(mpsc_queue(T) & this) { 119 122 T * _ = 0p; 120 123 return pop(this, _); … … 141 144 static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; } 142 145 143 static inline forall( | { T * volatile & next( T * ); })146 static inline forall(| { T * volatile & ?`next ( T * ); }) 144 147 { 145 148 // Adds an element to the list 146 149 // Multi-Thread Safe, Lock-Free 147 bool push( poison_list(T) & this, T * elem) __attribute__((artificial));148 bool push( poison_list(T) & this, T * elem) {149 /* paranoid */ verify( 0p == next( elem ));150 __atomic_store_n( & next( elem ), (T*)1p, __ATOMIC_RELAXED );150 bool push(poison_list(T) & this, T * elem) __attribute__((artificial)); 151 bool push(poison_list(T) & this, T * elem) { 152 /* paranoid */ verify(0p == (elem`next)); 153 __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED ); 151 154 152 155 // read the head up-front … … 161 164 162 165 // We should never succeed the CAS if it's poisonned and the elem should be 1p. 163 /* paranoid */ verify( expected != 1p );164 /* paranoid */ verify( next( elem )== 1p );166 /* paranoid */ verify( expected != 1p ); 167 /* paranoid */ verify( elem`next == 1p ); 165 168 166 169 // If we aren't the first, we need to tell the person before us 167 170 // No need to 168 next( elem )= expected;171 elem`next = expected; 169 172 return true; 170 173 } … … 175 178 // Passing an element that is not the head is undefined behavior 176 179 // NOT Multi-Thread Safe, concurrent pushes are safe 177 T * advance( T * elem) __attribute__((artificial));178 T * advance( T * elem) {180 T * advance(T * elem) __attribute__((artificial)); 181 T * advance(T * elem) { 179 182 T * ret; 180 183 181 184 // Wait for next item to show-up, filled by push 182 while (1p == (ret = __atomic_load_n( &next( elem ), __ATOMIC_RELAXED ) )) Pause();185 while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause(); 183 186 184 187 return ret; … … 186 189 187 190 // Poison the queue, preveting new pushes and returning the head 188 T * poison( poison_list(T) & this) __attribute__((artificial));189 T * poison( poison_list(T) & this) {191 T * poison(poison_list(T) & this) __attribute__((artificial)); 192 T * poison(poison_list(T) & this) { 190 193 T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST ); 191 194 /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this ); … … 212 215 }; // Link 213 216 214 forall( T /*| sized(T)*/ | { Link(T) * next( T * ); } ) {217 forall( T /*| sized(T)*/ | { Link(T) * ?`next( T * ); } ) { 215 218 struct StackLF { 216 219 Link(T) stack; … … 223 226 224 227 void push( StackLF(T) & this, T & n ) with(this) { 225 * next( &n )= stack; // atomic assignment unnecessary, or use CAA228 *( &n )`next = stack; // atomic assignment unnecessary, or use CAA 226 229 for () { // busy wait 227 if ( __atomic_compare_exchange_n( &stack.atom, & next( &n )->atom, (Link(T))@{ (LinkData(T))@{ &n, next( &n )->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node230 if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node 228 231 } // for 229 232 } // push … … 233 236 for () { // busy wait 234 237 if ( t.data.top == 0p ) return 0p; // empty stack ? 235 Link(T) * next = next( t.data.top );238 Link(T) * next = ( t.data.top )`next; 236 239 if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node 237 240 } // for … … 243 246 // TODO: Avoiding some problems with double fields access. 244 247 LinkData(T) * data = &link->data; 245 T * n top= (T *)&(*data).top;246 if ( n top== node ) {247 data->top = next( node )->data.top;248 T * next = (T *)&(*data).top; 249 if ( next == node ) { 250 data->top = ( node )`next->data.top; 248 251 return true; 249 252 } 250 if ( n top== 0p ) return false;251 link = next( ntop );253 if ( next == 0p ) return false; 254 link = ( next )`next; 252 255 } 253 256 }
Note:
See TracChangeset
for help on using the changeset viewer.