- Timestamp:
- Mar 27, 2021, 6:04:14 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- fec3e9a
- Parents:
- 2644610 (diff), f8a7fed (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
r2644610 r1da7397 58 58 this.themain = themain; 59 59 this.arg = arg; 60 ( (thread&)this){"C-thread", cl};60 (this.self){"C-thread", cl}; 61 61 __thrd_start(this, main); 62 62 } … … 102 102 this.init = init; 103 103 this.arg = arg; 104 ( (thread&)this){"Processir Init"};104 (this.self){"Processir Init"}; 105 105 106 106 // Don't use __thrd_start! just prep the context manually … … 312 312 313 313 ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) { 314 return cfa_write(fildes, buf, nbyte, CFA_IO_LAZY); 314 // Use send rather then write for socket since it's faster 315 return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY); 315 316 } 316 317 … … 335 336 336 337 ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) { 337 return cfa_read(fildes, buf, nbyte, CFA_IO_LAZY); 338 } 339 340 } 338 // Use recv rather then read for socket since it's faster 339 return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY); 340 } 341 342 } -
libcfa/src/concurrency/invoke.h
r2644610 r1da7397 225 225 226 226 static inline $thread * volatile & ?`next ( $thread * this ) __attribute__((const)) { 227 return this->seqable. back;227 return this->seqable.next; 228 228 } 229 229 -
libcfa/src/concurrency/kernel.hfa
r2644610 r1da7397 69 69 // Cluster from which to get threads 70 70 struct cluster * cltr; 71 72 // Id within the cluster 73 unsigned cltr_id; 71 74 72 75 // Set to true to notify the processor should terminate -
libcfa/src/concurrency/kernel/startup.cfa
r2644610 r1da7397 486 486 487 487 // Adjust the ready queue size 488 ready_queue_grow( cltr, target );488 this.cltr_id = ready_queue_grow( cltr, target ); 489 489 490 490 // Unlock the RWlock -
libcfa/src/concurrency/kernel_private.hfa
r2644610 r1da7397 278 278 //----------------------------------------------------------------------- 279 279 // Increase the width of the ready queue (number of lanes) by 4 280 void ready_queue_grow (struct cluster * cltr, int target);280 unsigned ready_queue_grow (struct cluster * cltr, int target); 281 281 282 282 //----------------------------------------------------------------------- -
libcfa/src/concurrency/locks.hfa
r2644610 r1da7397 20 20 21 21 #include "bits/weakso_locks.hfa" 22 #include "containers/queueLockFree.hfa" 23 24 #include "thread.hfa" 22 25 23 26 #include "time_t.hfa" 24 27 #include "time.hfa" 28 29 //----------------------------------------------------------------------------- 30 // Semaphores 31 32 // '0-nary' semaphore 33 // Similar to a counting semaphore except the value of one is never reached 34 // as a consequence, a V() that would bring the value to 1 *spins* until 35 // a P consumes it 36 struct Semaphore0nary { 37 __spinlock_t lock; // needed to protect 38 mpsc_queue($thread) queue; 39 }; 40 41 static inline bool P(Semaphore0nary & this, $thread * thrd) __attribute__((artificial)); 42 static inline bool P(Semaphore0nary & this, $thread * thrd) { 43 /* paranoid */ verify(!(thrd->seqable.next)); 44 /* paranoid */ verify(!(thrd`next)); 45 46 push(this.queue, thrd); 47 return true; 48 } 49 50 static inline bool P(Semaphore0nary & this) __attribute__((artificial)); 51 static inline bool P(Semaphore0nary & this) { 52 $thread * thrd = active_thread(); 53 P(this, thrd); 54 park(); 55 return true; 56 } 57 58 static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) __attribute__((artificial)); 59 static inline $thread * V(Semaphore0nary & this, const bool doUnpark = true) { 60 $thread * next; 61 lock(this.lock __cfaabi_dbg_ctx2); 62 for (;;) { 63 next = pop(this.queue); 64 if (next) break; 65 Pause(); 66 } 67 unlock(this.lock); 68 69 if (doUnpark) unpark(next); 70 return next; 71 } 72 73 // Wrapper used on top of any sempahore to avoid potential locking 74 struct BinaryBenaphore { 75 volatile ssize_t counter; 76 }; 77 78 static inline { 79 void ?{}(BinaryBenaphore & this) { this.counter = 0; } 80 void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; } 81 void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; } 82 83 // returns true if no blocking needed 84 bool P(BinaryBenaphore & this) { return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0; } 85 bool tryP(BinaryBenaphore & this) { 86 ssize_t c = this.counter; 87 return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED); 88 } 89 90 // returns true if notify needed 91 bool V(BinaryBenaphore & this) { 92 ssize_t c = 0; 93 for () { 94 if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 95 if (c == 0) return true; 96 /* paranoid */ verify(c < 0); 97 return false; 98 } else { 99 if (c == 1) return true; 100 /* paranoid */ verify(c < 1); 101 Pause(); 102 } 103 } 104 } 105 } 106 107 // Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore 108 struct ThreadBenaphore { 109 BinaryBenaphore ben; 110 Semaphore0nary sem; 111 }; 112 113 static inline void ?{}(ThreadBenaphore & this) {} 114 static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; } 115 static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; } 116 117 static inline bool P(ThreadBenaphore & this) { return /* P(this.ben) ? false : */ P(this.sem); } 118 static inline bool P(ThreadBenaphore & this, $thread * t) { return /* P(this.ben) ? false : */ P(this.sem, t ); } 119 static inline bool tryP(ThreadBenaphore & this) { return tryP(this.ben); } 120 static inline bool P(ThreadBenaphore & this, bool wait) { return wait ? P(this) : tryP(this); } 121 122 static inline $thread * V(ThreadBenaphore & this, const bool doUnpark = true) { 123 // if (V(this.ben)) return 0p; 124 return V(this.sem, doUnpark); 125 } 126 127 //----------------------------------------------------------------------------- 128 // Semaphore 129 struct semaphore { 130 __spinlock_t lock; 131 int count; 132 __queue_t($thread) waiting; 133 }; 134 135 void ?{}(semaphore & this, int count = 1); 136 void ^?{}(semaphore & this); 137 bool P (semaphore & this); 138 bool V (semaphore & this); 139 bool V (semaphore & this, unsigned count); 25 140 26 141 //---------- … … 54 169 static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); } 55 170 171 struct fast_lock { 172 $thread * volatile owner; 173 ThreadBenaphore sem; 174 }; 175 176 static inline bool $try_lock(fast_lock & this, $thread * thrd) { 177 $thread * exp = 0p; 178 return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED); 179 } 180 181 static inline void $lock(fast_lock & this, $thread * thrd) { 182 /* paranoid */verify(thrd != this.owner); 183 184 for (;;) { 185 if ($try_lock(this, thrd)) return; 186 P(this.sem, thrd); 187 } 188 } 189 190 static inline void lock( fast_lock & this ) { 191 $thread * thrd = active_thread(); 192 /* paranoid */verify(thrd != this.owner); 193 194 for (;;) { 195 if ($try_lock(this, thrd)) return; 196 P(this.sem); 197 } 198 } 199 200 static inline void try_lock ( fast_lock & this ) { 201 $thread * thrd = active_thread(); 202 /* paranoid */ verify(thrd != this.owner); 203 return $try_lock(this, thrd); 204 } 205 206 static inline void unlock( fast_lock & this ) { 207 $thread * thrd = active_thread(); 208 /* paranoid */ verify(thrd == this.owner); 209 $thread * next = V(this.sem, false); // implicit fence 210 // open 'owner' only after fence 211 this.owner = 0p; 212 213 // Unpark the next person (can be 0p, unpark handles it) 214 unpark(next); 215 } 216 217 static inline void on_wait( fast_lock & this ) { 218 unlock(this); 219 #warning this is broken 220 } 221 222 static inline void on_notify( fast_lock & this, struct $thread * t ) { 223 $lock(this, t); 224 #warning this is broken 225 } 226 227 static inline void set_recursion_count( fast_lock & this, size_t recursion ) {} 228 static inline size_t get_recursion_count( fast_lock & this ) { return 0; } 229 230 struct mcs_node { 231 mcs_node * volatile next; 232 single_sem sem; 233 }; 234 235 static inline void ?{}(mcs_node & this) { this.next = 0p; } 236 237 static inline mcs_node * volatile & ?`next ( mcs_node * node ) { 238 return node->next; 239 } 240 241 struct mcs_lock { 242 mcs_queue(mcs_node) queue; 243 }; 244 245 static inline void lock(mcs_lock & l, mcs_node & n) { 246 if(push(l.queue, &n)) 247 wait(n.sem); 248 } 249 250 static inline void unlock(mcs_lock & l, mcs_node & n) { 251 mcs_node * next = advance(l.queue, &n); 252 if(next) post(next->sem); 253 } 254 56 255 //----------------------------------------------------------------------------- 57 256 // is_blocking_lock … … 121 320 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ); 122 321 } 123 124 //-----------------------------------------------------------------------------125 // Semaphore126 struct semaphore {127 __spinlock_t lock;128 int count;129 __queue_t($thread) waiting;130 };131 132 void ?{}(semaphore & this, int count = 1);133 void ^?{}(semaphore & this);134 bool P (semaphore & this);135 bool V (semaphore & this);136 bool V (semaphore & this, unsigned count); -
libcfa/src/concurrency/ready_queue.cfa
r2644610 r1da7397 39 39 #endif 40 40 41 #define BIAS 1641 #define BIAS 4 42 42 43 43 // returns the maximum number of processors the RWLock support … … 252 252 preferred = 253 253 //* 254 kernelTLS().this_processor ? kernelTLS().this_processor-> id * 4: -1;254 kernelTLS().this_processor ? kernelTLS().this_processor->cltr_id : -1; 255 255 /*/ 256 256 thrd->link.preferred * 4; … … 330 330 #if defined(BIAS) 331 331 // Don't bother trying locally too much 332 preferred = kernelTLS().this_processor-> id * 4;332 preferred = kernelTLS().this_processor->cltr_id; 333 333 #endif 334 334 … … 352 352 353 353 #if !defined(__CFA_NO_STATISTICS__) 354 if(locali) { 355 __tls_stats()->ready.pick.pop.local++; 356 } 357 if(localj) { 354 if(locali && localj) { 358 355 __tls_stats()->ready.pick.pop.local++; 359 356 } … … 528 525 529 526 // Grow the ready queue 530 void ready_queue_grow (struct cluster * cltr, int target) { 527 unsigned ready_queue_grow(struct cluster * cltr, int target) { 528 unsigned preferred; 529 size_t ncount; 530 531 531 /* paranoid */ verify( ready_mutate_islocked() ); 532 532 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n"); … … 543 543 // Find new count 544 544 // Make sure we always have atleast 1 list 545 size_t ncount = target >= 2 ? target * 4: 1; 545 if(target >= 2) { 546 ncount = target * 4; 547 preferred = ncount - 4; 548 } else { 549 ncount = 1; 550 preferred = 0; 551 } 546 552 547 553 // Allocate new array (uses realloc and memcpies the data) … … 578 584 579 585 /* paranoid */ verify( ready_mutate_islocked() ); 586 return preferred; 580 587 } 581 588 -
libcfa/src/containers/queueLockFree.hfa
r2644610 r1da7397 20 20 // Adds an element to the list 21 21 // Multi-Thread Safe, Lock-Free 22 T * push(mcs_queue(T) & this, T & elem) { 23 /* paranoid */ verify(!(&elem)`next); 22 T * push(mcs_queue(T) & this, T * elem) __attribute__((artificial)); 23 T * push(mcs_queue(T) & this, T * elem) { 24 /* paranoid */ verify(!(elem`next)); 24 25 // Race to add to the tail 25 T * prev = __atomic_exchange_n(&this.tail, &elem, __ATOMIC_SEQ_CST);26 T * prev = __atomic_exchange_n(&this.tail, elem, __ATOMIC_SEQ_CST); 26 27 // If we aren't the first, we need to tell the person before us 27 28 // No need to 28 if (prev) prev`next = &elem;29 if (prev) prev`next = elem; 29 30 return prev; 30 31 } … … 33 34 // Passing an element that is not the head is undefined behavior 34 35 // NOT Multi-Thread Safe, concurrent pushes are safe 35 T * advance(mcs_queue(T) & this, T & elem) { 36 T * expected = &elem; 36 T * advance(mcs_queue(T) & this, T * elem) __attribute__((artificial)); 37 T * advance(mcs_queue(T) & this, T * elem) { 38 T * expected = elem; 37 39 // Check if this is already the last item 38 40 if (__atomic_compare_exchange_n(&this.tail, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return 0p; 39 41 40 // If not wait for next item to show-up 41 // added by push 42 while (!(&elem)`next) Pause(); 43 return (&elem)`next; 42 // If not wait for next item to show-up, filled by push 43 while (!(elem`next)) Pause(); 44 45 // we need to return if the next link was empty 46 T * ret = elem`next; 47 48 // invalidate link to reset to initial state 49 elem`next = 0p; 50 return ret; 44 51 } 45 52 } … … 64 71 // Added a new element to the queue 65 72 // Multi-Thread Safe, Lock-Free 66 T * push(mpsc_queue(T) & this, T & elem) { 73 T * push(mpsc_queue(T) & this, T * elem) __attribute__((artificial)); 74 T * push(mpsc_queue(T) & this, T * elem) { 67 75 T * prev = push((mcs_queue(T)&)this, elem); 68 if (!prev) this.head = &elem;76 if (!prev) this.head = elem; 69 77 return prev; 70 78 } … … 74 82 // next is set to the new head of the queue 75 83 // NOT Multi-Thread Safe 84 T * pop(mpsc_queue(T) & this, T *& next) __attribute__((artificial)); 76 85 T * pop(mpsc_queue(T) & this, T *& next) { 77 86 T * elem = this.head; … … 84 93 // force memory sync 85 94 __atomic_thread_fence(__ATOMIC_SEQ_CST); 95 96 // invalidate link to reset to initial state 97 elem`next = 0p; 86 98 } 87 99 // Otherwise, there might be a race where it only looks but someone is enqueuing … … 91 103 // after that point, it could overwrite the write in push 92 104 this.head = 0p; 93 next = advance((mcs_queue(T)&)this, (*elem));105 next = advance((mcs_queue(T)&)this, elem); 94 106 95 107 // Only write to the head if there is a next element … … 98 110 if (next) this.head = next; 99 111 } 100 101 // invalidate link102 elem`next = 0p;103 112 104 113 // return removed element
Note:
See TracChangeset
for help on using the changeset viewer.