Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/collections/lockfree.hfa

    r6b33e89 ra6b48f6  
    1616        };
    1717
    18         static inline void ?{}( mcs_queue(T) & this ) { this.tail = 0p; }
    19         static inline bool empty( const mcs_queue(T) & this ) { return ! this.tail; }
    20 
    21         static inline forall( | { T * volatile & next ( T * ); }) {
     18        static inline void ?{}(mcs_queue(T) & this) { this.tail = 0p; }
     19        static inline bool empty(const mcs_queue(T) & this) { return !this.tail; }
     20
     21        static inline forall(| { T * volatile & ?`next ( T * ); })
     22        {
    2223                // Adds an element to the list
    2324                // Multi-Thread Safe, Lock-Free
    24                 T * push( mcs_queue(T) & this, T * elem ) __attribute__((artificial));
    25                 T * push( mcs_queue(T) & this, T * elem ) {
    26                         /* paranoid */ verify( ! next( elem ) );
     25                T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
     26                T * push(mcs_queue(T) & this, T * elem) {
     27                        /* paranoid */ verify(!(elem`next));
    2728                        // Race to add to the tail
    28                         T * prev_val = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
     29                        T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
    2930                        // If we aren't the first, we need to tell the person before us
    3031                        // No need to
    31                         if ( prev_val ) next( prev_val ) = elem;
    32                         return prev_val;
     32                        if (prev) prev`next = elem;
     33                        return prev;
    3334                }
    3435
     
    3637                // Passing an element that is not the head is undefined behavior
    3738                // NOT Multi-Thread Safe, concurrent pushes are safe
    38                 T * advance( mcs_queue(T) & this, T * elem ) __attribute__((artificial));
    39                 T * advance( mcs_queue(T) & this, T * elem ) {
     39                T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
     40                T * advance(mcs_queue(T) & this, T * elem) {
    4041                        T * expected = elem;
    4142                        // Check if this is already the last item
     
    4344
    4445                        // If not wait for next item to show-up, filled by push
    45                         while ( ! next( elem ) ) Pause();
     46                        while (!(elem`next)) Pause();
    4647
    4748                        // we need to return if the next link was empty
    48                         T * ret = next( elem );
     49                        T * ret = elem`next;
    4950
    5051                        // invalidate link to reset to initial state
    51                         next( elem ) = 0p;
     52                        elem`next = 0p;
    5253                        return ret;
    5354                }
     
    6465        };
    6566
    66         static inline void ?{}( mpsc_queue(T) & this ) {
     67        static inline void ?{}(mpsc_queue(T) & this) {
    6768                ((mcs_queue(T)&)this){};
    6869                this.head = 0p;
    6970        }
    7071
    71         static inline forall( | { T * volatile & next ( T * ); }) {
     72        static inline forall(| { T * volatile & ?`next ( T * ); })
     73        {
    7274                // Added a new element to the queue
    7375                // Multi-Thread Safe, Lock-Free
    74                 T * push( mpsc_queue(T) & this, T * elem ) __attribute__((artificial));
    75                 T * push( mpsc_queue(T) & this, T * elem ) {
    76                         T * prev_val = push( (mcs_queue(T)&)this, elem );
    77                         if ( ! prev_val ) this.head = elem;
    78                         return prev_val;
     76                T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
     77                T * push(mpsc_queue(T) & this, T * elem) {
     78                        T * prev = push((mcs_queue(T)&)this, elem);
     79                        if (!prev) this.head = elem;
     80                        return prev;
    7981                }
    8082
    8183                // Pop an element from the queue
    8284                // return the element that was removed
    83                 // head is set to the new head of the queue
     85                // next is set to the new head of the queue
    8486                // NOT Multi-Thread Safe
    85                 T * pop( mpsc_queue(T) & this, T *& head ) __attribute__((artificial));
    86                 T * pop( mpsc_queue(T) & this, T *& head ) {
     87                T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
     88                T * pop(mpsc_queue(T) & this, T *& next) {
    8789                        T * elem = this.head;
    8890                        // If head is empty just return
    89                         if ( ! elem ) return 0p;
     91                        if (!elem) return 0p;
    9092
    9193                        // If there is already someone in the list, then it's easy
    92                         if ( next( elem ) ) {
    93                                 this.head = head = next( elem );
     94                        if (elem`next) {
     95                                this.head = next = elem`next;
    9496                                // force memory sync
    9597                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
    9698
    9799                                // invalidate link to reset to initial state
    98                                 next( elem ) = 0p;
     100                                elem`next = 0p;
    99101                        }
    100102                        // Otherwise, there might be a race where it only looks but someone is enqueuing
     
    104106                                // after that point, it could overwrite the write in push
    105107                                this.head = 0p;
    106                                 head = advance( (mcs_queue(T)&)this, elem );
     108                                next = advance((mcs_queue(T)&)this, elem);
    107109
    108110                                // Only write to the head if there is a next element
    109111                                // it is the only way we can guarantee we are not overwriting
    110112                                // a write made in push
    111                                 if ( head ) this.head = head;
    112                         }
     113                                if (next) this.head = next;
     114                        }
     115
    113116                        // return removed element
    114117                        return elem;
     
    116119
    117120                // Same as previous function
    118                 T * pop( mpsc_queue(T) & this ) {
     121                T * pop(mpsc_queue(T) & this) {
    119122                        T * _ = 0p;
    120123                        return pop(this, _);
     
    141144        static inline bool is_poisoned( const poison_list(T) & this ) { return 1p == this.head; }
    142145
    143         static inline forall( | { T * volatile & next( T * ); })
     146        static inline forall(| { T * volatile & ?`next ( T * ); })
    144147        {
    145148                // Adds an element to the list
    146149                // Multi-Thread Safe, Lock-Free
    147                 bool push( poison_list(T) & this, T * elem ) __attribute__((artificial));
    148                 bool push( poison_list(T) & this, T * elem ) {
    149                         /* paranoid */ verify( 0p == next( elem ) );
    150                         __atomic_store_n( &next( elem ), (T *)1p, __ATOMIC_RELAXED );
     150                bool push(poison_list(T) & this, T * elem) __attribute__((artificial));
     151                bool push(poison_list(T) & this, T * elem) {
     152                        /* paranoid */ verify(0p == (elem`next));
     153                        __atomic_store_n( &elem`next, (T*)1p, __ATOMIC_RELAXED );
    151154
    152155                        // read the head up-front
     
    161164
    162165                                        // We should never succeed the CAS if it's poisonned and the elem should be 1p.
    163                                         /* paranoid */ verify( expected != 1p );
    164                                         /* paranoid */ verify( next( elem ) == 1p );
     166                                        /* paranoid */ verify( expected  != 1p );
     167                                        /* paranoid */ verify( elem`next == 1p );
    165168
    166169                                        // If we aren't the first, we need to tell the person before us
    167170                                        // No need to
    168                                         next( elem ) = expected;
     171                                        elem`next = expected;
    169172                                        return true;
    170173                                }
     
    175178                // Passing an element that is not the head is undefined behavior
    176179                // NOT Multi-Thread Safe, concurrent pushes are safe
    177                 T * advance( T * elem ) __attribute__((artificial));
    178                 T * advance( T * elem ) {
     180                T * advance(T * elem) __attribute__((artificial));
     181                T * advance(T * elem) {
    179182                        T * ret;
    180183
    181184                        // Wait for next item to show-up, filled by push
    182                         while (1p == (ret = __atomic_load_n( &next( elem ), __ATOMIC_RELAXED ) ) ) Pause();
     185                        while (1p == (ret = __atomic_load_n(&elem`next, __ATOMIC_RELAXED))) Pause();
    183186
    184187                        return ret;
     
    186189
    187190                // Poison the queue, preveting new pushes and returning the head
    188                 T * poison( poison_list(T) & this ) __attribute__((artificial));
    189                 T * poison( poison_list(T) & this ) {
     191                T * poison(poison_list(T) & this) __attribute__((artificial));
     192                T * poison(poison_list(T) & this) {
    190193                        T * ret = __atomic_exchange_n( &this.head, (T*)1p, __ATOMIC_SEQ_CST );
    191194                        /* paranoid */ verifyf( ret != (T*)1p, "Poison list %p poisoned more than once!", &this );
     
    212215}; // Link
    213216
    214 forall( T /*| sized(T)*/ | { Link(T) * next( T * ); } ) {
     217forall( T /*| sized(T)*/ | { Link(T) * ?`next( T * ); } ) {
    215218        struct StackLF {
    216219                Link(T) stack;
     
    223226
    224227                void push( StackLF(T) & this, T & n ) with(this) {
    225                         *next( &n ) = stack;                                            // atomic assignment unnecessary, or use CAA
     228                        *( &n )`next = stack;                                           // atomic assignment unnecessary, or use CAA
    226229                        for () {                                                                        // busy wait
    227                                 if ( __atomic_compare_exchange_n( &stack.atom, &next( &n )->atom, (Link(T))@{ (LinkData(T))@{ &n, next( &n )->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
     230                                if ( __atomic_compare_exchange_n( &stack.atom, &( &n )`next->atom, (Link(T))@{ (LinkData(T))@{ &n, ( &n )`next->data.count + 1} }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) break; // attempt to update top node
    228231                        } // for
    229232                } // push
     
    233236                        for () {                                                                        // busy wait
    234237                                if ( t.data.top == 0p ) return 0p;              // empty stack ?
    235                                 Link(T) * next = next( t.data.top );
     238                                Link(T) * next = ( t.data.top )`next;
    236239                                if ( __atomic_compare_exchange_n( &stack.atom, &t.atom, (Link(T))@{ (LinkData(T))@{ next->data.top, t.data.count } }.atom, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) return t.data.top; // attempt to update top node
    237240                        } // for
     
    243246                                // TODO: Avoiding some problems with double fields access.
    244247                                LinkData(T) * data = &link->data;
    245                                 T * ntop = (T *)&(*data).top;
    246                                 if ( ntop == node ) {
    247                                         data->top = next( node )->data.top;
     248                                T * next = (T *)&(*data).top;
     249                                if ( next == node ) {
     250                                        data->top = ( node )`next->data.top;
    248251                                        return true;
    249252                                }
    250                                 if ( ntop == 0p ) return false;
    251                                 link = next( ntop );
     253                                if ( next == 0p ) return false;
     254                                link = ( next )`next;
    252255                        }
    253256                }
Note: See TracChangeset for help on using the changeset viewer.