Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/collections/lockfree.hfa

    ra6b48f6 r6b33e89  
    1616        };
    1717
    18         static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
    19         static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
    20 
    21         static inline forall(| { T * volatile & ?`next ( T * ); })
    22         {
     18        static inline void ?{}( mcs_queue(T) & this ) { this.tail = 0p; }
     19        static inline bool empty( const mcs_queue(T) & this ) { return ! this.tail; }
     20
     21        static inline forall( | { T * volatile & next ( T * ); }) {
    2322                // Adds an element to the list
    2423                // Multi-Thread Safe, Lock-Free
    25                 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
    26                 T * push(mcs_queue(T) & this, T * elem) {
    27                         /* paranoid */ verify(!(elem`next));
     24                T * push( mcs_queue(T) & this, T * elem ) __attribute__((artificial));
     25                T * push( mcs_queue(T) & this, T * elem ) {
     26                        /* paranoid */ verify( ! next( elem ) );
    2827                        // Race to add to the tail
    29                         T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
     28                        T * prev_val = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
    3029                        // If we aren't the first, we need to tell the person before us
    3130                        // No need to
    32                         if (prev) prev`next = elem;
    33                         return prev;
     31                        if ( prev_val ) next( prev_val ) = elem;
     32                        return prev_val;
    3433                }
    3534
     
    3736                // Passing an element that is not the head is undefined behavior
    3837                // NOT Multi-Thread Safe, concurrent pushes are safe
    39                 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
    40                 T * advance(mcs_queue(T) & this, T * elem) {
     38                T * advance( mcs_queue(T) & this, T * elem ) __attribute__((artificial));
     39                T * advance( mcs_queue(T) & this, T * elem ) {
    4140                        T * expected = elem;
    4241                        // Check if this is already the last item
     
    4443
    4544                        // If not wait for next item to show-up, filled by push
    46                         while (!(elem`next)) Pause();
     45                        while ( ! next( elem ) ) Pause();
    4746
    4847                        // we need to return if the next link was empty
    49                         T * ret = elem`next;
     48                        T * ret = next( elem );
    5049
    5150                        // invalidate link to reset to initial state
    52                         elem`next = 0p;
     51                        next( elem ) = 0p;
    5352                        return ret;
    5453                }
     
    6564        };
    6665
    67         static inline void ?{}(mpsc_queue(T) & this) {
     66        static inline void ?{}( mpsc_queue(T) & this ) {
    6867                ((mcs_queue(T)&)this){};
    6968                this.head = 0p;
    7069        }
    7170
    72         static inline forall(| { T * volatile & ?`next ( T * ); })
    73         {
     71        static inline forall( | { T * volatile & next ( T * ); }) {
    7472                // Added a new element to the queue
    7573                // Multi-Thread Safe, Lock-Free
    76                 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
    77                 T * push(mpsc_queue(T) & this, T * elem) {
    78                         T * prev = push((mcs_queue(T)&)this, elem);
    79                         if (!prev) this.head = elem;
    80                         return prev;
     74                T * push( mpsc_queue(T) & this, T * elem ) __attribute__((artificial));
     75                T * push( mpsc_queue(T) & this, T * elem ) {
     76                        T * prev_val = push( (mcs_queue(T)&)this, elem );
     77                        if ( ! prev_val ) this.head = elem;
     78                        return prev_val;
    8179                }
    8280
    8381                // Pop an element from the queue
    8482                // return the element that was removed
    85                 // next is set to the new head of the queue
     83                // head is set to the new head of the queue
    8684                // NOT Multi-Thread Safe
    87                 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
    88                 T * pop(mpsc_queue(T) & this, T *& next) {
     85                T * pop( mpsc_queue(T) & this, T *& head ) __attribute__((artificial));
     86                T * pop( mpsc_queue(T) & this, T *& head ) {
    8987                        T * elem = this.head;
    9088                        // If head is empty just return
    91                         if (!elem) return 0p;
     89                        if ( ! elem ) return 0p;
    9290
    9391                        // If there is already someone in the list, then it's easy
    94                         if (elem`next) {
    95                                 this.head = next = elem`next;
     92                        if ( next( elem ) ) {
     93                                this.head = head = next( elem );
    9694                                // force memory sync
    9795                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
    9896
    9997                                // invalidate link to reset to initial state
    100                                 elem`next = 0p;
     98                                next( elem ) = 0p;
    10199                        }
    102100                        // Otherwise, there might be a race where it only looks but someone is enqueuing
     
    106104                                // after that point, it could overwrite the write in push
    107105                                this.head = 0p;
    108                                 next = advance((mcs_queue(T)&)this, elem);
     106                                head = advance( (mcs_queue(T)&)this, elem );
    109107
    110108                                // Only write to the head if there is a next element
    111109                                // it is the only way we can guarantee we are not overwriting
    112110                                // a write made in push
    113                                 if (next) this.head = next;
    114                         }
    115 
     111                                if ( head ) this.head = head;
     112                        }
    116113                        // return removed element
    117114                        return elem;
     
    119116
    120117                // Same as previous function
    121                 T * pop(mpsc_queue(T) & this) {
     118                T * pop( mpsc_queue(T) & this ) {
    122119                        T * _ = 0p;
    123120                        return pop(this, _);
     
    144141        static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; }
    145142
    146         static inline forall(| { T * volatile & ?`next ( T * ); })
     143        static inline forall( | { T * volatile & next( T * ); })
    147144        {
    148145                // Adds an element to the list
    149146                // Multi-Thread Safe, Lock-Free
    150                 bool push(poison_list(T) & this, T * elem) __attribute__((artificial));
    151                 bool push(poison_list(T) & this, T * elem) {
    152                         /* paranoid */ verify(0p == (elem`next));
    153                         __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED );
     147                bool push( poison_list(T) & this, T * elem ) __attribute__((artificial));
     148                bool push( poison_list(T) & this, T * elem ) {
     149                        /* paranoid */ verify( 0p == next( elem ) );
     150                        __atomic_store_n( &next( elem ), (T *)1p, __ATOMIC_RELAXED );
    154151
    155152                        // read the head up-front
     
    164161
    165162                                        // We should never succeed the CAS if it's poisonned and the elem should be 1p.
    166                                         /* paranoid */ verify( expected  != 1p );
    167                                         /* paranoid */ verify( elem`next == 1p );
     163                                        /* paranoid */ verify( expected != 1p );
     164                                        /* paranoid */ verify( next( elem ) == 1p );
    168165
    169166                                        // If we aren't the first, we need to tell the person before us
    170167                                        // No need to
    171                                         elem`next = expected;
     168                                        next( elem ) = expected;
    172169                                        return true;
    173170                                }
     
    178175                // Passing an element that is not the head is undefined behavior
    179176                // NOT Multi-Thread Safe, concurrent pushes are safe
    180                 T * advance(T * elem) __attribute__((artificial));
    181                 T * advance(T * elem) {
     177                T * advance( T * elem ) __attribute__((artificial));
     178                T * advance( T * elem ) {
    182179                        T * ret;
    183180
    184181                        // Wait for next item to show-up, filled by push
    185                         while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause();
     182                        while (1p == (ret = __atomic_load_n( &next( elem ), __ATOMIC_RELAXED ) ) ) Pause();
    186183
    187184                        return ret;
     
    189186
    190187                // Poison the queue, preveting new pushes and returning the head
    191                 T * poison(poison_list(T) & this) __attribute__((artificial));
    192                 T * poison(poison_list(T) & this) {
     188                T * poison( poison_list(T) & this ) __attribute__((artificial));
     189                T * poison( poison_list(T) & this ) {
    193190                        T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST );
    194191                        /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this );
     
    215212}; // Link
    216213
    217 forall( T /*| sized(T)*/ | { Link(T) * ?`next( T * ); } ) {
     214forall( T /*| sized(T)*/ | { Link(T) * next( T * ); } ) {
    218215        struct StackLF {
    219216                Link(T) stack;
     
    226223
    227224                void push( StackLF(T) & this, T & n ) with(this) {
    228                         *( &n )`next = stack;                                           // atomic assignment unnecessary, or use CAA
     225                        *next( &n ) = stack;                                            // atomic assignment unnecessary, or use CAA
    229226                        for () {                                                                        // busy wait
    230                                 if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
     227                                if ( __atomic_compare_exchange_n( &stack.atom, &next( &n )->atom, (Link(T))@{ (LinkData(T))@{ &n, next( &n )->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
    231228                        } // for
    232229                } // push
     
    236233                        for () {                                                                        // busy wait
    237234                                if ( t.data.top == 0p ) return 0p;              // empty stack ?
    238                                 Link(T) * next = ( t.data.top )`next;
     235                                Link(T) * next = next( t.data.top );
    239236                                if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node
    240237                        } // for
     
    246243                                // TODO: Avoiding some problems with double fields access.
    247244                                LinkData(T) * data = &link->data;
    248                                 T * next = (T *)&(*data).top;
    249                                 if ( next == node ) {
    250                                         data->top = ( node )`next->data.top;
     245                                T * ntop = (T *)&(*data).top;
     246                                if ( ntop == node ) {
     247                                        data->top = next( node )->data.top;
    251248                                        return true;
    252249                                }
    253                                 if ( next == 0p ) return false;
    254                                 link = ( next )`next;
     250                                if ( ntop == 0p ) return false;
     251                                link = next( ntop );
    255252                        }
    256253                }
Note: See TracChangeset for help on using the changeset viewer.