Changes in / [08e75215:f8a7fed]


Ignore:
Files:
6 added
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/invoke.h

    r08e75215 rf8a7fed  
    225225
    226226                static inline $thread * volatile & ?`next ( $thread * this )  __attribute__((const)) {
    227                         return this->seqable.back;
     227                        return this->seqable.next;
    228228                }
    229229
  • libcfa/src/concurrency/locks.hfa

    r08e75215 rf8a7fed  
    2020
    2121#include "bits/weakso_locks.hfa"
     22#include "containers/queueLockFree.hfa"
     23
     24#include "thread.hfa"
    2225
    2326#include "time_t.hfa"
    2427#include "time.hfa"
     28
     29//-----------------------------------------------------------------------------
     30// Semaphores
     31
     32// '0-nary' semaphore
     33// Similar to a counting semaphore except the value of one is never reached
     34// as a consequence, a V() that would bring the value to 1 *spins* until
     35// a P consumes it
     36struct Semaphore0nary {
     37        __spinlock_t lock; // needed to protect
     38        mpsc_queue($thread) queue;
     39};
     40
     41static inline bool P(Semaphore0nary & this, $thread * thrd) __attribute__((artificial));
     42static inline bool P(Semaphore0nary & this, $thread * thrd) {
     43        /* paranoid */ verify(!(thrd->seqable.next));
     44        /* paranoid */ verify(!(thrd`next));
     45
     46        push(this.queue, thrd);
     47        return true;
     48}
     49
     50static inline bool P(Semaphore0nary & this) __attribute__((artificial));
     51static inline bool P(Semaphore0nary & this) {
     52    $thread * thrd = active_thread();
     53    P(this, thrd);
     54    park();
     55    return true;
     56}
     57
     58static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) __attribute__((artificial));
     59static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) {
     60        $thread * next;
     61        lock(this.lock __cfaabi_dbg_ctx2);
     62                for (;;) {
     63                        next = pop(this.queue);
     64                        if (next) break;
     65                        Pause();
     66                }
     67        unlock(this.lock);
     68
     69        if (doUnpark) unpark(next);
     70        return next;
     71}
     72
     73// Wrapper used on top of any sempahore to avoid potential locking
     74struct BinaryBenaphore {
     75        volatile ssize_t counter;
     76};
     77
     78static inline {
     79        void ?{}(BinaryBenaphore & this) { this.counter = 0; }
     80        void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; }
     81        void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; }
     82
     83        // returns true if no blocking needed
     84        bool P(BinaryBenaphore & this) { return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0; }
     85        bool tryP(BinaryBenaphore & this) {
     86                ssize_t c = this.counter;
     87                return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
     88        }
     89
     90        // returns true if notify needed
     91        bool V(BinaryBenaphore & this) {
     92                ssize_t c = 0;
     93                for () {
     94                        if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     95                                if (c == 0) return true;
     96                                /* paranoid */ verify(c < 0);
     97                                return false;
     98                        } else {
     99                                if (c == 1) return true;
     100                                /* paranoid */ verify(c < 1);
     101                                Pause();
     102                        }
     103                }
     104        }
     105}
     106
     107// Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore
     108struct ThreadBenaphore {
     109        BinaryBenaphore ben;
     110        Semaphore0nary  sem;
     111};
     112
     113static inline void ?{}(ThreadBenaphore & this) {}
     114static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; }
     115static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; }
     116
     117static inline bool P(ThreadBenaphore & this)              { return /* P(this.ben) ? false : */ P(this.sem); }
     118static inline bool P(ThreadBenaphore & this, $thread * t) { return /* P(this.ben) ? false : */ P(this.sem, t ); }
     119static inline bool tryP(ThreadBenaphore & this)           { return tryP(this.ben); }
     120static inline bool P(ThreadBenaphore & this, bool wait)   { return wait ? P(this) : tryP(this); }
     121
     122static inline $thread * V(ThreadBenaphore & this, const bool doUnpark = true) {
     123        // if (V(this.ben)) return 0p;
     124        return V(this.sem, doUnpark);
     125}
     126
     127//-----------------------------------------------------------------------------
     128// Semaphore
     129struct semaphore {
     130        __spinlock_t lock;
     131        int count;
     132        __queue_t($thread) waiting;
     133};
     134
     135void  ?{}(semaphore & this, int count = 1);
     136void ^?{}(semaphore & this);
     137bool   P (semaphore & this);
     138bool   V (semaphore & this);
     139bool   V (semaphore & this, unsigned count);
    25140
    26141//----------
     
    54169static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
    55170
     171struct fast_lock {
     172        $thread * volatile owner;
     173        ThreadBenaphore sem;
     174};
     175
     176static inline bool $try_lock(fast_lock & this, $thread * thrd) {
     177    $thread * exp = 0p;
     178    return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
     179}
     180
     181static inline void $lock(fast_lock & this, $thread * thrd) {
     182        /* paranoid */verify(thrd != this.owner);
     183
     184        for (;;) {
     185                if ($try_lock(this, thrd)) return;
     186                P(this.sem, thrd);
     187        }
     188}
     189
     190static inline void lock( fast_lock & this ) {
     191        $thread * thrd = active_thread();
     192        /* paranoid */verify(thrd != this.owner);
     193
     194        for (;;) {
     195                if ($try_lock(this, thrd)) return;
     196                P(this.sem);
     197        }
     198}
     199
     200static inline void try_lock ( fast_lock & this ) {
     201        $thread * thrd = active_thread();
     202        /* paranoid */ verify(thrd != this.owner);
     203        return $try_lock(this, thrd);
     204}
     205
     206static inline void unlock( fast_lock & this ) {
     207        $thread * thrd = active_thread();
     208        /* paranoid */ verify(thrd == this.owner);
     209        $thread * next = V(this.sem, false); // implicit fence
     210        // open 'owner' only after fence
     211        this.owner = 0p;
     212
     213        // Unpark the next person (can be 0p, unpark handles it)
     214        unpark(next);
     215}
     216
     217static inline void on_wait( fast_lock & this ) {
     218        unlock(this);
     219        #warning this is broken
     220}
     221
     222static inline void on_notify( fast_lock & this, struct $thread * t ) {
     223        $lock(this, t);
     224        #warning this is broken
     225}
     226
     227static inline void   set_recursion_count( fast_lock & this, size_t recursion ) {}
     228static inline size_t get_recursion_count( fast_lock & this ) { return 0; }
     229
     230struct mcs_node {
     231        mcs_node * volatile next;
     232        single_sem sem;
     233};
     234
     235static inline void ?{}(mcs_node & this) { this.next = 0p; }
     236
     237static inline mcs_node * volatile & ?`next ( mcs_node * node ) {
     238        return node->next;
     239}
     240
     241struct mcs_lock {
     242        mcs_queue(mcs_node) queue;
     243};
     244
     245static inline void lock(mcs_lock & l, mcs_node & n) {
     246        if(push(l.queue, &n))
     247                wait(n.sem);
     248}
     249
     250static inline void unlock(mcs_lock & l, mcs_node & n) {
     251        mcs_node * next = advance(l.queue, &n);
     252        if(next) post(next->sem);
     253}
     254
    56255//-----------------------------------------------------------------------------
    57256// is_blocking_lock
     
    121320        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    122321}
    123 
    124 //-----------------------------------------------------------------------------
    125 // Semaphore
    126 struct semaphore {
    127         __spinlock_t lock;
    128         int count;
    129         __queue_t($thread) waiting;
    130 };
    131 
    132 void  ?{}(semaphore & this, int count = 1);
    133 void ^?{}(semaphore & this);
    134 bool   P (semaphore & this);
    135 bool   V (semaphore & this);
    136 bool   V (semaphore & this, unsigned count);
  • libcfa/src/containers/queueLockFree.hfa

    r08e75215 rf8a7fed  
    2020                // Adds an element to the list
    2121                // Multi-Thread Safe, Lock-Free
    22                 T * push(mcs_queue(T) & this, T & elem) {
    23                         /* paranoid */ verify(!(&elem)`next);
     22                T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial));
     23                T * push(mcs_queue(T) & this, T * elem) {
     24                        /* paranoid */ verify(!(elem`next));
    2425                        // Race to add to the tail
    25                         T * prev = __atomic_exchange_n(&this.tail, &elem, __ATOMIC_SEQ_CST);
     26                        T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);
    2627                        // If we aren't the first, we need to tell the person before us
    2728                        // No need to
    28                         if (prev) prev`next = &elem;
     29                        if (prev) prev`next = elem;
    2930                        return prev;
    3031                }
     
    3334                // Passing an element that is not the head is undefined behavior
    3435                // NOT Multi-Thread Safe, concurrent pushes are safe
    35                 T * advance(mcs_queue(T) & this, T & elem) {
    36                         T * expected = &elem;
     36                T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial));
     37                T * advance(mcs_queue(T) & this, T * elem) {
     38                        T * expected = elem;
    3739                        // Check if this is already the last item
    3840                        if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p;
    3941
    40                         // If not wait for next item to show-up
    41                         // added by push
    42                         while (!(&elem)`next) Pause();
    43                         return (&elem)`next;
     42                        // If not wait for next item to show-up, filled by push
     43                        while (!(elem`next)) Pause();
     44
     45                        // we need to return if the next link was empty
     46                        T * ret = elem`next;
     47
     48                        // invalidate link to reset to initial state
     49                        elem`next = 0p;
     50                        return ret;
    4451                }
    4552        }
     
    6471                // Added a new element to the queue
    6572                // Multi-Thread Safe, Lock-Free
    66                 T * push(mpsc_queue(T) & this, T & elem) {
     73                T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial));
     74                T * push(mpsc_queue(T) & this, T * elem) {
    6775                        T * prev = push((mcs_queue(T)&)this, elem);
    68                         if (!prev) this.head = &elem;
     76                        if (!prev) this.head = elem;
    6977                        return prev;
    7078                }
     
    7482                // next is set to the new head of the queue
    7583                // NOT Multi-Thread Safe
     84                T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));
    7685                T * pop(mpsc_queue(T) & this, T *& next) {
    7786                        T * elem = this.head;
     
    8493                                // force memory sync
    8594                                __atomic_thread_fence(__ATOMIC_SEQ_CST);
     95
     96                                // invalidate link to reset to initial state
     97                                elem`next = 0p;
    8698                        }
    8799                        // Otherwise, there might be a race where it only looks but someone is enqueuing
     
    91103                                // after that point, it could overwrite the write in push
    92104                                this.head = 0p;
    93                                 next = advance((mcs_queue(T)&)this, (*elem));
     105                                next = advance((mcs_queue(T)&)this, elem);
    94106
    95107                                // Only write to the head if there is a next element
     
    98110                                if (next) this.head = next;
    99111                        }
    100 
    101                         // invalidate link
    102                         elem`next = 0p;
    103112
    104113                        // return removed element
  • tests/Makefile.am

    r08e75215 rf8a7fed  
    7575        pybin/tools.py \
    7676        long_tests.hfa \
    77         .in/io.data \
    7877        io/.in/io.data \
     78        io/.in/many_read.data \
    7979        avltree/avl.h \
    8080        avltree/avl-private.h \
    8181        concurrent/clib.c \
     82        concurrent/clib_tls.c \
    8283        exceptions/with-threads.hfa \
    8384        exceptions/except-io.hfa
Note: See TracChangeset for help on using the changeset viewer.