Changes in / [f8a7fed:08e75215]


Ignore:
Files:
6 deleted
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/invoke.h

    rf8a7fed r08e75215  
    225225
    226226                static inline $thread * volatile & ?`next ( $thread * this )  __attribute__((const)) {
    227                         return this->seqable.next;
     227                        return this->seqable.back;
    228228                }
    229229
  • libcfa/src/concurrency/locks.hfa

    rf8a7fed r08e75215  
    2020
    2121#include "bits/weakso_locks.hfa"
    22 #include "containers/queueLockFree.hfa"
    23 
    24 #include "thread.hfa"
    2522
    2623#include "time_t.hfa"
    2724#include "time.hfa"
    28 
    29 //-----------------------------------------------------------------------------
    30 // Semaphores
    31 
    32 // '0-nary' semaphore
    33 // Similar to a counting semaphore except the value of one is never reached
    34 // as a consequence, a V() that would bring the value to 1 *spins* until
    35 // a P consumes it
    36 struct Semaphore0nary {
    37         __spinlock_t lock; // needed to protect
    38         mpsc_queue($thread) queue;
    39 };
    40 
    41 static inline bool P(Semaphore0nary & this, $thread * thrd) __attribute__((artificial));
    42 static inline bool P(Semaphore0nary & this, $thread * thrd) {
    43         /* paranoid */ verify(!(thrd->seqable.next));
    44         /* paranoid */ verify(!(thrd`next));
    45 
    46         push(this.queue, thrd);
    47         return true;
    48 }
    49 
    50 static inline bool P(Semaphore0nary & this) __attribute__((artificial));
    51 static inline bool P(Semaphore0nary & this) {
    52     $thread * thrd = active_thread();
    53     P(this, thrd);
    54     park();
    55     return true;
    56 }
    57 
    58 static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) __attribute__((artificial));
    59 static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) {
    60         $thread * next;
    61         lock(this.lock __cfaabi_dbg_ctx2);
    62                 for (;;) {
    63                         next = pop(this.queue);
    64                         if (next) break;
    65                         Pause();
    66                 }
    67         unlock(this.lock);
    68 
    69         if (doUnpark) unpark(next);
    70         return next;
    71 }
    72 
    73 // Wrapper used on top of any sempahore to avoid potential locking
    74 struct BinaryBenaphore {
    75         volatile ssize_t counter;
    76 };
    77 
    78 static inline {
    79         void ?{}(BinaryBenaphore & this) { this.counter = 0; }
    80         void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; }
    81         void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; }
    82 
    83         // returns true if no blocking needed
    84         bool P(BinaryBenaphore & this) { return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0; }
    85         bool tryP(BinaryBenaphore & this) {
    86                 ssize_t c = this.counter;
    87                 return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
    88         }
    89 
    90         // returns true if notify needed
    91         bool V(BinaryBenaphore & this) {
    92                 ssize_t c = 0;
    93                 for () {
    94                         if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    95                                 if (c == 0) return true;
    96                                 /* paranoid */ verify(c < 0);
    97                                 return false;
    98                         } else {
    99                                 if (c == 1) return true;
    100                                 /* paranoid */ verify(c < 1);
    101                                 Pause();
    102                         }
    103                 }
    104         }
    105 }
    106 
    107 // Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore
    108 struct ThreadBenaphore {
    109         BinaryBenaphore ben;
    110         Semaphore0nary  sem;
    111 };
    112 
    113 static inline void ?{}(ThreadBenaphore & this) {}
    114 static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; }
    115 static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; }
    116 
    117 static inline bool P(ThreadBenaphore & this)              { return /* P(this.ben) ? false : */ P(this.sem); }
    118 static inline bool P(ThreadBenaphore & this, $thread * t) { return /* P(this.ben) ? false : */ P(this.sem, t ); }
    119 static inline bool tryP(ThreadBenaphore & this)           { return tryP(this.ben); }
    120 static inline bool P(ThreadBenaphore & this, bool wait)   { return wait ? P(this) : tryP(this); }
    121 
    122 static inline $thread * V(ThreadBenaphore & this, const bool doUnpark = true) {
    123         // if (V(this.ben)) return 0p;
    124         return V(this.sem, doUnpark);
    125 }
    126 
    127 //-----------------------------------------------------------------------------
    128 // Semaphore
    129 struct semaphore {
    130         __spinlock_t lock;
    131         int count;
    132         __queue_t($thread) waiting;
    133 };
    134 
    135 void  ?{}(semaphore & this, int count = 1);
    136 void ^?{}(semaphore & this);
    137 bool   P (semaphore & this);
    138 bool   V (semaphore & this);
    139 bool   V (semaphore & this, unsigned count);
    14025
    14126//----------
     
    16853static inline void   set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    16954static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
    170 
    171 struct fast_lock {
    172         $thread * volatile owner;
    173         ThreadBenaphore sem;
    174 };
    175 
    176 static inline bool $try_lock(fast_lock & this, $thread * thrd) {
    177     $thread * exp = 0p;
    178     return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
    179 }
    180 
    181 static inline void $lock(fast_lock & this, $thread * thrd) {
    182         /* paranoid */verify(thrd != this.owner);
    183 
    184         for (;;) {
    185                 if ($try_lock(this, thrd)) return;
    186                 P(this.sem, thrd);
    187         }
    188 }
    189 
    190 static inline void lock( fast_lock & this ) {
    191         $thread * thrd = active_thread();
    192         /* paranoid */verify(thrd != this.owner);
    193 
    194         for (;;) {
    195                 if ($try_lock(this, thrd)) return;
    196                 P(this.sem);
    197         }
    198 }
    199 
    200 static inline void try_lock ( fast_lock & this ) {
    201         $thread * thrd = active_thread();
    202         /* paranoid */ verify(thrd != this.owner);
    203         return $try_lock(this, thrd);
    204 }
    205 
    206 static inline void unlock( fast_lock & this ) {
    207         $thread * thrd = active_thread();
    208         /* paranoid */ verify(thrd == this.owner);
    209         $thread * next = V(this.sem, false); // implicit fence
    210         // open 'owner' only after fence
    211         this.owner = 0p;
    212 
    213         // Unpark the next person (can be 0p, unpark handles it)
    214         unpark(next);
    215 }
    216 
    217 static inline void on_wait( fast_lock & this ) {
    218         unlock(this);
    219         #warning this is broken
    220 }
    221 
    222 static inline void on_notify( fast_lock & this, struct $thread * t ) {
    223         $lock(this, t);
    224         #warning this is broken
    225 }
    226 
    227 static inline void   set_recursion_count( fast_lock & this, size_t recursion ) {}
    228 static inline size_t get_recursion_count( fast_lock & this ) { return 0; }
    229 
    230 struct mcs_node {
    231         mcs_node * volatile next;
    232         single_sem sem;
    233 };
    234 
    235 static inline void ?{}(mcs_node & this) { this.next = 0p; }
    236 
    237 static inline mcs_node * volatile & ?`next ( mcs_node * node ) {
    238         return node->next;
    239 }
    240 
    241 struct mcs_lock {
    242         mcs_queue(mcs_node) queue;
    243 };
    244 
    245 static inline void lock(mcs_lock & l, mcs_node & n) {
    246         if(push(l.queue, &n))
    247                 wait(n.sem);
    248 }
    249 
    250 static inline void unlock(mcs_lock & l, mcs_node & n) {
    251         mcs_node * next = advance(l.queue, &n);
    252         if(next) post(next->sem);
    253 }
    25455
    25556//-----------------------------------------------------------------------------
     
    320121        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    321122}
     123
     124//-----------------------------------------------------------------------------
     125// Semaphore
     126struct semaphore {
     127        __spinlock_t lock;
     128        int count;
     129        __queue_t($thread) waiting;
     130};
     131
     132void  ?{}(semaphore & this, int count = 1);
     133void ^?{}(semaphore & this);
     134bool   P (semaphore & this);
     135bool   V (semaphore & this);
     136bool   V (semaphore & this, unsigned count);
  • libcfa/src/containers/queueLockFree.hfa

    rf8a7fed r08e75215  
    2020                // Adds an element to the list
    2121                // Multi-Thread Safe, Lock-Free
    22                 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
    23                 T * push(mcs_queue(T) & this, T * elem) {
    24                         /* paranoid */ verify(!(elem`next));
     22                T * push(mcs_queue(T) & this, T & elem) {
     23                        /* paranoid */ verify(!(&elem)`next);
    2524                        // Race to add to the tail
    26                         T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
     25                        T * prev = __atomic_exchange_n(&this.tail, &elem, __ATOMIC_SEQ_CST);
    2726                        // If we aren't the first, we need to tell the person before us
    2827                        // No need to
    29                         if (prev) prev`next = elem;
     28                        if (prev) prev`next = &elem;
    3029                        return prev;
    3130                }
     
    3433                // Passing an element that is not the head is undefined behavior
    3534                // NOT Multi-Thread Safe, concurrent pushes are safe
    36                 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
    37                 T * advance(mcs_queue(T) & this, T * elem) {
    38                         T * expected = elem;
     35                T * advance(mcs_queue(T) & this, T & elem) {
     36                        T * expected = &elem;
    3937                        // Check if this is already the last item
    4038                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
    4139
    42                         // If not wait for next item to show-up, filled by push
    43                         while (!(elem`next)) Pause();
    44 
    45                         // we need to return if the next link was empty
    46                         T * ret = elem`next;
    47 
    48                         // invalidate link to reset to initial state
    49                         elem`next = 0p;
    50                         return ret;
     40                        // If not wait for next item to show-up
     41                        // added by push
     42                        while (!(&elem)`next) Pause();
     43                        return (&elem)`next;
    5144                }
    5245        }
     
    7164                // Added a new element to the queue
    7265                // Multi-Thread Safe, Lock-Free
    73                 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
    74                 T * push(mpsc_queue(T) & this, T * elem) {
     66                T * push(mpsc_queue(T) & this, T & elem) {
    7567                        T * prev = push((mcs_queue(T)&)this, elem);
    76                         if (!prev) this.head = elem;
     68                        if (!prev) this.head = &elem;
    7769                        return prev;
    7870                }
     
    8274                // next is set to the new head of the queue
    8375                // NOT Multi-Thread Safe
    84                 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
    8576                T * pop(mpsc_queue(T) & this, T *& next) {
    8677                        T * elem = this.head;
     
    9384                                // force memory sync
    9485                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
    95 
    96                                 // invalidate link to reset to initial state
    97                                 elem`next = 0p;
    9886                        }
    9987                        // Otherwise, there might be a race where it only looks but someone is enqueuing
     
    10391                                // after that point, it could overwrite the write in push
    10492                                this.head = 0p;
    105                                 next = advance((mcs_queue(T)&)this, elem);
     93                                next = advance((mcs_queue(T)&)this, (*elem));
    10694
    10795                                // Only write to the head if there is a next element
     
    11098                                if (next) this.head = next;
    11199                        }
     100
     101                        // invalidate link
     102                        elem`next = 0p;
    112103
    113104                        // return removed element
  • tests/Makefile.am

    rf8a7fed r08e75215  
    7575        pybin/tools.py \
    7676        long_tests.hfa \
     77        .in/io.data \
    7778        io/.in/io.data \
    78         io/.in/many_read.data \
    7979        avltree/avl.h \
    8080        avltree/avl-private.h \
    8181        concurrent/clib.c \
    82         concurrent/clib_tls.c \
    8382        exceptions/with-threads.hfa \
    8483        exceptions/except-io.hfa
Note: See TracChangeset for help on using the changeset viewer.