Changes in / [f8a7fed:08e75215]
- Files:
-
- 6 deleted
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/invoke.h
rf8a7fed r08e75215 225 225 226 226 static inline $thread * volatile & ?`next ( $thread * this ) __attribute__((const)) { 227 return this->seqable. next;227 return this->seqable.back; 228 228 } 229 229 -
libcfa/src/concurrency/locks.hfa
rf8a7fed r08e75215 20 20 21 21 #include "bits/weakso_locks.hfa" 22 #include "containers/queueLockFree.hfa"23 24 #include "thread.hfa"25 22 26 23 #include "time_t.hfa" 27 24 #include "time.hfa" 28 29 //-----------------------------------------------------------------------------30 // Semaphores31 32 // '0-nary' semaphore33 // Similar to a counting semaphore except the value of one is never reached34 // as a consequence, a V() that would bring the value to 1 *spins* until35 // a P consumes it36 struct Semaphore0nary {37 __spinlock_t lock; // needed to protect38 mpsc_queue($thread) queue;39 };40 41 static inline bool P(Semaphore0nary & this, $thread * thrd) __attribute__((artificial));42 static inline bool P(Semaphore0nary & this, $thread * thrd) {43 /* paranoid */ verify(!(thrd->seqable.next));44 /* paranoid */ verify(!(thrd`next));45 46 push(this.queue, thrd);47 return true;48 }49 50 static inline bool P(Semaphore0nary & this) __attribute__((artificial));51 static inline bool P(Semaphore0nary & this) {52 $thread * thrd = active_thread();53 P(this, thrd);54 park();55 return true;56 }57 58 static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) __attribute__((artificial));59 static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) {60 $thread * next;61 lock(this.lock __cfaabi_dbg_ctx2);62 for (;;) {63 next = pop(this.queue);64 if (next) break;65 Pause();66 }67 unlock(this.lock);68 69 if (doUnpark) unpark(next);70 return next;71 }72 73 // Wrapper used on top of any sempahore to avoid potential locking74 struct BinaryBenaphore {75 volatile ssize_t counter;76 };77 78 static inline {79 void ?{}(BinaryBenaphore & this) { this.counter = 0; }80 void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; }81 void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; }82 83 // returns true if no blocking needed84 bool P(BinaryBenaphore & this) { return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0; }85 bool tryP(BinaryBenaphore & this) {86 ssize_t c = this.counter;87 return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);88 }89 90 // returns true if notify needed91 bool V(BinaryBenaphore & this) {92 ssize_t c = 0;93 for () {94 if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {95 if (c == 0) return true;96 /* paranoid */ verify(c < 0);97 return false;98 } else {99 if (c == 1) return true;100 /* paranoid */ verify(c < 1);101 Pause();102 }103 }104 }105 }106 107 // Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore108 struct ThreadBenaphore {109 BinaryBenaphore ben;110 Semaphore0nary sem;111 };112 113 static inline void ?{}(ThreadBenaphore & this) {}114 static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; }115 static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; }116 117 static inline bool P(ThreadBenaphore & this) { return /* P(this.ben) ? false : */ P(this.sem); }118 static inline bool P(ThreadBenaphore & this, $thread * t) { return /* P(this.ben) ? false : */ P(this.sem, t ); }119 static inline bool tryP(ThreadBenaphore & this) { return tryP(this.ben); }120 static inline bool P(ThreadBenaphore & this, bool wait) { return wait ? P(this) : tryP(this); }121 122 static inline $thread * V(ThreadBenaphore & this, const bool doUnpark = true) {123 // if (V(this.ben)) return 0p;124 return V(this.sem, doUnpark);125 }126 127 //-----------------------------------------------------------------------------128 // Semaphore129 struct semaphore {130 __spinlock_t lock;131 int count;132 __queue_t($thread) waiting;133 };134 135 void ?{}(semaphore & this, int count = 1);136 void ^?{}(semaphore & this);137 bool P (semaphore & this);138 bool V (semaphore & this);139 bool V (semaphore & this, unsigned count);140 25 141 26 //---------- … … 168 53 static inline void set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); } 169 54 static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); } 170 171 struct fast_lock {172 $thread * volatile owner;173 ThreadBenaphore sem;174 };175 176 static inline bool $try_lock(fast_lock & this, $thread * thrd) {177 $thread * exp = 0p;178 return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);179 }180 181 static inline void $lock(fast_lock & this, $thread * thrd) {182 /* paranoid */verify(thrd != this.owner);183 184 for (;;) {185 if ($try_lock(this, thrd)) return;186 P(this.sem, thrd);187 }188 }189 190 static inline void lock( fast_lock & this ) {191 $thread * thrd = active_thread();192 /* paranoid */verify(thrd != this.owner);193 194 for (;;) {195 if ($try_lock(this, thrd)) return;196 P(this.sem);197 }198 }199 200 static inline void try_lock ( fast_lock & this ) {201 $thread * thrd = active_thread();202 /* paranoid */ verify(thrd != this.owner);203 return $try_lock(this, thrd);204 }205 206 static inline void unlock( fast_lock & this ) {207 $thread * thrd = active_thread();208 /* paranoid */ verify(thrd == this.owner);209 $thread * next = V(this.sem, false); // implicit fence210 // open 'owner' only after fence211 this.owner = 0p;212 213 // Unpark the next person (can be 0p, unpark handles it)214 unpark(next);215 }216 217 static inline void on_wait( fast_lock & this ) {218 unlock(this);219 #warning this is broken220 }221 222 static inline void on_notify( fast_lock & this, struct $thread * t ) {223 $lock(this, t);224 #warning this is broken225 }226 227 static inline void set_recursion_count( fast_lock & this, size_t recursion ) {}228 static inline size_t get_recursion_count( fast_lock & this ) { return 0; }229 230 struct mcs_node {231 mcs_node * volatile next;232 single_sem sem;233 };234 235 static inline void ?{}(mcs_node & this) { this.next = 0p; }236 237 static inline mcs_node * volatile & ?`next ( mcs_node * node ) {238 return node->next;239 }240 241 struct mcs_lock {242 mcs_queue(mcs_node) queue;243 };244 245 static inline void lock(mcs_lock & l, mcs_node & n) {246 if(push(l.queue, &n))247 wait(n.sem);248 }249 250 static inline void unlock(mcs_lock & l, mcs_node & n) {251 mcs_node * next = advance(l.queue, &n);252 if(next) post(next->sem);253 }254 55 255 56 //----------------------------------------------------------------------------- … … 320 121 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ); 321 122 } 123 124 //----------------------------------------------------------------------------- 125 // Semaphore 126 struct semaphore { 127 __spinlock_t lock; 128 int count; 129 __queue_t($thread) waiting; 130 }; 131 132 void ?{}(semaphore & this, int count = 1); 133 void ^?{}(semaphore & this); 134 bool P (semaphore & this); 135 bool V (semaphore & this); 136 bool V (semaphore & this, unsigned count); -
libcfa/src/containers/queueLockFree.hfa
rf8a7fed r08e75215 20 20 // Adds an element to the list 21 21 // Multi-Thread Safe, Lock-Free 22 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial)); 23 T * push(mcs_queue(T) & this, T * elem) { 24 /* paranoid */ verify(!(elem`next)); 22 T * push(mcs_queue(T) & this, T & elem) { 23 /* paranoid */ verify(!(&elem)`next); 25 24 // Race to add to the tail 26 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST);25 T * prev = __atomic_exchange_n(&this.tail, &elem, __ATOMIC_SEQ_CST); 27 26 // If we aren't the first, we need to tell the person before us 28 27 // No need to 29 if (prev) prev`next = elem;28 if (prev) prev`next = &elem; 30 29 return prev; 31 30 } … … 34 33 // Passing an element that is not the head is undefined behavior 35 34 // NOT Multi-Thread Safe, concurrent pushes are safe 36 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial)); 37 T * advance(mcs_queue(T) & this, T * elem) { 38 T * expected = elem; 35 T * advance(mcs_queue(T) & this, T & elem) { 36 T * expected = &elem; 39 37 // Check if this is already the last item 40 38 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p; 41 39 42 // If not wait for next item to show-up, filled by push 43 while (!(elem`next)) Pause(); 44 45 // we need to return if the next link was empty 46 T * ret = elem`next; 47 48 // invalidate link to reset to initial state 49 elem`next = 0p; 50 return ret; 40 // If not wait for next item to show-up 41 // added by push 42 while (!(&elem)`next) Pause(); 43 return (&elem)`next; 51 44 } 52 45 } … … 71 64 // Added a new element to the queue 72 65 // Multi-Thread Safe, Lock-Free 73 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial)); 74 T * push(mpsc_queue(T) & this, T * elem) { 66 T * push(mpsc_queue(T) & this, T & elem) { 75 67 T * prev = push((mcs_queue(T)&)this, elem); 76 if (!prev) this.head = elem;68 if (!prev) this.head = &elem; 77 69 return prev; 78 70 } … … 82 74 // next is set to the new head of the queue 83 75 // NOT Multi-Thread Safe 84 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial));85 76 T * pop(mpsc_queue(T) & this, T *& next) { 86 77 T * elem = this.head; … … 93 84 // force memory sync 94 85 __atomic_thread_fence(__ATOMIC_SEQ_CST); 95 96 // invalidate link to reset to initial state97 elem`next = 0p;98 86 } 99 87 // Otherwise, there might be a race where it only looks but someone is enqueuing … … 103 91 // after that point, it could overwrite the write in push 104 92 this.head = 0p; 105 next = advance((mcs_queue(T)&)this, elem);93 next = advance((mcs_queue(T)&)this, (*elem)); 106 94 107 95 // Only write to the head if there is a next element … … 110 98 if (next) this.head = next; 111 99 } 100 101 // invalidate link 102 elem`next = 0p; 112 103 113 104 // return removed element -
tests/Makefile.am
rf8a7fed r08e75215 75 75 pybin/tools.py \ 76 76 long_tests.hfa \ 77 .in/io.data \ 77 78 io/.in/io.data \ 78 io/.in/many_read.data \79 79 avltree/avl.h \ 80 80 avltree/avl-private.h \ 81 81 concurrent/clib.c \ 82 concurrent/clib_tls.c \83 82 exceptions/with-threads.hfa \ 84 83 exceptions/except-io.hfa
Note: See TracChangeset
for help on using the changeset viewer.