Changeset eb5962a for libcfa/src
- Timestamp:
- Jun 21, 2022, 1:39:24 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- b62d1d6
- Parents:
- 1df492a (diff), 1dbbef6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src
- Files:
-
- 12 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/bits/locks.hfa
r1df492a reb5962a 26 26 // Wrap in struct to prevent false sharing with debug info 27 27 volatile bool lock; 28 #ifdef __CFA_DEBUG__29 // previous function to acquire the lock30 const char * prev_name;31 // previous thread to acquire the lock32 void* prev_thrd;33 // keep track of number of times we had to spin, just in case the number is unexpectedly huge34 size_t spin_count;35 #endif36 28 }; 37 29 … … 40 32 extern void disable_interrupts() OPTIONAL_THREAD; 41 33 extern void enable_interrupts( bool poll = true ) OPTIONAL_THREAD; 42 43 #ifdef __CFA_DEBUG__ 44 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]); 45 #else 46 #define __cfaabi_dbg_record_lock(x, y) 47 #endif 34 #define __cfaabi_dbg_record_lock(x, y) 48 35 } 49 36 50 37 static inline void ?{}( __spinlock_t & this ) { 51 38 this.lock = 0; 52 #ifdef __CFA_DEBUG__53 this.spin_count = 0;54 #endif55 39 } 56 40 … … 77 61 for ( unsigned int i = 1;; i += 1 ) { 78 62 if ( (this.lock == 0) && (__atomic_test_and_set( &this.lock, __ATOMIC_ACQUIRE ) == 0) ) break; 79 #ifdef __CFA_DEBUG__80 this.spin_count++;81 #endif82 63 #ifndef NOEXPBACK 83 64 // exponential spin -
libcfa/src/concurrency/invoke.h
r1df492a reb5962a 195 195 struct __monitor_group_t monitors; 196 196 197 // used to put threads on user data structures198 struct {199 struct thread$ * next;200 struct thread$ * back;201 } seqable;202 203 197 // used to put threads on dlist data structure 204 198 __cfa_dlink(thread$); … … 208 202 struct thread$ * prev; 209 203 } node; 204 205 // used to store state between clh lock/unlock 206 volatile bool * clh_prev; 207 208 // used to point to this thd's current clh node 209 volatile bool * clh_node; 210 210 211 211 struct processor * last_proc; … … 240 240 } 241 241 242 static inline thread$ * volatile & ?`next ( thread$ * this ) __attribute__((const)) {243 return this->seqable.next;244 }245 246 static inline thread$ *& Back( thread$ * this ) __attribute__((const)) {247 return this->seqable.back;248 }249 250 static inline thread$ *& Next( thread$ * this ) __attribute__((const)) {251 return this->seqable.next;252 }253 254 static inline bool listed( thread$ * this ) {255 return this->seqable.next != 0p;256 }257 258 242 static inline void ?{}(__monitor_group_t & this) { 259 243 (this.data){0p}; -
libcfa/src/concurrency/io.cfa
r1df492a reb5962a 159 159 160 160 const __u32 mask = *ctx->cq.mask; 161 const __u32 num = *ctx->cq.num; 161 162 unsigned long long ts_prev = ctx->cq.ts; 162 163 // re-read the head and tail in case it already changed. 164 const __u32 head = *ctx->cq.head; 165 const __u32 tail = *ctx->cq.tail; 166 const __u32 count = tail - head; 167 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 168 169 for(i; count) { 170 unsigned idx = (head + i) & mask; 171 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 172 173 /* paranoid */ verify(&cqe); 174 175 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 176 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 177 178 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 179 } 180 181 unsigned long long ts_next = ctx->cq.ts = rdtscl(); 182 183 // Mark to the kernel that the cqe has been seen 184 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 185 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 186 ctx->proc->idle_wctx.drain_time = ts_next; 163 unsigned long long ts_next; 164 165 // We might need to do this multiple times if more events completed than can fit in the queue. 166 for() { 167 // re-read the head and tail in case it already changed. 168 const __u32 head = *ctx->cq.head; 169 const __u32 tail = *ctx->cq.tail; 170 const __u32 count = tail - head; 171 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 172 173 for(i; count) { 174 unsigned idx = (head + i) & mask; 175 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 176 177 /* paranoid */ verify(&cqe); 178 179 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 180 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 181 182 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 183 } 184 185 ts_next = ctx->cq.ts = rdtscl(); 186 187 // Mark to the kernel that the cqe has been seen 188 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 189 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 190 ctx->proc->idle_wctx.drain_time = ts_next; 191 192 if(likely(count < num)) break; 193 194 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 195 } 187 196 188 197 __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next); -
libcfa/src/concurrency/io/setup.cfa
r1df492a reb5962a 138 138 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; 139 139 if( !is_pow2(nentries) ) { 140 abort("ERROR: I/O setup 'num_entries' must be a power of 2 \n");140 abort("ERROR: I/O setup 'num_entries' must be a power of 2, was %u\n", nentries); 141 141 } 142 142 -
libcfa/src/concurrency/iofwd.hfa
r1df492a reb5962a 76 76 void reset ( io_future_t & this ) { return reset (this.self); } 77 77 bool available( io_future_t & this ) { return available(this.self); } 78 bool setup ( io_future_t & this, oneshot & ctx ) { return setup (this.self, ctx); } 79 bool retract ( io_future_t & this, oneshot & ctx ) { return retract(this.self, ctx); } 78 80 } 79 81 -
libcfa/src/concurrency/kernel.cfa
r1df492a reb5962a 834 834 #endif 835 835 836 837 838 //-----------------------------------------------------------------------------839 // Debug840 __cfaabi_dbg_debug_do(841 extern "C" {842 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]) {843 this.prev_name = prev_name;844 this.prev_thrd = kernelTLS().this_thread;845 }846 }847 )848 849 836 //----------------------------------------------------------------------------- 850 837 // Debug -
libcfa/src/concurrency/kernel/fwd.hfa
r1df492a reb5962a 200 200 struct thread$ * expected = this.ptr; 201 201 if(expected == 1p) return false; 202 /* paranoid */ verify( expected == 0p );203 202 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 204 203 park(); … … 213 212 thread$ * post(oneshot & this, bool do_unpark = true) { 214 213 struct thread$ * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 215 if( got == 0p ) return 0p;214 if( got == 0p || got == 1p ) return 0p; 216 215 if(do_unpark) unpark( got ); 217 216 return got; … … 263 262 264 263 // The future is not fulfilled, try to setup the wait context 265 /* paranoid */ verify( expected == 0p );266 264 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 267 265 return true; … … 275 273 // should retract the wait ctx 276 274 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 277 void retract( future_t & this, oneshot & wait_ctx ) { 278 // Remove the wait context 279 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 280 281 // got == 0p: future was never actually setup, just return 282 if( got == 0p ) return; 283 284 // got == wait_ctx: since fulfil does an atomic_swap, 285 // if we got back the original then no one else saw context 286 // It is safe to delete (which could happen after the return) 287 if( got == &wait_ctx ) return; 288 289 // got == 1p: the future is ready and the context was fully consumed 290 // the server won't use the pointer again 291 // It is safe to delete (which could happen after the return) 292 if( got == 1p ) return; 293 294 // got == 2p: the future is ready but the context hasn't fully been consumed 295 // spin until it is safe to move on 296 if( got == 2p ) { 297 while( this.ptr != 1p ) Pause(); 298 return; 299 } 300 301 // got == any thing else, something wen't wrong here, abort 302 abort("Future in unexpected state"); 275 bool retract( future_t & this, oneshot & wait_ctx ) { 276 for() { 277 struct oneshot * expected = this.ptr; 278 279 // expected == 0p: future was never actually setup, just return 280 if( expected == 0p ) return false; 281 282 // expected == 1p: the future is ready and the context was fully consumed 283 // the server won't use the pointer again 284 // It is safe to delete (which could happen after the return) 285 if( expected == 1p ) return true; 286 287 // expected == 2p: the future is ready but the context hasn't fully been consumed 288 // spin until it is safe to move on 289 if( expected == 2p ) { 290 while( this.ptr != 1p ) Pause(); 291 /* paranoid */ verify( this.ptr == 1p ); 292 return true; 293 } 294 295 // expected != wait_ctx: the future was setup with a different context ?!?! 296 // something went wrong here, abort 297 if( expected != &wait_ctx ) abort("Future in unexpected state"); 298 299 // we still have the original context, then no one else saw it 300 // attempt to remove the context so it doesn't get consumed. 301 if(__atomic_compare_exchange_n( &this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 302 return false; 303 } 304 } 303 305 } 304 306 … … 379 381 return ret; 380 382 } 383 384 // Wait for any future to be fulfilled 385 forall(T& | sized(T) | { bool setup( T&, oneshot & ); bool retract( T&, oneshot & ); }) 386 T & wait_any( T * futures, size_t num_futures ) { 387 oneshot temp; 388 389 // setup all futures 390 // if any are already satisfied return 391 for ( i; num_futures ) { 392 if( !setup(futures[i], temp) ) return futures[i]; 393 } 394 395 // Wait context is setup, just wait on it 396 wait( temp ); 397 398 size_t ret; 399 // attempt to retract all futures 400 for ( i; num_futures ) { 401 if ( retract( futures[i], temp ) ) ret = i; 402 } 403 404 return futures[ret]; 405 } 381 406 } 382 407 -
libcfa/src/concurrency/locks.cfa
r1df492a reb5962a 219 219 // this casts the alarm node to our wrapped type since we used type erasure 220 220 static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); } 221 222 struct pthread_alarm_node_wrap { 223 alarm_node_t alarm_node; 224 pthread_cond_var(L) * cond; 225 info_thread(L) * info_thd; 226 }; 227 228 void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) { 229 this.alarm_node{ callback, alarm, period }; 230 this.cond = c; 231 this.info_thd = i; 232 } 233 234 void ^?{}( pthread_alarm_node_wrap(L) & this ) { } 235 236 static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) { 237 // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin. 238 lock( cond->lock __cfaabi_dbg_ctx2 ); 239 240 // this check is necessary to avoid a race condition since this timeout handler 241 // may still be called after a thread has been removed from the queue but 242 // before the alarm is unregistered 243 if ( (*info_thd)`isListed ) { // is thread on queue 244 info_thd->signalled = false; 245 // remove this thread O(1) 246 remove( *info_thd ); 247 on_notify(*info_thd->lock, info_thd->t); 248 } 249 unlock( cond->lock ); 250 } 251 252 // this casts the alarm node to our wrapped type since we used type erasure 253 static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); } 221 254 } 222 255 … … 388 421 on_wakeup(*i.lock, recursion_count); 389 422 } 390 } 391 423 424 //----------------------------------------------------------------------------- 425 // pthread_cond_var 426 427 void ?{}( pthread_cond_var(L) & this ) with(this) { 428 blocked_threads{}; 429 lock{}; 430 } 431 432 void ^?{}( pthread_cond_var(L) & this ) { } 433 434 bool notify_one( pthread_cond_var(L) & this ) with(this) { 435 lock( lock __cfaabi_dbg_ctx2 ); 436 bool ret = ! blocked_threads`isEmpty; 437 if ( ret ) { 438 info_thread(L) & popped = try_pop_front( blocked_threads ); 439 on_notify(*popped.lock, popped.t); 440 } 441 unlock( lock ); 442 return ret; 443 } 444 445 bool notify_all( pthread_cond_var(L) & this ) with(this) { 446 lock( lock __cfaabi_dbg_ctx2 ); 447 bool ret = ! blocked_threads`isEmpty; 448 while( ! blocked_threads`isEmpty ) { 449 info_thread(L) & popped = try_pop_front( blocked_threads ); 450 on_notify(*popped.lock, popped.t); 451 } 452 unlock( lock ); 453 return ret; 454 } 455 456 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; } 457 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 458 459 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) { 460 // add info_thread to waiting queue 461 insert_last( blocked_threads, *i ); 462 size_t recursion_count = 0; 463 recursion_count = on_wait( *i->lock ); 464 return recursion_count; 465 } 466 467 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 468 lock( lock __cfaabi_dbg_ctx2 ); 469 size_t recursion_count = queue_and_get_recursion(this, &info); 470 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 471 register_self( &node_wrap.alarm_node ); 472 unlock( lock ); 473 474 // blocks here 475 park(); 476 477 // unregisters alarm so it doesn't go off if this happens first 478 unregister_self( &node_wrap.alarm_node ); 479 480 // resets recursion count here after waking 481 if (info.lock) on_wakeup(*info.lock, recursion_count); 482 } 483 484 void wait( pthread_cond_var(L) & this, L & l ) with(this) { 485 wait( this, l, 0 ); 486 } 487 488 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) { 489 lock( lock __cfaabi_dbg_ctx2 ); 490 info_thread( L ) i = { active_thread(), info, &l }; 491 size_t recursion_count = queue_and_get_recursion(this, &i); 492 unlock( lock ); 493 park( ); 494 on_wakeup(*i.lock, recursion_count); 495 } 496 497 #define PTHREAD_WAIT_TIME( u, l, t ) \ 498 info_thread( L ) i = { active_thread(), u, l }; \ 499 queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \ 500 return i.signalled; 501 502 bool wait( pthread_cond_var(L) & this, L & l, timespec t ) { 503 Duration d = { t }; 504 WAIT_TIME( 0, &l , d ) 505 } 506 507 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) { 508 Duration d = { t }; 509 WAIT_TIME( info, &l , d ) 510 } 511 } 392 512 //----------------------------------------------------------------------------- 393 513 // Semaphore -
libcfa/src/concurrency/locks.hfa
r1df492a reb5962a 101 101 102 102 //----------------------------------------------------------------------------- 103 // MCS Spin Lock 104 // - No recursive acquisition 105 // - Needs to be released by owner 106 107 struct mcs_spin_node { 108 mcs_spin_node * volatile next; 109 volatile bool locked; 110 }; 111 112 struct mcs_spin_queue { 113 mcs_spin_node * volatile tail; 114 }; 115 116 static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; } 117 118 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) { 119 return node->next; 120 } 121 122 struct mcs_spin_lock { 123 mcs_spin_queue queue; 124 }; 125 126 static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) { 127 mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST); 128 n.locked = true; 129 if(prev == 0p) return; 130 prev->next = &n; 131 while(__atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause(); 132 } 133 134 static inline void unlock(mcs_spin_lock & l, mcs_spin_node & n) { 135 mcs_spin_node * n_ptr = &n; 136 if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return; 137 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {} 138 n.next->locked = false; 139 } 140 141 //----------------------------------------------------------------------------- 142 // CLH Spinlock 143 // - No recursive acquisition 144 // - Needs to be released by owner 145 146 struct clh_lock { 147 volatile bool * volatile tail; 148 }; 149 150 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; } 151 static inline void ^?{}( clh_lock & this ) { free(this.tail); } 152 153 static inline void lock(clh_lock & l) { 154 thread$ * curr_thd = active_thread(); 155 *(curr_thd->clh_node) = false; 156 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST); 157 while(!__atomic_load_n(prev, __ATOMIC_ACQUIRE)) Pause(); 158 curr_thd->clh_prev = prev; 159 } 160 161 static inline void unlock(clh_lock & l) { 162 thread$ * curr_thd = active_thread(); 163 __atomic_store_n(curr_thd->clh_node, true, __ATOMIC_RELEASE); 164 curr_thd->clh_node = curr_thd->clh_prev; 165 } 166 167 //----------------------------------------------------------------------------- 103 168 // Linear backoff Spinlock 104 169 struct linear_backoff_then_block_lock { … … 205 270 // Fast Block Lock 206 271 207 // High efficiencyminimal blocking lock272 // minimal blocking lock 208 273 // - No reacquire for cond var 209 274 // - No recursive acquisition 210 275 // - No ownership 211 276 struct fast_block_lock { 277 // List of blocked threads 278 dlist( thread$ ) blocked_threads; 279 212 280 // Spin lock used for mutual exclusion 213 281 __spinlock_t lock; 214 282 215 // List of blocked threads 216 dlist( thread$ ) blocked_threads; 217 283 // flag showing if lock is held 218 284 bool held:1; 285 286 #ifdef __CFA_DEBUG__ 287 // for deadlock detection 288 struct thread$ * owner; 289 #endif 219 290 }; 220 291 … … 231 302 static inline void lock(fast_block_lock & this) with(this) { 232 303 lock( lock __cfaabi_dbg_ctx2 ); 304 305 #ifdef __CFA_DEBUG__ 306 assert(!(held && owner == active_thread())); 307 #endif 233 308 if (held) { 234 309 insert_last( blocked_threads, *active_thread() ); … … 238 313 } 239 314 held = true; 315 #ifdef __CFA_DEBUG__ 316 owner = active_thread(); 317 #endif 240 318 unlock( lock ); 241 319 } … … 246 324 thread$ * t = &try_pop_front( blocked_threads ); 247 325 held = ( t ? true : false ); 326 #ifdef __CFA_DEBUG__ 327 owner = ( t ? t : 0p ); 328 #endif 248 329 unpark( t ); 249 330 unlock( lock ); … … 253 334 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 254 335 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } 336 337 //----------------------------------------------------------------------------- 338 // simple_owner_lock 339 340 // pthread owner lock 341 // - reacquire for cond var 342 // - recursive acquisition 343 // - ownership 344 struct simple_owner_lock { 345 // List of blocked threads 346 dlist( thread$ ) blocked_threads; 347 348 // Spin lock used for mutual exclusion 349 __spinlock_t lock; 350 351 // owner showing if lock is held 352 struct thread$ * owner; 353 354 size_t recursion_count; 355 }; 356 357 static inline void ?{}( simple_owner_lock & this ) with(this) { 358 lock{}; 359 blocked_threads{}; 360 owner = 0p; 361 recursion_count = 0; 362 } 363 static inline void ^?{}( simple_owner_lock & this ) {} 364 static inline void ?{}( simple_owner_lock & this, simple_owner_lock this2 ) = void; 365 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void; 366 367 static inline void lock(simple_owner_lock & this) with(this) { 368 if (owner == active_thread()) { 369 recursion_count++; 370 return; 371 } 372 lock( lock __cfaabi_dbg_ctx2 ); 373 374 if (owner != 0p) { 375 insert_last( blocked_threads, *active_thread() ); 376 unlock( lock ); 377 park( ); 378 return; 379 } 380 owner = active_thread(); 381 recursion_count = 1; 382 unlock( lock ); 383 } 384 385 // TODO: fix duplicate def issue and bring this back 386 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) { 387 // thread$ * t = &try_pop_front( blocked_threads ); 388 // owner = t; 389 // recursion_count = ( t ? 1 : 0 ); 390 // unpark( t ); 391 // } 392 393 static inline void unlock(simple_owner_lock & this) with(this) { 394 lock( lock __cfaabi_dbg_ctx2 ); 395 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); 396 /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this ); 397 // if recursion count is zero release lock and set new owner if one is waiting 398 recursion_count--; 399 if ( recursion_count == 0 ) { 400 // pop_and_set_new_owner( this ); 401 thread$ * t = &try_pop_front( blocked_threads ); 402 owner = t; 403 recursion_count = ( t ? 1 : 0 ); 404 unpark( t ); 405 } 406 unlock( lock ); 407 } 408 409 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) { 410 lock( lock __cfaabi_dbg_ctx2 ); 411 // lock held 412 if ( owner != 0p ) { 413 insert_last( blocked_threads, *t ); 414 unlock( lock ); 415 } 416 // lock not held 417 else { 418 owner = t; 419 recursion_count = 1; 420 unpark( t ); 421 unlock( lock ); 422 } 423 } 424 425 static inline size_t on_wait(simple_owner_lock & this) with(this) { 426 lock( lock __cfaabi_dbg_ctx2 ); 427 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); 428 /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this ); 429 430 size_t ret = recursion_count; 431 432 // pop_and_set_new_owner( this ); 433 434 thread$ * t = &try_pop_front( blocked_threads ); 435 owner = t; 436 recursion_count = ( t ? 1 : 0 ); 437 unpark( t ); 438 439 unlock( lock ); 440 return ret; 441 } 442 443 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 444 445 //----------------------------------------------------------------------------- 446 // Spin Queue Lock 447 448 // - No reacquire for cond var 449 // - No recursive acquisition 450 // - No ownership 451 // - spin lock with no locking/atomics in unlock 452 struct spin_queue_lock { 453 // Spin lock used for mutual exclusion 454 mcs_spin_lock lock; 455 456 // flag showing if lock is held 457 volatile bool held; 458 459 #ifdef __CFA_DEBUG__ 460 // for deadlock detection 461 struct thread$ * owner; 462 #endif 463 }; 464 465 static inline void ?{}( spin_queue_lock & this ) with(this) { 466 lock{}; 467 held = false; 468 } 469 static inline void ^?{}( spin_queue_lock & this ) {} 470 static inline void ?{}( spin_queue_lock & this, spin_queue_lock this2 ) = void; 471 static inline void ?=?( spin_queue_lock & this, spin_queue_lock this2 ) = void; 472 473 // if this is called recursively IT WILL DEADLOCK!!!!! 474 static inline void lock(spin_queue_lock & this) with(this) { 475 mcs_spin_node node; 476 #ifdef __CFA_DEBUG__ 477 assert(!(held && owner == active_thread())); 478 #endif 479 lock( lock, node ); 480 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 481 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 482 unlock( lock, node ); 483 #ifdef __CFA_DEBUG__ 484 owner = active_thread(); 485 #endif 486 } 487 488 static inline void unlock(spin_queue_lock & this) with(this) { 489 #ifdef __CFA_DEBUG__ 490 owner = 0p; 491 #endif 492 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 493 } 494 495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); } 496 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { } 498 499 500 //----------------------------------------------------------------------------- 501 // MCS Block Spin Lock 502 503 // - No reacquire for cond var 504 // - No recursive acquisition 505 // - No ownership 506 // - Blocks but first node spins (like spin queue but blocking for not first thd) 507 struct mcs_block_spin_lock { 508 // Spin lock used for mutual exclusion 509 mcs_lock lock; 510 511 // flag showing if lock is held 512 volatile bool held; 513 514 #ifdef __CFA_DEBUG__ 515 // for deadlock detection 516 struct thread$ * owner; 517 #endif 518 }; 519 520 static inline void ?{}( mcs_block_spin_lock & this ) with(this) { 521 lock{}; 522 held = false; 523 } 524 static inline void ^?{}( mcs_block_spin_lock & this ) {} 525 static inline void ?{}( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void; 526 static inline void ?=?( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void; 527 528 // if this is called recursively IT WILL DEADLOCK!!!!! 529 static inline void lock(mcs_block_spin_lock & this) with(this) { 530 mcs_node node; 531 #ifdef __CFA_DEBUG__ 532 assert(!(held && owner == active_thread())); 533 #endif 534 lock( lock, node ); 535 while(held) Pause(); 536 held = true; 537 unlock( lock, node ); 538 #ifdef __CFA_DEBUG__ 539 owner = active_thread(); 540 #endif 541 } 542 543 static inline void unlock(mcs_block_spin_lock & this) with(this) { 544 #ifdef __CFA_DEBUG__ 545 owner = 0p; 546 #endif 547 held = false; 548 } 549 550 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 551 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { } 553 554 //----------------------------------------------------------------------------- 555 // Block Spin Lock 556 557 // - No reacquire for cond var 558 // - No recursive acquisition 559 // - No ownership 560 // - Blocks but first node spins (like spin queue but blocking for not first thd) 561 struct block_spin_lock { 562 // Spin lock used for mutual exclusion 563 fast_block_lock lock; 564 565 // flag showing if lock is held 566 volatile bool held; 567 568 #ifdef __CFA_DEBUG__ 569 // for deadlock detection 570 struct thread$ * owner; 571 #endif 572 }; 573 574 static inline void ?{}( block_spin_lock & this ) with(this) { 575 lock{}; 576 held = false; 577 } 578 static inline void ^?{}( block_spin_lock & this ) {} 579 static inline void ?{}( block_spin_lock & this, block_spin_lock this2 ) = void; 580 static inline void ?=?( block_spin_lock & this, block_spin_lock this2 ) = void; 581 582 // if this is called recursively IT WILL DEADLOCK!!!!! 583 static inline void lock(block_spin_lock & this) with(this) { 584 #ifdef __CFA_DEBUG__ 585 assert(!(held && owner == active_thread())); 586 #endif 587 lock( lock ); 588 while(held) Pause(); 589 held = true; 590 unlock( lock ); 591 #ifdef __CFA_DEBUG__ 592 owner = active_thread(); 593 #endif 594 } 595 596 static inline void unlock(block_spin_lock & this) with(this) { 597 #ifdef __CFA_DEBUG__ 598 owner = 0p; 599 #endif 600 held = false; 601 } 602 603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); } 604 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { } 255 606 256 607 //----------------------------------------------------------------------------- … … 332 683 // - signalling without holding branded lock is UNSAFE! 333 684 // - only allows usage of one lock, cond var is branded after usage 685 334 686 struct fast_cond_var { 335 687 // List of blocked threads 336 688 dlist( info_thread(L) ) blocked_threads; 337 338 689 #ifdef __CFA_DEBUG__ 339 690 L * lock_used; … … 341 692 }; 342 693 343 344 694 void ?{}( fast_cond_var(L) & this ); 345 695 void ^?{}( fast_cond_var(L) & this ); … … 349 699 350 700 uintptr_t front( fast_cond_var(L) & this ); 351 352 701 bool empty ( fast_cond_var(L) & this ); 353 702 354 703 void wait( fast_cond_var(L) & this, L & l ); 355 704 void wait( fast_cond_var(L) & this, L & l, uintptr_t info ); 356 } 705 706 707 //----------------------------------------------------------------------------- 708 // pthread_cond_var 709 // 710 // - cond var with minimal footprint 711 // - supports operations needed for phthread cond 712 713 struct pthread_cond_var { 714 dlist( info_thread(L) ) blocked_threads; 715 __spinlock_t lock; 716 }; 717 718 void ?{}( pthread_cond_var(L) & this ); 719 void ^?{}( pthread_cond_var(L) & this ); 720 721 bool notify_one( pthread_cond_var(L) & this ); 722 bool notify_all( pthread_cond_var(L) & this ); 723 724 uintptr_t front( pthread_cond_var(L) & this ); 725 bool empty ( pthread_cond_var(L) & this ); 726 727 void wait( pthread_cond_var(L) & this, L & l ); 728 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ); 729 bool wait( pthread_cond_var(L) & this, L & l, timespec t ); 730 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ); 731 } -
libcfa/src/concurrency/thread.cfa
r1df492a reb5962a 53 53 #endif 54 54 55 seqable.next = 0p;56 seqable.back = 0p;57 58 55 node.next = 0p; 59 56 node.prev = 0p; 57 58 clh_node = malloc( ); 59 *clh_node = false; 60 60 61 doregister(curr_cluster, this); 61 62 62 monitors{ &self_mon_p, 1, (fptr_t)0 }; 63 63 } … … 67 67 canary = 0xDEADDEADDEADDEADp; 68 68 #endif 69 free(clh_node); 69 70 unregister(curr_cluster, this); 70 71 ^self_cor{}; -
libcfa/src/containers/queueLockFree.hfa
r1df492a reb5962a 2 2 3 3 #include <assert.h> 4 5 #include <bits/defs.hfa> 4 6 5 7 forall( T &) { -
libcfa/src/startup.cfa
r1df492a reb5962a 63 63 64 64 struct __spinlock_t; 65 extern "C" {66 void __cfaabi_dbg_record_lock(struct __spinlock_t & this, const char prev_name[]) __attribute__(( weak )) libcfa_public {}67 }68 65 69 66 // Local Variables: //
Note:
See TracChangeset
for help on using the changeset viewer.