Changeset d95969a for libcfa/src/concurrency
- Timestamp:
- Jan 25, 2021, 3:45:42 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- c292244
- Parents:
- b6a8b31 (diff), 7158202 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 20 edited
-
coroutine.cfa (modified) (4 diffs)
-
coroutine.hfa (modified) (5 diffs)
-
future.hfa (modified) (2 diffs)
-
io.cfa (modified) (8 diffs)
-
io/setup.cfa (modified) (8 diffs)
-
io/types.hfa (modified) (3 diffs)
-
kernel.cfa (modified) (6 diffs)
-
kernel.hfa (modified) (3 diffs)
-
kernel/fwd.hfa (modified) (2 diffs)
-
kernel/startup.cfa (modified) (3 diffs)
-
locks.cfa (modified) (6 diffs)
-
locks.hfa (modified) (4 diffs)
-
monitor.cfa (modified) (2 diffs)
-
monitor.hfa (modified) (2 diffs)
-
mutex.cfa (modified) (3 diffs)
-
mutex.hfa (modified) (3 diffs)
-
preemption.cfa (modified) (1 diff)
-
stats.hfa (modified) (2 diffs)
-
thread.cfa (modified) (8 diffs)
-
thread.hfa (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/coroutine.cfa
rb6a8b31 rd95969a 46 46 47 47 //----------------------------------------------------------------------------- 48 FORALL_DATA_INSTANCE(CoroutineCancelled, ( dtype coroutine_t), (coroutine_t))49 50 forall( dtype T)48 FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t)) 49 50 forall(T &) 51 51 void mark_exception(CoroutineCancelled(T) *) {} 52 52 53 forall( dtype T)53 forall(T &) 54 54 void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src) { 55 55 dst->virtual_table = src->virtual_table; … … 58 58 } 59 59 60 forall( dtype T)60 forall(T &) 61 61 const char * msg(CoroutineCancelled(T) *) { 62 62 return "CoroutineCancelled(...)"; … … 64 64 65 65 // This code should not be inlined. It is the error path on resume. 66 forall( dtype T| is_coroutine(T))66 forall(T & | is_coroutine(T)) 67 67 void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ) { 68 68 verify( desc->cancellation ); … … 148 148 // Part of the Public API 149 149 // Not inline since only ever called once per coroutine 150 forall( dtype T| is_coroutine(T))150 forall(T & | is_coroutine(T)) 151 151 void prime(T& cor) { 152 152 $coroutine* this = get_coroutine(cor); -
libcfa/src/concurrency/coroutine.hfa
rb6a8b31 rd95969a 22 22 //----------------------------------------------------------------------------- 23 23 // Exception thrown from resume when a coroutine stack is cancelled. 24 FORALL_DATA_EXCEPTION(CoroutineCancelled, ( dtype coroutine_t), (coroutine_t)) (24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) ( 25 25 coroutine_t * the_coroutine; 26 26 exception_t * the_exception; 27 27 ); 28 28 29 forall( dtype T)29 forall(T &) 30 30 void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src); 31 31 32 forall( dtype T)32 forall(T &) 33 33 const char * msg(CoroutineCancelled(T) *); 34 34 … … 37 37 // Anything that implements this trait can be resumed. 38 38 // Anything that is resumed is a coroutine. 39 trait is_coroutine( dtype T| IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {39 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) { 40 40 void main(T & this); 41 41 $coroutine * get_coroutine(T & this); … … 60 60 //----------------------------------------------------------------------------- 61 61 // Public coroutine API 62 forall( dtype T| is_coroutine(T))62 forall(T & | is_coroutine(T)) 63 63 void prime(T & cor); 64 64 … … 72 72 void __cfactx_invoke_coroutine(void (*main)(void *), void * this); 73 73 74 forall( dtype T)74 forall(T &) 75 75 void __cfactx_start(void (*main)(T &), struct $coroutine * cor, T & this, void (*invoke)(void (*main)(void *), void *)); 76 76 … … 129 129 } 130 130 131 forall( dtype T| is_coroutine(T))131 forall(T & | is_coroutine(T)) 132 132 void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ); 133 133 134 134 // Resume implementation inlined for performance 135 forall( dtype T| is_coroutine(T))135 forall(T & | is_coroutine(T)) 136 136 static inline T & resume(T & cor) { 137 137 // optimization : read TLS once and reuse it -
libcfa/src/concurrency/future.hfa
rb6a8b31 rd95969a 19 19 #include "monitor.hfa" 20 20 21 forall( otypeT ) {21 forall( T ) { 22 22 struct future { 23 23 inline future_t; … … 58 58 } 59 59 60 forall( otypeT ) {60 forall( T ) { 61 61 monitor multi_future { 62 62 inline future_t; -
libcfa/src/concurrency/io.cfa
rb6a8b31 rd95969a 41 41 #include "io/types.hfa" 42 42 43 static const char * opcodes[] = {43 __attribute__((unused)) static const char * opcodes[] = { 44 44 "OP_NOP", 45 45 "OP_READV", … … 173 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 174 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret); 176 175 177 if( ret < 0 ) { 176 178 switch((int)errno) { 177 179 case EAGAIN: 178 180 case EINTR: 181 case EBUSY: 179 182 ret = -1; 180 183 break; … … 318 321 319 322 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 323 324 __ioctx_unregister( this ); 320 325 } 321 326 … … 389 394 390 395 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" );393 396 394 397 yield(); … … 465 468 sqe->flags, 466 469 sqe->ioprio, 467 sqe->off,468 sqe->addr,470 (void*)sqe->off, 471 (void*)sqe->addr, 469 472 sqe->len, 470 473 sqe->accept_flags, … … 491 494 } 492 495 else if( ring.eager_submits ) { 493 __ u32 picked = __submit_to_ready_array( ring, idx, mask );496 __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask ); 494 497 495 498 #if defined(LEADER_LOCK) … … 629 632 sqe->flags, 630 633 sqe->ioprio, 631 sqe->off,632 sqe->addr,634 (void*)sqe->off, 635 (void*)sqe->addr, 633 636 sqe->len, 634 637 sqe->accept_flags, … … 642 645 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 643 646 // Release the consumed SQEs 647 644 648 __release_consumed_submission( ring ); 645 649 // ring.submit_q.sqes[idx].user_data = 3ul64; -
libcfa/src/concurrency/io/setup.cfa
rb6a8b31 rd95969a 42 42 void ^?{}(io_context & this, bool cluster_context) {} 43 43 44 void register_fixed_files( io_context &, int *, unsigned ) {} 45 void register_fixed_files( cluster &, int *, unsigned ) {} 46 44 47 #else 45 48 #include <errno.h> … … 110 113 111 114 static struct { 112 pthread_t thrd; // pthread handle to io poller thread 113 void * stack; // pthread stack for io poller thread 114 int epollfd; // file descriptor to the epoll instance 115 volatile bool run; // Whether or not to continue 115 pthread_t thrd; // pthread handle to io poller thread 116 void * stack; // pthread stack for io poller thread 117 int epollfd; // file descriptor to the epoll instance 118 volatile bool run; // Whether or not to continue 119 volatile bool stopped; // Whether the poller has finished running 120 volatile uint64_t epoch; // Epoch used for memory reclamation 116 121 } iopoll; 117 122 … … 126 131 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" ); 127 132 128 iopoll.run = true; 129 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p ); 133 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p ); 134 iopoll.run = true; 135 iopoll.stopped = false; 136 iopoll.epoch = 0; 130 137 } 131 138 … … 171 178 while( iopoll.run ) { 172 179 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 180 181 // increment the epoch to notify any deleters we are starting a new cycle 182 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST); 173 183 174 184 // Wait for events … … 197 207 } 198 208 } 209 210 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST); 199 211 200 212 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" ); … … 493 505 // I/O Context Sleep 494 506 //============================================================================================= 495 #define IOEVENTS EPOLLIN | EPOLLONESHOT496 497 507 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 498 508 struct epoll_event ev; 499 ev.events = IOEVENTS;509 ev.events = EPOLLIN | EPOLLONESHOT; 500 510 ev.data.u64 = (__u64)&ctx; 501 511 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); … … 514 524 } 515 525 526 void __ioctx_unregister($io_ctx_thread & ctx) { 527 // Read the current epoch so we know when to stop 528 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 529 530 // Remove the fd from the iopoller 531 __ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 532 533 // Notify the io poller thread of the shutdown 534 iopoll.run = false; 535 sigval val = { 1 }; 536 pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 537 538 // Make sure all this is done 539 __atomic_thread_fence(__ATOMIC_SEQ_CST); 540 541 // Wait for the next epoch 542 while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 543 } 544 516 545 //============================================================================================= 517 546 // I/O Context Misc Setup … … 520 549 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 521 550 if( ret < 0 ) { 522 abort( "KERNEL ERROR: IO_URING SYSCALL- (%d) %s\n", (int)errno, strerror(errno) );551 abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) ); 523 552 } 524 553 -
libcfa/src/concurrency/io/types.hfa
rb6a8b31 rd95969a 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa -- 7 // io/types.hfa -- PRIVATE 8 // Types used by the I/O subsystem 8 9 // 9 10 // Author : Thierry Delisle … … 21 22 22 23 #include "bits/locks.hfa" 24 #include "kernel/fwd.hfa" 23 25 24 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) … … 133 135 struct $io_ctx_thread; 134 136 void __ioctx_register($io_ctx_thread & ctx); 137 void __ioctx_unregister($io_ctx_thread & ctx); 135 138 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 139 void __sqe_clean( volatile struct io_uring_sqe * sqe ); -
libcfa/src/concurrency/kernel.cfa
rb6a8b31 rd95969a 140 140 preemption_scope scope = { this }; 141 141 142 #if !defined(__CFA_NO_STATISTICS__) 143 unsigned long long last_tally = rdtscl(); 144 #endif 145 146 142 147 __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this); 143 148 … … 206 211 // Are we done? 207 212 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 213 214 #if !defined(__CFA_NO_STATISTICS__) 215 unsigned long long curr = rdtscl(); 216 if(curr > (last_tally + 500000000)) { 217 __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats); 218 last_tally = curr; 219 } 220 #endif 208 221 } 209 222 … … 211 224 } 212 225 213 V( this->terminated );226 post( this->terminated ); 214 227 215 228 if(this == mainProcessor) { … … 611 624 // Unexpected Terminating logic 612 625 //============================================================================================= 613 static __spinlock_t kernel_abort_lock; 614 static bool kernel_abort_called = false; 615 616 void * kernel_abort(void) __attribute__ ((__nothrow__)) { 617 // abort cannot be recursively entered by the same or different processors because all signal handlers return when 618 // the globalAbort flag is true. 619 lock( kernel_abort_lock __cfaabi_dbg_ctx2 ); 620 621 // disable interrupts, it no longer makes sense to try to interrupt this processor 622 disable_interrupts(); 623 624 // first task to abort ? 625 if ( kernel_abort_called ) { // not first task to abort ? 626 unlock( kernel_abort_lock ); 627 628 sigset_t mask; 629 sigemptyset( &mask ); 630 sigaddset( &mask, SIGALRM ); // block SIGALRM signals 631 sigaddset( &mask, SIGUSR1 ); // block SIGALRM signals 632 sigsuspend( &mask ); // block the processor to prevent further damage during abort 633 _exit( EXIT_FAILURE ); // if processor unblocks before it is killed, terminate it 634 } 635 else { 636 kernel_abort_called = true; 637 unlock( kernel_abort_lock ); 638 } 639 640 return __cfaabi_tls.this_thread; 641 } 642 643 void kernel_abort_msg( void * kernel_data, char * abort_text, int abort_text_size ) { 644 $thread * thrd = ( $thread * ) kernel_data; 626 void __kernel_abort_msg( char * abort_text, int abort_text_size ) { 627 $thread * thrd = __cfaabi_tls.this_thread; 645 628 646 629 if(thrd) { … … 662 645 } 663 646 664 int kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {665 return get_coroutine( kernelTLS().this_thread) == get_coroutine(mainThread) ? 4 : 2;647 int __kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) { 648 return get_coroutine(__cfaabi_tls.this_thread) == get_coroutine(mainThread) ? 4 : 2; 666 649 } 667 650 … … 681 664 // Kernel Utilities 682 665 //============================================================================================= 683 //-----------------------------------------------------------------------------684 // Locks685 void ?{}( semaphore & this, int count = 1 ) {686 (this.lock){};687 this.count = count;688 (this.waiting){};689 }690 void ^?{}(semaphore & this) {}691 692 bool P(semaphore & this) with( this ){693 lock( lock __cfaabi_dbg_ctx2 );694 count -= 1;695 if ( count < 0 ) {696 // queue current task697 append( waiting, active_thread() );698 699 // atomically release spin lock and block700 unlock( lock );701 park();702 return true;703 }704 else {705 unlock( lock );706 return false;707 }708 }709 710 bool V(semaphore & this) with( this ) {711 $thread * thrd = 0p;712 lock( lock __cfaabi_dbg_ctx2 );713 count += 1;714 if ( count <= 0 ) {715 // remove task at head of waiting list716 thrd = pop_head( waiting );717 }718 719 unlock( lock );720 721 // make new owner722 unpark( thrd );723 724 return thrd != 0p;725 }726 727 bool V(semaphore & this, unsigned diff) with( this ) {728 $thread * thrd = 0p;729 lock( lock __cfaabi_dbg_ctx2 );730 int release = max(-count, (int)diff);731 count += diff;732 for(release) {733 unpark( pop_head( waiting ) );734 }735 736 unlock( lock );737 738 return thrd != 0p;739 }740 741 666 //----------------------------------------------------------------------------- 742 667 // Debug -
libcfa/src/concurrency/kernel.hfa
rb6a8b31 rd95969a 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel -- 7 // kernel -- Header containing the core of the kernel API 8 8 // 9 9 // Author : Thierry Delisle … … 24 24 extern "C" { 25 25 #include <bits/pthreadtypes.h> 26 #include <pthread.h> 26 27 #include <linux/types.h> 27 28 } 28 29 29 30 //----------------------------------------------------------------------------- 30 // Locks 31 struct semaphore { 32 __spinlock_t lock; 33 int count; 34 __queue_t($thread) waiting; 35 }; 36 37 void ?{}(semaphore & this, int count = 1); 38 void ^?{}(semaphore & this); 39 bool P (semaphore & this); 40 bool V (semaphore & this); 41 bool V (semaphore & this, unsigned count); 31 // Underlying Locks 32 #ifdef __CFA_WITH_VERIFY__ 33 extern bool __cfaabi_dbg_in_kernel(); 34 #endif 35 36 extern "C" { 37 char * strerror(int); 38 } 39 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); } 40 41 struct __bin_sem_t { 42 pthread_mutex_t lock; 43 pthread_cond_t cond; 44 int val; 45 }; 46 47 static inline void ?{}(__bin_sem_t & this) with( this ) { 48 // Create the mutex with error checking 49 pthread_mutexattr_t mattr; 50 pthread_mutexattr_init( &mattr ); 51 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP); 52 pthread_mutex_init(&lock, &mattr); 53 54 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required 55 val = 0; 56 } 57 58 static inline void ^?{}(__bin_sem_t & this) with( this ) { 59 CHECKED( pthread_mutex_destroy(&lock) ); 60 CHECKED( pthread_cond_destroy (&cond) ); 61 } 62 63 static inline void wait(__bin_sem_t & this) with( this ) { 64 verify(__cfaabi_dbg_in_kernel()); 65 CHECKED( pthread_mutex_lock(&lock) ); 66 while(val < 1) { 67 pthread_cond_wait(&cond, &lock); 68 } 69 val -= 1; 70 CHECKED( pthread_mutex_unlock(&lock) ); 71 } 72 73 static inline bool post(__bin_sem_t & this) with( this ) { 74 bool needs_signal = false; 75 76 CHECKED( pthread_mutex_lock(&lock) ); 77 if(val < 1) { 78 val += 1; 79 pthread_cond_signal(&cond); 80 needs_signal = true; 81 } 82 CHECKED( pthread_mutex_unlock(&lock) ); 83 84 return needs_signal; 85 } 86 87 #undef CHECKED 42 88 43 89 … … 91 137 92 138 // Termination synchronisation (user semaphore) 93 semaphoreterminated;139 oneshot terminated; 94 140 95 141 // pthread Stack -
libcfa/src/concurrency/kernel/fwd.hfa
rb6a8b31 rd95969a 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel/fwd.hfa -- 7 // kernel/fwd.hfa -- PUBLIC 8 // Fundamental code needed to implement threading M.E.S. algorithms. 8 9 // 9 10 // Author : Thierry Delisle … … 134 135 extern uint64_t thread_rand(); 135 136 137 // Semaphore which only supports a single thread 138 struct single_sem { 139 struct $thread * volatile ptr; 140 }; 141 142 static inline { 143 void ?{}(single_sem & this) { 144 this.ptr = 0p; 145 } 146 147 void ^?{}(single_sem &) {} 148 149 bool wait(single_sem & this) { 150 for() { 151 struct $thread * expected = this.ptr; 152 if(expected == 1p) { 153 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 154 return false; 155 } 156 } 157 else { 158 /* paranoid */ verify( expected == 0p ); 159 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 160 park(); 161 return true; 162 } 163 } 164 165 } 166 } 167 168 bool post(single_sem & this) { 169 for() { 170 struct $thread * expected = this.ptr; 171 if(expected == 1p) return false; 172 if(expected == 0p) { 173 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 174 return false; 175 } 176 } 177 else { 178 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 179 unpark( expected ); 180 return true; 181 } 182 } 183 } 184 } 185 } 186 187 // Synchronozation primitive which only supports a single thread and one post 188 // Similar to a binary semaphore with a 'one shot' semantic 189 // is expected to be discarded after each party call their side 190 struct oneshot { 191 // Internal state : 192 // 0p : is initial state (wait will block) 193 // 1p : fulfilled (wait won't block) 194 // any thread : a thread is currently waiting 195 struct $thread * volatile ptr; 196 }; 197 198 static inline { 199 void ?{}(oneshot & this) { 200 this.ptr = 0p; 201 } 202 203 void ^?{}(oneshot &) {} 204 205 // Wait for the post, return immidiately if it already happened. 206 // return true if the thread was parked 207 bool wait(oneshot & this) { 208 for() { 209 struct $thread * expected = this.ptr; 210 if(expected == 1p) return false; 211 /* paranoid */ verify( expected == 0p ); 212 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 213 park(); 214 /* paranoid */ verify( this.ptr == 1p ); 215 return true; 216 } 217 } 218 } 219 220 // Mark as fulfilled, wake thread if needed 221 // return true if a thread was unparked 222 bool post(oneshot & this) { 223 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 224 if( got == 0p ) return false; 225 unpark( got ); 226 return true; 227 } 228 } 229 230 // base types for future to build upon 231 // It is based on the 'oneshot' type to allow multiple futures 232 // to block on the same instance, permitting users to block a single 233 // thread on "any of" [a given set of] futures. 234 // does not support multiple threads waiting on the same future 235 struct future_t { 236 // Internal state : 237 // 0p : is initial state (wait will block) 238 // 1p : fulfilled (wait won't block) 239 // 2p : in progress () 240 // 3p : abandoned, server should delete 241 // any oneshot : a context has been setup to wait, a thread could wait on it 242 struct oneshot * volatile ptr; 243 }; 244 245 static inline { 246 void ?{}(future_t & this) { 247 this.ptr = 0p; 248 } 249 250 void ^?{}(future_t &) {} 251 252 void reset(future_t & this) { 253 // needs to be in 0p or 1p 254 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 255 } 256 257 // check if the future is available 258 bool available( future_t & this ) { 259 return this.ptr == 1p; 260 } 261 262 // Prepare the future to be waited on 263 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 264 bool setup( future_t & this, oneshot & wait_ctx ) { 265 /* paranoid */ verify( wait_ctx.ptr == 0p ); 266 // The future needs to set the wait context 267 for() { 268 struct oneshot * expected = this.ptr; 269 // Is the future already fulfilled? 270 if(expected == 1p) return false; // Yes, just return false (didn't block) 271 272 // The future is not fulfilled, try to setup the wait context 273 /* paranoid */ verify( expected == 0p ); 274 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 275 return true; 276 } 277 } 278 } 279 280 // Stop waiting on a future 281 // When multiple futures are waited for together in "any of" pattern 282 // futures that weren't fulfilled before the thread woke up 283 // should retract the wait ctx 284 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 285 void retract( future_t & this, oneshot & wait_ctx ) { 286 // Remove the wait context 287 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 288 289 // got == 0p: future was never actually setup, just return 290 if( got == 0p ) return; 291 292 // got == wait_ctx: since fulfil does an atomic_swap, 293 // if we got back the original then no one else saw context 294 // It is safe to delete (which could happen after the return) 295 if( got == &wait_ctx ) return; 296 297 // got == 1p: the future is ready and the context was fully consumed 298 // the server won't use the pointer again 299 // It is safe to delete (which could happen after the return) 300 if( got == 1p ) return; 301 302 // got == 2p: the future is ready but the context hasn't fully been consumed 303 // spin until it is safe to move on 304 if( got == 2p ) { 305 while( this.ptr != 1p ) Pause(); 306 return; 307 } 308 309 // got == any thing else, something wen't wrong here, abort 310 abort("Future in unexpected state"); 311 } 312 313 // Mark the future as abandoned, meaning it will be deleted by the server 314 bool abandon( future_t & this ) { 315 /* paranoid */ verify( this.ptr != 3p ); 316 317 // Mark the future as abandonned 318 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST); 319 320 // If the future isn't already fulfilled, let the server delete it 321 if( got == 0p ) return false; 322 323 // got == 2p: the future is ready but the context hasn't fully been consumed 324 // spin until it is safe to move on 325 if( got == 2p ) { 326 while( this.ptr != 1p ) Pause(); 327 got = 1p; 328 } 329 330 // The future is completed delete it now 331 /* paranoid */ verify( this.ptr != 1p ); 332 free( &this ); 333 return true; 334 } 335 336 // from the server side, mark the future as fulfilled 337 // delete it if needed 338 bool fulfil( future_t & this ) { 339 for() { 340 struct oneshot * expected = this.ptr; 341 // was this abandoned? 342 #if defined(__GNUC__) && __GNUC__ >= 7 343 #pragma GCC diagnostic push 344 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 345 #endif 346 if( expected == 3p ) { free( &this ); return false; } 347 #if defined(__GNUC__) && __GNUC__ >= 7 348 #pragma GCC diagnostic pop 349 #endif 350 351 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen 352 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case. 353 354 // If there is a wait context, we need to consume it and mark it as consumed after 355 // If there is no context then we can skip the in progress phase 356 struct oneshot * want = expected == 0p ? 1p : 2p; 357 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 358 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; } 359 bool ret = post( *expected ); 360 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 361 return ret; 362 } 363 } 364 365 } 366 367 // Wait for the future to be fulfilled 368 bool wait( future_t & this ) { 369 oneshot temp; 370 if( !setup(this, temp) ) return false; 371 372 // Wait context is setup, just wait on it 373 bool ret = wait( temp ); 374 375 // Wait for the future to tru 376 while( this.ptr == 2p ) Pause(); 377 // Make sure the state makes sense 378 // Should be fulfilled, could be in progress but it's out of date if so 379 // since if that is the case, the oneshot was fulfilled (unparking this thread) 380 // and the oneshot should not be needed any more 381 __attribute__((unused)) struct oneshot * was = this.ptr; 382 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was ); 383 384 // Mark the future as fulfilled, to be consistent 385 // with potential calls to avail 386 // this.ptr = 1p; 387 return ret; 388 } 389 } 390 136 391 //----------------------------------------------------------------------- 137 392 // Statics call at the end of each thread to register statistics -
libcfa/src/concurrency/kernel/startup.cfa
rb6a8b31 rd95969a 199 199 void ?{}(processor & this) with( this ) { 200 200 ( this.idle ){}; 201 ( this.terminated ){ 0};201 ( this.terminated ){}; 202 202 ( this.runner ){}; 203 203 init( this, "Main Processor", *mainCluster ); … … 528 528 void ?{}(processor & this, const char name[], cluster & _cltr) { 529 529 ( this.idle ){}; 530 ( this.terminated ){ 0};530 ( this.terminated ){}; 531 531 ( this.runner ){}; 532 532 … … 549 549 __wake_proc( &this ); 550 550 551 P( terminated );551 wait( terminated ); 552 552 /* paranoid */ verify( active_processor() != &this); 553 553 } -
libcfa/src/concurrency/locks.cfa
rb6a8b31 rd95969a 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // locks.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2021 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 17 #define __cforall_thread__ 18 1 19 #include "locks.hfa" 2 20 #include "kernel_private.hfa" … … 7 25 //----------------------------------------------------------------------------- 8 26 // info_thread 9 forall( dtype L| is_blocking_lock(L)) {27 forall(L & | is_blocking_lock(L)) { 10 28 struct info_thread { 11 29 // used to put info_thread on a dl queue (aka sequence) … … 56 74 57 75 void ^?{}( blocking_lock & this ) {} 58 void ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };} 59 void ^?{}( single_acquisition_lock & this ) {} 60 void ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };} 61 void ^?{}( owner_lock & this ) {} 62 void ?{}( multiple_acquisition_lock & this ) {((blocking_lock &)this){ true, false };} 63 void ^?{}( multiple_acquisition_lock & this ) {} 76 64 77 65 78 void lock( blocking_lock & this ) with( this ) { … … 170 183 171 184 //----------------------------------------------------------------------------- 172 // Overloaded routines for traits173 // These routines are temporary until an inheritance bug is fixed174 void lock ( single_acquisition_lock & this ) { lock ( (blocking_lock &)this ); }175 void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }176 void on_wait ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }177 void on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }178 void set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }179 size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }180 181 void lock ( owner_lock & this ) { lock ( (blocking_lock &)this ); }182 void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }183 void on_wait ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }184 void on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }185 void set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }186 size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }187 188 void lock ( multiple_acquisition_lock & this ) { lock ( (blocking_lock &)this ); }189 void unlock ( multiple_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }190 void on_wait ( multiple_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }191 void on_notify( multiple_acquisition_lock & this, struct $thread * t ){ on_notify( (blocking_lock &)this, t ); }192 void set_recursion_count( multiple_acquisition_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); }193 size_t get_recursion_count( multiple_acquisition_lock & this ){ return get_recursion_count( (blocking_lock &)this ); }194 195 //-----------------------------------------------------------------------------196 185 // alarm node wrapper 197 forall( dtype L| is_blocking_lock(L)) {186 forall(L & | is_blocking_lock(L)) { 198 187 struct alarm_node_wrap { 199 188 alarm_node_t alarm_node; … … 239 228 //----------------------------------------------------------------------------- 240 229 // condition variable 241 forall( dtype L| is_blocking_lock(L)) {230 forall(L & | is_blocking_lock(L)) { 242 231 243 232 void ?{}( condition_variable(L) & this ){ … … 356 345 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ) with(this) { WAIT_TIME( info, &l , time ) } 357 346 } 347 348 //----------------------------------------------------------------------------- 349 // Semaphore 350 void ?{}( semaphore & this, int count = 1 ) { 351 (this.lock){}; 352 this.count = count; 353 (this.waiting){}; 354 } 355 void ^?{}(semaphore & this) {} 356 357 bool P(semaphore & this) with( this ){ 358 lock( lock __cfaabi_dbg_ctx2 ); 359 count -= 1; 360 if ( count < 0 ) { 361 // queue current task 362 append( waiting, active_thread() ); 363 364 // atomically release spin lock and block 365 unlock( lock ); 366 park(); 367 return true; 368 } 369 else { 370 unlock( lock ); 371 return false; 372 } 373 } 374 375 bool V(semaphore & this) with( this ) { 376 $thread * thrd = 0p; 377 lock( lock __cfaabi_dbg_ctx2 ); 378 count += 1; 379 if ( count <= 0 ) { 380 // remove task at head of waiting list 381 thrd = pop_head( waiting ); 382 } 383 384 unlock( lock ); 385 386 // make new owner 387 unpark( thrd ); 388 389 return thrd != 0p; 390 } 391 392 bool V(semaphore & this, unsigned diff) with( this ) { 393 $thread * thrd = 0p; 394 lock( lock __cfaabi_dbg_ctx2 ); 395 int release = max(-count, (int)diff); 396 count += diff; 397 for(release) { 398 unpark( pop_head( waiting ) ); 399 } 400 401 unlock( lock ); 402 403 return thrd != 0p; 404 } -
libcfa/src/concurrency/locks.hfa
rb6a8b31 rd95969a 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // locks.hfa -- PUBLIC 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2021 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include <stdbool.h> 4 20 5 #include "bits/locks.hfa" 6 #include "bits/sequence.hfa" 7 8 #include "invoke.h" 21 #include "bits/weakso_locks.hfa" 9 22 10 23 #include "time_t.hfa" 11 24 #include "time.hfa" 12 25 26 //---------- 27 struct single_acquisition_lock { 28 inline blocking_lock; 29 }; 30 31 static inline void ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };} 32 static inline void ^?{}( single_acquisition_lock & this ) {} 33 static inline void lock ( single_acquisition_lock & this ) { lock ( (blocking_lock &)this ); } 34 static inline void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 35 static inline void on_wait ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); } 36 static inline void on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); } 37 static inline void set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); } 38 static inline size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); } 39 40 //---------- 41 struct owner_lock { 42 inline blocking_lock; 43 }; 44 45 static inline void ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };} 46 static inline void ^?{}( owner_lock & this ) {} 47 static inline void lock ( owner_lock & this ) { lock ( (blocking_lock &)this ); } 48 static inline void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); } 49 static inline void on_wait ( owner_lock & this ) { on_wait( (blocking_lock &)this ); } 50 static inline void on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); } 51 static inline void set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); } 52 static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); } 53 13 54 //----------------------------------------------------------------------------- 14 55 // is_blocking_lock 15 trait is_blocking_lock( dtype L| sized(L)) {56 trait is_blocking_lock(L & | sized(L)) { 16 57 // For synchronization locks to use when acquiring 17 58 void on_notify( L &, struct $thread * ); … … 31 72 // the info thread is a wrapper around a thread used 32 73 // to store extra data for use in the condition variable 33 forall( dtype L| is_blocking_lock(L)) {74 forall(L & | is_blocking_lock(L)) { 34 75 struct info_thread; 35 76 … … 40 81 41 82 //----------------------------------------------------------------------------- 42 // Blocking Locks43 struct blocking_lock {44 // Spin lock used for mutual exclusion45 __spinlock_t lock;46 47 // List of blocked threads48 Sequence( $thread ) blocked_threads;49 50 // Count of current blocked threads51 size_t wait_count;52 53 // Flag if the lock allows multiple acquisition54 bool multi_acquisition;55 56 // Flag if lock can be released by non owner57 bool strict_owner;58 59 // Current thread owning the lock60 struct $thread * owner;61 62 // Number of recursion level63 size_t recursion_count;64 };65 66 struct single_acquisition_lock {67 inline blocking_lock;68 };69 70 struct owner_lock {71 inline blocking_lock;72 };73 74 struct multiple_acquisition_lock {75 inline blocking_lock;76 };77 78 void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner );79 void ^?{}( blocking_lock & this );80 81 void ?{}( single_acquisition_lock & this );82 void ^?{}( single_acquisition_lock & this );83 84 void ?{}( owner_lock & this );85 void ^?{}( owner_lock & this );86 87 void ?{}( multiple_acquisition_lock & this );88 void ^?{}( multiple_acquisition_lock & this );89 90 void lock( blocking_lock & this );91 bool try_lock( blocking_lock & this );92 void unlock( blocking_lock & this );93 void on_notify( blocking_lock & this, struct $thread * t );94 void on_wait( blocking_lock & this );95 size_t wait_count( blocking_lock & this );96 void set_recursion_count( blocking_lock & this, size_t recursion );97 size_t get_recursion_count( blocking_lock & this );98 99 void lock( single_acquisition_lock & this );100 void unlock( single_acquisition_lock & this );101 void on_notify( single_acquisition_lock & this, struct $thread * t );102 void on_wait( single_acquisition_lock & this );103 void set_recursion_count( single_acquisition_lock & this, size_t recursion );104 size_t get_recursion_count( single_acquisition_lock & this );105 106 void lock( owner_lock & this );107 void unlock( owner_lock & this );108 void on_notify( owner_lock & this, struct $thread * t );109 void on_wait( owner_lock & this );110 void set_recursion_count( owner_lock & this, size_t recursion );111 size_t get_recursion_count( owner_lock & this );112 113 void lock( multiple_acquisition_lock & this );114 void unlock( multiple_acquisition_lock & this );115 void on_notify( multiple_acquisition_lock & this, struct $thread * t );116 void on_wait( multiple_acquisition_lock & this );117 void set_recursion_count( multiple_acquisition_lock & this, size_t recursion );118 size_t get_recursion_count( multiple_acquisition_lock & this );119 120 //-----------------------------------------------------------------------------121 83 // Synchronization Locks 122 forall( dtype L| is_blocking_lock(L)) {84 forall(L & | is_blocking_lock(L)) { 123 85 struct condition_variable { 124 86 // Spin lock used for mutual exclusion … … 157 119 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ); 158 120 } 121 122 //----------------------------------------------------------------------------- 123 // Semaphore 124 struct semaphore { 125 __spinlock_t lock; 126 int count; 127 __queue_t($thread) waiting; 128 }; 129 130 void ?{}(semaphore & this, int count = 1); 131 void ^?{}(semaphore & this); 132 bool P (semaphore & this); 133 bool V (semaphore & this); 134 bool V (semaphore & this, unsigned count); -
libcfa/src/concurrency/monitor.cfa
rb6a8b31 rd95969a 50 50 static inline [$thread *, int] search_entry_queue( const __waitfor_mask_t &, $monitor * monitors [], __lock_size_t count ); 51 51 52 forall( dtype T| sized( T ))52 forall(T & | sized( T )) 53 53 static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ); 54 54 static inline __lock_size_t count_max ( const __waitfor_mask_t & mask ); … … 949 949 } 950 950 951 forall( dtype T| sized( T ))951 forall(T & | sized( T )) 952 952 static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) { 953 953 if( !val ) return size; -
libcfa/src/concurrency/monitor.hfa
rb6a8b31 rd95969a 22 22 #include "stdlib.hfa" 23 23 24 trait is_monitor( dtype T) {24 trait is_monitor(T &) { 25 25 $monitor * get_monitor( T & ); 26 26 void ^?{}( T & mutex ); … … 59 59 void ^?{}( monitor_dtor_guard_t & this ); 60 60 61 static inline forall( dtype T| sized(T) | { void ^?{}( T & mutex ); } )61 static inline forall( T & | sized(T) | { void ^?{}( T & mutex ); } ) 62 62 void delete( T * th ) { 63 63 ^(*th){}; -
libcfa/src/concurrency/mutex.cfa
rb6a8b31 rd95969a 164 164 } 165 165 166 forall( dtype L| is_lock(L))166 forall(L & | is_lock(L)) 167 167 void wait(condition_variable & this, L & l) { 168 168 lock( this.lock __cfaabi_dbg_ctx2 ); … … 176 176 //----------------------------------------------------------------------------- 177 177 // Scopes 178 forall( dtype L| is_lock(L))178 forall(L & | is_lock(L)) 179 179 void lock_all ( L * locks[], size_t count) { 180 180 // Sort locks based on addresses … … 188 188 } 189 189 190 forall( dtype L| is_lock(L))190 forall(L & | is_lock(L)) 191 191 void unlock_all( L * locks[], size_t count) { 192 192 // Lock all -
libcfa/src/concurrency/mutex.hfa
rb6a8b31 rd95969a 42 42 }; 43 43 44 void ?{}(mutex_lock & this) ;45 void ^?{}(mutex_lock & this) ;46 void lock(mutex_lock & this) ;47 bool try_lock(mutex_lock & this) ;48 void unlock(mutex_lock & this) ;44 void ?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 45 void ^?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 46 void lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 47 bool try_lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 48 void unlock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 49 49 50 50 // Exclusive lock - recursive … … 64 64 }; 65 65 66 void ?{}(recursive_mutex_lock & this) ;67 void ^?{}(recursive_mutex_lock & this) ;68 void lock(recursive_mutex_lock & this) ;69 bool try_lock(recursive_mutex_lock & this) ;70 void unlock(recursive_mutex_lock & this) ;66 void ?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 67 void ^?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 68 void lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 69 bool try_lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 70 void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 71 71 72 trait is_lock( dtype L| sized(L)) {72 trait is_lock(L & | sized(L)) { 73 73 void lock (L &); 74 74 void unlock(L &); … … 86 86 }; 87 87 88 void ?{}(condition_variable & this) ;89 void ^?{}(condition_variable & this) ;88 void ?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 89 void ^?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 90 90 91 void notify_one(condition_variable & this) ;92 void notify_all(condition_variable & this) ;91 void notify_one(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 92 void notify_all(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 93 93 94 void wait(condition_variable & this) ;94 void wait(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 95 95 96 forall( dtype L| is_lock(L))97 void wait(condition_variable & this, L & l) ;96 forall(L & | is_lock(L)) 97 void wait(condition_variable & this, L & l) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 98 98 99 99 //----------------------------------------------------------------------------- 100 100 // Scopes 101 forall( dtype L| is_lock(L)) {101 forall(L & | is_lock(L)) { 102 102 #if !defined( __TUPLE_ARRAYS_EXIST__ ) 103 103 void lock ( L * locks [], size_t count); -
libcfa/src/concurrency/preemption.cfa
rb6a8b31 rd95969a 616 616 } 617 617 618 // Prevent preemption since we are about to start terminating things 619 void __kernel_abort_lock(void) { 620 signal_block( SIGUSR1 ); 621 } 622 618 623 // Raii ctor/dtor for the preemption_scope 619 624 // Used by thread to control when they want to receive preemption signals -
libcfa/src/concurrency/stats.hfa
rb6a8b31 rd95969a 2 2 3 3 #include <stdint.h> 4 5 enum { 6 CFA_STATS_READY_Q = 0x01, 7 CFA_STATS_IO = 0x02, 8 }; 4 9 5 10 #if defined(__CFA_NO_STATISTICS__) … … 9 14 static inline void __print_stats( struct __stats_t *, int, const char *, const char *, void * ) {} 10 15 #else 11 enum {12 CFA_STATS_READY_Q = 0x01,13 #if defined(CFA_HAVE_LINUX_IO_URING_H)14 CFA_STATS_IO = 0x02,15 #endif16 };17 16 18 17 struct __attribute__((aligned(64))) __stats_readQ_t { -
libcfa/src/concurrency/thread.cfa
rb6a8b31 rd95969a 62 62 } 63 63 64 FORALL_DATA_INSTANCE(ThreadCancelled, ( dtype thread_t), (thread_t))64 FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t)) 65 65 66 forall( dtype T)66 forall(T &) 67 67 void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src) { 68 68 dst->virtual_table = src->virtual_table; … … 71 71 } 72 72 73 forall( dtype T)73 forall(T &) 74 74 const char * msg(ThreadCancelled(T) *) { 75 75 return "ThreadCancelled"; 76 76 } 77 77 78 forall( dtype T)78 forall(T &) 79 79 static void default_thread_cancel_handler(ThreadCancelled(T) & ) { 80 80 abort( "Unhandled thread cancellation.\n" ); 81 81 } 82 82 83 forall( dtype T| is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))83 forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T))) 84 84 void ?{}( thread_dtor_guard_t & this, 85 T & thrd, void(* defaultResumptionHandler)(ThreadCancelled(T) &)) {86 $monitor * m = get_monitor(thrd);85 T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) { 86 $monitor * m = get_monitor(thrd); 87 87 $thread * desc = get_thread(thrd); 88 88 89 89 // Setup the monitor guard 90 90 void (*dtor)(T& mutex this) = ^?{}; 91 bool join = defaultResumptionHandler != (void(*)(ThreadCancelled(T)&))0;91 bool join = cancelHandler != (void(*)(ThreadCancelled(T)&))0; 92 92 (this.mg){&m, (void(*)())dtor, join}; 93 93 … … 103 103 } 104 104 desc->state = Cancelled; 105 if (!join) { 106 defaultResumptionHandler = default_thread_cancel_handler; 107 } 105 void(*defaultResumptionHandler)(ThreadCancelled(T) &) = 106 join ? cancelHandler : default_thread_cancel_handler; 108 107 109 108 ThreadCancelled(T) except; … … 125 124 //----------------------------------------------------------------------------- 126 125 // Starting and stopping threads 127 forall( dtype T| is_thread(T) )126 forall( T & | is_thread(T) ) 128 127 void __thrd_start( T & this, void (*main_p)(T &) ) { 129 128 $thread * this_thrd = get_thread(this); … … 141 140 //----------------------------------------------------------------------------- 142 141 // Support for threads that don't ues the thread keyword 143 forall( dtype T| sized(T) | is_thread(T) | { void ?{}(T&); } )142 forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } ) 144 143 void ?{}( scoped(T)& this ) with( this ) { 145 144 handle{}; … … 147 146 } 148 147 149 forall( dtype T, ttype P| sized(T) | is_thread(T) | { void ?{}(T&, P); } )148 forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } ) 150 149 void ?{}( scoped(T)& this, P params ) with( this ) { 151 150 handle{ params }; … … 153 152 } 154 153 155 forall( dtype T| sized(T) | is_thread(T) )154 forall( T & | sized(T) | is_thread(T) ) 156 155 void ^?{}( scoped(T)& this ) with( this ) { 157 156 ^handle{}; … … 159 158 160 159 //----------------------------------------------------------------------------- 161 forall( dtype T| is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))160 forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T))) 162 161 T & join( T & this ) { 163 162 thread_dtor_guard_t guard = { this, defaultResumptionHandler }; -
libcfa/src/concurrency/thread.hfa
rb6a8b31 rd95969a 26 26 //----------------------------------------------------------------------------- 27 27 // thread trait 28 trait is_thread( dtype T) {28 trait is_thread(T &) { 29 29 void ^?{}(T& mutex this); 30 30 void main(T& this); … … 32 32 }; 33 33 34 FORALL_DATA_EXCEPTION(ThreadCancelled, ( dtype thread_t), (thread_t)) (34 FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) ( 35 35 thread_t * the_thread; 36 36 exception_t * the_exception; 37 37 ); 38 38 39 forall( dtype T)39 forall(T &) 40 40 void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src); 41 41 42 forall( dtype T)42 forall(T &) 43 43 const char * msg(ThreadCancelled(T) *); 44 44 … … 47 47 48 48 // Inline getters for threads/coroutines/monitors 49 forall( dtype T| is_thread(T) )49 forall( T & | is_thread(T) ) 50 50 static inline $coroutine* get_coroutine(T & this) __attribute__((const)) { return &get_thread(this)->self_cor; } 51 51 52 forall( dtype T| is_thread(T) )52 forall( T & | is_thread(T) ) 53 53 static inline $monitor * get_monitor (T & this) __attribute__((const)) { return &get_thread(this)->self_mon; } 54 54 … … 60 60 extern struct cluster * mainCluster; 61 61 62 forall( dtype T| is_thread(T) )62 forall( T & | is_thread(T) ) 63 63 void __thrd_start( T & this, void (*)(T &) ); 64 64 … … 82 82 }; 83 83 84 forall( dtype T| is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )84 forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) ) 85 85 void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) ); 86 86 void ^?{}( thread_dtor_guard_t & this ); … … 89 89 // thread runner 90 90 // Structure that actually start and stop threads 91 forall( dtype T| sized(T) | is_thread(T) )91 forall( T & | sized(T) | is_thread(T) ) 92 92 struct scoped { 93 93 T handle; 94 94 }; 95 95 96 forall( dtype T| sized(T) | is_thread(T) | { void ?{}(T&); } )96 forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } ) 97 97 void ?{}( scoped(T)& this ); 98 98 99 forall( dtype T, ttype P| sized(T) | is_thread(T) | { void ?{}(T&, P); } )99 forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } ) 100 100 void ?{}( scoped(T)& this, P params ); 101 101 102 forall( dtype T| sized(T) | is_thread(T) )102 forall( T & | sized(T) | is_thread(T) ) 103 103 void ^?{}( scoped(T)& this ); 104 104 … … 115 115 void unpark( $thread * this ); 116 116 117 forall( dtype T| is_thread(T) )117 forall( T & | is_thread(T) ) 118 118 static inline void unpark( T & this ) { if(!&this) return; unpark( get_thread( this ) );} 119 119 … … 128 128 //---------- 129 129 // join 130 forall( dtype T| is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )130 forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) ) 131 131 T & join( T & this ); 132 132
Note:
See TracChangeset
for help on using the changeset viewer.