Changeset 24d6572 for libcfa/src/concurrency
- Timestamp:
- Jun 12, 2023, 2:45:32 PM (2 years ago)
- Branches:
- ast-experimental, master
- Children:
- 62d62db
- Parents:
- 34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 5 added
- 28 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
r34b4268 r24d6572 16 16 // #define EPOLL_FOR_SOCKETS 17 17 18 #include <string.h> 19 18 20 #include "fstream.hfa" 19 21 #include "locks.hfa" … … 23 25 #include "time.hfa" 24 26 #include "stdlib.hfa" 25 27 #include "iofwd.hfa" 26 28 #include "cfathread.h" 27 28 extern "C" {29 #include <string.h>30 #include <errno.h>31 }32 29 33 30 extern void ?{}(processor &, const char[], cluster &, thread$ *); 34 31 extern "C" { 35 extern void __cfactx_invoke_thread(void (*main)(void *), void * this); 36 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 32 extern void __cfactx_invoke_thread(void (*main)(void *), void * this); 37 33 } 38 34 … … 439 435 // Mutex 440 436 struct cfathread_mutex { 441 linear_backoff_then_block_lock impl;437 exp_backoff_then_block_lock impl; 442 438 }; 443 439 int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; } … … 454 450 // Condition 455 451 struct cfathread_condition { 456 condition_variable( linear_backoff_then_block_lock) impl;452 condition_variable(exp_backoff_then_block_lock) impl; 457 453 }; 458 454 int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; } … … 472 468 } 473 469 474 #include <iofwd.hfa>475 476 470 extern "C" { 477 #include <unistd.h>478 #include <sys/types.h>479 #include <sys/socket.h>480 481 471 //-------------------- 482 472 // IO operations … … 488 478 , protocol); 489 479 } 490 int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) {480 int cfathread_bind(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len) { 491 481 return bind(socket, address, address_len); 492 482 } … … 496 486 } 497 487 498 int cfathread_accept(int socket, struct sockaddr *restrictaddress, socklen_t *restrict address_len) {488 int cfathread_accept(int socket, __SOCKADDR_ARG address, socklen_t *restrict address_len) { 499 489 #if defined(EPOLL_FOR_SOCKETS) 500 490 int ret; … … 513 503 } 514 504 515 int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) {505 int cfathread_connect(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len) { 516 506 #if defined(EPOLL_FOR_SOCKETS) 517 507 int ret; -
libcfa/src/concurrency/clib/cfathread.h
r34b4268 r24d6572 9 9 // Author : Thierry Delisle 10 10 // Created On : Tue Sep 22 15:31:20 2020 11 // Last Modified By : 12 // Last Modified On : 13 // Update Count : 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Mon Mar 13 23:48:40 2023 13 // Update Count : 7 14 14 // 15 15 16 #pragma once 17 16 18 #if defined(__cforall) || defined(__cplusplus) 19 #include <unistd.h> 20 #include <errno.h> 21 #include <sys/socket.h> 22 17 23 extern "C" { 18 24 #endif 19 #include <asm/types.h>20 #include <errno.h>21 #include <unistd.h>22 23 24 25 //-------------------- 25 26 // Basic types … … 73 74 } cfathread_mutexattr_t; 74 75 typedef struct cfathread_mutex * cfathread_mutex_t; 75 int cfathread_mutex_init(cfathread_mutex_t * restrict mut, const cfathread_mutexattr_t *restrict attr) __attribute__((nonnull (1)));76 int cfathread_mutex_init(cfathread_mutex_t * restrict mut, const cfathread_mutexattr_t * restrict attr) __attribute__((nonnull (1))); 76 77 int cfathread_mutex_destroy(cfathread_mutex_t *mut) __attribute__((nonnull (1))); 77 78 int cfathread_mutex_lock(cfathread_mutex_t *mut) __attribute__((nonnull (1))); … … 91 92 //-------------------- 92 93 // IO operations 93 struct sockaddr;94 struct msghdr;95 94 int cfathread_socket(int domain, int type, int protocol); 96 int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len);95 int cfathread_bind(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len); 97 96 int cfathread_listen(int socket, int backlog); 98 int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);99 int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len);97 int cfathread_accept(int socket, __SOCKADDR_ARG address, socklen_t * restrict address_len); 98 int cfathread_connect(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len); 100 99 int cfathread_dup(int fildes); 101 100 int cfathread_close(int fildes); -
libcfa/src/concurrency/coroutine.cfa
r34b4268 r24d6572 10 10 // Created On : Mon Nov 28 12:27:26 2016 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : T ue Dec 15 12:06:04 202013 // Update Count : 2 312 // Last Modified On : Thu Feb 16 15:34:46 2023 13 // Update Count : 24 14 14 // 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #include "coroutine.hfa" -
libcfa/src/concurrency/coroutine.hfa
r34b4268 r24d6572 10 10 // Created On : Mon Nov 28 12:27:26 2016 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Thu Jan 6 16:33:16 202213 // Update Count : 1 212 // Last Modified On : Thu Feb 2 11:31:42 2023 13 // Update Count : 13 14 14 // 15 15 … … 38 38 // Anything that implements this trait can be resumed. 39 39 // Anything that is resumed is a coroutine. 40 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled(T))) { 40 forall( T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled(T)) ) 41 trait is_coroutine { 41 42 void main(T & this); 42 43 coroutine$ * get_coroutine(T & this); -
libcfa/src/concurrency/future.hfa
r34b4268 r24d6572 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa --8 // 9 // Author : Thierry Delisle & Peiran Hong 7 // concurrency/future.hfa -- 8 // 9 // Author : Thierry Delisle & Peiran Hong & Colby Parsons 10 10 // Created On : Wed Jan 06 17:33:18 2021 11 11 // Last Modified By : … … 18 18 #include "bits/locks.hfa" 19 19 #include "monitor.hfa" 20 20 #include "select.hfa" 21 #include "locks.hfa" 22 23 //---------------------------------------------------------------------------- 24 // future 25 // I don't use future_t here since I need to use a lock for this future 26 // since it supports multiple consumers 27 // future_t is lockfree and uses atomics which aren't needed given we use locks here 21 28 forall( T ) { 29 // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards 30 31 // temporary enum replacement 32 const int FUTURE_EMPTY = 0; 33 const int FUTURE_FULFILLED = 1; 34 22 35 struct future { 36 int state; 37 T result; 38 dlist( select_node ) waiters; 39 futex_mutex lock; 40 }; 41 42 struct future_node { 43 inline select_node; 44 T * my_result; 45 }; 46 47 static inline { 48 49 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) { 50 ((select_node &)this){ blocked_thread }; 51 this.my_result = my_result; 52 } 53 54 void ?{}( future(T) & this ) { 55 this.waiters{}; 56 this.state = FUTURE_EMPTY; 57 this.lock{}; 58 } 59 60 // Reset future back to original state 61 void reset( future(T) & this ) with(this) 62 { 63 lock( lock ); 64 if( ! waiters`isEmpty ) 65 abort("Attempting to reset a future with blocked waiters"); 66 state = FUTURE_EMPTY; 67 unlock( lock ); 68 } 69 70 // check if the future is available 71 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected 72 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); } 73 74 75 // memcpy wrapper to help copy values 76 void copy_T( T & from, T & to ) { 77 memcpy((void *)&to, (void *)&from, sizeof(T)); 78 } 79 80 // internal helper to signal waiters off of the future 81 void _internal_flush( future(T) & this ) with(this) { 82 while( ! waiters`isEmpty ) { 83 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case 84 break; // if handle_OR returns false then waiters is empty so break 85 select_node &s = try_pop_front( waiters ); 86 87 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks 88 copy_T( result, *(((future_node(T) &)s).my_result) ); 89 90 wake_one( waiters, s ); 91 } 92 } 93 94 // Fulfil the future, returns whether or not someone was unblocked 95 bool fulfil( future(T) & this, T val ) with(this) { 96 lock( lock ); 97 if( state != FUTURE_EMPTY ) 98 abort("Attempting to fulfil a future that has already been fulfilled"); 99 100 copy_T( val, result ); 101 102 bool ret_val = ! waiters`isEmpty; 103 state = FUTURE_FULFILLED; 104 _internal_flush( this ); 105 unlock( lock ); 106 return ret_val; 107 } 108 109 // Wait for the future to be fulfilled 110 // Also return whether the thread had to block or not 111 [T, bool] get( future(T) & this ) with( this ) { 112 lock( lock ); 113 T ret_val; 114 if( state == FUTURE_FULFILLED ) { 115 copy_T( result, ret_val ); 116 unlock( lock ); 117 return [ret_val, false]; 118 } 119 120 future_node(T) node = { active_thread(), &ret_val }; 121 insert_last( waiters, ((select_node &)node) ); 122 unlock( lock ); 123 park( ); 124 125 return [ret_val, true]; 126 } 127 128 // Wait for the future to be fulfilled 129 T get( future(T) & this ) { 130 [T, bool] tt; 131 tt = get(this); 132 return tt.0; 133 } 134 135 // Gets value if it is available and returns [ val, true ] 136 // otherwise returns [ default_val, false] 137 // will not block 138 [T, bool] try_get( future(T) & this ) with(this) { 139 lock( lock ); 140 T ret_val; 141 if( state == FUTURE_FULFILLED ) { 142 copy_T( result, ret_val ); 143 unlock( lock ); 144 return [ret_val, true]; 145 } 146 unlock( lock ); 147 148 return [ret_val, false]; 149 } 150 151 bool register_select( future(T) & this, select_node & s ) with(this) { 152 lock( lock ); 153 154 // check if we can complete operation. If so race to establish winner in special OR case 155 if ( !s.park_counter && state != FUTURE_EMPTY ) { 156 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering 157 unlock( lock ); 158 return false; 159 } 160 } 161 162 // future not ready -> insert select node and return 163 if( state == FUTURE_EMPTY ) { 164 insert_last( waiters, s ); 165 unlock( lock ); 166 return false; 167 } 168 169 __make_select_node_available( s ); 170 unlock( lock ); 171 return true; 172 } 173 174 bool unregister_select( future(T) & this, select_node & s ) with(this) { 175 if ( ! s`isListed ) return false; 176 lock( lock ); 177 if ( s`isListed ) remove( s ); 178 unlock( lock ); 179 return false; 180 } 181 182 void on_selected( future(T) & this, select_node & node ) {} 183 } 184 } 185 186 //-------------------------------------------------------------------------------------------------------- 187 // These futures below do not support select statements so they may not have as many features as 'future' 188 // however the 'single_future' is cheap and cheerful and is most likely more performant than 'future' 189 // since it uses raw atomics and no locks 190 // 191 // As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future' 192 // since it is monitor based and also is not compatible with select statements 193 //-------------------------------------------------------------------------------------------------------- 194 195 forall( T ) { 196 struct single_future { 23 197 inline future_t; 24 198 T result; … … 27 201 static inline { 28 202 // Reset future back to original state 29 void reset( future(T) & this) { reset( (future_t&)this ); }203 void reset(single_future(T) & this) { reset( (future_t&)this ); } 30 204 31 205 // check if the future is available 32 bool available( future(T) & this ) { return available( (future_t&)this ); }206 bool available( single_future(T) & this ) { return available( (future_t&)this ); } 33 207 34 208 // Mark the future as abandoned, meaning it will be deleted by the server 35 209 // This doesn't work beause of the potential need for a destructor 36 void abandon( future(T) & this );210 void abandon( single_future(T) & this ); 37 211 38 212 // Fulfil the future, returns whether or not someone was unblocked 39 thread$ * fulfil( future(T) & this, T result ) {213 thread$ * fulfil( single_future(T) & this, T result ) { 40 214 this.result = result; 41 215 return fulfil( (future_t&)this ); … … 44 218 // Wait for the future to be fulfilled 45 219 // Also return whether the thread had to block or not 46 [T, bool] wait( future(T) & this ) {220 [T, bool] wait( single_future(T) & this ) { 47 221 bool r = wait( (future_t&)this ); 48 222 return [this.result, r]; … … 50 224 51 225 // Wait for the future to be fulfilled 52 T wait( future(T) & this ) {226 T wait( single_future(T) & this ) { 53 227 [T, bool] tt; 54 228 tt = wait(this); -
libcfa/src/concurrency/invoke.h
r34b4268 r24d6572 10 10 // Created On : Tue Jan 17 12:27:26 2016 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Tue Nov 29 20:42:21 2022 13 // Update Count : 56 14 // 12 // Last Modified On : Tue Mar 14 13:39:31 2023 13 // Update Count : 59 14 // 15 16 // No not use #pragma once was this file is included twice in some places. It has its own guard system. 15 17 16 18 #include "bits/containers.hfa" … … 215 217 struct __thread_user_link cltr_link; 216 218 217 // used to store state between clh lock/unlock218 volatile bool * clh_prev;219 220 // used to point to this thd's current clh node221 volatile bool * clh_node;222 223 219 struct processor * last_proc; 220 221 // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes 222 // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock 223 void * link_node; 224 224 225 225 PRNG_STATE_T random_state; // fast random numbers -
libcfa/src/concurrency/io.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #if defined(__CFA_DEBUG__) … … 85 84 static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want ); 86 85 static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy ); 87 static void __ioarbiter_flush ( io_context$ & );86 static void __ioarbiter_flush ( io_context$ &, bool kernel ); 88 87 static inline void __ioarbiter_notify( io_context$ & ctx ); 89 88 //============================================================================================= … … 94 93 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 95 94 95 static inline void __post(oneshot & this, bool kernel, unpark_hint hint) { 96 thread$ * t = post( this, false ); 97 if(kernel) __kernel_unpark( t, hint ); 98 else unpark( t, hint ); 99 } 100 101 // actual system call of io uring 102 // wrap so everything that needs to happen around it is always done 103 // i.e., stats, book keeping, sqe reclamation, etc. 96 104 static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) { 97 105 __STATS__( true, io.calls.flush++; ) 98 106 int ret; 99 107 for() { 108 // do the system call in a loop, repeat on interrupts 100 109 ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8); 101 110 if( ret < 0 ) { … … 120 129 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 121 130 122 ctx.sq.to_submit -= ret; 131 // keep track of how many still need submitting 132 __atomic_fetch_sub(&ctx.sq.to_submit, ret, __ATOMIC_SEQ_CST); 123 133 124 134 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); … … 129 139 /* paranoid */ verify( ! __preemption_enabled() ); 130 140 141 // mark that there is no pending io left 131 142 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 132 143 } 133 144 145 // try to acquire an io context for draining, helping means we never *need* to drain, we can always do it later 134 146 static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) { 135 147 /* paranoid */ verify( ! __preemption_enabled() ); … … 138 150 139 151 { 152 // if there is nothing to drain there is no point in acquiring anything 140 153 const __u32 head = *ctx->cq.head; 141 154 const __u32 tail = *ctx->cq.tail; … … 144 157 } 145 158 146 // Drain the queue 147 if(!__atomic_try_acquire(&ctx->cq.lock)) { 159 // try a simple spinlock acquire, it's likely there are completions to drain 160 if(!__atomic_try_acquire(&ctx->cq.try_lock)) { 161 // some other processor already has it 148 162 __STATS__( false, io.calls.locked++; ) 149 163 return false; 150 164 } 151 165 166 // acquired!! 152 167 return true; 153 168 } 154 169 170 // actually drain the completion 155 171 static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 156 172 /* paranoid */ verify( ! __preemption_enabled() ); 157 173 /* paranoid */ verify( ready_schedule_islocked() ); 158 /* paranoid */ verify( ctx->cq.lock == true ); 159 174 /* paranoid */ verify( ctx->cq.try_lock == true ); 175 176 // get all the invariants and initial state 160 177 const __u32 mask = *ctx->cq.mask; 161 178 const __u32 num = *ctx->cq.num; … … 166 183 for() { 167 184 // re-read the head and tail in case it already changed. 185 // count the difference between the two 168 186 const __u32 head = *ctx->cq.head; 169 187 const __u32 tail = *ctx->cq.tail; … … 171 189 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 172 190 191 // for everything between head and tail, drain it 173 192 for(i; count) { 174 193 unsigned idx = (head + i) & mask; … … 177 196 /* paranoid */ verify(&cqe); 178 197 198 // find the future in the completion 179 199 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 180 200 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 181 201 202 // don't directly fulfill the future, preemption is disabled so we need to use kernel_unpark 182 203 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 183 204 } 184 205 206 // update the timestamps accordingly 207 // keep a local copy so we can update the relaxed copy 185 208 ts_next = ctx->cq.ts = rdtscl(); 186 209 … … 190 213 ctx->proc->idle_wctx.drain_time = ts_next; 191 214 215 // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel. 192 216 if(likely(count < num)) break; 193 217 218 // the ring buffer was full, there could be more stuff in the kernel. 194 219 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 195 220 } … … 199 224 /* paranoid */ verify( ! __preemption_enabled() ); 200 225 201 __atomic_unlock(&ctx->cq.lock); 202 226 // everything is drained, we can release the lock 227 __atomic_unlock(&ctx->cq.try_lock); 228 229 // update the relaxed timestamp 203 230 touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next, false ); 204 231 … … 206 233 } 207 234 235 // call from a processor to flush 236 // contains all the bookkeeping a proc must do, not just the barebones flushing logic 237 void __cfa_do_flush( io_context$ & ctx, bool kernel ) { 238 /* paranoid */ verify( ! __preemption_enabled() ); 239 240 // flush any external requests 241 ctx.sq.last_external = false; // clear the external bit, the arbiter will reset it if needed 242 __ioarbiter_flush( ctx, kernel ); 243 244 // if submitting must be submitted, do the system call 245 if(ctx.sq.to_submit != 0) { 246 ioring_syscsll(ctx, 0, 0); 247 } 248 } 249 250 // call from a processor to drain 251 // contains all the bookkeeping a proc must do, not just the barebones draining logic 208 252 bool __cfa_io_drain( struct processor * proc ) { 209 253 bool local = false; 210 254 bool remote = false; 211 255 256 // make sure no ones creates/destroys io contexts 212 257 ready_schedule_lock(); 213 258 … … 217 262 /* paranoid */ verify( ctx ); 218 263 264 // Help if needed 219 265 with(cltr->sched) { 220 266 const size_t ctxs_count = io.count; … … 230 276 const unsigned long long ctsc = rdtscl(); 231 277 278 // only help once every other time 279 // pick a target when not helping 232 280 if(proc->io.target == UINT_MAX) { 233 281 uint64_t chaos = __tls_rand(); 282 // choose who to help and whether to accept helping far processors 234 283 unsigned ext = chaos & 0xff; 235 284 unsigned other = (chaos >> 8) % (ctxs_count); 236 285 286 // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it 237 287 if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) { 238 288 proc->io.target = other; … … 240 290 } 241 291 else { 292 // a target was picked last time, help it 242 293 const unsigned target = proc->io.target; 243 294 /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX ); 295 // make sure the target hasn't stopped existing since last time 244 296 HELP: if(target < ctxs_count) { 297 // calculate it's age and how young it could be before we give up on helping 245 298 const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false); 246 299 const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false); 247 300 __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no"); 301 // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger 248 302 if(age <= cutoff) break HELP; 249 303 250 if(!try_acquire(io.data[target])) break HELP; 251 304 // attempt to help the submission side 305 __cfa_do_flush( *io.data[target], true ); 306 307 // attempt to help the completion side 308 if(!try_acquire(io.data[target])) break HELP; // already acquire no help needed 309 310 // actually help 252 311 if(!__cfa_do_drain( io.data[target], cltr )) break HELP; 253 312 313 // track we did help someone 254 314 remote = true; 255 315 __STATS__( true, io.calls.helped++; ) 256 316 } 317 318 // reset the target 257 319 proc->io.target = UINT_MAX; 258 320 } 259 321 } 260 261 322 262 323 // Drain the local queue … … 270 331 271 332 ready_schedule_unlock(); 333 334 // return true if some completion entry, local or remote, was drained 272 335 return local || remote; 273 336 } 274 337 338 339 340 // call from a processor to flush 341 // contains all the bookkeeping a proc must do, not just the barebones flushing logic 275 342 bool __cfa_io_flush( struct processor * proc ) { 276 343 /* paranoid */ verify( ! __preemption_enabled() ); … … 278 345 /* paranoid */ verify( proc->io.ctx ); 279 346 280 io_context$ & ctx = *proc->io.ctx; 281 282 __ioarbiter_flush( ctx ); 283 284 if(ctx.sq.to_submit != 0) { 285 ioring_syscsll(ctx, 0, 0); 286 287 } 288 347 __cfa_do_flush( *proc->io.ctx, false ); 348 349 // also drain since some stuff will immediately complete 289 350 return __cfa_io_drain( proc ); 290 351 } … … 393 454 //============================================================================================= 394 455 // submission 395 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) { 456 // barebones logic to submit a group of sqes 457 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) { 458 if(!lock) 459 lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 ); 396 460 // We can proceed to the fast path 397 461 // Get the right objects … … 408 472 // Make the sqes visible to the submitter 409 473 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 410 sq.to_submit += have; 411 474 __atomic_fetch_add(&sq.to_submit, have, __ATOMIC_SEQ_CST); 475 476 // set the bit to mark things need to be flushed 412 477 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED); 413 478 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 414 } 415 479 480 if(!lock) 481 unlock( ctx->ext_sq.lock ); 482 } 483 484 // submission logic + maybe flushing 416 485 static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) { 417 486 __sub_ring_t & sq = ctx->sq; 418 __submit_only(ctx, idxs, have );487 __submit_only(ctx, idxs, have, false); 419 488 420 489 if(sq.to_submit > 30) { … … 428 497 } 429 498 499 // call from a processor to flush 500 // might require arbitration if the thread was migrated after the allocation 430 501 void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public { 431 502 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); … … 441 512 if( ctx == inctx ) // We have the right instance? 442 513 { 514 // yes! fast submit 443 515 __submit(ctx, idxs, have, lazy); 444 516 … … 507 579 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 508 580 581 // notify the allocator that new allocations can be made 509 582 __ioarbiter_notify(ctx); 510 583 … … 557 630 } 558 631 632 // notify the arbiter that new allocations are available 559 633 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 560 634 /* paranoid */ verify( !empty(this.pending.queue) ); 561 635 /* paranoid */ verify( __preemption_enabled() ); 636 637 // mutual exclusion is needed 562 638 lock( this.pending.lock __cfaabi_dbg_ctx2 ); 563 639 { 640 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 641 642 // as long as there are pending allocations try to satisfy them 643 // for simplicity do it in FIFO order 564 644 while( !empty(this.pending.queue) ) { 565 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");645 // get first pending allocs 566 646 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 567 647 __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue ); 568 648 649 // check if we have enough to satisfy the request 569 650 if( have > pa.want ) goto DONE; 651 652 // if there are enough allocations it means we can drop the request 570 653 drop( this.pending.queue ); 571 654 572 655 /* paranoid */__attribute__((unused)) bool ret = 573 656 657 // actually do the alloc 574 658 __alloc(ctx, pa.idxs, pa.want); 575 659 576 660 /* paranoid */ verify( ret ); 577 661 662 // write out which context statisfied the request and post 663 // this 578 664 pa.ctx = ctx; 579 580 665 post( pa.waitctx ); 581 666 } … … 585 670 } 586 671 unlock( this.pending.lock ); 587 } 588 672 673 /* paranoid */ verify( __preemption_enabled() ); 674 } 675 676 // short hand to avoid the mutual exclusion of the pending is empty regardless 589 677 static void __ioarbiter_notify( io_context$ & ctx ) { 590 if(!empty( ctx.arbiter->pending )) { 591 __ioarbiter_notify( *ctx.arbiter, &ctx ); 592 } 593 } 594 595 // Simply append to the pending 678 if(empty( ctx.arbiter->pending )) return; 679 __ioarbiter_notify( *ctx.arbiter, &ctx ); 680 } 681 682 // Submit from outside the local processor: append to the outstanding list 596 683 static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) { 597 684 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); … … 599 686 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 600 687 688 // create the intrusive object to append 601 689 __external_io ei; 602 690 ei.idxs = idxs; … … 604 692 ei.lazy = lazy; 605 693 694 // enqueue the io 606 695 bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei); 607 696 697 // mark pending 608 698 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST); 609 699 700 // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing 701 // if it's not the first enqueue, a signal is already in transit 610 702 if( we ) { 611 703 sigval_t value = { PREEMPT_IO }; 612 704 __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); 613 } 614 705 __STATS__( false, io.flush.signal += 1; ) 706 } 707 __STATS__( false, io.submit.extr += 1; ) 708 709 // to avoid dynamic allocation/memory reclamation headaches, wait for it to have been submitted 615 710 wait( ei.waitctx ); 616 711 … … 618 713 } 619 714 620 static void __ioarbiter_flush( io_context$ & ctx ) { 621 if(!empty( ctx.ext_sq )) { 622 __STATS__( false, io.flush.external += 1; ) 623 624 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 625 626 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 627 { 628 while( !empty(ctx.ext_sq.queue) ) { 629 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 630 631 __submit_only(&ctx, ei.idxs, ei.have); 632 633 post( ei.waitctx ); 634 } 635 636 ctx.ext_sq.empty = true; 715 // flush the io arbiter: move all external io operations to the submission ring 716 static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) { 717 // if there are no external operations just return 718 if(empty( ctx.ext_sq )) return; 719 720 // stats and logs 721 __STATS__( false, io.flush.external += 1; ) 722 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 723 724 // this can happen from multiple processors, mutual exclusion is needed 725 lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 ); 726 { 727 // pop each operation one at a time. 728 // There is no wait morphing because of the io sq ring 729 while( !empty(ctx.ext_sq.queue) ) { 730 // drop the element from the queue 731 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 732 733 // submit it 734 __submit_only(&ctx, ei.idxs, ei.have, true); 735 736 // wake the thread that was waiting on it 737 // since this can both be called from kernel and user, check the flag before posting 738 __post( ei.waitctx, kernel, UNPARK_LOCAL ); 637 739 } 638 unlock(ctx.ext_sq.lock ); 740 741 // mark the queue as empty 742 ctx.ext_sq.empty = true; 743 ctx.sq.last_external = true; 744 } 745 unlock(ctx.ext_sq.lock ); 746 } 747 748 extern "C" { 749 // debug functions used for gdb 750 // io_uring doesn't yet support gdb soe the kernel-shared data structures aren't viewable in gdb 751 // these functions read the data that gdb can't and should be removed once the support is added 752 static __u32 __cfagdb_cq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.head; } 753 static __u32 __cfagdb_cq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.tail; } 754 static __u32 __cfagdb_cq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.mask; } 755 static __u32 __cfagdb_sq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.head; } 756 static __u32 __cfagdb_sq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.tail; } 757 static __u32 __cfagdb_sq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.mask; } 758 759 // fancier version that reads an sqe and copies it out. 760 static struct io_uring_sqe __cfagdb_sq_at( io_context$ * ctx, __u32 at ) __attribute__((nonnull(1),used,noinline)) { 761 __u32 ax = at & *ctx->sq.mask; 762 __u32 ix = ctx->sq.kring.array[ax]; 763 return ctx->sq.sqes[ix]; 639 764 } 640 765 } -
libcfa/src/concurrency/io/call.cfa.in
r34b4268 r24d6572 31 31 Prelude = """#define __cforall_thread__ 32 32 33 #include <unistd.h> 34 #include <errno.h> 35 #include <sys/socket.h> 36 #include <time.hfa> 37 33 38 #include "bits/defs.hfa" 34 39 #include "kernel.hfa" … … 43 48 #include <assert.h> 44 49 #include <stdint.h> 45 #include <errno.h>46 50 #include <linux/io_uring.h> 47 48 51 #include "kernel/fwd.hfa" 49 52 … … 82 85 // I/O Forwards 83 86 //============================================================================================= 84 #include <time.hfa>85 86 // Some forward declarations87 #include <errno.h>88 #include <unistd.h>89 87 90 88 extern "C" { 91 #include <asm/types.h>92 #include <sys/socket.h>93 #include <sys/syscall.h>94 95 89 #if defined(CFA_HAVE_PREADV2) 96 90 struct iovec; 97 extern ssize_t preadv2 (int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags);91 extern ssize_t preadv2 (int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags); 98 92 #endif 99 93 #if defined(CFA_HAVE_PWRITEV2) 100 94 struct iovec; 101 extern ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags);95 extern ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags); 102 96 #endif 103 97 … … 114 108 struct msghdr; 115 109 struct sockaddr; 116 extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); 117 extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags); 118 extern ssize_t send(int sockfd, const void *buf, size_t len, int flags); 119 extern ssize_t recv(int sockfd, void *buf, size_t len, int flags); 120 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 121 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 110 extern ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags); 111 extern ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags); 112 extern ssize_t send(int sockfd, const void * buf, size_t len, int flags); 113 extern ssize_t recv(int sockfd, void * buf, size_t len, int flags); 122 114 123 115 extern int fallocate(int fd, int mode, off_t offset, off_t len); 124 116 extern int posix_fadvise(int fd, off_t offset, off_t len, int advice); 125 extern int madvise(void * addr, size_t length, int advice);126 127 extern int openat(int dirfd, const char * pathname, int flags, mode_t mode);117 extern int madvise(void * addr, size_t length, int advice); 118 119 extern int openat(int dirfd, const char * pathname, int flags, mode_t mode); 128 120 extern int close(int fd); 129 121 130 extern ssize_t read (int fd, void * buf, size_t count);122 extern ssize_t read (int fd, void * buf, size_t count); 131 123 132 124 struct epoll_event; 133 extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event);134 135 extern ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags);125 extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event); 126 127 extern ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags); 136 128 extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags); 137 129 } … … 232 224 calls = [ 233 225 # CFA_HAVE_IORING_OP_READV 234 Call('READV', 'ssize_t preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', {226 Call('READV', 'ssize_t preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', { 235 227 'fd' : 'fd', 228 'addr': '(typeof(sqe->addr))iov', 229 'len' : 'iovcnt', 236 230 'off' : 'offset', 237 'addr': '(uintptr_t)iov', 238 'len' : 'iovcnt', 231 'rw_flags' : 'flags' 239 232 }, define = 'CFA_HAVE_PREADV2'), 240 233 # CFA_HAVE_IORING_OP_WRITEV 241 Call('WRITEV', 'ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', {234 Call('WRITEV', 'ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', { 242 235 'fd' : 'fd', 236 'addr': '(typeof(sqe->addr))iov', 237 'len' : 'iovcnt', 243 238 'off' : 'offset', 244 'addr': '(uintptr_t)iov', 245 'len' : 'iovcnt' 239 'rw_flags' : 'flags' 246 240 }, define = 'CFA_HAVE_PWRITEV2'), 247 241 # CFA_HAVE_IORING_OP_FSYNC … … 250 244 }), 251 245 # CFA_HAVE_IORING_OP_EPOLL_CTL 252 Call('EPOLL_CTL', 'int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event)', {246 Call('EPOLL_CTL', 'int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event)', { 253 247 'fd': 'epfd', 248 'len': 'op', 254 249 'addr': 'fd', 255 'len': 'op', 256 'off': '(uintptr_t)event' 250 'off': '(typeof(sqe->off))event' 257 251 }), 258 252 # CFA_HAVE_IORING_OP_SYNC_FILE_RANGE … … 264 258 }), 265 259 # CFA_HAVE_IORING_OP_SENDMSG 266 Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags)', {267 'fd': 'sockfd', 268 'addr': '( uintptr_t)(struct msghdr *)msg',260 Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags)', { 261 'fd': 'sockfd', 262 'addr': '(typeof(sqe->addr))(struct msghdr *)msg', 269 263 'len': '1', 270 264 'msg_flags': 'flags' 271 265 }), 272 266 # CFA_HAVE_IORING_OP_RECVMSG 273 Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags)', {274 'fd': 'sockfd', 275 'addr': '( uintptr_t)(struct msghdr *)msg',267 Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags)', { 268 'fd': 'sockfd', 269 'addr': '(typeof(sqe->addr))(struct msghdr *)msg', 276 270 'len': '1', 277 271 'msg_flags': 'flags' 278 272 }), 279 273 # CFA_HAVE_IORING_OP_SEND 280 Call('SEND', 'ssize_t send(int sockfd, const void * buf, size_t len, int flags)', {281 'fd': 'sockfd', 282 'addr': '( uintptr_t)buf',274 Call('SEND', 'ssize_t send(int sockfd, const void * buf, size_t len, int flags)', { 275 'fd': 'sockfd', 276 'addr': '(typeof(sqe->addr))buf', 283 277 'len': 'len', 284 278 'msg_flags': 'flags' 285 279 }), 286 280 # CFA_HAVE_IORING_OP_RECV 287 Call('RECV', 'ssize_t recv(int sockfd, void * buf, size_t len, int flags)', {288 'fd': 'sockfd', 289 'addr': '( uintptr_t)buf',281 Call('RECV', 'ssize_t recv(int sockfd, void * buf, size_t len, int flags)', { 282 'fd': 'sockfd', 283 'addr': '(typeof(sqe->addr))buf', 290 284 'len': 'len', 291 285 'msg_flags': 'flags' 292 286 }), 293 287 # CFA_HAVE_IORING_OP_ACCEPT 294 Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {295 'fd': 'sockfd', 296 'addr': '( uintptr_t)addr',297 'addr2': '( uintptr_t)addrlen',288 Call('ACCEPT', 'int accept4(int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags)', { 289 'fd': 'sockfd', 290 'addr': '(typeof(sqe->addr))&addr', 291 'addr2': '(typeof(sqe->addr2))addrlen', 298 292 'accept_flags': 'flags' 299 293 }), 300 294 # CFA_HAVE_IORING_OP_CONNECT 301 Call('CONNECT', 'int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)', {302 'fd': 'sockfd', 303 'addr': '( uintptr_t)addr',295 Call('CONNECT', 'int connect(int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen)', { 296 'fd': 'sockfd', 297 'addr': '(typeof(sqe->addr))&addr', 304 298 'off': 'addrlen' 305 299 }), … … 307 301 Call('FALLOCATE', 'int fallocate(int fd, int mode, off_t offset, off_t len)', { 308 302 'fd': 'fd', 309 'addr': '(uintptr_t)len',310 303 'len': 'mode', 311 'off': 'offset' 304 'off': 'offset', 305 'addr': 'len' 312 306 }), 313 307 # CFA_HAVE_IORING_OP_FADVISE … … 319 313 }), 320 314 # CFA_HAVE_IORING_OP_MADVISE 321 Call('MADVISE', 'int madvise(void * addr, size_t length, int advice)', {322 'addr': '( uintptr_t)addr',315 Call('MADVISE', 'int madvise(void * addr, size_t length, int advice)', { 316 'addr': '(typeof(sqe->addr))addr', 323 317 'len': 'length', 324 318 'fadvise_advice': 'advice' 325 319 }), 326 320 # CFA_HAVE_IORING_OP_OPENAT 327 Call('OPENAT', 'int openat(int dirfd, const char * pathname, int flags, mode_t mode)', {321 Call('OPENAT', 'int openat(int dirfd, const char * pathname, int flags, mode_t mode)', { 328 322 'fd': 'dirfd', 329 'addr': '( uintptr_t)pathname',330 ' len': 'mode',331 ' open_flags': 'flags;'323 'addr': '(typeof(sqe->addr))pathname', 324 'open_flags': 'flags;', 325 'len': 'mode' 332 326 }), 333 327 # CFA_HAVE_IORING_OP_OPENAT2 334 Call('OPENAT2', 'int openat2(int dirfd, const char * pathname, struct open_how * how, size_t size)', {328 Call('OPENAT2', 'int openat2(int dirfd, const char * pathname, struct open_how * how, size_t size)', { 335 329 'fd': 'dirfd', 336 'addr': ' pathname',337 ' len': 'sizeof(*how)',338 ' off': '(uintptr_t)how',330 'addr': '(typeof(sqe->addr))pathname', 331 'off': '(typeof(sqe->off))how', 332 'len': 'sizeof(*how)' 339 333 }, define = 'CFA_HAVE_OPENAT2'), 340 334 # CFA_HAVE_IORING_OP_CLOSE … … 343 337 }), 344 338 # CFA_HAVE_IORING_OP_STATX 345 Call('STATX', 'int statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx *statxbuf)', {339 Call('STATX', 'int statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf)', { 346 340 'fd': 'dirfd', 347 ' off': '(uintptr_t)statxbuf',348 ' addr': 'pathname',341 'addr': '(typeof(sqe->addr))pathname', 342 'statx_flags': 'flags', 349 343 'len': 'mask', 350 ' statx_flags': 'flags'344 'off': '(typeof(sqe->off))statxbuf' 351 345 }, define = 'CFA_HAVE_STATX'), 352 346 # CFA_HAVE_IORING_OP_READ 353 347 Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', { 354 348 'fd': 'fd', 355 'addr': '( uintptr_t)buf',349 'addr': '(typeof(sqe->addr))buf', 356 350 'len': 'count' 357 351 }), … … 359 353 Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', { 360 354 'fd': 'fd', 361 'addr': '( uintptr_t)buf',355 'addr': '(typeof(sqe->addr))buf', 362 356 'len': 'count' 363 357 }), 364 358 # CFA_HAVE_IORING_OP_SPLICE 365 Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', {359 Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags)', { 366 360 'splice_fd_in': 'fd_in', 367 'splice_off_in': 'off_in ? ( __u64)*off_in : (__u64)-1',361 'splice_off_in': 'off_in ? (typeof(sqe->splice_off_in))*off_in : (typeof(sqe->splice_off_in))-1', 368 362 'fd': 'fd_out', 369 'off': 'off_out ? ( __u64)*off_out : (__u64)-1',363 'off': 'off_out ? (typeof(sqe->off))*off_out : (typeof(sqe->off))-1', 370 364 'len': 'len', 371 365 'splice_flags': 'flags' -
libcfa/src/concurrency/io/setup.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #if defined(__CFA_DEBUG__) … … 216 215 217 216 // completion queue 218 cq. lock= false;217 cq.try_lock = false; 219 218 cq.id = MAX; 220 219 cq.ts = rdtscl(); -
libcfa/src/concurrency/io/types.hfa
r34b4268 r24d6572 37 37 //----------------------------------------------------------------------- 38 38 // Ring Data structure 39 struct __sub_ring_t { 39 // represent the io_uring submission ring which contains operations that will be sent to io_uring for processing 40 struct __sub_ring_t { 41 // lock needed because remote processors might need to flush the instance 42 __spinlock_t lock; 43 40 44 struct { 41 45 // Head and tail of the ring (associated with array) … … 58 62 59 63 // number of sqes to submit on next system call. 60 __u32 to_submit;64 volatile __u32 to_submit; 61 65 62 66 // number of entries and mask to go with it … … 77 81 void * ring_ptr; 78 82 size_t ring_sz; 79 }; 80 83 84 // for debug purposes, whether or not the last flush was due to a arbiter flush 85 bool last_external; 86 }; 87 88 // represent the io_uring completion ring which contains operations that have completed 81 89 struct __cmp_ring_t { 82 volatile bool lock; 83 90 // needed because remote processors can help drain the buffer 91 volatile bool try_lock; 92 93 // id of the ring, used for the helping/topology algorithms 84 94 unsigned id; 85 95 96 // timestamp from last time it was drained 86 97 unsigned long long ts; 87 98 … … 105 116 }; 106 117 118 // struct representing an io operation that still needs processing 119 // actual operations are expected to inherit from this 107 120 struct __outstanding_io { 121 // intrusive link fields 108 122 inline Colable; 123 124 // primitive on which to block until the io is processed 109 125 oneshot waitctx; 110 126 }; 111 127 static inline __outstanding_io *& Next( __outstanding_io * n ) { return (__outstanding_io *)Next( (Colable *)n ); } 112 128 129 // queue of operations that are outstanding 113 130 struct __outstanding_io_queue { 131 // spinlock for protection 132 // TODO: changing to a lock that blocks, I haven't examined whether it should be a kernel or user lock 114 133 __spinlock_t lock; 134 135 // the actual queue 115 136 Queue(__outstanding_io) queue; 137 138 // volatile used to avoid the need for taking the lock if it's empty 116 139 volatile bool empty; 117 140 }; 118 141 142 // struct representing an operation that was submitted 119 143 struct __external_io { 144 // inherits from outstanding io 120 145 inline __outstanding_io; 146 147 // pointer and count to an array of ids to be submitted 121 148 __u32 * idxs; 122 149 __u32 have; 150 151 // whether or not these can be accumulated before flushing the buffer 123 152 bool lazy; 124 153 }; 125 154 126 155 // complete io_context, contains all the data for io submission and completion 127 156 struct __attribute__((aligned(64))) io_context$ { 157 // arbiter, used in cases where threads for migrated at unfortunate moments 128 158 io_arbiter$ * arbiter; 159 160 // which prcessor the context is tied to 129 161 struct processor * proc; 130 162 163 // queue of io submissions that haven't beeen processed. 131 164 __outstanding_io_queue ext_sq; 132 165 166 // io_uring ring data structures 133 167 struct __sub_ring_t sq; 134 168 struct __cmp_ring_t cq; 169 170 // flag the io_uring rings where created with 135 171 __u32 ring_flags; 172 173 // file descriptor that identifies the io_uring instance 136 174 int fd; 137 175 }; 138 176 177 // short hand to check when the io_context was last processed (io drained) 139 178 static inline unsigned long long ts(io_context$ *& this) { 140 179 const __u32 head = *this->cq.head; 141 180 const __u32 tail = *this->cq.tail; 142 181 182 // if there is no pending completions, just pretend it's infinetely recent 143 183 if(head == tail) return ULLONG_MAX; 144 184 … … 146 186 } 147 187 188 // structure represeting allocations that couldn't succeed locally 148 189 struct __pending_alloc { 190 // inherit from outstanding io 149 191 inline __outstanding_io; 192 193 // array and size of the desired allocation 150 194 __u32 * idxs; 151 195 __u32 want; 196 197 // output param, the context the io was allocated from 152 198 io_context$ * ctx; 153 199 }; 154 200 201 // arbiter that handles cases where the context tied to the local processor is unable to satisfy the io 155 202 monitor __attribute__((aligned(64))) io_arbiter$ { 203 // contains a queue of io for pending allocations 156 204 __outstanding_io_queue pending; 157 205 }; -
libcfa/src/concurrency/iofwd.hfa
r34b4268 r24d6572 9 9 // Author : Thierry Delisle 10 10 // Created On : Thu Apr 23 17:31:00 2020 11 // Last Modified By : 12 // Last Modified On : 13 // Update Count : 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Mon Mar 13 23:54:57 2023 13 // Update Count : 1 14 14 // 15 15 … … 17 17 18 18 #include <unistd.h> 19 #include <sys/socket.h> 20 19 21 extern "C" { 20 22 #include <asm/types.h> … … 48 50 typedef __off64_t off64_t; 49 51 50 struct cluster;51 struct io_context$;52 53 struct iovec;54 struct msghdr;55 struct sockaddr;56 struct statx;57 52 struct epoll_event; 58 59 struct io_uring_sqe;60 53 61 54 //----------------------------------------------------------------------- … … 88 81 // synchronous calls 89 82 #if defined(CFA_HAVE_PREADV2) 90 extern ssize_t cfa_preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);83 extern ssize_t cfa_preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 91 84 #endif 92 85 #if defined(CFA_HAVE_PWRITEV2) 93 extern ssize_t cfa_pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);86 extern ssize_t cfa_pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 94 87 #endif 95 88 extern int cfa_fsync(int fd, __u64 submit_flags); 96 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event * event, __u64 submit_flags);89 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event * event, __u64 submit_flags); 97 90 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags); 98 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr * msg, int flags, __u64 submit_flags);99 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr * msg, int flags, __u64 submit_flags);100 extern ssize_t cfa_send(int sockfd, const void * buf, size_t len, int flags, __u64 submit_flags);101 extern ssize_t cfa_recv(int sockfd, void * buf, size_t len, int flags, __u64 submit_flags);102 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);103 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);91 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr * msg, int flags, __u64 submit_flags); 92 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr * msg, int flags, __u64 submit_flags); 93 extern ssize_t cfa_send(int sockfd, const void * buf, size_t len, int flags, __u64 submit_flags); 94 extern ssize_t cfa_recv(int sockfd, void * buf, size_t len, int flags, __u64 submit_flags); 95 extern int cfa_accept4(int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags, __u64 submit_flags); 96 extern int cfa_connect(int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen, __u64 submit_flags); 104 97 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags); 105 98 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags); 106 extern int cfa_madvise(void * addr, size_t length, int advice, __u64 submit_flags);107 extern int cfa_openat(int dirfd, const char * pathname, int flags, mode_t mode, __u64 submit_flags);99 extern int cfa_madvise(void * addr, size_t length, int advice, __u64 submit_flags); 100 extern int cfa_openat(int dirfd, const char * pathname, int flags, mode_t mode, __u64 submit_flags); 108 101 #if defined(CFA_HAVE_OPENAT2) 109 extern int cfa_openat2(int dirfd, const char * pathname, struct open_how * how, size_t size, __u64 submit_flags);102 extern int cfa_openat2(int dirfd, const char * pathname, struct open_how * how, size_t size, __u64 submit_flags); 110 103 #endif 111 104 extern int cfa_close(int fd, __u64 submit_flags); 112 105 #if defined(CFA_HAVE_STATX) 113 extern int cfa_statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);106 extern int cfa_statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf, __u64 submit_flags); 114 107 #endif 115 108 extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags); 116 109 extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags); 117 extern ssize_t cfa_splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);110 extern ssize_t cfa_splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags, __u64 submit_flags); 118 111 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags); 119 112 … … 121 114 // asynchronous calls 122 115 #if defined(CFA_HAVE_PREADV2) 123 extern void async_preadv2(io_future_t & future, int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);116 extern void async_preadv2(io_future_t & future, int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 124 117 #endif 125 118 #if defined(CFA_HAVE_PWRITEV2) 126 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);119 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 127 120 #endif 128 121 extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags); 129 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event * event, __u64 submit_flags);122 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event * event, __u64 submit_flags); 130 123 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags); 131 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr * msg, int flags, __u64 submit_flags);132 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr * msg, int flags, __u64 submit_flags);133 extern void async_send(io_future_t & future, int sockfd, const void * buf, size_t len, int flags, __u64 submit_flags);134 extern void async_recv(io_future_t & future, int sockfd, void * buf, size_t len, int flags, __u64 submit_flags);135 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);136 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);124 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr * msg, int flags, __u64 submit_flags); 125 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr * msg, int flags, __u64 submit_flags); 126 extern void async_send(io_future_t & future, int sockfd, const void * buf, size_t len, int flags, __u64 submit_flags); 127 extern void async_recv(io_future_t & future, int sockfd, void * buf, size_t len, int flags, __u64 submit_flags); 128 extern void async_accept4(io_future_t & future, int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags, __u64 submit_flags); 129 extern void async_connect(io_future_t & future, int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen, __u64 submit_flags); 137 130 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags); 138 131 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags); 139 extern void async_madvise(io_future_t & future, void * addr, size_t length, int advice, __u64 submit_flags);140 extern void async_openat(io_future_t & future, int dirfd, const char * pathname, int flags, mode_t mode, __u64 submit_flags);132 extern void async_madvise(io_future_t & future, void * addr, size_t length, int advice, __u64 submit_flags); 133 extern void async_openat(io_future_t & future, int dirfd, const char * pathname, int flags, mode_t mode, __u64 submit_flags); 141 134 #if defined(CFA_HAVE_OPENAT2) 142 extern void async_openat2(io_future_t & future, int dirfd, const char * pathname, struct open_how * how, size_t size, __u64 submit_flags);135 extern void async_openat2(io_future_t & future, int dirfd, const char * pathname, struct open_how * how, size_t size, __u64 submit_flags); 143 136 #endif 144 137 extern void async_close(io_future_t & future, int fd, __u64 submit_flags); 145 138 #if defined(CFA_HAVE_STATX) 146 extern void async_statx(io_future_t & future, int dirfd, const char * pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);139 extern void async_statx(io_future_t & future, int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf, __u64 submit_flags); 147 140 #endif 148 141 void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags); 149 142 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags); 150 extern void async_splice(io_future_t & future, int fd_in, __off64_t * off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);143 extern void async_splice(io_future_t & future, int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags, __u64 submit_flags); 151 144 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags); 152 145 -
libcfa/src/concurrency/kernel.cfa
r34b4268 r24d6572 10 10 // Created On : Tue Jan 17 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Wed Nov 30 18:14:08 202213 // Update Count : 7 612 // Last Modified On : Mon Jan 9 08:42:05 2023 13 // Update Count : 77 14 14 // 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 // #define __CFA_DEBUG_PRINT_RUNTIME_CORE__ … … 258 257 __cfadbg_print_safe(runtime_core, "Kernel : core %p stopping\n", this); 259 258 } 259 260 __cfa_io_flush( this ); 261 __cfa_io_drain( this ); 260 262 261 263 post( this->terminated ); -
libcfa/src/concurrency/kernel/cluster.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #include "bits/defs.hfa" … … 69 68 return max_cores_l; 70 69 } 71 72 #if defined(CFA_HAVE_LINUX_LIBRSEQ)73 // No forward declaration needed74 #define __kernel_rseq_register rseq_register_current_thread75 #define __kernel_rseq_unregister rseq_unregister_current_thread76 #elif defined(CFA_HAVE_LINUX_RSEQ_H)77 static void __kernel_raw_rseq_register (void);78 static void __kernel_raw_rseq_unregister(void);79 80 #define __kernel_rseq_register __kernel_raw_rseq_register81 #define __kernel_rseq_unregister __kernel_raw_rseq_unregister82 #else83 // No forward declaration needed84 // No initialization needed85 static inline void noop(void) {}86 87 #define __kernel_rseq_register noop88 #define __kernel_rseq_unregister noop89 #endif90 70 91 71 //======================================================================= … … 111 91 // Lock-Free registering/unregistering of threads 112 92 unsigned register_proc_id( void ) with(__scheduler_lock.lock) { 113 __kernel_rseq_register();114 115 93 bool * handle = (bool *)&kernelTLS().sched_lock; 116 94 … … 162 140 163 141 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE); 164 165 __kernel_rseq_unregister();166 142 } 167 143 … … 505 481 /* paranoid */ verify( mock_head(this) == this.l.prev ); 506 482 } 507 508 #if defined(CFA_HAVE_LINUX_LIBRSEQ)509 // No definition needed510 #elif defined(CFA_HAVE_LINUX_RSEQ_H)511 512 #if defined( __x86_64 ) || defined( __i386 )513 #define RSEQ_SIG 0x53053053514 #elif defined( __ARM_ARCH )515 #ifdef __ARMEB__516 #define RSEQ_SIG 0xf3def5e7 /* udf #24035 ; 0x5de3 (ARMv6+) */517 #else518 #define RSEQ_SIG 0xe7f5def3 /* udf #24035 ; 0x5de3 */519 #endif520 #endif521 522 extern void __disable_interrupts_hard();523 extern void __enable_interrupts_hard();524 525 static void __kernel_raw_rseq_register (void) {526 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );527 528 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);529 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);530 if(ret != 0) {531 int e = errno;532 switch(e) {533 case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");534 case ENOSYS: abort("KERNEL ERROR: rseq register no supported");535 case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");536 case EBUSY : abort("KERNEL ERROR: rseq register already registered");537 case EPERM : abort("KERNEL ERROR: rseq register sig argument on unregistration does not match the signature received on registration");538 default: abort("KERNEL ERROR: rseq register unexpected return %d", e);539 }540 }541 }542 543 static void __kernel_raw_rseq_unregister(void) {544 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );545 546 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);547 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);548 if(ret != 0) {549 int e = errno;550 switch(e) {551 case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");552 case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");553 case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");554 case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");555 case EPERM : abort("KERNEL ERROR: rseq unregister sig argument on unregistration does not match the signature received on registration");556 default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);557 }558 }559 }560 #else561 // No definition needed562 #endif -
libcfa/src/concurrency/kernel/cluster.hfa
r34b4268 r24d6572 40 40 41 41 // convert to log2 scale but using double 42 static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2( intsc); }42 static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2((__readyQ_avg_t)intsc); } 43 43 44 44 #define warn_large_before warnf( !strict || old_avg < 35.0, "Suspiciously large previous average: %'lf, %'" PRId64 "ms \n", old_avg, program()`ms ) … … 146 146 } 147 147 148 static struct {149 constunsigned readyq;150 constunsigned io;148 const static struct { 149 unsigned readyq; 150 unsigned io; 151 151 } __shard_factor = { 2, 1 }; 152 152 -
libcfa/src/concurrency/kernel/private.hfa
r34b4268 r24d6572 10 10 // Created On : Mon Feb 13 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Wed Aug 12 08:21:33 202013 // Update Count : 912 // Last Modified On : Thu Mar 2 16:04:46 2023 13 // Update Count : 11 14 14 // 15 15 … … 29 29 30 30 extern "C" { 31 #if defined(CFA_HAVE_LINUX_LIBRSEQ)32 #include <rseq/rseq.h>33 #elif defined(CFA_HAVE_LINUX_RSEQ_H)34 #include <linux/rseq.h>35 #else36 #ifndef _GNU_SOURCE37 #error kernel/private requires gnu_source38 #endif39 31 #include <sched.h> 40 #endif41 32 } 42 33 … … 110 101 // Hardware 111 102 112 #if defined(CFA_HAVE_LINUX_LIBRSEQ)113 // No data needed114 #elif defined(CFA_HAVE_LINUX_RSEQ_H)115 extern "Cforall" {116 extern __attribute__((aligned(64))) __thread volatile struct rseq __cfaabi_rseq;117 }118 #else119 // No data needed120 #endif121 122 103 static inline int __kernel_getcpu() { 123 104 /* paranoid */ verify( ! __preemption_enabled() ); 124 #if defined(CFA_HAVE_LINUX_LIBRSEQ)125 return rseq_current_cpu();126 #elif defined(CFA_HAVE_LINUX_RSEQ_H)127 int r = __cfaabi_rseq.cpu_id;128 /* paranoid */ verify( r >= 0 );129 return r;130 #else131 105 return sched_getcpu(); 132 #endif133 106 } 134 107 -
libcfa/src/concurrency/kernel/startup.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 // #define __CFA_DEBUG_PRINT_RUNTIME_CORE__ 20 19 21 20 // C Includes 22 #include <errno.h> // errno21 #include <errno.h> // errno 23 22 #include <signal.h> 24 #include <string.h> // strerror25 #include <unistd.h> // sysconf26 23 #include <string.h> // strerror 24 #include <unistd.h> 25 #include <limits.h> // PTHREAD_STACK_MIN 27 26 extern "C" { 28 #include <limits.h> // PTHREAD_STACK_MIN 29 #include <unistd.h> // syscall 30 #include <sys/eventfd.h> // eventfd 31 #include <sys/mman.h> // mprotect 32 #include <sys/resource.h> // getrlimit 27 #include <sys/eventfd.h> // eventfd 28 #include <sys/mman.h> // mprotect 29 #include <sys/resource.h> // getrlimit 33 30 } 34 31 … … 36 33 #include "kernel/private.hfa" 37 34 #include "iofwd.hfa" 38 #include "startup.hfa" // STARTUP_PRIORITY_XXX35 #include "startup.hfa" // STARTUP_PRIORITY_XXX 39 36 #include "limits.hfa" 40 37 #include "math.hfa" … … 150 147 __scheduler_RWLock_t __scheduler_lock @= { 0 }; 151 148 152 #if defined(CFA_HAVE_LINUX_LIBRSEQ)153 // No data needed154 #elif defined(CFA_HAVE_LINUX_RSEQ_H)155 extern "Cforall" {156 __attribute__((aligned(64))) __thread volatile struct rseq __cfaabi_rseq @= {157 .cpu_id : RSEQ_CPU_ID_UNINITIALIZED,158 };159 }160 #else161 // No data needed162 #endif163 164 149 //----------------------------------------------------------------------------- 165 150 // Struct to steal stack -
libcfa/src/concurrency/locks.cfa
r34b4268 r24d6572 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // locks. hfa -- LIBCFATHREAD7 // locks.cfa -- LIBCFATHREAD 8 8 // Runtime locks that used with the runtime thread system. 9 9 // … … 16 16 17 17 #define __cforall_thread__ 18 #define _GNU_SOURCE19 18 20 19 #include "locks.hfa" … … 80 79 // lock is held by some other thread 81 80 if ( owner != 0p && owner != thrd ) { 82 insert_last( blocked_threads, *thrd ); 81 select_node node; 82 insert_last( blocked_threads, node ); 83 83 wait_count++; 84 84 unlock( lock ); 85 85 park( ); 86 } 87 // multi acquisition lock is held by current thread 88 else if ( owner == thrd && multi_acquisition ) { 86 return; 87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 89 88 recursion_count++; 90 unlock( lock ); 91 } 92 // lock isn't held 93 else { 89 } else { // lock isn't held 94 90 owner = thrd; 95 91 recursion_count = 1; 96 unlock( lock );97 } 92 } 93 unlock( lock ); 98 94 } 99 95 … … 118 114 } 119 115 120 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) { 121 thread$ * t = &try_pop_front( blocked_threads ); 122 owner = t; 123 recursion_count = ( t ? 1 : 0 ); 124 if ( t ) wait_count--; 125 unpark( t ); 116 static inline void pop_node( blocking_lock & this ) with( this ) { 117 __handle_waituntil_OR( blocked_threads ); 118 select_node * node = &try_pop_front( blocked_threads ); 119 if ( node ) { 120 wait_count--; 121 owner = node->blocked_thread; 122 recursion_count = 1; 123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 124 wake_one( blocked_threads, *node ); 125 } else { 126 owner = 0p; 127 recursion_count = 0; 128 } 126 129 } 127 130 … … 135 138 recursion_count--; 136 139 if ( recursion_count == 0 ) { 137 pop_ and_set_new_owner( this );140 pop_node( this ); 138 141 } 139 142 unlock( lock ); … … 148 151 // lock held 149 152 if ( owner != 0p ) { 150 insert_last( blocked_threads, * t);153 insert_last( blocked_threads, *(select_node *)t->link_node ); 151 154 wait_count++; 152 unlock( lock );153 155 } 154 156 // lock not held … … 157 159 recursion_count = 1; 158 160 unpark( t ); 159 unlock( lock );160 } 161 } 162 163 size_t on_wait( blocking_lock & this ) with( this ) {161 } 162 unlock( lock ); 163 } 164 165 size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) { 164 166 lock( lock __cfaabi_dbg_ctx2 ); 165 167 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 168 170 size_t ret = recursion_count; 169 171 170 pop_and_set_new_owner( this ); 172 pop_node( this ); 173 174 select_node node; 175 active_thread()->link_node = (void *)&node; 171 176 unlock( lock ); 177 178 pre_park_then_park( pp_fn, pp_datum ); 179 172 180 return ret; 173 181 } … … 176 184 recursion_count = recursion; 177 185 } 186 187 // waituntil() support 188 bool register_select( blocking_lock & this, select_node & node ) with(this) { 189 lock( lock __cfaabi_dbg_ctx2 ); 190 thread$ * thrd = active_thread(); 191 192 // single acquisition lock is held by current thread 193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this ); 194 195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case 196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 197 unlock( lock ); 198 return false; 199 } 200 } 201 202 // lock is held by some other thread 203 if ( owner != 0p && owner != thrd ) { 204 insert_last( blocked_threads, node ); 205 wait_count++; 206 unlock( lock ); 207 return false; 208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread 209 recursion_count++; 210 } else { // lock isn't held 211 owner = thrd; 212 recursion_count = 1; 213 } 214 215 if ( node.park_counter ) __make_select_node_available( node ); 216 unlock( lock ); 217 return true; 218 } 219 220 bool unregister_select( blocking_lock & this, select_node & node ) with(this) { 221 lock( lock __cfaabi_dbg_ctx2 ); 222 if ( node`isListed ) { 223 remove( node ); 224 wait_count--; 225 unlock( lock ); 226 return false; 227 } 228 229 if ( owner == active_thread() ) { 230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count ); 231 // if recursion count is zero release lock and set new owner if one is waiting 232 recursion_count--; 233 if ( recursion_count == 0 ) { 234 pop_node( this ); 235 } 236 } 237 unlock( lock ); 238 return false; 239 } 240 241 void on_selected( blocking_lock & this, select_node & node ) {} 178 242 179 243 //----------------------------------------------------------------------------- … … 312 376 int counter( condition_variable(L) & this ) with(this) { return count; } 313 377 314 static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {378 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) { 315 379 // add info_thread to waiting queue 316 380 insert_last( blocked_threads, *i ); 317 381 count++; 318 size_t recursion_count = 0; 319 if (i->lock) { 320 // if lock was passed get recursion count to reset to after waking thread 321 recursion_count = on_wait( *i->lock ); 322 } 323 return recursion_count; 324 } 382 } 383 384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) { 385 size_t recursion_count = 0; 386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread 387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks 388 else 389 pre_park_then_park( pp_fn, pp_datum ); 390 return recursion_count; 391 } 392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); } 325 393 326 394 // helper for wait()'s' with no timeout 327 395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) { 328 396 lock( lock __cfaabi_dbg_ctx2 ); 329 size_t recursion_count = queue_and_get_recursion(this, &i);397 enqueue_thread( this, &i ); 330 398 unlock( lock ); 331 399 332 400 // blocks here 333 park();401 size_t recursion_count = block_and_get_recursion( i ); 334 402 335 403 // resets recursion count here after waking 336 if ( i.lock) on_wakeup(*i.lock, recursion_count);404 if ( i.lock ) on_wakeup( *i.lock, recursion_count ); 337 405 } 338 406 … … 341 409 queue_info_thread( this, i ); 342 410 411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); } 412 343 413 // helper for wait()'s' with a timeout 344 414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 345 415 lock( lock __cfaabi_dbg_ctx2 ); 346 size_t recursion_count = queue_and_get_recursion(this, &info);416 enqueue_thread( this, &info ); 347 417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 348 418 unlock( lock ); 349 419 350 // registers alarm outside cond lock to avoid deadlock 351 register_self( &node_wrap.alarm_node ); 352 353 // blocks here 354 park(); 420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 422 // park(); 355 423 356 424 // unregisters alarm so it doesn't go off if this happens first … … 358 426 359 427 // resets recursion count here after waking 360 if ( info.lock) on_wakeup(*info.lock, recursion_count);428 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 361 429 } 362 430 … … 418 486 info_thread( L ) i = { active_thread(), info, &l }; 419 487 insert_last( blocked_threads, i ); 420 size_t recursion_count = on_wait( *i.lock );421 park( );488 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here 489 // park( ); 422 490 on_wakeup(*i.lock, recursion_count); 423 491 } … … 460 528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 461 529 462 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {463 // add info_thread to waiting queue464 insert_last( blocked_threads, *i );465 size_t recursion_count = 0;466 recursion_count = on_wait( *i->lock );467 return recursion_count;468 }469 470 530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 471 531 lock( lock __cfaabi_dbg_ctx2 ); 472 size_t recursion_count = queue_and_get_recursion(this, &info);532 insert_last( blocked_threads, info ); 473 533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 474 534 unlock( lock ); 475 535 476 // registers alarm outside cond lock to avoid deadlock 477 register_self( &node_wrap.alarm_node ); 478 479 // blocks here 480 park(); 481 482 // unregisters alarm so it doesn't go off if this happens first 536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock 537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) ); 538 539 // unregisters alarm so it doesn't go off if signal happens first 483 540 unregister_self( &node_wrap.alarm_node ); 484 541 485 542 // resets recursion count here after waking 486 if ( info.lock) on_wakeup(*info.lock, recursion_count);543 if ( info.lock ) on_wakeup( *info.lock, recursion_count ); 487 544 } 488 545 … … 494 551 lock( lock __cfaabi_dbg_ctx2 ); 495 552 info_thread( L ) i = { active_thread(), info, &l }; 496 size_t recursion_count = queue_and_get_recursion(this, &i); 497 unlock( lock ); 498 park( ); 499 on_wakeup(*i.lock, recursion_count); 553 insert_last( blocked_threads, i ); 554 unlock( lock ); 555 556 // blocks here 557 size_t recursion_count = block_and_get_recursion( i ); 558 559 on_wakeup( *i.lock, recursion_count ); 500 560 } 501 561 … … 585 645 return thrd != 0p; 586 646 } 647 -
libcfa/src/concurrency/locks.hfa
r34b4268 r24d6572 30 30 #include "time.hfa" 31 31 32 #include "select.hfa" 33 32 34 #include <fstream.hfa> 33 34 35 35 36 // futex headers … … 38 39 #include <unistd.h> 39 40 40 // undef to make a number of the locks not reacquire upon waking from a condlock 41 #define REACQ 1 41 typedef void (*__cfa_pre_park)( void * ); 42 43 static inline void pre_park_noop( void * ) {} 44 45 //----------------------------------------------------------------------------- 46 // is_blocking_lock 47 forall( L & | sized(L) ) 48 trait is_blocking_lock { 49 // For synchronization locks to use when acquiring 50 void on_notify( L &, struct thread$ * ); 51 52 // For synchronization locks to use when releasing 53 size_t on_wait( L &, __cfa_pre_park pp_fn, void * pp_datum ); 54 55 // to set recursion count after getting signalled; 56 void on_wakeup( L &, size_t recursion ); 57 }; 58 59 static inline void pre_park_then_park( __cfa_pre_park pp_fn, void * pp_datum ) { 60 pp_fn( pp_datum ); 61 park(); 62 } 63 64 // macros for default routine impls for is_blocking_lock trait that do not wait-morph 65 66 #define DEFAULT_ON_NOTIFY( lock_type ) \ 67 static inline void on_notify( lock_type & this, thread$ * t ){ unpark(t); } 68 69 #define DEFAULT_ON_WAIT( lock_type ) \ 70 static inline size_t on_wait( lock_type & this, __cfa_pre_park pp_fn, void * pp_datum ) { \ 71 unlock( this ); \ 72 pre_park_then_park( pp_fn, pp_datum ); \ 73 return 0; \ 74 } 75 76 // on_wakeup impl if lock should be reacquired after waking up 77 #define DEFAULT_ON_WAKEUP_REACQ( lock_type ) \ 78 static inline void on_wakeup( lock_type & this, size_t recursion ) { lock( this ); } 79 80 // on_wakeup impl if lock will not be reacquired after waking up 81 #define DEFAULT_ON_WAKEUP_NO_REACQ( lock_type ) \ 82 static inline void on_wakeup( lock_type & this, size_t recursion ) {} 83 84 42 85 43 86 //----------------------------------------------------------------------------- … … 66 109 static inline bool try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); } 67 110 static inline void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 68 static inline size_t on_wait ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this); }111 static inline size_t on_wait ( single_acquisition_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 69 112 static inline void on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 70 113 static inline void on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 114 static inline bool register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 115 static inline bool unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 116 static inline void on_selected( single_acquisition_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); } 71 117 72 118 //---------- … … 80 126 static inline bool try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); } 81 127 static inline void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); } 82 static inline size_t on_wait ( owner_lock & this ) { return on_wait ( (blocking_lock &)this); }128 static inline size_t on_wait ( owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); } 83 129 static inline void on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); } 84 130 static inline void on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); } 131 static inline bool register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 132 static inline bool unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 133 static inline void on_selected( owner_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); } 85 134 86 135 //----------------------------------------------------------------------------- … … 127 176 static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; } 128 177 129 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) {130 return node->next;131 }132 133 178 struct mcs_spin_lock { 134 179 mcs_spin_queue queue; … … 136 181 137 182 static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) { 183 n.locked = true; 138 184 mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST); 139 n.locked = true; 140 if(prev == 0p) return; 185 if( prev == 0p ) return; 141 186 prev->next = &n; 142 while( __atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause();187 while( __atomic_load_n(&n.locked, __ATOMIC_RELAXED) ) Pause(); 143 188 } 144 189 … … 146 191 mcs_spin_node * n_ptr = &n; 147 192 if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return; 148 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {}193 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) Pause(); 149 194 n.next->locked = false; 150 195 } … … 153 198 // futex_mutex 154 199 155 // - No cond var support156 200 // - Kernel thd blocking alternative to the spinlock 157 201 // - No ownership (will deadlock on reacq) 202 // - no reacq on wakeup 158 203 struct futex_mutex { 159 204 // lock state any state other than UNLOCKED is locked … … 169 214 } 170 215 171 static inline void 172 173 static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) {216 static inline void ?{}( futex_mutex & this ) with(this) { val = 0; } 217 218 static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) { 174 219 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 175 220 } 176 221 177 static inline int internal_exchange( futex_mutex & this) with(this) {222 static inline int internal_exchange( futex_mutex & this ) with(this) { 178 223 return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE); 179 224 } 180 225 181 226 // if this is called recursively IT WILL DEADLOCK!!!!! 182 static inline void lock( futex_mutex & this) with(this) {227 static inline void lock( futex_mutex & this ) with(this) { 183 228 int state; 184 229 185 186 // // linear backoff omitted for now 187 // for( int spin = 4; spin < 1024; spin += spin) { 188 // state = 0; 189 // // if unlocked, lock and return 190 // if (internal_try_lock(this, state)) return; 191 // if (2 == state) break; 192 // for (int i = 0; i < spin; i++) Pause(); 193 // } 194 195 // no contention try to acquire 196 if (internal_try_lock(this, state)) return; 230 for( int spin = 4; spin < 1024; spin += spin) { 231 state = 0; 232 // if unlocked, lock and return 233 if (internal_try_lock(this, state)) return; 234 if (2 == state) break; 235 for (int i = 0; i < spin; i++) Pause(); 236 } 197 237 198 238 // if not in contended state, set to be in contended state … … 207 247 208 248 static inline void unlock(futex_mutex & this) with(this) { 209 // if uncontended do atomic eunlock and then return210 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel 249 // if uncontended do atomic unlock and then return 250 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return; 211 251 212 252 // otherwise threads are blocked so we must wake one 213 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);214 253 futex((int *)&val, FUTEX_WAKE, 1); 215 254 } 216 255 217 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); } 218 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;} 219 220 // to set recursion count after getting signalled; 221 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 222 223 //----------------------------------------------------------------------------- 224 // CLH Spinlock 225 // - No recursive acquisition 226 // - Needs to be released by owner 227 228 struct clh_lock { 229 volatile bool * volatile tail; 230 }; 231 232 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; } 233 static inline void ^?{}( clh_lock & this ) { free(this.tail); } 234 235 static inline void lock(clh_lock & l) { 236 thread$ * curr_thd = active_thread(); 237 *(curr_thd->clh_node) = false; 238 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST); 239 while(!__atomic_load_n(prev, __ATOMIC_ACQUIRE)) Pause(); 240 curr_thd->clh_prev = prev; 241 } 242 243 static inline void unlock(clh_lock & l) { 244 thread$ * curr_thd = active_thread(); 245 __atomic_store_n(curr_thd->clh_node, true, __ATOMIC_RELEASE); 246 curr_thd->clh_node = curr_thd->clh_prev; 247 } 248 249 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); } 250 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; } 251 static inline void on_wakeup(clh_lock & this, size_t recursion ) { 252 #ifdef REACQ 253 lock(this); 254 #endif 255 } 256 257 258 //----------------------------------------------------------------------------- 259 // Linear backoff Spinlock 260 struct linear_backoff_then_block_lock { 256 DEFAULT_ON_NOTIFY( futex_mutex ) 257 DEFAULT_ON_WAIT( futex_mutex ) 258 DEFAULT_ON_WAKEUP_NO_REACQ( futex_mutex ) 259 260 //----------------------------------------------------------------------------- 261 // go_mutex 262 263 // - Kernel thd blocking alternative to the spinlock 264 // - No ownership (will deadlock on reacq) 265 // - Golang's flavour of mutex 266 // - Impl taken from Golang: src/runtime/lock_futex.go 267 struct go_mutex { 268 // lock state any state other than UNLOCKED is locked 269 // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 }; 270 271 // stores a lock state 272 int val; 273 }; 274 static inline void ?{}( go_mutex & this ) with(this) { val = 0; } 275 // static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted 276 // static inline void ?=?( go_mutex & this, go_mutex this2 ) = void; 277 278 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) { 279 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 280 } 281 282 static inline int internal_exchange(go_mutex & this, int swap ) with(this) { 283 return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE); 284 } 285 286 // if this is called recursively IT WILL DEADLOCK!!!!! 287 static inline void lock( go_mutex & this ) with( this ) { 288 int state, init_state; 289 290 // speculative grab 291 state = internal_exchange(this, 1); 292 if ( !state ) return; // state == 0 293 init_state = state; 294 for (;;) { 295 for( int i = 0; i < 4; i++ ) { 296 while( !val ) { // lock unlocked 297 state = 0; 298 if ( internal_try_lock( this, state, init_state ) ) return; 299 } 300 for (int i = 0; i < 30; i++) Pause(); 301 } 302 303 while( !val ) { // lock unlocked 304 state = 0; 305 if ( internal_try_lock( this, state, init_state ) ) return; 306 } 307 sched_yield(); 308 309 // if not in contended state, set to be in contended state 310 state = internal_exchange( this, 2 ); 311 if ( !state ) return; // state == 0 312 init_state = 2; 313 futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK 314 } 315 } 316 317 static inline void unlock( go_mutex & this ) with(this) { 318 // if uncontended do atomic unlock and then return 319 if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return; 320 321 // otherwise threads are blocked so we must wake one 322 futex( (int *)&val, FUTEX_WAKE, 1 ); 323 } 324 325 DEFAULT_ON_NOTIFY( go_mutex ) 326 DEFAULT_ON_WAIT( go_mutex ) 327 DEFAULT_ON_WAKEUP_NO_REACQ( go_mutex ) 328 329 //----------------------------------------------------------------------------- 330 // Exponential backoff then block lock 331 struct exp_backoff_then_block_lock { 261 332 // Spin lock used for mutual exclusion 262 333 __spinlock_t spinlock; … … 269 340 }; 270 341 271 static inline void ?{}( linear_backoff_then_block_lock & this ) {342 static inline void ?{}( exp_backoff_then_block_lock & this ) { 272 343 this.spinlock{}; 273 344 this.blocked_threads{}; 274 345 this.lock_value = 0; 275 346 } 276 static inline void ^?{}( linear_backoff_then_block_lock & this ) {} 277 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 278 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void; 279 280 static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) { 281 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 282 return true; 283 } 284 return false; 285 } 286 287 static inline bool try_lock(linear_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); } 288 289 static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) { 290 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 291 return true; 292 } 293 return false; 294 } 295 296 static inline bool block(linear_backoff_then_block_lock & this) with(this) { 297 lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC) 298 if (lock_value != 2) { 299 unlock( spinlock ); 300 return true; 301 } 302 insert_last( blocked_threads, *active_thread() ); 303 unlock( spinlock ); 347 static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 348 static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 349 350 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 351 352 static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) { 353 return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); 354 } 355 356 static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); } 357 358 static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) { 359 return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE ); 360 } 361 362 static inline bool block( exp_backoff_then_block_lock & this ) with(this) { 363 lock( spinlock __cfaabi_dbg_ctx2 ); 364 if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) { 365 unlock( spinlock ); 366 return true; 367 } 368 insert_last( blocked_threads, *active_thread() ); 369 unlock( spinlock ); 304 370 park( ); 305 371 return true; 306 372 } 307 373 308 static inline void lock( linear_backoff_then_block_lock & this) with(this) {374 static inline void lock( exp_backoff_then_block_lock & this ) with(this) { 309 375 size_t compare_val = 0; 310 376 int spin = 4; 377 311 378 // linear backoff 312 379 for( ;; ) { … … 324 391 } 325 392 326 static inline void unlock( linear_backoff_then_block_lock & this) with(this) {393 static inline void unlock( exp_backoff_then_block_lock & this ) with(this) { 327 394 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 328 lock( spinlock __cfaabi_dbg_ctx2 ); 329 thread$ * t = &try_pop_front( blocked_threads ); 330 unlock( spinlock ); 331 unpark( t ); 332 } 333 334 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); } 335 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; } 336 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) { 337 #ifdef REACQ 338 lock(this); 339 #endif 340 } 395 lock( spinlock __cfaabi_dbg_ctx2 ); 396 thread$ * t = &try_pop_front( blocked_threads ); 397 unlock( spinlock ); 398 unpark( t ); 399 } 400 401 DEFAULT_ON_NOTIFY( exp_backoff_then_block_lock ) 402 DEFAULT_ON_WAIT( exp_backoff_then_block_lock ) 403 DEFAULT_ON_WAKEUP_REACQ( exp_backoff_then_block_lock ) 341 404 342 405 //----------------------------------------------------------------------------- … … 368 431 369 432 // if this is called recursively IT WILL DEADLOCK!!!!! 370 static inline void lock( fast_block_lock & this) with(this) {433 static inline void lock( fast_block_lock & this ) with(this) { 371 434 lock( lock __cfaabi_dbg_ctx2 ); 372 435 if ( held ) { … … 380 443 } 381 444 382 static inline void unlock( fast_block_lock & this) with(this) {445 static inline void unlock( fast_block_lock & this ) with(this) { 383 446 lock( lock __cfaabi_dbg_ctx2 ); 384 447 /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this ); … … 389 452 } 390 453 391 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) { 392 #ifdef REACQ 393 lock( lock __cfaabi_dbg_ctx2 ); 394 insert_last( blocked_threads, *t ); 395 unlock( lock ); 396 #else 397 unpark(t); 398 #endif 399 } 400 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 401 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } 454 static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) { 455 lock( lock __cfaabi_dbg_ctx2 ); 456 insert_last( blocked_threads, *t ); 457 unlock( lock ); 458 } 459 DEFAULT_ON_WAIT( fast_block_lock ) 460 DEFAULT_ON_WAKEUP_NO_REACQ( fast_block_lock ) 402 461 403 462 //----------------------------------------------------------------------------- … … 410 469 struct simple_owner_lock { 411 470 // List of blocked threads 412 dlist( thread$) blocked_threads;471 dlist( select_node ) blocked_threads; 413 472 414 473 // Spin lock used for mutual exclusion … … 431 490 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void; 432 491 433 static inline void lock( simple_owner_lock & this) with(this) {434 if ( owner == active_thread()) {492 static inline void lock( simple_owner_lock & this ) with(this) { 493 if ( owner == active_thread() ) { 435 494 recursion_count++; 436 495 return; … … 438 497 lock( lock __cfaabi_dbg_ctx2 ); 439 498 440 if (owner != 0p) { 441 insert_last( blocked_threads, *active_thread() ); 499 if ( owner != 0p ) { 500 select_node node; 501 insert_last( blocked_threads, node ); 442 502 unlock( lock ); 443 503 park( ); … … 449 509 } 450 510 451 // TODO: fix duplicate def issue and bring this back 452 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) { 453 // thread$ * t = &try_pop_front( blocked_threads ); 454 // owner = t; 455 // recursion_count = ( t ? 1 : 0 ); 456 // unpark( t ); 457 // } 458 459 static inline void unlock(simple_owner_lock & this) with(this) { 511 static inline void pop_node( simple_owner_lock & this ) with(this) { 512 __handle_waituntil_OR( blocked_threads ); 513 select_node * node = &try_pop_front( blocked_threads ); 514 if ( node ) { 515 owner = node->blocked_thread; 516 recursion_count = 1; 517 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread ); 518 wake_one( blocked_threads, *node ); 519 } else { 520 owner = 0p; 521 recursion_count = 0; 522 } 523 } 524 525 static inline void unlock( simple_owner_lock & this ) with(this) { 460 526 lock( lock __cfaabi_dbg_ctx2 ); 461 527 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 464 530 recursion_count--; 465 531 if ( recursion_count == 0 ) { 466 // pop_and_set_new_owner( this ); 467 thread$ * t = &try_pop_front( blocked_threads ); 468 owner = t; 469 recursion_count = ( t ? 1 : 0 ); 470 unpark( t ); 532 pop_node( this ); 471 533 } 472 534 unlock( lock ); 473 535 } 474 536 475 static inline void on_notify( simple_owner_lock & this, structthread$ * t ) with(this) {537 static inline void on_notify( simple_owner_lock & this, thread$ * t ) with(this) { 476 538 lock( lock __cfaabi_dbg_ctx2 ); 477 539 // lock held 478 540 if ( owner != 0p ) { 479 insert_last( blocked_threads, * t);541 insert_last( blocked_threads, *(select_node *)t->link_node ); 480 542 } 481 543 // lock not held … … 488 550 } 489 551 490 static inline size_t on_wait( simple_owner_lock & this) with(this) {552 static inline size_t on_wait( simple_owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with(this) { 491 553 lock( lock __cfaabi_dbg_ctx2 ); 492 554 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); … … 495 557 size_t ret = recursion_count; 496 558 497 // pop_and_set_new_owner( this ); 498 499 thread$ * t = &try_pop_front( blocked_threads ); 500 owner = t; 501 recursion_count = ( t ? 1 : 0 ); 502 unpark( t ); 503 559 pop_node( this ); 560 561 select_node node; 562 active_thread()->link_node = (void *)&node; 504 563 unlock( lock ); 564 565 pre_park_then_park( pp_fn, pp_datum ); 566 505 567 return ret; 506 568 } 507 569 508 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 570 static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 571 572 // waituntil() support 573 static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) { 574 lock( lock __cfaabi_dbg_ctx2 ); 575 576 // check if we can complete operation. If so race to establish winner in special OR case 577 if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) { 578 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering 579 unlock( lock ); 580 return false; 581 } 582 } 583 584 if ( owner == active_thread() ) { 585 recursion_count++; 586 if ( node.park_counter ) __make_select_node_available( node ); 587 unlock( lock ); 588 return true; 589 } 590 591 if ( owner != 0p ) { 592 insert_last( blocked_threads, node ); 593 unlock( lock ); 594 return false; 595 } 596 597 owner = active_thread(); 598 recursion_count = 1; 599 600 if ( node.park_counter ) __make_select_node_available( node ); 601 unlock( lock ); 602 return true; 603 } 604 605 static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) { 606 lock( lock __cfaabi_dbg_ctx2 ); 607 if ( node`isListed ) { 608 remove( node ); 609 unlock( lock ); 610 return false; 611 } 612 613 if ( owner == active_thread() ) { 614 recursion_count--; 615 if ( recursion_count == 0 ) { 616 pop_node( this ); 617 } 618 } 619 unlock( lock ); 620 return false; 621 } 622 623 static inline void on_selected( simple_owner_lock & this, select_node & node ) {} 624 509 625 510 626 //----------------------------------------------------------------------------- … … 521 637 // flag showing if lock is held 522 638 volatile bool held; 523 524 #ifdef __CFA_DEBUG__525 // for deadlock detection526 struct thread$ * owner;527 #endif528 639 }; 529 640 … … 536 647 static inline void ?=?( spin_queue_lock & this, spin_queue_lock this2 ) = void; 537 648 538 // if this is called recursively IT WILL DEADLOCK! !!!!539 static inline void lock( spin_queue_lock & this) with(this) {649 // if this is called recursively IT WILL DEADLOCK! 650 static inline void lock( spin_queue_lock & this ) with(this) { 540 651 mcs_spin_node node; 541 652 lock( lock, node ); … … 545 656 } 546 657 547 static inline void unlock( spin_queue_lock & this) with(this) {658 static inline void unlock( spin_queue_lock & this ) with(this) { 548 659 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 549 660 } 550 661 551 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { 552 unpark(t); 553 } 554 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 555 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { 556 #ifdef REACQ 557 lock(this); 558 #endif 559 } 560 662 DEFAULT_ON_NOTIFY( spin_queue_lock ) 663 DEFAULT_ON_WAIT( spin_queue_lock ) 664 DEFAULT_ON_WAKEUP_REACQ( spin_queue_lock ) 561 665 562 666 //----------------------------------------------------------------------------- … … 584 688 585 689 // if this is called recursively IT WILL DEADLOCK!!!!! 586 static inline void lock( mcs_block_spin_lock & this) with(this) {690 static inline void lock( mcs_block_spin_lock & this ) with(this) { 587 691 mcs_node node; 588 692 lock( lock, node ); … … 596 700 } 597 701 598 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 599 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 600 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { 601 #ifdef REACQ 602 lock(this); 603 #endif 604 } 702 DEFAULT_ON_NOTIFY( mcs_block_spin_lock ) 703 DEFAULT_ON_WAIT( mcs_block_spin_lock ) 704 DEFAULT_ON_WAKEUP_REACQ( mcs_block_spin_lock ) 605 705 606 706 //----------------------------------------------------------------------------- … … 628 728 629 729 // if this is called recursively IT WILL DEADLOCK!!!!! 630 static inline void lock( block_spin_lock & this) with(this) {730 static inline void lock( block_spin_lock & this ) with(this) { 631 731 lock( lock ); 632 732 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); … … 635 735 } 636 736 637 static inline void unlock( block_spin_lock & this) with(this) {737 static inline void unlock( block_spin_lock & this ) with(this) { 638 738 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 639 739 } 640 740 641 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) { 642 #ifdef REACQ 741 static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) { 643 742 // first we acquire internal fast_block_lock 644 743 lock( lock __cfaabi_dbg_ctx2 ); … … 652 751 unlock( lock ); 653 752 654 #endif655 753 unpark(t); 656 657 } 658 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 659 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) { 660 #ifdef REACQ 754 } 755 DEFAULT_ON_WAIT( block_spin_lock ) 756 static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) { 661 757 // now we acquire the entire block_spin_lock upon waking up 662 758 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 663 759 __atomic_store_n(&held, true, __ATOMIC_RELEASE); 664 760 unlock( lock ); // Now we release the internal fast_spin_lock 665 #endif 666 } 667 668 //----------------------------------------------------------------------------- 669 // is_blocking_lock 670 trait is_blocking_lock(L & | sized(L)) { 671 // For synchronization locks to use when acquiring 672 void on_notify( L &, struct thread$ * ); 673 674 // For synchronization locks to use when releasing 675 size_t on_wait( L & ); 676 677 // to set recursion count after getting signalled; 678 void on_wakeup( L &, size_t recursion ); 679 }; 761 } 680 762 681 763 //----------------------------------------------------------------------------- … … 685 767 forall(L & | is_blocking_lock(L)) { 686 768 struct info_thread; 687 688 // // for use by sequence689 // info_thread(L) *& Back( info_thread(L) * this );690 // info_thread(L) *& Next( info_thread(L) * this );691 769 } 692 770 -
libcfa/src/concurrency/monitor.cfa
r34b4268 r24d6572 10 10 // Created On : Thd Feb 23 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Wed Dec 4 07:55:14 201913 // Update Count : 1 012 // Last Modified On : Sun Feb 19 17:00:59 2023 13 // Update Count : 12 14 14 // 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #include "monitor.hfa" -
libcfa/src/concurrency/monitor.hfa
r34b4268 r24d6572 10 10 // Created On : Thd Feb 23 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Wed Dec 4 07:55:32 201913 // Update Count : 1 112 // Last Modified On : Thu Feb 2 11:29:21 2023 13 // Update Count : 12 14 14 // 15 15 … … 22 22 #include "stdlib.hfa" 23 23 24 trait is_monitor(T &) { 24 forall( T & ) 25 trait is_monitor { 25 26 monitor$ * get_monitor( T & ); 26 27 void ^?{}( T & mutex ); -
libcfa/src/concurrency/mutex.cfa
r34b4268 r24d6572 12 12 // Created On : Fri May 25 01:37:11 2018 13 13 // Last Modified By : Peter A. Buhr 14 // Last Modified On : Wed Dec 4 09:16:39 201915 // Update Count : 114 // Last Modified On : Sun Feb 19 17:01:36 2023 15 // Update Count : 3 16 16 // 17 17 18 18 #define __cforall_thread__ 19 #define _GNU_SOURCE20 19 21 20 #include "mutex.hfa" -
libcfa/src/concurrency/mutex.hfa
r34b4268 r24d6572 12 12 // Created On : Fri May 25 01:24:09 2018 13 13 // Last Modified By : Peter A. Buhr 14 // Last Modified On : Wed Dec 4 09:16:53 201915 // Update Count : 114 // Last Modified On : Thu Feb 2 11:46:08 2023 15 // Update Count : 2 16 16 // 17 17 … … 70 70 void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 71 71 72 trait is_lock(L & | sized(L)) { 72 forall( L & | sized(L) ) 73 trait is_lock { 73 74 void lock (L &); 74 75 void unlock(L &); -
libcfa/src/concurrency/mutex_stmt.hfa
r34b4268 r24d6572 1 #pragma once 2 1 3 #include "bits/algorithm.hfa" 2 4 #include "bits/defs.hfa" … … 4 6 //----------------------------------------------------------------------------- 5 7 // is_lock 6 trait is_lock(L & | sized(L)) { 8 forall(L & | sized(L)) 9 trait is_lock { 7 10 // For acquiring a lock 8 11 void lock( L & ); … … 11 14 void unlock( L & ); 12 15 }; 13 14 16 15 17 struct __mutex_stmt_lock_guard { … … 24 26 // Sort locks based on address 25 27 __libcfa_small_sort(this.lockarr, count); 26 27 // acquire locks in order28 // for ( size_t i = 0; i < count; i++ ) {29 // lock(*this.lockarr[i]);30 // }31 }32 33 static inline void ^?{}( __mutex_stmt_lock_guard & this ) with(this) {34 // for ( size_t i = count; i > 0; i-- ) {35 // unlock(*lockarr[i - 1]);36 // }37 28 } 38 29 39 30 forall(L & | is_lock(L)) { 40 41 struct scoped_lock { 42 L * internal_lock; 43 }; 44 45 static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) { 46 this.internal_lock = &internal_lock; 47 lock(internal_lock); 48 } 49 50 static inline void ^?{}( scoped_lock(L) & this ) with(this) { 51 unlock(*internal_lock); 52 } 53 54 static inline void * __get_mutexstmt_lock_ptr( L & this ) { 55 return &this; 56 } 57 58 static inline L __get_mutexstmt_lock_type( L & this ); 59 60 static inline L __get_mutexstmt_lock_type( L * this ); 31 static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; } 32 static inline L __get_mutexstmt_lock_type( L & this ) {} 33 static inline L __get_mutexstmt_lock_type( L * this ) {} 61 34 } -
libcfa/src/concurrency/preemption.cfa
r34b4268 r24d6572 10 10 // Created On : Mon Jun 5 14:20:42 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Thu Feb 17 11:18:57 202213 // Update Count : 5912 // Last Modified On : Mon Jan 9 08:42:59 2023 13 // Update Count : 60 14 14 // 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 // #define __CFA_DEBUG_PRINT_PREEMPTION__ … … 118 117 __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn); 119 118 Duration period = node->period; 120 if( period == 0 ) {119 if( period == 0 ) { 121 120 node->set = false; // Node is one-shot, just mark it as not pending 122 121 } -
libcfa/src/concurrency/pthread.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #include <signal.h> … … 35 34 struct pthread_values{ 36 35 inline Seqable; 37 void * value;36 void * value; 38 37 bool in_use; 39 38 }; … … 51 50 struct pthread_keys { 52 51 bool in_use; 53 void (* destructor)( void * );52 void (* destructor)( void * ); 54 53 Sequence(pthread_values) threads; 55 54 }; 56 55 57 static void ?{}(pthread_keys& k) {56 static void ?{}(pthread_keys& k) { 58 57 k.threads{}; 59 58 } … … 62 61 static pthread_keys cfa_pthread_keys_storage[PTHREAD_KEYS_MAX] __attribute__((aligned (16))); 63 62 64 static void init_pthread_storage() {65 for ( int i = 0; i < PTHREAD_KEYS_MAX; i++){63 static void init_pthread_storage() { 64 for ( int i = 0; i < PTHREAD_KEYS_MAX; i++ ) { 66 65 cfa_pthread_keys_storage[i]{}; 67 66 } … … 96 95 97 96 /* condvar helper routines */ 98 static void init(pthread_cond_t * pcond){97 static void init(pthread_cond_t * pcond) { 99 98 static_assert(sizeof(pthread_cond_t) >= sizeof(cfa2pthr_cond_var_t),"sizeof(pthread_t) < sizeof(cfa2pthr_cond_var_t)"); 100 cfa2pthr_cond_var_t * _cond = (cfa2pthr_cond_var_t*)pcond;99 cfa2pthr_cond_var_t * _cond = (cfa2pthr_cond_var_t *)pcond; 101 100 ?{}(*_cond); 102 101 } 103 102 104 static cfa2pthr_cond_var_t * get(pthread_cond_t* pcond){103 static cfa2pthr_cond_var_t * get(pthread_cond_t * pcond) { 105 104 static_assert(sizeof(pthread_cond_t) >= sizeof(cfa2pthr_cond_var_t),"sizeof(pthread_t) < sizeof(cfa2pthr_cond_var_t)"); 106 return (cfa2pthr_cond_var_t *)pcond;107 } 108 109 static void destroy(pthread_cond_t * cond){105 return (cfa2pthr_cond_var_t *)pcond; 106 } 107 108 static void destroy(pthread_cond_t * cond) { 110 109 static_assert(sizeof(pthread_cond_t) >= sizeof(cfa2pthr_cond_var_t),"sizeof(pthread_t) < sizeof(cfa2pthr_cond_var_t)"); 111 110 ^?{}(*get(cond)); … … 116 115 117 116 /* mutex helper routines */ 118 static void mutex_check(pthread_mutex_t * t){117 static void mutex_check(pthread_mutex_t * t) { 119 118 // Use double check to improve performance. 120 119 // Check is safe on x86; volatile prevents compiler reordering 121 volatile pthread_mutex_t * const mutex_ = t;120 volatile pthread_mutex_t * const mutex_ = t; 122 121 123 122 // SKULLDUGGERY: not a portable way to access the kind field, /usr/include/x86_64-linux-gnu/bits/pthreadtypes.h … … 136 135 137 136 138 static void init(pthread_mutex_t * plock){137 static void init(pthread_mutex_t * plock) { 139 138 static_assert(sizeof(pthread_mutex_t) >= sizeof(simple_owner_lock),"sizeof(pthread_mutex_t) < sizeof(simple_owner_lock)"); 140 simple_owner_lock * _lock = (simple_owner_lock*)plock;139 simple_owner_lock * _lock = (simple_owner_lock *)plock; 141 140 ?{}(*_lock); 142 141 } 143 142 144 static simple_owner_lock * get(pthread_mutex_t* plock){143 static simple_owner_lock * get(pthread_mutex_t * plock) { 145 144 static_assert(sizeof(pthread_mutex_t) >= sizeof(simple_owner_lock),"sizeof(pthread_mutex_t) < sizeof(simple_owner_lock)"); 146 return (simple_owner_lock *)plock;147 } 148 149 static void destroy(pthread_mutex_t * plock){145 return (simple_owner_lock *)plock; 146 } 147 148 static void destroy(pthread_mutex_t * plock) { 150 149 static_assert(sizeof(pthread_mutex_t) >= sizeof(simple_owner_lock),"sizeof(pthread_mutex_t) < sizeof(simple_owner_lock)"); 151 150 ^?{}(*get(plock)); … … 153 152 154 153 //######################### Attr helpers ######################### 155 struct cfaPthread_attr_t {// thread attributes154 typedef struct cfaPthread_attr_t { // thread attributes 156 155 int contentionscope; 157 156 int detachstate; 158 157 size_t stacksize; 159 void * stackaddr;158 void * stackaddr; 160 159 int policy; 161 160 int inheritsched; 162 161 struct sched_param param; 163 } typedefcfaPthread_attr_t;164 165 static const cfaPthread_attr_t default_attrs {162 } cfaPthread_attr_t; 163 164 static const cfaPthread_attr_t default_attrs { 166 165 0, 167 166 0, 168 (size_t)65000,169 (void *)NULL,167 65_000, 168 NULL, 170 169 0, 171 170 0, … … 173 172 }; 174 173 175 static cfaPthread_attr_t * get(const pthread_attr_t* attr){176 static_assert(sizeof(pthread_attr_t) >= sizeof(cfaPthread_attr_t), "sizeof(pthread_attr_t) < sizeof(cfaPthread_attr_t)");177 return (cfaPthread_attr_t *)attr;174 static cfaPthread_attr_t * get(const pthread_attr_t * attr) { 175 static_assert(sizeof(pthread_attr_t) >= sizeof(cfaPthread_attr_t), "sizeof(pthread_attr_t) < sizeof(cfaPthread_attr_t)"); 176 return (cfaPthread_attr_t *)attr; 178 177 } 179 178 … … 190 189 191 190 // pthreads return value 192 void * joinval;191 void * joinval; 193 192 194 193 // pthread attributes 195 194 pthread_attr_t pthread_attr; 196 195 197 void *(* start_routine)(void *);198 void * start_arg;196 void *(* start_routine)(void *); 197 void * start_arg; 199 198 200 199 // thread local data 201 pthread_values * pthreadData;200 pthread_values * pthreadData; 202 201 203 202 // flag used for tryjoin … … 207 206 /* thread part routines */ 208 207 // cfaPthread entry point 209 void main(cfaPthread & _thread) with(_thread){210 joinval = 208 void main(cfaPthread & _thread) with(_thread) { 209 joinval = start_routine(start_arg); 211 210 isTerminated = true; 212 211 } 213 212 214 static cfaPthread * lookup( pthread_t p ){215 static_assert(sizeof(pthread_t) >= sizeof(cfaPthread *),"sizeof(pthread_t) < sizeof(cfaPthread*)");216 return (cfaPthread *)p;217 } 218 219 static void pthread_deletespecific_( pthread_values * values ) { // see uMachContext::invokeTask220 pthread_values * value;221 pthread_keys * key;213 static cfaPthread * lookup( pthread_t p ) { 214 static_assert(sizeof(pthread_t) >= sizeof(cfaPthread *),"sizeof(pthread_t) < sizeof(cfaPthread *)"); 215 return (cfaPthread *)p; 216 } 217 218 static void pthread_deletespecific_( pthread_values * values ) { // see uMachContext::invokeTask 219 pthread_values * value; 220 pthread_keys * key; 222 221 bool destcalled = true; 223 if (values != NULL) {222 if (values != NULL) { 224 223 for ( int attempts = 0; attempts < PTHREAD_DESTRUCTOR_ITERATIONS && destcalled ; attempts += 1 ) { 225 224 destcalled = false; 226 225 lock(key_lock); 227 for ( int i = 0; i < PTHREAD_KEYS_MAX; i++){226 for ( int i = 0; i < PTHREAD_KEYS_MAX; i++ ) { 228 227 // for each valid key 229 if ( values[i].in_use) {228 if ( values[i].in_use) { 230 229 value = &values[i]; 231 230 key = &cfa_pthread_keys[i]; … … 234 233 // if a key value has a non-NULL destructor pointer, and the thread has a non-NULL value associated with that key, 235 234 // the value of the key is set to NULL, and then the function pointed to is called with the previously associated value as its sole argument. 236 if (value->value != NULL && key->destructor != NULL) {235 if (value->value != NULL && key->destructor != NULL) { 237 236 unlock(key_lock); 238 237 key->destructor(value->value); // run destructor … … 249 248 } 250 249 251 static void ^?{}(cfaPthread & mutex t) {250 static void ^?{}(cfaPthread & mutex t) { 252 251 // delete pthread local storage 253 252 pthread_values * values = t.pthreadData; … … 255 254 } 256 255 257 static void ?{}(cfaPthread & t, pthread_t* _thread, const pthread_attr_t * _attr,void *(*start_routine)(void *), void * arg) {258 static_assert(sizeof(pthread_t) >= sizeof(cfaPthread *), "pthread_t too small to hold a pointer: sizeof(pthread_t) < sizeof(cfaPthread*)");256 static void ?{}(cfaPthread & t, pthread_t * _thread, const pthread_attr_t * _attr,void *(* start_routine)(void *), void * arg) { 257 static_assert(sizeof(pthread_t) >= sizeof(cfaPthread *), "pthread_t too small to hold a pointer: sizeof(pthread_t) < sizeof(cfaPthread *)"); 259 258 260 259 // set up user thread stackSize … … 278 277 //######################### Pthread Attrs ######################### 279 278 280 int pthread_attr_init(pthread_attr_t * attr) libcfa_public __THROW {281 cfaPthread_attr_t * _attr = get(attr);279 int pthread_attr_init(pthread_attr_t * attr) libcfa_public __THROW { 280 cfaPthread_attr_t * _attr = get(attr); 282 281 ?{}(*_attr, default_attrs); 283 282 return 0; 284 283 } 285 int pthread_attr_destroy(pthread_attr_t * attr) libcfa_public __THROW {284 int pthread_attr_destroy(pthread_attr_t * attr) libcfa_public __THROW { 286 285 ^?{}(*get(attr)); 287 286 return 0; 288 287 } 289 288 290 int pthread_attr_setscope( pthread_attr_t * attr, int contentionscope ) libcfa_public __THROW {289 int pthread_attr_setscope( pthread_attr_t * attr, int contentionscope ) libcfa_public __THROW { 291 290 get( attr )->contentionscope = contentionscope; 292 291 return 0; 293 292 } // pthread_attr_setscope 294 293 295 int pthread_attr_getscope( const pthread_attr_t * attr, int *contentionscope ) libcfa_public __THROW {294 int pthread_attr_getscope( const pthread_attr_t * attr, int * contentionscope ) libcfa_public __THROW { 296 295 *contentionscope = get( attr )->contentionscope; 297 296 return 0; 298 297 } // pthread_attr_getscope 299 298 300 int pthread_attr_setdetachstate( pthread_attr_t * attr, int detachstate ) libcfa_public __THROW {299 int pthread_attr_setdetachstate( pthread_attr_t * attr, int detachstate ) libcfa_public __THROW { 301 300 get( attr )->detachstate = detachstate; 302 301 return 0; 303 302 } // pthread_attr_setdetachstate 304 303 305 int pthread_attr_getdetachstate( const pthread_attr_t * attr, int *detachstate ) libcfa_public __THROW {304 int pthread_attr_getdetachstate( const pthread_attr_t * attr, int * detachstate ) libcfa_public __THROW { 306 305 *detachstate = get( attr )->detachstate; 307 306 return 0; 308 307 } // pthread_attr_getdetachstate 309 308 310 int pthread_attr_setstacksize( pthread_attr_t * attr, size_t stacksize ) libcfa_public __THROW {309 int pthread_attr_setstacksize( pthread_attr_t * attr, size_t stacksize ) libcfa_public __THROW { 311 310 get( attr )->stacksize = stacksize; 312 311 return 0; 313 312 } // pthread_attr_setstacksize 314 313 315 int pthread_attr_getstacksize( const pthread_attr_t * attr, size_t *stacksize ) libcfa_public __THROW {314 int pthread_attr_getstacksize( const pthread_attr_t * attr, size_t * stacksize ) libcfa_public __THROW { 316 315 *stacksize = get( attr )->stacksize; 317 316 return 0; … … 326 325 } // pthread_attr_setguardsize 327 326 328 int pthread_attr_setstackaddr( pthread_attr_t * attr, void *stackaddr ) libcfa_public __THROW {327 int pthread_attr_setstackaddr( pthread_attr_t * attr, void * stackaddr ) libcfa_public __THROW { 329 328 get( attr )->stackaddr = stackaddr; 330 329 return 0; 331 330 } // pthread_attr_setstackaddr 332 331 333 int pthread_attr_getstackaddr( const pthread_attr_t * attr, void **stackaddr ) libcfa_public __THROW {332 int pthread_attr_getstackaddr( const pthread_attr_t * attr, void ** stackaddr ) libcfa_public __THROW { 334 333 *stackaddr = get( attr )->stackaddr; 335 334 return 0; 336 335 } // pthread_attr_getstackaddr 337 336 338 int pthread_attr_setstack( pthread_attr_t * attr, void *stackaddr, size_t stacksize ) libcfa_public __THROW {337 int pthread_attr_setstack( pthread_attr_t * attr, void * stackaddr, size_t stacksize ) libcfa_public __THROW { 339 338 get( attr )->stackaddr = stackaddr; 340 339 get( attr )->stacksize = stacksize; … … 342 341 } // pthread_attr_setstack 343 342 344 int pthread_attr_getstack( const pthread_attr_t * attr, void **stackaddr, size_t *stacksize ) libcfa_public __THROW {343 int pthread_attr_getstack( const pthread_attr_t * attr, void ** stackaddr, size_t * stacksize ) libcfa_public __THROW { 345 344 *stackaddr = get( attr )->stackaddr; 346 345 *stacksize = get( attr )->stacksize; … … 351 350 // already running thread threadID. It shall be called on unitialized attr 352 351 // and destroyed with pthread_attr_destroy when no longer needed. 353 int pthread_getattr_np( pthread_t threadID, pthread_attr_t * attr ) libcfa_public __THROW { // GNU extension352 int pthread_getattr_np( pthread_t threadID, pthread_attr_t * attr ) libcfa_public __THROW { // GNU extension 354 353 check_nonnull(attr); 355 354 … … 363 362 //######################### Threads ######################### 364 363 365 int pthread_create(pthread_t * _thread, const pthread_attr_t * attr, void *(* start_routine)(void *), void * arg) libcfa_public __THROW {366 cfaPthread * t = alloc();364 int pthread_create(pthread_t * _thread, const pthread_attr_t * attr, void *(* start_routine)(void *), void * arg) libcfa_public __THROW { 365 cfaPthread * t = alloc(); 367 366 (*t){_thread, attr, start_routine, arg}; 368 367 return 0; 369 368 } 370 369 371 372 int pthread_join(pthread_t _thread, void **value_ptr) libcfa_public __THROW { 370 int pthread_join(pthread_t _thread, void ** value_ptr) libcfa_public __THROW { 373 371 // if thread is invalid 374 372 if (_thread == NULL) return EINVAL; … … 376 374 377 375 // get user thr pointer 378 cfaPthread * p = lookup(_thread);376 cfaPthread * p = lookup(_thread); 379 377 try { 380 378 join(*p); … … 389 387 } 390 388 391 int pthread_tryjoin_np(pthread_t _thread, void ** value_ptr) libcfa_public __THROW {389 int pthread_tryjoin_np(pthread_t _thread, void ** value_ptr) libcfa_public __THROW { 392 390 // if thread is invalid 393 391 if (_thread == NULL) return EINVAL; 394 392 if (_thread == pthread_self()) return EDEADLK; 395 393 396 cfaPthread * p = lookup(_thread);394 cfaPthread * p = lookup(_thread); 397 395 398 396 // thread not finished ? … … 412 410 void pthread_exit(void * status) libcfa_public __THROW { 413 411 pthread_t pid = pthread_self(); 414 cfaPthread * _thread = (cfaPthread*)pid;412 cfaPthread * _thread = (cfaPthread *)pid; 415 413 _thread->joinval = status; // set return value 416 414 _thread->isTerminated = 1; // set terminated flag … … 426 424 //######################### Mutex ######################### 427 425 428 int pthread_mutex_init(pthread_mutex_t *_mutex, const pthread_mutexattr_t * attr) libcfa_public __THROW {426 int pthread_mutex_init(pthread_mutex_t *_mutex, const pthread_mutexattr_t * attr) libcfa_public __THROW { 429 427 check_nonnull(_mutex); 430 428 init(_mutex); … … 435 433 int pthread_mutex_destroy(pthread_mutex_t *_mutex) libcfa_public __THROW { 436 434 check_nonnull(_mutex); 437 simple_owner_lock * _lock = get(_mutex);438 if (_lock->owner != NULL) {435 simple_owner_lock * _lock = get(_mutex); 436 if (_lock->owner != NULL) { 439 437 return EBUSY; 440 438 } … … 446 444 check_nonnull(_mutex); 447 445 mutex_check(_mutex); 448 simple_owner_lock * _lock = get(_mutex);446 simple_owner_lock * _lock = get(_mutex); 449 447 lock(*_lock); 450 448 return 0; … … 453 451 int pthread_mutex_unlock(pthread_mutex_t *_mutex) libcfa_public __THROW { 454 452 check_nonnull(_mutex); 455 simple_owner_lock * _lock = get(_mutex);456 if (_lock->owner != active_thread()) {453 simple_owner_lock * _lock = get(_mutex); 454 if (_lock->owner != active_thread()) { 457 455 return EPERM; 458 456 } // current thread does not hold the mutex … … 463 461 int pthread_mutex_trylock(pthread_mutex_t *_mutex) libcfa_public __THROW { 464 462 check_nonnull(_mutex); 465 simple_owner_lock * _lock = get(_mutex);466 if (_lock->owner != active_thread() && _lock->owner != NULL) {463 simple_owner_lock * _lock = get(_mutex); 464 if (_lock->owner != active_thread() && _lock->owner != NULL) { 467 465 return EBUSY; 468 466 } // if mutex is owned … … 474 472 475 473 /* conditional variable routines */ 476 int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t *attr) libcfa_public __THROW {474 int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr) libcfa_public __THROW { 477 475 check_nonnull(cond); 478 476 init(cond); … … 480 478 } //pthread_cond_init 481 479 482 int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t *_mutex) libcfa_public __THROW {480 int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t *_mutex) libcfa_public __THROW { 483 481 check_nonnull(_mutex); 484 482 check_nonnull(cond); … … 494 492 } // pthread_cond_timedwait 495 493 496 int pthread_cond_signal(pthread_cond_t * cond) libcfa_public __THROW {494 int pthread_cond_signal(pthread_cond_t * cond) libcfa_public __THROW { 497 495 check_nonnull(cond); 498 496 return notify_one(*get(cond)); 499 497 } // pthread_cond_signal 500 498 501 int pthread_cond_broadcast(pthread_cond_t * cond) libcfa_public __THROW {499 int pthread_cond_broadcast(pthread_cond_t * cond) libcfa_public __THROW { 502 500 check_nonnull(cond); 503 501 return notify_all(*get(cond)); 504 502 } // pthread_cond_broadcast 505 503 506 int pthread_cond_destroy(pthread_cond_t * cond) libcfa_public __THROW {504 int pthread_cond_destroy(pthread_cond_t * cond) libcfa_public __THROW { 507 505 check_nonnull(cond); 508 506 destroy(cond); … … 514 512 //######################### Local storage ######################### 515 513 516 int pthread_once(pthread_once_t * once_control, void (*init_routine)(void)) libcfa_public __THROW {514 int pthread_once(pthread_once_t * once_control, void (* init_routine)(void)) libcfa_public __THROW { 517 515 static_assert(sizeof(pthread_once_t) >= sizeof(int),"sizeof(pthread_once_t) < sizeof(int)"); 518 516 check_nonnull(once_control); … … 527 525 } // pthread_once 528 526 529 int pthread_key_create( pthread_key_t * key, void (*destructor)( void * ) ) libcfa_public __THROW {527 int pthread_key_create( pthread_key_t * key, void (* destructor)( void * ) ) libcfa_public __THROW { 530 528 lock(key_lock); 531 529 for ( int i = 0; i < PTHREAD_KEYS_MAX; i += 1 ) { … … 562 560 } // pthread_key_delete 563 561 564 int pthread_setspecific( pthread_key_t key, const void * value ) libcfa_public __THROW {562 int pthread_setspecific( pthread_key_t key, const void * value ) libcfa_public __THROW { 565 563 // get current thread 566 cfaPthread * t = lookup(pthread_self());564 cfaPthread * t = lookup(pthread_self()); 567 565 // if current thread's pthreadData is NULL; initialize it 568 pthread_values * values;569 if (t->pthreadData == NULL) {566 pthread_values * values; 567 if (t->pthreadData == NULL) { 570 568 values = anew( PTHREAD_KEYS_MAX); 571 569 t->pthreadData = values; 572 for ( int i = 0;i < PTHREAD_KEYS_MAX; i++){570 for ( int i = 0;i < PTHREAD_KEYS_MAX; i++ ) { 573 571 t->pthreadData[i].in_use = false; 574 572 } // for … … 593 591 } //pthread_setspecific 594 592 595 void * pthread_getspecific(pthread_key_t key) libcfa_public __THROW {593 void * pthread_getspecific(pthread_key_t key) libcfa_public __THROW { 596 594 if (key >= PTHREAD_KEYS_MAX || ! cfa_pthread_keys[key].in_use) return NULL; 597 595 598 596 // get current thread 599 cfaPthread * t = lookup(pthread_self());597 cfaPthread * t = lookup(pthread_self()); 600 598 if (t->pthreadData == NULL) return NULL; 601 599 lock(key_lock); 602 pthread_values & entry = ((pthread_values *)t->pthreadData)[key];600 pthread_values & entry = ((pthread_values *)t->pthreadData)[key]; 603 601 if ( ! entry.in_use ) { 604 602 unlock( key_lock ); 605 603 return NULL; 606 604 } // if 607 void * value = entry.value;605 void * value = entry.value; 608 606 unlock(key_lock); 609 607 … … 875 873 //######################### Parallelism ######################### 876 874 877 int pthread_setaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {878 abort( "pthread_setaffinity_np" );879 } // pthread_setaffinity_np880 881 int pthread_getaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {882 abort( "pthread_getaffinity_np" );883 } // pthread_getaffinity_np884 885 int pthread_attr_setaffinity_np( pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {886 abort( "pthread_attr_setaffinity_np" );887 } // pthread_attr_setaffinity_np888 889 int pthread_attr_getaffinity_np( __const pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {890 abort( "pthread_attr_getaffinity_np" );891 } // pthread_attr_getaffinity_np875 // int pthread_setaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW { 876 // abort( "pthread_setaffinity_np" ); 877 // } // pthread_setaffinity_np 878 879 // int pthread_getaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW { 880 // abort( "pthread_getaffinity_np" ); 881 // } // pthread_getaffinity_np 882 883 // int pthread_attr_setaffinity_np( pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW { 884 // abort( "pthread_attr_setaffinity_np" ); 885 // } // pthread_attr_setaffinity_np 886 887 // int pthread_attr_getaffinity_np( __const pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW { 888 // abort( "pthread_attr_getaffinity_np" ); 889 // } // pthread_attr_getaffinity_np 892 890 893 891 //######################### Cancellation ######################### … … 906 904 } // pthread_cancel 907 905 908 int pthread_setcancelstate( int state, int * oldstate ) libcfa_public __THROW {906 int pthread_setcancelstate( int state, int * oldstate ) libcfa_public __THROW { 909 907 abort("pthread_setcancelstate not implemented"); 910 908 return 0; 911 909 } // pthread_setcancelstate 912 910 913 int pthread_setcanceltype( int type, int * oldtype ) libcfa_public __THROW {911 int pthread_setcanceltype( int type, int * oldtype ) libcfa_public __THROW { 914 912 abort("pthread_setcanceltype not implemented"); 915 913 return 0; … … 918 916 919 917 #pragma GCC diagnostic pop 920 -
libcfa/src/concurrency/ready_queue.cfa
r34b4268 r24d6572 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 // #define __CFA_DEBUG_PRINT_READY_QUEUE__ -
libcfa/src/concurrency/thread.cfa
r34b4268 r24d6572 10 10 // Created On : Tue Jan 17 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Sun Dec 11 20:56:54 202213 // Update Count : 10 212 // Last Modified On : Mon Jan 9 08:42:33 2023 13 // Update Count : 103 14 14 // 15 15 16 16 #define __cforall_thread__ 17 #define _GNU_SOURCE18 17 19 18 #include "thread.hfa" … … 54 53 preferred = ready_queue_new_preferred(); 55 54 last_proc = 0p; 55 link_node = 0p; 56 56 PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() ); 57 57 #if defined( __CFA_WITH_VERIFY__ ) … … 60 60 #endif 61 61 62 clh_node = malloc( );63 *clh_node = false;64 65 62 doregister(curr_cluster, this); 66 63 monitors{ &self_mon_p, 1, (fptr_t)0 }; … … 71 68 canary = 0xDEADDEADDEADDEADp; 72 69 #endif 73 free(clh_node);74 70 unregister(curr_cluster, this); 75 71 ^self_cor{}; -
libcfa/src/concurrency/thread.hfa
r34b4268 r24d6572 10 10 // Created On : Tue Jan 17 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : T ue Nov 22 22:18:34 202213 // Update Count : 3 512 // Last Modified On : Thu Feb 2 11:27:59 2023 13 // Update Count : 37 14 14 // 15 15 … … 27 27 //----------------------------------------------------------------------------- 28 28 // thread trait 29 trait is_thread(T &) { 29 forall( T & ) 30 trait is_thread { 30 31 void ^?{}(T& mutex this); 31 32 void main(T& this);
Note:
See TracChangeset
for help on using the changeset viewer.