Changeset 8e4aa05 for libcfa/src/concurrency
- Timestamp:
- Mar 4, 2021, 7:40:25 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 77d601f
- Parents:
- 342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 25 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/coroutine.cfa
r342af53 r8e4aa05 46 46 47 47 //----------------------------------------------------------------------------- 48 FORALL_DATA_INSTANCE(CoroutineCancelled, ( dtype coroutine_t), (coroutine_t))49 50 forall( dtype T)48 FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t)) 49 50 forall(T &) 51 51 void mark_exception(CoroutineCancelled(T) *) {} 52 52 53 forall( dtype T)53 forall(T &) 54 54 void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src) { 55 55 dst->virtual_table = src->virtual_table; … … 58 58 } 59 59 60 forall( dtype T)60 forall(T &) 61 61 const char * msg(CoroutineCancelled(T) *) { 62 62 return "CoroutineCancelled(...)"; … … 64 64 65 65 // This code should not be inlined. It is the error path on resume. 66 forall( dtype T| is_coroutine(T))66 forall(T & | is_coroutine(T)) 67 67 void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ) { 68 68 verify( desc->cancellation ); … … 148 148 // Part of the Public API 149 149 // Not inline since only ever called once per coroutine 150 forall( dtype T| is_coroutine(T))150 forall(T & | is_coroutine(T)) 151 151 void prime(T& cor) { 152 152 $coroutine* this = get_coroutine(cor); … … 196 196 197 197 void __stack_clean ( __stack_info_t * this ) { 198 size_t size = ((intptr_t)this->storage->base) - ((intptr_t)this->storage->limit) + sizeof(__stack_t);199 198 void * storage = this->storage->limit; 200 199 201 200 #if CFA_COROUTINE_USE_MMAP 201 size_t size = ((intptr_t)this->storage->base) - ((intptr_t)this->storage->limit) + sizeof(__stack_t); 202 202 storage = (void *)(((intptr_t)storage) - __page_size); 203 203 if(munmap(storage, size + __page_size) == -1) { -
libcfa/src/concurrency/coroutine.hfa
r342af53 r8e4aa05 22 22 //----------------------------------------------------------------------------- 23 23 // Exception thrown from resume when a coroutine stack is cancelled. 24 FORALL_DATA_EXCEPTION(CoroutineCancelled, ( dtype coroutine_t), (coroutine_t)) (24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) ( 25 25 coroutine_t * the_coroutine; 26 26 exception_t * the_exception; 27 27 ); 28 28 29 forall( dtype T)29 forall(T &) 30 30 void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src); 31 31 32 forall( dtype T)32 forall(T &) 33 33 const char * msg(CoroutineCancelled(T) *); 34 34 … … 37 37 // Anything that implements this trait can be resumed. 38 38 // Anything that is resumed is a coroutine. 39 trait is_coroutine( dtype T| IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {39 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) { 40 40 void main(T & this); 41 41 $coroutine * get_coroutine(T & this); … … 60 60 //----------------------------------------------------------------------------- 61 61 // Public coroutine API 62 forall( dtype T| is_coroutine(T))62 forall(T & | is_coroutine(T)) 63 63 void prime(T & cor); 64 64 … … 72 72 void __cfactx_invoke_coroutine(void (*main)(void *), void * this); 73 73 74 forall( dtype T)74 forall(T &) 75 75 void __cfactx_start(void (*main)(T &), struct $coroutine * cor, T & this, void (*invoke)(void (*main)(void *), void *)); 76 76 … … 129 129 } 130 130 131 forall( dtype T| is_coroutine(T))131 forall(T & | is_coroutine(T)) 132 132 void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ); 133 133 134 134 // Resume implementation inlined for performance 135 forall( dtype T| is_coroutine(T))135 forall(T & | is_coroutine(T)) 136 136 static inline T & resume(T & cor) { 137 137 // optimization : read TLS once and reuse it -
libcfa/src/concurrency/future.hfa
r342af53 r8e4aa05 19 19 #include "monitor.hfa" 20 20 21 forall( otypeT ) {21 forall( T ) { 22 22 struct future { 23 23 inline future_t; … … 58 58 } 59 59 60 forall( otypeT ) {60 forall( T ) { 61 61 monitor multi_future { 62 62 inline future_t; -
libcfa/src/concurrency/io.cfa
r342af53 r8e4aa05 32 32 extern "C" { 33 33 #include <sys/syscall.h> 34 #include <sys/eventfd.h> 34 35 35 36 #include <linux/io_uring.h> … … 41 42 #include "io/types.hfa" 42 43 43 static const char * opcodes[] = {44 __attribute__((unused)) static const char * opcodes[] = { 44 45 "OP_NOP", 45 46 "OP_READV", … … 79 80 }; 80 81 81 // returns true of acquired as leader or second leader 82 static inline bool try_lock( __leaderlock_t & this ) { 83 const uintptr_t thrd = 1z | (uintptr_t)active_thread(); 84 bool block; 85 disable_interrupts(); 86 for() { 87 struct $thread * expected = this.value; 88 if( 1p != expected && 0p != expected ) { 89 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader 90 enable_interrupts( __cfaabi_dbg_ctx ); 91 return false; 92 } 93 struct $thread * desired; 94 if( 0p == expected ) { 95 // If the lock isn't locked acquire it, no need to block 96 desired = 1p; 97 block = false; 98 } 99 else { 100 // If the lock is already locked try becomming the next leader 101 desired = (struct $thread *)thrd; 102 block = true; 103 } 104 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 105 } 106 if( block ) { 107 enable_interrupts( __cfaabi_dbg_ctx ); 108 park(); 109 disable_interrupts(); 110 } 111 return true; 112 } 113 114 static inline bool next( __leaderlock_t & this ) { 82 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want ); 83 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy ); 84 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * ); 85 static inline void __ioarbiter_notify( $io_context & ctx ); 86 //============================================================================================= 87 // I/O Polling 88 //============================================================================================= 89 static inline unsigned __flush( struct $io_context & ); 90 static inline __u32 __release_sqes( struct $io_context & ); 91 92 void __cfa_io_drain( processor * proc ) { 115 93 /* paranoid */ verify( ! __preemption_enabled() ); 116 struct $thread * nextt; 117 for() { 118 struct $thread * expected = this.value; 119 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked 120 121 struct $thread * desired; 122 if( 1p == expected ) { 123 // No next leader, just unlock 124 desired = 0p; 125 nextt = 0p; 126 } 127 else { 128 // There is a next leader, remove but keep locked 129 desired = 1p; 130 nextt = (struct $thread *)(~1z & (uintptr_t)expected); 131 } 132 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 133 } 134 135 if(nextt) { 136 unpark( nextt ); 137 enable_interrupts( __cfaabi_dbg_ctx ); 138 return true; 139 } 140 enable_interrupts( __cfaabi_dbg_ctx ); 141 return false; 142 } 143 144 //============================================================================================= 145 // I/O Syscall 146 //============================================================================================= 147 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 148 bool need_sys_to_submit = false; 149 bool need_sys_to_complete = false; 150 unsigned flags = 0; 151 152 TO_SUBMIT: 153 if( to_submit > 0 ) { 154 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 155 need_sys_to_submit = true; 156 break TO_SUBMIT; 157 } 158 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 159 need_sys_to_submit = true; 160 flags |= IORING_ENTER_SQ_WAKEUP; 161 } 162 } 163 164 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 165 flags |= IORING_ENTER_GETEVENTS; 166 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 167 need_sys_to_complete = true; 168 } 169 } 170 171 int ret = 0; 172 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 if( ret < 0 ) { 176 switch((int)errno) { 177 case EAGAIN: 178 case EINTR: 179 ret = -1; 180 break; 181 default: 182 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 183 } 184 } 185 } 186 187 // Memory barrier 188 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 189 return ret; 190 } 191 192 //============================================================================================= 193 // I/O Polling 194 //============================================================================================= 195 static unsigned __collect_submitions( struct __io_data & ring ); 196 static __u32 __release_consumed_submission( struct __io_data & ring ); 197 static inline void __clean( volatile struct io_uring_sqe * sqe ); 198 199 // Process a single completion message from the io_uring 200 // This is NOT thread-safe 201 static inline void process( volatile struct io_uring_cqe & cqe ) { 202 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 203 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 204 205 fulfil( *future, cqe.res ); 206 } 207 208 static [int, bool] __drain_io( & struct __io_data ring ) { 209 /* paranoid */ verify( ! __preemption_enabled() ); 210 211 unsigned to_submit = 0; 212 if( ring.poller_submits ) { 213 // If the poller thread also submits, then we need to aggregate the submissions which are ready 214 to_submit = __collect_submitions( ring ); 215 } 216 217 int ret = __io_uring_enter(ring, to_submit, true); 218 if( ret < 0 ) { 219 return [0, true]; 220 } 221 222 // update statistics 223 if (to_submit > 0) { 224 __STATS__( true, 225 if( to_submit > 0 ) { 226 io.submit_q.submit_avg.rdy += to_submit; 227 io.submit_q.submit_avg.csm += ret; 228 io.submit_q.submit_avg.cnt += 1; 229 } 230 ) 231 } 232 233 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 234 235 // Release the consumed SQEs 236 __release_consumed_submission( ring ); 94 /* paranoid */ verify( proc ); 95 /* paranoid */ verify( proc->io.ctx ); 237 96 238 97 // Drain the queue 239 unsigned head = *ring.completion_q.head; 240 unsigned tail = *ring.completion_q.tail; 241 const __u32 mask = *ring.completion_q.mask; 242 243 // Nothing was new return 0 244 if (head == tail) { 245 return [0, to_submit > 0]; 246 } 98 $io_context * ctx = proc->io.ctx; 99 unsigned head = *ctx->cq.head; 100 unsigned tail = *ctx->cq.tail; 101 const __u32 mask = *ctx->cq.mask; 247 102 248 103 __u32 count = tail - head; 249 /* paranoid */ verify( count != 0 ); 104 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 105 250 106 for(i; count) { 251 107 unsigned idx = (head + i) & mask; 252 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];108 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 253 109 254 110 /* paranoid */ verify(&cqe); 255 111 256 process( cqe ); 257 } 112 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 113 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 114 115 fulfil( *future, cqe.res ); 116 } 117 118 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 258 119 259 120 // Mark to the kernel that the cqe has been seen 260 121 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 261 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 262 263 return [count, count > 0 || to_submit > 0]; 264 } 265 266 void main( $io_ctx_thread & this ) { 267 __ioctx_register( this ); 268 269 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 270 271 const int reset_cnt = 5; 272 int reset = reset_cnt; 273 // Then loop until we need to start 274 LOOP: 275 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 276 // Drain the io 277 int count; 278 bool again; 279 disable_interrupts(); 280 [count, again] = __drain_io( *this.ring ); 281 282 if(!again) reset--; 283 122 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 123 124 /* paranoid */ verify( ! __preemption_enabled() ); 125 126 return; 127 } 128 129 void __cfa_io_flush( processor * proc ) { 130 /* paranoid */ verify( ! __preemption_enabled() ); 131 /* paranoid */ verify( proc ); 132 /* paranoid */ verify( proc->io.ctx ); 133 134 $io_context & ctx = *proc->io.ctx; 135 136 if(!ctx.ext_sq.empty) { 137 __ioarbiter_flush( *ctx.arbiter, &ctx ); 138 } 139 140 __STATS__( true, io.calls.flush++; ) 141 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8); 142 if( ret < 0 ) { 143 switch((int)errno) { 144 case EAGAIN: 145 case EINTR: 146 case EBUSY: 284 147 // Update statistics 285 __STATS__( true, 286 io.complete_q.completed_avg.val += count; 287 io.complete_q.completed_avg.cnt += 1; 288 ) 289 enable_interrupts( __cfaabi_dbg_ctx ); 290 291 // If we got something, just yield and check again 292 if(reset > 1) { 293 yield(); 294 continue LOOP; 148 __STATS__( false, io.calls.errors.busy ++; ) 149 return; 150 default: 151 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 295 152 } 296 297 // We alread failed to find completed entries a few time. 298 if(reset == 1) { 299 // Rearm the context so it can block 300 // but don't block right away 301 // we need to retry one last time in case 302 // something completed *just now* 303 __ioctx_prepare_block( this ); 304 continue LOOP; 305 } 306 307 __STATS__( false, 308 io.complete_q.blocks += 1; 309 ) 310 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 311 312 // block this thread 313 wait( this.sem ); 314 315 // restore counter 316 reset = reset_cnt; 317 } 318 319 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 153 } 154 155 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 156 __STATS__( true, io.calls.submitted += ret; ) 157 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 158 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 159 160 ctx.sq.to_submit -= ret; 161 162 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 163 164 // Release the consumed SQEs 165 __release_sqes( ctx ); 166 167 /* paranoid */ verify( ! __preemption_enabled() ); 168 169 ctx.proc->io.pending = false; 320 170 } 321 171 … … 339 189 // head and tail must be fully filled and shouldn't ever be touched again. 340 190 // 191 //============================================================================================= 192 // Allocation 193 // for user's convenience fill the sqes from the indexes 194 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx) { 195 struct io_uring_sqe * sqes = ctx->sq.sqes; 196 for(i; want) { 197 __cfadbg_print_safe(io, "Kernel I/O : filling loop\n"); 198 out_sqes[i] = &sqes[idxs[i]]; 199 } 200 } 201 202 // Try to directly allocate from the a given context 203 // Not thread-safe 204 static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) { 205 __sub_ring_t & sq = ctx->sq; 206 const __u32 mask = *sq.mask; 207 __u32 fhead = sq.free_ring.head; // get the current head of the queue 208 __u32 ftail = sq.free_ring.tail; // get the current tail of the queue 209 210 // If we don't have enough sqes, fail 211 if((ftail - fhead) < want) { return false; } 212 213 // copy all the indexes we want from the available list 214 for(i; want) { 215 __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n"); 216 idxs[i] = sq.free_ring.array[(fhead + i) & mask]; 217 } 218 219 // Advance the head to mark the indexes as consumed 220 __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE); 221 222 // return success 223 return true; 224 } 341 225 342 226 // Allocate an submit queue entry. … … 345 229 // for convenience, return both the index and the pointer to the sqe 346 230 // sqe == &sqes[idx] 347 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 348 /* paranoid */ verify( data != 0 ); 349 350 // Prepare the data we need 351 __attribute((unused)) int len = 0; 352 __attribute((unused)) int block = 0; 353 __u32 cnt = *ring.submit_q.num; 354 __u32 mask = *ring.submit_q.mask; 355 356 __u32 off = thread_rand(); 357 358 // Loop around looking for an available spot 359 for() { 360 // Look through the list starting at some offset 361 for(i; cnt) { 362 __u64 expected = 3; 363 __u32 idx = (i + off) & mask; // Get an index from a random 364 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 365 volatile __u64 * udata = &sqe->user_data; 366 367 // Allocate the entry by CASing the user_data field from 0 to the future address 368 if( *udata == expected && 369 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 370 { 371 // update statistics 372 __STATS__( false, 373 io.submit_q.alloc_avg.val += len; 374 io.submit_q.alloc_avg.block += block; 375 io.submit_q.alloc_avg.cnt += 1; 376 ) 377 378 // debug log 379 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 380 381 // Success return the data 382 return [sqe, idx]; 383 } 384 verify(expected != data); 385 386 // This one was used 387 len ++; 388 } 389 390 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" ); 393 394 yield(); 395 } 396 } 397 398 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 399 /* paranoid */ verify( idx <= mask ); 400 /* paranoid */ verify( idx != -1ul32 ); 401 402 // We need to find a spot in the ready array 403 __attribute((unused)) int len = 0; 404 __attribute((unused)) int block = 0; 405 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 406 407 __u32 off = thread_rand(); 408 409 __u32 picked; 410 LOOKING: for() { 411 for(i; ring.submit_q.ready_cnt) { 412 picked = (i + off) & ready_mask; 413 __u32 expected = -1ul32; 414 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 415 break LOOKING; 416 } 417 verify(expected != idx); 418 419 len ++; 420 } 421 422 block++; 423 424 __u32 released = __release_consumed_submission( ring ); 425 if( released == 0 ) { 426 yield(); 427 } 428 } 429 430 // update statistics 431 __STATS__( false, 432 io.submit_q.look_avg.val += len; 433 io.submit_q.look_avg.block += block; 434 io.submit_q.look_avg.cnt += 1; 435 ) 436 437 return picked; 438 } 439 440 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 441 __io_data & ring = *ctx->thrd.ring; 442 231 struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) { 232 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 233 234 disable_interrupts(); 235 processor * proc = __cfaabi_tls.this_processor; 236 $io_context * ctx = proc->io.ctx; 237 /* paranoid */ verify( __cfaabi_tls.this_processor ); 238 /* paranoid */ verify( ctx ); 239 240 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 241 242 // We can proceed to the fast path 243 if( __alloc(ctx, idxs, want) ) { 244 // Allocation was successful 245 __STATS__( true, io.alloc.fast += 1; ) 246 enable_interrupts( __cfaabi_dbg_ctx ); 247 248 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd); 249 250 __fill( sqes, want, idxs, ctx ); 251 return ctx; 252 } 253 // The fast path failed, fallback 254 __STATS__( true, io.alloc.fail += 1; ) 255 256 // Fast path failed, fallback on arbitration 257 __STATS__( true, io.alloc.slow += 1; ) 258 enable_interrupts( __cfaabi_dbg_ctx ); 259 260 $io_arbiter * ioarb = proc->cltr->io.arbiter; 261 /* paranoid */ verify( ioarb ); 262 263 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 264 265 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want); 266 267 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); 268 269 __fill( sqes, want, idxs,ret ); 270 return ret; 271 } 272 273 274 //============================================================================================= 275 // submission 276 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) { 277 // We can proceed to the fast path 278 // Get the right objects 279 __sub_ring_t & sq = ctx->sq; 280 const __u32 mask = *sq.mask; 281 __u32 tail = *sq.kring.tail; 282 283 // Add the sqes to the array 284 for( i; have ) { 285 __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n"); 286 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 287 } 288 289 // Make the sqes visible to the submitter 290 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 291 sq.to_submit++; 292 293 ctx->proc->io.pending = true; 294 ctx->proc->io.dirty = true; 295 if(sq.to_submit > 30 || !lazy) { 296 __cfa_io_flush( ctx->proc ); 297 } 298 } 299 300 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) { 301 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 302 303 disable_interrupts(); 304 processor * proc = __cfaabi_tls.this_processor; 305 $io_context * ctx = proc->io.ctx; 306 /* paranoid */ verify( __cfaabi_tls.this_processor ); 307 /* paranoid */ verify( ctx ); 308 309 // Can we proceed to the fast path 310 if( ctx == inctx ) // We have the right instance? 443 311 { 444 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 445 __cfadbg_print_safe( io, 446 "Kernel I/O : submitting %u (%p) for %p\n" 447 " data: %p\n" 448 " opcode: %s\n" 449 " fd: %d\n" 450 " flags: %d\n" 451 " prio: %d\n" 452 " off: %p\n" 453 " addr: %p\n" 454 " len: %d\n" 455 " other flags: %d\n" 456 " splice fd: %d\n" 457 " pad[0]: %llu\n" 458 " pad[1]: %llu\n" 459 " pad[2]: %llu\n", 460 idx, sqe, 461 active_thread(), 462 (void*)sqe->user_data, 463 opcodes[sqe->opcode], 464 sqe->fd, 465 sqe->flags, 466 sqe->ioprio, 467 sqe->off, 468 sqe->addr, 469 sqe->len, 470 sqe->accept_flags, 471 sqe->splice_fd_in, 472 sqe->__pad2[0], 473 sqe->__pad2[1], 474 sqe->__pad2[2] 475 ); 476 } 477 478 479 // Get now the data we definetely need 480 volatile __u32 * const tail = ring.submit_q.tail; 481 const __u32 mask = *ring.submit_q.mask; 482 483 // There are 2 submission schemes, check which one we are using 484 if( ring.poller_submits ) { 485 // If the poller thread submits, then we just need to add this to the ready array 486 __submit_to_ready_array( ring, idx, mask ); 487 488 post( ctx->thrd.sem ); 489 490 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 491 } 492 else if( ring.eager_submits ) { 493 __u32 picked = __submit_to_ready_array( ring, idx, mask ); 494 495 #if defined(LEADER_LOCK) 496 if( !try_lock(ring.submit_q.submit_lock) ) { 497 __STATS__( false, 498 io.submit_q.helped += 1; 499 ) 500 return; 501 } 502 /* paranoid */ verify( ! __preemption_enabled() ); 503 __STATS__( true, 504 io.submit_q.leader += 1; 505 ) 506 #else 507 for() { 508 yield(); 509 510 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 511 __STATS__( false, 512 io.submit_q.leader += 1; 513 ) 514 break; 515 } 516 517 // If some one else collected our index, we are done 518 #warning ABA problem 519 if( ring.submit_q.ready[picked] != idx ) { 520 __STATS__( false, 521 io.submit_q.helped += 1; 522 ) 523 return; 524 } 525 526 __STATS__( false, 527 io.submit_q.busy += 1; 528 ) 529 } 530 #endif 531 532 // We got the lock 533 // Collect the submissions 534 unsigned to_submit = __collect_submitions( ring ); 535 536 // Actually submit 537 int ret = __io_uring_enter( ring, to_submit, false ); 538 539 #if defined(LEADER_LOCK) 540 /* paranoid */ verify( ! __preemption_enabled() ); 541 next(ring.submit_q.submit_lock); 542 #else 543 unlock(ring.submit_q.submit_lock); 544 #endif 545 if( ret < 0 ) { 546 return; 547 } 548 549 // Release the consumed SQEs 550 __release_consumed_submission( ring ); 551 552 // update statistics 553 __STATS__( false, 554 io.submit_q.submit_avg.rdy += to_submit; 555 io.submit_q.submit_avg.csm += ret; 556 io.submit_q.submit_avg.cnt += 1; 557 ) 558 559 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 560 } 561 else 562 { 563 // get mutual exclusion 564 #if defined(LEADER_LOCK) 565 while(!try_lock(ring.submit_q.submit_lock)); 566 #else 567 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 568 #endif 569 570 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 571 /* paranoid */ "index %u already reclaimed\n" 572 /* paranoid */ "head %u, prev %u, tail %u\n" 573 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 574 /* paranoid */ idx, 575 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 576 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 577 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 578 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 579 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 580 /* paranoid */ ); 581 582 // Append to the list of ready entries 583 584 /* paranoid */ verify( idx <= mask ); 585 ring.submit_q.array[ (*tail) & mask ] = idx; 586 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 587 588 // Submit however, many entries need to be submitted 589 int ret = __io_uring_enter( ring, 1, false ); 590 if( ret < 0 ) { 591 switch((int)errno) { 592 default: 593 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 594 } 595 } 596 597 /* paranoid */ verify(ret == 1); 598 599 // update statistics 600 __STATS__( false, 601 io.submit_q.submit_avg.csm += 1; 602 io.submit_q.submit_avg.cnt += 1; 603 ) 604 605 { 606 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 607 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 608 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 609 610 __cfadbg_print_safe( io, 611 "Kernel I/O : last submitted is %u (%p)\n" 612 " data: %p\n" 613 " opcode: %s\n" 614 " fd: %d\n" 615 " flags: %d\n" 616 " prio: %d\n" 617 " off: %p\n" 618 " addr: %p\n" 619 " len: %d\n" 620 " other flags: %d\n" 621 " splice fd: %d\n" 622 " pad[0]: %llu\n" 623 " pad[1]: %llu\n" 624 " pad[2]: %llu\n", 625 last_idx, sqe, 626 (void*)sqe->user_data, 627 opcodes[sqe->opcode], 628 sqe->fd, 629 sqe->flags, 630 sqe->ioprio, 631 sqe->off, 632 sqe->addr, 633 sqe->len, 634 sqe->accept_flags, 635 sqe->splice_fd_in, 636 sqe->__pad2[0], 637 sqe->__pad2[1], 638 sqe->__pad2[2] 639 ); 640 } 641 642 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 643 // Release the consumed SQEs 644 __release_consumed_submission( ring ); 645 // ring.submit_q.sqes[idx].user_data = 3ul64; 646 647 #if defined(LEADER_LOCK) 648 next(ring.submit_q.submit_lock); 649 #else 650 unlock(ring.submit_q.submit_lock); 651 #endif 652 653 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 654 } 655 } 656 657 // #define PARTIAL_SUBMIT 32 658 659 // go through the list of submissions in the ready array and moved them into 660 // the ring's submit queue 661 static unsigned __collect_submitions( struct __io_data & ring ) { 662 /* paranoid */ verify( ring.submit_q.ready != 0p ); 663 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 664 665 unsigned to_submit = 0; 666 __u32 tail = *ring.submit_q.tail; 667 const __u32 mask = *ring.submit_q.mask; 668 #if defined(PARTIAL_SUBMIT) 669 #if defined(LEADER_LOCK) 670 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 671 #endif 672 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 673 const __u32 offset = ring.submit_q.prev_ready; 674 ring.submit_q.prev_ready += cnt; 675 #else 676 const __u32 cnt = ring.submit_q.ready_cnt; 677 const __u32 offset = 0; 678 #endif 679 680 // Go through the list of ready submissions 681 for( c; cnt ) { 682 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 683 684 // replace any submission with the sentinel, to consume it. 685 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 686 687 // If it was already the sentinel, then we are done 688 if( idx == -1ul32 ) continue; 689 690 // If we got a real submission, append it to the list 691 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 692 to_submit++; 693 } 694 695 // Increment the tail based on how many we are ready to submit 696 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 697 698 return to_submit; 699 } 700 312 __submit(ctx, idxs, have, lazy); 313 314 // Mark the instance as no longer in-use, re-enable interrupts and return 315 __STATS__( true, io.submit.fast += 1; ) 316 enable_interrupts( __cfaabi_dbg_ctx ); 317 318 __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n"); 319 return; 320 } 321 322 // Fast path failed, fallback on arbitration 323 __STATS__( true, io.submit.slow += 1; ) 324 enable_interrupts( __cfaabi_dbg_ctx ); 325 326 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 327 328 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy); 329 } 330 331 //============================================================================================= 332 // Flushing 701 333 // Go through the ring's submit queue and release everything that has already been consumed 702 334 // by io_uring 703 static __u32 __release_consumed_submission( struct __io_data & ring ) { 704 const __u32 smask = *ring.submit_q.mask; 705 706 // We need to get the lock to copy the old head and new head 707 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 335 // This cannot be done by multiple threads 336 static __u32 __release_sqes( struct $io_context & ctx ) { 337 const __u32 mask = *ctx.sq.mask; 338 708 339 __attribute__((unused)) 709 __u32 ctail = * ring.submit_q.tail;// get the current tail of the queue710 __u32 chead = * ring.submit_q.head;// get the current head of the queue711 __u32 phead = ring.submit_q.prev_head;// get the head the last time we were here712 ring.submit_q.prev_head = chead; // note up to were we processed 713 unlock(ring.submit_q.release_lock);340 __u32 ctail = *ctx.sq.kring.tail; // get the current tail of the queue 341 __u32 chead = *ctx.sq.kring.head; // get the current head of the queue 342 __u32 phead = ctx.sq.kring.released; // get the head the last time we were here 343 344 __u32 ftail = ctx.sq.free_ring.tail; // get the current tail of the queue 714 345 715 346 // the 3 fields are organized like this diagram … … 730 361 __u32 count = chead - phead; 731 362 363 if(count == 0) { 364 return 0; 365 } 366 732 367 // We acquired an previous-head/current-head range 733 368 // go through the range and release the sqes 734 369 for( i; count ) { 735 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 736 737 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 738 __clean( &ring.submit_q.sqes[ idx ] ); 739 } 370 __cfadbg_print_safe(io, "Kernel I/O : release loop\n"); 371 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 372 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; 373 } 374 375 ctx.sq.kring.released = chead; // note up to were we processed 376 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 377 378 __ioarbiter_notify(ctx); 379 740 380 return count; 741 381 } 742 382 743 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 744 __clean( sqe ); 745 } 746 747 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 748 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 749 __cfaabi_dbg_debug_do( 750 memset(sqe, 0xde, sizeof(*sqe)); 751 sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1; 752 ); 753 754 // Mark the entry as unused 755 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 383 //============================================================================================= 384 // I/O Arbiter 385 //============================================================================================= 386 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) { 387 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 388 389 __STATS__( false, io.alloc.block += 1; ) 390 391 // No one has any resources left, wait for something to finish 392 // Mark as pending 393 __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST ); 394 395 // Wait for our turn to submit 396 wait( this.pending.blocked, want ); 397 398 __attribute((unused)) bool ret = 399 __alloc( this.pending.ctx, idxs, want); 400 /* paranoid */ verify( ret ); 401 402 return this.pending.ctx; 403 404 } 405 406 static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) { 407 /* paranoid */ verify( !is_empty(this.pending.blocked) ); 408 this.pending.ctx = ctx; 409 410 while( !is_empty(this.pending.blocked) ) { 411 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 412 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 413 __u32 want = front( this.pending.blocked ); 414 415 if( have > want ) return; 416 417 signal_block( this.pending.blocked ); 418 } 419 420 this.pending.flag = false; 421 } 422 423 static void __ioarbiter_notify( $io_context & ctx ) { 424 if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) { 425 __ioarbiter_notify( *ctx.arbiter, &ctx ); 426 } 427 } 428 429 // Simply append to the pending 430 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) { 431 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 432 433 /* paranoid */ verify( &this == ctx->arbiter ); 434 435 // Mark as pending 436 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST ); 437 438 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 439 440 // Wait for our turn to submit 441 wait( ctx->ext_sq.blocked ); 442 443 // Submit our indexes 444 __submit(ctx, idxs, have, lazy); 445 446 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); 447 } 448 449 static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) { 450 /* paranoid */ verify( &this == ctx->arbiter ); 451 452 __STATS__( false, io.flush.external += 1; ) 453 454 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 455 456 condition & blcked = ctx->ext_sq.blocked; 457 /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) ); 458 while(!is_empty( blcked )) { 459 signal_block( blcked ); 460 } 461 462 ctx->ext_sq.empty = true; 756 463 } 757 464 #endif -
libcfa/src/concurrency/io/call.cfa.in
r342af53 r8e4aa05 54 54 | IOSQE_IO_DRAIN 55 55 #endif 56 #if defined(CFA_HAVE_IOSQE_IO_LINK) 57 | IOSQE_IO_LINK 58 #endif 59 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 60 | IOSQE_IO_HARDLINK 61 #endif 56 62 #if defined(CFA_HAVE_IOSQE_ASYNC) 57 63 | IOSQE_ASYNC 58 64 #endif 59 ; 60 61 static const __u32 LINK_FLAGS = 0 62 #if defined(CFA_HAVE_IOSQE_IO_LINK) 63 | IOSQE_IO_LINK 64 #endif 65 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 66 | IOSQE_IO_HARDLINK 65 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED) 66 | IOSQE_BUFFER_SELECTED 67 67 #endif 68 68 ; … … 74 74 ; 75 75 76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 79 static inline io_context * __get_io_context( void ) { 80 cluster * cltr = active_cluster(); 81 82 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n"); 83 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr ); 84 85 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr); 86 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ]; 87 } 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 88 78 #endif 89 79 … … 98 88 99 89 extern "C" { 100 #include < sys/types.h>90 #include <asm/types.h> 101 91 #include <sys/socket.h> 102 92 #include <sys/syscall.h> … … 142 132 extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 143 133 144 extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);134 extern ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags); 145 135 extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags); 146 136 } … … 195 185 return ', '.join(args_a) 196 186 197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{ 198 188 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 199 189 ssize_t res = {name}({args}); … … 205 195 }} 206 196 #else 207 // we don't support LINK yet208 if( 0 != (submit_flags & LINK_FLAGS) ) {{209 errno = ENOTSUP; return -1;210 }}211 212 if( !context ) {{213 context = __get_io_context();214 }}215 if(cancellation) {{216 cancellation->target = (__u64)(uintptr_t)&future;217 }}218 219 197 __u8 sflags = REGULAR_FLAGS & submit_flags; 220 struct __io_data & ring = *context->thrd.ring;221 222 198 __u32 idx; 223 199 struct io_uring_sqe * sqe; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future);200 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 ); 225 201 226 202 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = (__u64)(uintptr_t)&future; 227 204 sqe->flags = sflags; 228 205 sqe->ioprio = 0; … … 239 216 240 217 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 241 __submit( context, idx);218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) ); 242 219 #endif 243 220 }}""" 244 221 245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{ 246 if( timeout >= 0 ) {{ 247 errno = ENOTSUP; 248 return -1; 249 }} 222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{ 250 223 io_future_t future; 251 224 252 async_{name}( future, {args}, submit_flags , cancellation, context);225 async_{name}( future, {args}, submit_flags ); 253 226 254 227 wait( future ); … … 393 366 }), 394 367 # CFA_HAVE_IORING_OP_SPLICE 395 Call('SPLICE', 'ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags)', {368 Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', { 396 369 'splice_fd_in': 'fd_in', 397 370 'splice_off_in': 'off_in ? (__u64)*off_in : (__u64)-1', … … 415 388 if c.define: 416 389 print("""#if defined({define}) 417 {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);390 {ret} cfa_{name}({params}, __u64 submit_flags); 418 391 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 419 392 else: 420 print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);"393 print("{ret} cfa_{name}({params}, __u64 submit_flags);" 421 394 .format(ret=c.ret, name=c.name, params=c.params)) 422 395 … … 426 399 if c.define: 427 400 print("""#if defined({define}) 428 void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);401 void async_{name}(io_future_t & future, {params}, __u64 submit_flags); 429 402 #endif""".format(define=c.define,name=c.name, params=c.params)) 430 403 else: 431 print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);"404 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);" 432 405 .format(name=c.name, params=c.params)) 433 406 print("\n") … … 474 447 475 448 print(""" 476 //-----------------------------------------------------------------------------477 bool cancel(io_cancellation & this) {478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)479 return false;480 #else481 io_future_t future;482 483 io_context * context = __get_io_context();484 485 __u8 sflags = 0;486 struct __io_data & ring = *context->thrd.ring;487 488 __u32 idx;489 volatile struct io_uring_sqe * sqe;490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;493 sqe->opcode = IORING_OP_ASYNC_CANCEL;494 sqe->flags = sflags;495 sqe->addr = this.target;496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future );498 __submit( context, idx );499 500 wait(future);501 502 if( future.result == 0 ) return true; // Entry found503 if( future.result == -EALREADY) return true; // Entry found but in progress504 if( future.result == -ENOENT ) return false; // Entry not found505 return false;506 #endif507 }508 509 449 //----------------------------------------------------------------------------- 510 450 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
r342af53 r8e4aa05 26 26 27 27 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 28 void __kernel_io_startup() {29 // Nothing to do without io_uring30 }31 32 void __kernel_io_shutdown() {33 // Nothing to do without io_uring34 }35 36 28 void ?{}(io_context_params & this) {} 37 29 38 void ?{}(io_context & this, struct cluster & cl) {} 39 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 40 41 void ^?{}(io_context & this) {} 42 void ^?{}(io_context & this, bool cluster_context) {} 30 void ?{}($io_context & this, struct cluster & cl) {} 31 void ^?{}($io_context & this) {} 32 33 void __cfa_io_start( processor * proc ) {} 34 void __cfa_io_flush( processor * proc ) {} 35 void __cfa_io_stop ( processor * proc ) {} 36 37 $io_arbiter * create(void) { return 0p; } 38 void destroy($io_arbiter *) {} 43 39 44 40 #else … … 65 61 void ?{}(io_context_params & this) { 66 62 this.num_entries = 256; 67 this.num_ready = 256;68 this.submit_aff = -1;69 this.eager_submits = false;70 this.poller_submits = false;71 this.poll_submit = false;72 this.poll_complete = false;73 63 } 74 64 … … 103 93 104 94 //============================================================================================= 105 // I/O Startup / Shutdown logic + Master Poller106 //=============================================================================================107 108 // IO Master poller loop forward109 static void * iopoll_loop( __attribute__((unused)) void * args );110 111 static struct {112 pthread_t thrd; // pthread handle to io poller thread113 void * stack; // pthread stack for io poller thread114 int epollfd; // file descriptor to the epoll instance115 volatile bool run; // Whether or not to continue116 } iopoll;117 118 void __kernel_io_startup(void) {119 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );120 121 iopoll.epollfd = epoll_create1(0);122 if (iopoll.epollfd == -1) {123 abort( "internal error, epoll_create1\n");124 }125 126 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );127 128 iopoll.run = true;129 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );130 }131 132 void __kernel_io_shutdown(void) {133 // Notify the io poller thread of the shutdown134 iopoll.run = false;135 sigval val = { 1 };136 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );137 138 // Wait for the io poller thread to finish139 140 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );141 142 int ret = close(iopoll.epollfd);143 if (ret == -1) {144 abort( "internal error, close epoll\n");145 }146 147 // Io polling is now fully stopped148 149 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );150 }151 152 static void * iopoll_loop( __attribute__((unused)) void * args ) {153 __processor_id_t id;154 id.full_proc = false;155 id.id = doregister(&id);156 __cfaabi_tls.this_proc_id = &id;157 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );158 159 // Block signals to control when they arrive160 sigset_t mask;161 sigfillset(&mask);162 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {163 abort( "internal error, pthread_sigmask" );164 }165 166 sigdelset( &mask, SIGUSR1 );167 168 // Create sufficient events169 struct epoll_event events[10];170 // Main loop171 while( iopoll.run ) {172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");173 174 // Wait for events175 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);178 179 // Check if an error occured180 if (nfds == -1) {181 if( errno == EINTR ) continue;182 abort( "internal error, pthread_sigmask" );183 }184 185 for(i; nfds) {186 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;187 /* paranoid */ verify( io_ctx );188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);189 #if !defined( __CFA_NO_STATISTICS__ )190 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;191 #endif192 193 eventfd_t v;194 eventfd_read(io_ctx->ring->efd, &v);195 196 post( io_ctx->sem );197 }198 }199 200 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );201 unregister(&id);202 return 0p;203 }204 205 //=============================================================================================206 95 // I/O Context Constrution/Destruction 207 96 //============================================================================================= 208 97 209 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 210 void main( $io_ctx_thread & this ); 211 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 212 void ^?{}( $io_ctx_thread & mutex this ) {} 213 214 static void __io_create ( __io_data & this, const io_context_params & params_in ); 215 static void __io_destroy( __io_data & this ); 216 217 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 218 (this.thrd){ cl }; 219 this.thrd.ring = malloc(); 220 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 221 __io_create( *this.thrd.ring, params ); 222 223 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 224 this.thrd.done = false; 225 __thrd_start( this.thrd, main ); 226 227 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 228 } 229 230 void ?{}(io_context & this, struct cluster & cl) { 231 io_context_params params; 232 (this){ cl, params }; 233 } 234 235 void ^?{}(io_context & this, bool cluster_context) { 236 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 237 238 // Notify the thread of the shutdown 239 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 240 241 // If this is an io_context within a cluster, things get trickier 242 $thread & thrd = this.thrd.self; 243 if( cluster_context ) { 244 // We are about to do weird things with the threads 245 // we don't need interrupts to complicate everything 246 disable_interrupts(); 247 248 // Get cluster info 249 cluster & cltr = *thrd.curr_cluster; 250 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 251 /* paranoid */ verify( !ready_mutate_islocked() ); 252 253 // We need to adjust the clean-up based on where the thread is 254 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case 256 // The thread was preempted or ready to run and now it is on the ready queue 257 // but the cluster is shutting down, so there aren't any processors to run the ready queue 258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 259 260 ready_schedule_lock(); 261 // The thread should on the list 262 /* paranoid */ verify( thrd.link.next != 0p ); 263 264 // Remove the thread from the ready queue of this cluster 265 // The thread should be the last on the list 266 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 267 /* paranoid */ verify( removed ); 268 thrd.link.next = 0p; 269 thrd.link.prev = 0p; 270 271 // Fixup the thread state 272 thrd.state = Blocked; 273 thrd.ticket = TICKET_BLOCKED; 274 thrd.preempted = __NO_PREEMPTION; 275 276 ready_schedule_unlock(); 277 278 // Pretend like the thread was blocked all along 279 } 280 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not) 282 if( thrd.state == Blocked ) { 283 // This is the "easy case" 284 // The thread is parked and can easily be moved to active cluster 285 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 286 thrd.curr_cluster = active_cluster(); 287 288 // unpark the fast io_poller 289 unpark( &thrd ); 290 } 291 else { 292 // The thread is in a weird state 293 // I don't know what to do here 294 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 295 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts. 298 enable_interrupts( __cfaabi_dbg_ctx ); 299 } else { 300 post( this.thrd.sem ); 301 } 302 303 ^(this.thrd){}; 304 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 305 306 __io_destroy( *this.thrd.ring ); 307 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 308 309 free(this.thrd.ring); 310 } 311 312 void ^?{}(io_context & this) { 313 ^(this){ false }; 314 } 315 316 static void __io_create( __io_data & this, const io_context_params & params_in ) { 98 99 100 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd ); 101 static void __io_uring_teardown( $io_context & this ); 102 static void __epoll_register($io_context & ctx); 103 static void __epoll_unregister($io_context & ctx); 104 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx ); 105 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 106 107 void ?{}($io_context & this, processor * proc, struct cluster & cl) { 108 /* paranoid */ verify( cl.io.arbiter ); 109 this.proc = proc; 110 this.arbiter = cl.io.arbiter; 111 this.ext_sq.empty = true; 112 (this.ext_sq.blocked){}; 113 __io_uring_setup( this, cl.io.params, proc->idle ); 114 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 115 } 116 117 void ^?{}($io_context & this) { 118 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 119 120 __io_uring_teardown( this ); 121 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd); 122 } 123 124 extern void __disable_interrupts_hard(); 125 extern void __enable_interrupts_hard(); 126 127 static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) { 317 128 // Step 1 : call to setup 318 129 struct io_uring_params params; 319 130 memset(¶ms, 0, sizeof(params)); 320 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;321 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;131 // if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 132 // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 322 133 323 134 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; … … 325 136 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 326 137 } 327 if( params_in.poller_submits && params_in.eager_submits ) {328 abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");329 }330 138 331 139 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 335 143 336 144 // Step 2 : mmap result 337 memset( &this, 0, sizeof(struct __io_data) ); 338 struct __submition_data & sq = this.submit_q; 339 struct __completion_data & cq = this.completion_q; 145 struct __sub_ring_t & sq = this.sq; 146 struct __cmp_ring_t & cq = this.cq; 340 147 341 148 // calculate the right ring size … … 386 193 // Get the pointers from the kernel to fill the structure 387 194 // submit queue 388 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 389 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 390 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 391 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 392 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 393 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 394 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 395 sq.prev_head = *sq.head; 396 397 { 398 const __u32 num = *sq.num; 399 for( i; num ) { 400 __sqe_clean( &sq.sqes[i] ); 401 } 402 } 403 404 (sq.submit_lock){}; 405 (sq.release_lock){}; 406 407 if( params_in.poller_submits || params_in.eager_submits ) { 408 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 409 sq.ready_cnt = max( params_in.num_ready, 8 ); 410 sq.ready = alloc( sq.ready_cnt, 64`align ); 411 for(i; sq.ready_cnt) { 412 sq.ready[i] = -1ul32; 413 } 414 sq.prev_ready = 0; 415 } 416 else { 417 sq.ready_cnt = 0; 418 sq.ready = 0p; 419 sq.prev_ready = 0; 420 } 195 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 196 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 197 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 198 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 199 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 200 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 201 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 202 203 sq.kring.released = 0; 204 205 sq.free_ring.head = 0; 206 sq.free_ring.tail = *sq.num; 207 sq.free_ring.array = alloc( *sq.num, 128`align ); 208 for(i; (__u32)*sq.num) { 209 sq.free_ring.array[i] = i; 210 } 211 212 sq.to_submit = 0; 421 213 422 214 // completion queue … … 429 221 430 222 // Step 4 : eventfd 431 int efd; 432 for() { 433 efd = eventfd(0, 0); 434 if (efd < 0) { 435 if (errno == EINTR) continue; 436 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 437 } 438 break; 439 } 440 441 int ret; 442 for() { 443 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 444 if (ret < 0) { 445 if (errno == EINTR) continue; 446 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 447 } 448 break; 449 } 223 // io_uring_register is so f*cking slow on some machine that it 224 // will never succeed if preemption isn't hard blocked 225 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd); 226 227 __disable_interrupts_hard(); 228 229 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1); 230 if (ret < 0) { 231 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 232 } 233 234 __enable_interrupts_hard(); 235 236 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd); 450 237 451 238 // some paranoid checks … … 457 244 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 458 245 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 459 /* paranoid */ verifyf( (*sq. head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );460 /* paranoid */ verifyf( (*sq. tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );246 /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head ); 247 /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail ); 461 248 462 249 // Update the global ring info 463 this.ring_flags = params.flags;250 this.ring_flags = 0; 464 251 this.fd = fd; 465 this.efd = efd; 466 this.eager_submits = params_in.eager_submits; 467 this.poller_submits = params_in.poller_submits; 468 } 469 470 static void __io_destroy( __io_data & this ) { 252 } 253 254 static void __io_uring_teardown( $io_context & this ) { 471 255 // Shutdown the io rings 472 struct __sub mition_data & sq = this.submit_q;473 struct __c ompletion_data & cq = this.completion_q;256 struct __sub_ring_t & sq = this.sq; 257 struct __cmp_ring_t & cq = this.cq; 474 258 475 259 // unmap the submit queue entries … … 486 270 // close the file descriptor 487 271 close(this.fd); 488 close(this.efd); 489 490 free( this.submit_q.ready ); // Maybe null, doesn't matter 272 273 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 274 } 275 276 void __cfa_io_start( processor * proc ) { 277 proc->io.ctx = alloc(); 278 (*proc->io.ctx){proc, *proc->cltr}; 279 } 280 void __cfa_io_stop ( processor * proc ) { 281 ^(*proc->io.ctx){}; 282 free(proc->io.ctx); 491 283 } 492 284 … … 494 286 // I/O Context Sleep 495 287 //============================================================================================= 496 #define IOEVENTS EPOLLIN | EPOLLONESHOT 497 498 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 499 struct epoll_event ev; 500 ev.events = IOEVENTS; 501 ev.data.u64 = (__u64)&ctx; 502 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); 503 if (ret < 0) { 504 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 505 } 506 } 507 508 void __ioctx_register($io_ctx_thread & ctx) { 509 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 510 } 511 512 void __ioctx_prepare_block($io_ctx_thread & ctx) { 513 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 514 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 515 } 288 // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 289 // struct epoll_event ev; 290 // ev.events = EPOLLIN | EPOLLONESHOT; 291 // ev.data.u64 = (__u64)&ctx; 292 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 293 // if (ret < 0) { 294 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 295 // } 296 // } 297 298 // static void __epoll_register($io_context & ctx) { 299 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 300 // } 301 302 // static void __epoll_unregister($io_context & ctx) { 303 // // Read the current epoch so we know when to stop 304 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 305 306 // // Remove the fd from the iopoller 307 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 308 309 // // Notify the io poller thread of the shutdown 310 // iopoll.run = false; 311 // sigval val = { 1 }; 312 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 313 314 // // Make sure all this is done 315 // __atomic_thread_fence(__ATOMIC_SEQ_CST); 316 317 // // Wait for the next epoch 318 // while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 319 // } 320 321 // void __ioctx_prepare_block($io_context & ctx) { 322 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 323 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 324 // } 325 516 326 517 327 //============================================================================================= 518 328 // I/O Context Misc Setup 519 329 //============================================================================================= 520 void register_fixed_files( io_context & ctx, int * files, unsigned count ) { 521 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 522 if( ret < 0 ) { 523 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 524 } 525 526 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 527 } 528 529 void register_fixed_files( cluster & cltr, int * files, unsigned count ) { 530 for(i; cltr.io.cnt) { 531 register_fixed_files( cltr.io.ctxs[i], files, count ); 532 } 533 } 330 void ?{}( $io_arbiter & this ) { 331 this.pending.flag = false; 332 } 333 334 void ^?{}( $io_arbiter & mutex this ) { 335 // /* paranoid */ verify( empty(this.assigned) ); 336 // /* paranoid */ verify( empty(this.available) ); 337 /* paranoid */ verify( is_empty(this.pending.blocked) ); 338 } 339 340 $io_arbiter * create(void) { 341 return new(); 342 } 343 void destroy($io_arbiter * arbiter) { 344 delete(arbiter); 345 } 346 347 //============================================================================================= 348 // I/O Context Misc Setup 349 //============================================================================================= 350 534 351 #endif -
libcfa/src/concurrency/io/types.hfa
r342af53 r8e4aa05 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa -- 7 // io/types.hfa -- PRIVATE 8 // Types used by the I/O subsystem 8 9 // 9 10 // Author : Thierry Delisle … … 21 22 22 23 #include "bits/locks.hfa" 24 #include "kernel/fwd.hfa" 23 25 24 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 25 #define LEADER_LOCK 26 struct __leaderlock_t { 27 struct $thread * volatile value; // ($thread) next_leader | (bool:1) is_locked 28 }; 27 #include "bits/sequence.hfa" 28 #include "monitor.hfa" 29 29 30 static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; } 30 struct processor; 31 monitor $io_arbiter; 31 32 32 33 //----------------------------------------------------------------------- 33 34 // Ring Data structure 34 struct __submition_data { 35 // Head and tail of the ring (associated with array) 36 volatile __u32 * head; 37 volatile __u32 * tail; 38 volatile __u32 prev_head; 35 struct __sub_ring_t { 36 struct { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 released; // one passed last index released back to the free list 39 41 40 // The actual kernel ring which uses head/tail 41 // indexes into the sqes arrays 42 __u32 * array; 42 // The actual kernel ring which uses head/tail 43 // indexes into the sqes arrays 44 __u32 * array; 45 } kring; 46 47 struct { 48 volatile __u32 head; 49 volatile __u32 tail; 50 // The ring which contains free allocations 51 // indexes into the sqes arrays 52 __u32 * array; 53 } free_ring; 54 55 // number of sqes to submit on next system call. 56 __u32 to_submit; 43 57 44 58 // number of entries and mask to go with it … … 46 60 const __u32 * mask; 47 61 48 // Submission flags (Not sure what for)62 // Submission flags, currently only IORING_SETUP_SQPOLL 49 63 __u32 * flags; 50 64 51 // number of sqes not submitted (whatever that means) 65 // number of sqes not submitted 66 // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer. 52 67 __u32 * dropped; 53 68 54 // Like head/tail but not seen by the kernel55 volatile __u32 * ready;56 __u32 ready_cnt;57 __u32 prev_ready;58 59 #if defined(LEADER_LOCK)60 __leaderlock_t submit_lock;61 #else62 __spinlock_t submit_lock;63 #endif64 __spinlock_t release_lock;65 66 69 // A buffer of sqes (not the actual ring) 67 volatilestruct io_uring_sqe * sqes;70 struct io_uring_sqe * sqes; 68 71 69 72 // The location and size of the mmaped area … … 72 75 }; 73 76 74 struct __c ompletion_data{77 struct __cmp_ring_t { 75 78 // Head and tail of the ring 76 79 volatile __u32 * head; … … 81 84 const __u32 * num; 82 85 83 // number of cqes not submitted (whatever that means)86 // I don't know what this value is for 84 87 __u32 * overflow; 85 88 … … 92 95 }; 93 96 94 struct __io_data { 95 struct __submition_data submit_q; 96 struct __completion_data completion_q; 97 struct __attribute__((aligned(128))) $io_context { 98 $io_arbiter * arbiter; 99 processor * proc; 100 101 struct { 102 volatile bool empty; 103 condition blocked; 104 } ext_sq; 105 106 struct __sub_ring_t sq; 107 struct __cmp_ring_t cq; 97 108 __u32 ring_flags; 98 109 int fd; 99 int efd; 100 bool eager_submits:1; 101 bool poller_submits:1; 110 }; 111 112 monitor __attribute__((aligned(128))) $io_arbiter { 113 struct { 114 condition blocked; 115 $io_context * ctx; 116 volatile bool flag; 117 } pending; 102 118 }; 103 119 … … 131 147 #endif 132 148 133 struct $io_ctx_thread; 134 void __ioctx_register($io_ctx_thread & ctx); 135 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 149 // void __ioctx_prepare_block($io_context & ctx); 137 150 #endif 138 151 -
libcfa/src/concurrency/iofwd.hfa
r342af53 r8e4aa05 18 18 #include <unistd.h> 19 19 extern "C" { 20 #include < sys/types.h>20 #include <asm/types.h> 21 21 #if CFA_HAVE_LINUX_IO_URING_H 22 22 #include <linux/io_uring.h> … … 48 48 struct cluster; 49 49 struct io_future_t; 50 struct io_context; 51 struct io_cancellation; 50 struct $io_context; 52 51 53 52 struct iovec; … … 55 54 struct sockaddr; 56 55 struct statx; 56 struct epoll_event; 57 58 //---------- 59 // underlying calls 60 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 61 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 57 62 58 63 //---------- 59 64 // synchronous calls 60 65 #if defined(CFA_HAVE_PREADV2) 61 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);66 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 62 67 #endif 63 68 #if defined(CFA_HAVE_PWRITEV2) 64 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);69 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 65 70 #endif 66 extern int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);67 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);68 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);69 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);70 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);71 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);72 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);73 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);74 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);75 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);76 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);77 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);78 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);71 extern int cfa_fsync(int fd, __u64 submit_flags); 72 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags); 73 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags); 74 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags); 75 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags); 76 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags); 77 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags); 78 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags); 79 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags); 80 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags); 81 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags); 82 extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags); 83 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags); 79 84 #if defined(CFA_HAVE_OPENAT2) 80 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);85 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags); 81 86 #endif 82 extern int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);87 extern int cfa_close(int fd, __u64 submit_flags); 83 88 #if defined(CFA_HAVE_STATX) 84 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);89 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags); 85 90 #endif 86 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);87 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);88 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);89 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);91 extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags); 92 extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags); 93 extern ssize_t cfa_splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags); 94 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags); 90 95 91 96 //---------- 92 97 // asynchronous calls 93 98 #if defined(CFA_HAVE_PREADV2) 94 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);99 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 95 100 #endif 96 101 #if defined(CFA_HAVE_PWRITEV2) 97 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);102 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags); 98 103 #endif 99 extern void async_fsync(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);100 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags, io_cancellation * cancellation, io_context * context);101 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);102 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);103 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);104 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);105 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);106 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);107 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, io_cancellation * cancellation, io_context * context);108 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags, io_cancellation * cancellation, io_context * context);109 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);110 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);111 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, io_cancellation * cancellation, io_context * context);104 extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags); 105 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags); 106 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags); 107 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags); 108 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags); 109 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags); 110 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags); 111 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags); 112 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags); 113 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags); 114 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags); 115 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags); 116 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags); 112 117 #if defined(CFA_HAVE_OPENAT2) 113 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, io_cancellation * cancellation, io_context * context);118 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags); 114 119 #endif 115 extern void async_close(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);120 extern void async_close(io_future_t & future, int fd, __u64 submit_flags); 116 121 #if defined(CFA_HAVE_STATX) 117 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, io_cancellation * cancellation, io_context * context);122 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags); 118 123 #endif 119 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);120 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);121 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);122 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);124 void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags); 125 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags); 126 extern void async_splice(io_future_t & future, int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags); 127 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags); 123 128 124 129 … … 126 131 // Check if a function is blocks a only the user thread 127 132 bool has_user_level_blocking( fptr_t func ); 128 129 //-----------------------------------------------------------------------------130 void register_fixed_files( io_context & ctx , int * files, unsigned count );131 void register_fixed_files( cluster & cltr, int * files, unsigned count ); -
libcfa/src/concurrency/kernel.cfa
r342af53 r8e4aa05 22 22 #include <signal.h> 23 23 #include <unistd.h> 24 extern "C" { 25 #include <sys/eventfd.h> 26 } 24 27 25 28 //CFA Includes … … 114 117 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 115 118 119 extern void __cfa_io_start( processor * ); 120 extern void __cfa_io_drain( processor * ); 121 extern void __cfa_io_flush( processor * ); 122 extern void __cfa_io_stop ( processor * ); 123 static inline void __maybe_io_drain( processor * ); 124 125 extern void __disable_interrupts_hard(); 126 extern void __enable_interrupts_hard(); 116 127 117 128 //============================================================================================= … … 129 140 verify(this); 130 141 142 __cfa_io_start( this ); 143 131 144 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 132 145 #if !defined(__CFA_NO_STATISTICS__) … … 140 153 preemption_scope scope = { this }; 141 154 155 #if !defined(__CFA_NO_STATISTICS__) 156 unsigned long long last_tally = rdtscl(); 157 #endif 158 159 142 160 __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this); 143 161 … … 145 163 MAIN_LOOP: 146 164 for() { 165 // Check if there is pending io 166 __maybe_io_drain( this ); 167 147 168 // Try to get the next thread 148 169 readyThread = __next_thread( this->cltr ); 149 170 150 171 if( !readyThread ) { 172 __cfa_io_flush( this ); 151 173 readyThread = __next_thread_slow( this->cltr ); 152 174 } … … 184 206 #endif 185 207 186 wait( this->idle ); 208 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 209 210 __disable_interrupts_hard(); 211 eventfd_t val; 212 eventfd_read( this->idle, &val ); 213 __enable_interrupts_hard(); 187 214 188 215 #if !defined(__CFA_NO_STATISTICS__) … … 201 228 /* paranoid */ verify( readyThread ); 202 229 230 // Reset io dirty bit 231 this->io.dirty = false; 232 203 233 // We found a thread run it 204 234 __run_thread(this, readyThread); … … 206 236 // Are we done? 207 237 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 238 239 #if !defined(__CFA_NO_STATISTICS__) 240 unsigned long long curr = rdtscl(); 241 if(curr > (last_tally + 500000000)) { 242 __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats); 243 last_tally = curr; 244 } 245 #endif 246 247 if(this->io.pending && !this->io.dirty) { 248 __cfa_io_flush( this ); 249 } 208 250 } 209 251 … … 211 253 } 212 254 213 V( this->terminated ); 255 __cfa_io_stop( this ); 256 257 post( this->terminated ); 258 214 259 215 260 if(this == mainProcessor) { … … 234 279 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 235 280 __builtin_prefetch( thrd_dst->context.SP ); 281 282 __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name); 236 283 237 284 $coroutine * proc_cor = get_coroutine(this->runner); … … 316 363 // Just before returning to the processor, set the processor coroutine to active 317 364 proc_cor->state = Active; 365 366 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst); 318 367 319 368 /* paranoid */ verify( ! __preemption_enabled() ); … … 550 599 551 600 // We found a processor, wake it up 552 post( p->idle ); 601 eventfd_t val; 602 val = 1; 603 eventfd_write( p->idle, val ); 553 604 554 605 #if !defined(__CFA_NO_STATISTICS__) … … 568 619 disable_interrupts(); 569 620 /* paranoid */ verify( ! __preemption_enabled() ); 570 post( this->idle ); 621 eventfd_t val; 622 val = 1; 623 eventfd_write( this->idle, val ); 571 624 enable_interrupts( __cfaabi_dbg_ctx ); 572 625 } … … 611 664 // Unexpected Terminating logic 612 665 //============================================================================================= 613 static __spinlock_t kernel_abort_lock; 614 static bool kernel_abort_called = false; 615 616 void * kernel_abort(void) __attribute__ ((__nothrow__)) { 617 // abort cannot be recursively entered by the same or different processors because all signal handlers return when 618 // the globalAbort flag is true. 619 lock( kernel_abort_lock __cfaabi_dbg_ctx2 ); 620 621 // disable interrupts, it no longer makes sense to try to interrupt this processor 622 disable_interrupts(); 623 624 // first task to abort ? 625 if ( kernel_abort_called ) { // not first task to abort ? 626 unlock( kernel_abort_lock ); 627 628 sigset_t mask; 629 sigemptyset( &mask ); 630 sigaddset( &mask, SIGALRM ); // block SIGALRM signals 631 sigaddset( &mask, SIGUSR1 ); // block SIGALRM signals 632 sigsuspend( &mask ); // block the processor to prevent further damage during abort 633 _exit( EXIT_FAILURE ); // if processor unblocks before it is killed, terminate it 634 } 635 else { 636 kernel_abort_called = true; 637 unlock( kernel_abort_lock ); 638 } 639 640 return __cfaabi_tls.this_thread; 641 } 642 643 void kernel_abort_msg( void * kernel_data, char * abort_text, int abort_text_size ) { 644 $thread * thrd = ( $thread * ) kernel_data; 666 void __kernel_abort_msg( char * abort_text, int abort_text_size ) { 667 $thread * thrd = __cfaabi_tls.this_thread; 645 668 646 669 if(thrd) { … … 662 685 } 663 686 664 int kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {665 return get_coroutine( kernelTLS().this_thread) == get_coroutine(mainThread) ? 4 : 2;687 int __kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) { 688 return get_coroutine(__cfaabi_tls.this_thread) == get_coroutine(mainThread) ? 4 : 2; 666 689 } 667 690 … … 681 704 // Kernel Utilities 682 705 //============================================================================================= 683 //----------------------------------------------------------------------------- 684 // Locks 685 void ?{}( semaphore & this, int count = 1 ) { 686 (this.lock){}; 687 this.count = count; 688 (this.waiting){}; 689 } 690 void ^?{}(semaphore & this) {} 691 692 bool P(semaphore & this) with( this ){ 693 lock( lock __cfaabi_dbg_ctx2 ); 694 count -= 1; 695 if ( count < 0 ) { 696 // queue current task 697 append( waiting, active_thread() ); 698 699 // atomically release spin lock and block 700 unlock( lock ); 701 park(); 702 return true; 703 } 704 else { 705 unlock( lock ); 706 return false; 707 } 708 } 709 710 bool V(semaphore & this) with( this ) { 711 $thread * thrd = 0p; 712 lock( lock __cfaabi_dbg_ctx2 ); 713 count += 1; 714 if ( count <= 0 ) { 715 // remove task at head of waiting list 716 thrd = pop_head( waiting ); 717 } 718 719 unlock( lock ); 720 721 // make new owner 722 unpark( thrd ); 723 724 return thrd != 0p; 725 } 726 727 bool V(semaphore & this, unsigned diff) with( this ) { 728 $thread * thrd = 0p; 729 lock( lock __cfaabi_dbg_ctx2 ); 730 int release = max(-count, (int)diff); 731 count += diff; 732 for(release) { 733 unpark( pop_head( waiting ) ); 734 } 735 736 unlock( lock ); 737 738 return thrd != 0p; 706 #if defined(CFA_HAVE_LINUX_IO_URING_H) 707 #include "io/types.hfa" 708 #endif 709 710 static inline void __maybe_io_drain( processor * proc ) { 711 #if defined(CFA_HAVE_LINUX_IO_URING_H) 712 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd); 713 714 // Check if we should drain the queue 715 $io_context * ctx = proc->io.ctx; 716 unsigned head = *ctx->cq.head; 717 unsigned tail = *ctx->cq.tail; 718 if(head != tail) __cfa_io_drain( proc ); 719 #endif 739 720 } 740 721 -
libcfa/src/concurrency/kernel.hfa
r342af53 r8e4aa05 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel -- 7 // kernel -- Header containing the core of the kernel API 8 8 // 9 9 // Author : Thierry Delisle … … 24 24 extern "C" { 25 25 #include <bits/pthreadtypes.h> 26 #include <pthread.h> 26 27 #include <linux/types.h> 27 28 } 28 29 29 //----------------------------------------------------------------------------- 30 // Locks 31 struct semaphore { 32 __spinlock_t lock; 33 int count; 34 __queue_t($thread) waiting; 35 }; 36 37 void ?{}(semaphore & this, int count = 1); 38 void ^?{}(semaphore & this); 39 bool P (semaphore & this); 40 bool V (semaphore & this); 41 bool V (semaphore & this, unsigned count); 42 30 #ifdef __CFA_WITH_VERIFY__ 31 extern bool __cfaabi_dbg_in_kernel(); 32 #endif 33 34 //----------------------------------------------------------------------------- 35 // I/O 36 struct cluster; 37 struct $io_context; 38 struct $io_arbiter; 39 40 struct io_context_params { 41 int num_entries; 42 }; 43 44 void ?{}(io_context_params & this); 43 45 44 46 //----------------------------------------------------------------------------- … … 80 82 pthread_t kernel_thread; 81 83 84 struct { 85 $io_context * ctx; 86 bool pending; 87 bool dirty; 88 } io; 89 82 90 // Preemption data 83 91 // Node which is added in the discrete event simulaiton … … 88 96 89 97 // Idle lock (kernel semaphore) 90 __bin_sem_t idle;98 int idle; 91 99 92 100 // Termination synchronisation (user semaphore) 93 semaphoreterminated;101 oneshot terminated; 94 102 95 103 // pthread Stack … … 118 126 119 127 DLISTED_MGD_IMPL_OUT(processor) 120 121 //-----------------------------------------------------------------------------122 // I/O123 struct __io_data;124 125 // IO poller user-thread126 // Not using the "thread" keyword because we want to control127 // more carefully when to start/stop it128 struct $io_ctx_thread {129 struct __io_data * ring;130 single_sem sem;131 volatile bool done;132 $thread self;133 };134 135 136 struct io_context {137 $io_ctx_thread thrd;138 };139 140 struct io_context_params {141 int num_entries;142 int num_ready;143 int submit_aff;144 bool eager_submits:1;145 bool poller_submits:1;146 bool poll_submit:1;147 bool poll_complete:1;148 };149 150 void ?{}(io_context_params & this);151 152 void ?{}(io_context & this, struct cluster & cl);153 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params);154 void ^?{}(io_context & this);155 156 struct io_cancellation {157 __u64 target;158 };159 160 static inline void ?{}(io_cancellation & this) { this.target = -1u; }161 static inline void ^?{}(io_cancellation &) {}162 bool cancel(io_cancellation & this);163 128 164 129 //----------------------------------------------------------------------------- … … 246 211 247 212 struct { 248 io_context * ctxs;249 unsigned cnt;213 $io_arbiter * arbiter; 214 io_context_params params; 250 215 } io; 251 216 -
libcfa/src/concurrency/kernel/fwd.hfa
r342af53 r8e4aa05 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel/fwd.hfa -- 7 // kernel/fwd.hfa -- PUBLIC 8 // Fundamental code needed to implement threading M.E.S. algorithms. 8 9 // 9 10 // Author : Thierry Delisle … … 134 135 extern uint64_t thread_rand(); 135 136 137 // Semaphore which only supports a single thread 138 struct single_sem { 139 struct $thread * volatile ptr; 140 }; 141 142 static inline { 143 void ?{}(single_sem & this) { 144 this.ptr = 0p; 145 } 146 147 void ^?{}(single_sem &) {} 148 149 bool wait(single_sem & this) { 150 for() { 151 struct $thread * expected = this.ptr; 152 if(expected == 1p) { 153 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 154 return false; 155 } 156 } 157 else { 158 /* paranoid */ verify( expected == 0p ); 159 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 160 park(); 161 return true; 162 } 163 } 164 165 } 166 } 167 168 bool post(single_sem & this) { 169 for() { 170 struct $thread * expected = this.ptr; 171 if(expected == 1p) return false; 172 if(expected == 0p) { 173 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 174 return false; 175 } 176 } 177 else { 178 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 179 unpark( expected ); 180 return true; 181 } 182 } 183 } 184 } 185 } 186 187 // Synchronozation primitive which only supports a single thread and one post 188 // Similar to a binary semaphore with a 'one shot' semantic 189 // is expected to be discarded after each party call their side 190 struct oneshot { 191 // Internal state : 192 // 0p : is initial state (wait will block) 193 // 1p : fulfilled (wait won't block) 194 // any thread : a thread is currently waiting 195 struct $thread * volatile ptr; 196 }; 197 198 static inline { 199 void ?{}(oneshot & this) { 200 this.ptr = 0p; 201 } 202 203 void ^?{}(oneshot &) {} 204 205 // Wait for the post, return immidiately if it already happened. 206 // return true if the thread was parked 207 bool wait(oneshot & this) { 208 for() { 209 struct $thread * expected = this.ptr; 210 if(expected == 1p) return false; 211 /* paranoid */ verify( expected == 0p ); 212 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 213 park(); 214 /* paranoid */ verify( this.ptr == 1p ); 215 return true; 216 } 217 } 218 } 219 220 // Mark as fulfilled, wake thread if needed 221 // return true if a thread was unparked 222 bool post(oneshot & this) { 223 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 224 if( got == 0p ) return false; 225 unpark( got ); 226 return true; 227 } 228 } 229 230 // base types for future to build upon 231 // It is based on the 'oneshot' type to allow multiple futures 232 // to block on the same instance, permitting users to block a single 233 // thread on "any of" [a given set of] futures. 234 // does not support multiple threads waiting on the same future 235 struct future_t { 236 // Internal state : 237 // 0p : is initial state (wait will block) 238 // 1p : fulfilled (wait won't block) 239 // 2p : in progress () 240 // 3p : abandoned, server should delete 241 // any oneshot : a context has been setup to wait, a thread could wait on it 242 struct oneshot * volatile ptr; 243 }; 244 245 static inline { 246 void ?{}(future_t & this) { 247 this.ptr = 0p; 248 } 249 250 void ^?{}(future_t &) {} 251 252 void reset(future_t & this) { 253 // needs to be in 0p or 1p 254 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 255 } 256 257 // check if the future is available 258 bool available( future_t & this ) { 259 return this.ptr == 1p; 260 } 261 262 // Prepare the future to be waited on 263 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 264 bool setup( future_t & this, oneshot & wait_ctx ) { 265 /* paranoid */ verify( wait_ctx.ptr == 0p ); 266 // The future needs to set the wait context 267 for() { 268 struct oneshot * expected = this.ptr; 269 // Is the future already fulfilled? 270 if(expected == 1p) return false; // Yes, just return false (didn't block) 271 272 // The future is not fulfilled, try to setup the wait context 273 /* paranoid */ verify( expected == 0p ); 274 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 275 return true; 276 } 277 } 278 } 279 280 // Stop waiting on a future 281 // When multiple futures are waited for together in "any of" pattern 282 // futures that weren't fulfilled before the thread woke up 283 // should retract the wait ctx 284 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 285 void retract( future_t & this, oneshot & wait_ctx ) { 286 // Remove the wait context 287 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 288 289 // got == 0p: future was never actually setup, just return 290 if( got == 0p ) return; 291 292 // got == wait_ctx: since fulfil does an atomic_swap, 293 // if we got back the original then no one else saw context 294 // It is safe to delete (which could happen after the return) 295 if( got == &wait_ctx ) return; 296 297 // got == 1p: the future is ready and the context was fully consumed 298 // the server won't use the pointer again 299 // It is safe to delete (which could happen after the return) 300 if( got == 1p ) return; 301 302 // got == 2p: the future is ready but the context hasn't fully been consumed 303 // spin until it is safe to move on 304 if( got == 2p ) { 305 while( this.ptr != 1p ) Pause(); 306 return; 307 } 308 309 // got == any thing else, something wen't wrong here, abort 310 abort("Future in unexpected state"); 311 } 312 313 // Mark the future as abandoned, meaning it will be deleted by the server 314 bool abandon( future_t & this ) { 315 /* paranoid */ verify( this.ptr != 3p ); 316 317 // Mark the future as abandonned 318 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST); 319 320 // If the future isn't already fulfilled, let the server delete it 321 if( got == 0p ) return false; 322 323 // got == 2p: the future is ready but the context hasn't fully been consumed 324 // spin until it is safe to move on 325 if( got == 2p ) { 326 while( this.ptr != 1p ) Pause(); 327 got = 1p; 328 } 329 330 // The future is completed delete it now 331 /* paranoid */ verify( this.ptr != 1p ); 332 free( &this ); 333 return true; 334 } 335 336 // from the server side, mark the future as fulfilled 337 // delete it if needed 338 bool fulfil( future_t & this ) { 339 for() { 340 struct oneshot * expected = this.ptr; 341 // was this abandoned? 342 #if defined(__GNUC__) && __GNUC__ >= 7 343 #pragma GCC diagnostic push 344 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 345 #endif 346 if( expected == 3p ) { free( &this ); return false; } 347 #if defined(__GNUC__) && __GNUC__ >= 7 348 #pragma GCC diagnostic pop 349 #endif 350 351 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen 352 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case. 353 354 // If there is a wait context, we need to consume it and mark it as consumed after 355 // If there is no context then we can skip the in progress phase 356 struct oneshot * want = expected == 0p ? 1p : 2p; 357 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 358 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; } 359 bool ret = post( *expected ); 360 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 361 return ret; 362 } 363 } 364 365 } 366 367 // Wait for the future to be fulfilled 368 bool wait( future_t & this ) { 369 oneshot temp; 370 if( !setup(this, temp) ) return false; 371 372 // Wait context is setup, just wait on it 373 bool ret = wait( temp ); 374 375 // Wait for the future to tru 376 while( this.ptr == 2p ) Pause(); 377 // Make sure the state makes sense 378 // Should be fulfilled, could be in progress but it's out of date if so 379 // since if that is the case, the oneshot was fulfilled (unparking this thread) 380 // and the oneshot should not be needed any more 381 __attribute__((unused)) struct oneshot * was = this.ptr; 382 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was ); 383 384 // Mark the future as fulfilled, to be consistent 385 // with potential calls to avail 386 // this.ptr = 1p; 387 return ret; 388 } 389 } 390 136 391 //----------------------------------------------------------------------- 137 392 // Statics call at the end of each thread to register statistics -
libcfa/src/concurrency/kernel/startup.cfa
r342af53 r8e4aa05 22 22 extern "C" { 23 23 #include <limits.h> // PTHREAD_STACK_MIN 24 #include <sys/eventfd.h> // eventfd 24 25 #include <sys/mman.h> // mprotect 25 26 #include <sys/resource.h> // getrlimit … … 89 90 extern void __kernel_alarm_startup(void); 90 91 extern void __kernel_alarm_shutdown(void); 91 extern void __kernel_io_startup (void);92 extern void __kernel_io_shutdown(void);93 92 94 93 //----------------------------------------------------------------------------- … … 102 101 KERNEL_STORAGE($thread, mainThread); 103 102 KERNEL_STORAGE(__stack_t, mainThreadCtx); 104 KERNEL_STORAGE(io_context, mainPollerThread);105 103 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 106 104 #if !defined(__CFA_NO_STATISTICS__) … … 198 196 199 197 void ?{}(processor & this) with( this ) { 200 ( this.idle ){}; 201 ( this.terminated ){ 0 }; 198 ( this.terminated ){}; 202 199 ( this.runner ){}; 203 200 init( this, "Main Processor", *mainCluster ); … … 226 223 __kernel_alarm_startup(); 227 224 228 // Start IO229 __kernel_io_startup();230 231 225 // Add the main thread to the ready queue 232 226 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread … … 241 235 // THE SYSTEM IS NOW COMPLETELY RUNNING 242 236 243 244 // SKULLDUGGERY: The constructor for the mainCluster will call alloc with a dimension of 0245 // malloc *can* return a non-null value, we should free it if that is the case246 free( mainCluster->io.ctxs );247 248 // Now that the system is up, finish creating systems that need threading249 mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;250 mainCluster->io.cnt = 1;251 (*mainCluster->io.ctxs){ *mainCluster };252 253 237 __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n"); 254 238 … … 260 244 261 245 static void __kernel_shutdown(void) { 262 //Before we start shutting things down, wait for systems that need threading to shutdown263 ^(*mainCluster->io.ctxs){};264 mainCluster->io.cnt = 0;265 mainCluster->io.ctxs = 0p;266 267 246 /* paranoid */ verify( __preemption_enabled() ); 268 247 disable_interrupts(); … … 282 261 // Disable preemption 283 262 __kernel_alarm_shutdown(); 284 285 // Stop IO286 __kernel_io_shutdown();287 263 288 264 // Destroy the main processor and its context in reverse order of construction … … 484 460 pending_preemption = false; 485 461 462 this.io.ctx = 0p; 463 this.io.pending = false; 464 this.io.dirty = false; 465 466 this.idle = eventfd(0, 0); 467 if (idle < 0) { 468 abort("KERNEL ERROR: PROCESSOR EVENTFD - %s\n", strerror(errno)); 469 } 470 486 471 #if !defined(__CFA_NO_STATISTICS__) 487 472 print_stats = 0; … … 524 509 // Finally we don't need the read_lock any more 525 510 unregister((__processor_id_t*)&this); 511 512 close(this.idle); 526 513 } 527 514 528 515 void ?{}(processor & this, const char name[], cluster & _cltr) { 529 ( this.idle ){}; 530 ( this.terminated ){ 0 }; 516 ( this.terminated ){}; 531 517 ( this.runner ){}; 532 518 … … 549 535 __wake_proc( &this ); 550 536 551 P( terminated );537 wait( terminated ); 552 538 /* paranoid */ verify( active_processor() != &this); 553 539 } … … 582 568 threads{ __get }; 583 569 570 io.arbiter = create(); 571 io.params = io_params; 572 584 573 doregister(this); 585 574 … … 594 583 ready_mutate_unlock( last_size ); 595 584 enable_interrupts_noPoll(); // Don't poll, could be in main cluster 596 597 598 this.io.cnt = num_io;599 this.io.ctxs = aalloc(num_io);600 for(i; this.io.cnt) {601 (this.io.ctxs[i]){ this, io_params };602 }603 585 } 604 586 605 587 void ^?{}(cluster & this) { 606 for(i; this.io.cnt) { 607 ^(this.io.ctxs[i]){ true }; 608 } 609 free(this.io.ctxs); 588 destroy(this.io.arbiter); 610 589 611 590 // Lock the RWlock so no-one pushes/pops while we are changing the queue … … 736 715 } 737 716 738 739 717 #if defined(__CFA_WITH_VERIFY__) 740 718 static bool verify_fwd_bck_rng(void) { -
libcfa/src/concurrency/kernel_private.hfa
r342af53 r8e4aa05 77 77 //----------------------------------------------------------------------------- 78 78 // I/O 79 void ^?{}(io_context & this, bool ); 79 $io_arbiter * create(void); 80 void destroy($io_arbiter *); 80 81 81 82 //======================================================================= -
libcfa/src/concurrency/locks.cfa
r342af53 r8e4aa05 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // locks.hfa -- LIBCFATHREAD 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2021 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 17 #define __cforall_thread__ 18 1 19 #include "locks.hfa" 2 20 #include "kernel_private.hfa" … … 7 25 //----------------------------------------------------------------------------- 8 26 // info_thread 9 forall( dtype L| is_blocking_lock(L)) {27 forall(L & | is_blocking_lock(L)) { 10 28 struct info_thread { 11 29 // used to put info_thread on a dl queue (aka sequence) … … 56 74 57 75 void ^?{}( blocking_lock & this ) {} 58 void ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };} 59 void ^?{}( single_acquisition_lock & this ) {} 60 void ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };} 61 void ^?{}( owner_lock & this ) {} 62 void ?{}( multiple_acquisition_lock & this ) {((blocking_lock &)this){ true, false };} 63 void ^?{}( multiple_acquisition_lock & this ) {} 76 64 77 65 78 void lock( blocking_lock & this ) with( this ) { … … 170 183 171 184 //----------------------------------------------------------------------------- 172 // Overloaded routines for traits173 // These routines are temporary until an inheritance bug is fixed174 void lock ( single_acquisition_lock & this ) { lock ( (blocking_lock &)this ); }175 void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }176 void on_wait ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }177 void on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }178 void set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }179 size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }180 181 void lock ( owner_lock & this ) { lock ( (blocking_lock &)this ); }182 void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }183 void on_wait ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }184 void on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }185 void set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }186 size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }187 188 void lock ( multiple_acquisition_lock & this ) { lock ( (blocking_lock &)this ); }189 void unlock ( multiple_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }190 void on_wait ( multiple_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }191 void on_notify( multiple_acquisition_lock & this, struct $thread * t ){ on_notify( (blocking_lock &)this, t ); }192 void set_recursion_count( multiple_acquisition_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); }193 size_t get_recursion_count( multiple_acquisition_lock & this ){ return get_recursion_count( (blocking_lock &)this ); }194 195 //-----------------------------------------------------------------------------196 185 // alarm node wrapper 197 forall( dtype L| is_blocking_lock(L)) {186 forall(L & | is_blocking_lock(L)) { 198 187 struct alarm_node_wrap { 199 188 alarm_node_t alarm_node; … … 239 228 //----------------------------------------------------------------------------- 240 229 // condition variable 241 forall( dtype L| is_blocking_lock(L)) {230 forall(L & | is_blocking_lock(L)) { 242 231 243 232 void ?{}( condition_variable(L) & this ){ … … 356 345 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ) with(this) { WAIT_TIME( info, &l , time ) } 357 346 } 347 348 //----------------------------------------------------------------------------- 349 // Semaphore 350 void ?{}( semaphore & this, int count = 1 ) { 351 (this.lock){}; 352 this.count = count; 353 (this.waiting){}; 354 } 355 void ^?{}(semaphore & this) {} 356 357 bool P(semaphore & this) with( this ){ 358 lock( lock __cfaabi_dbg_ctx2 ); 359 count -= 1; 360 if ( count < 0 ) { 361 // queue current task 362 append( waiting, active_thread() ); 363 364 // atomically release spin lock and block 365 unlock( lock ); 366 park(); 367 return true; 368 } 369 else { 370 unlock( lock ); 371 return false; 372 } 373 } 374 375 bool V(semaphore & this) with( this ) { 376 $thread * thrd = 0p; 377 lock( lock __cfaabi_dbg_ctx2 ); 378 count += 1; 379 if ( count <= 0 ) { 380 // remove task at head of waiting list 381 thrd = pop_head( waiting ); 382 } 383 384 unlock( lock ); 385 386 // make new owner 387 unpark( thrd ); 388 389 return thrd != 0p; 390 } 391 392 bool V(semaphore & this, unsigned diff) with( this ) { 393 $thread * thrd = 0p; 394 lock( lock __cfaabi_dbg_ctx2 ); 395 int release = max(-count, (int)diff); 396 count += diff; 397 for(release) { 398 unpark( pop_head( waiting ) ); 399 } 400 401 unlock( lock ); 402 403 return thrd != 0p; 404 } -
libcfa/src/concurrency/locks.hfa
r342af53 r8e4aa05 1 // 2 // Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo 3 // 4 // The contents of this file are covered under the licence agreement in the 5 // file "LICENCE" distributed with Cforall. 6 // 7 // locks.hfa -- PUBLIC 8 // Runtime locks that used with the runtime thread system. 9 // 10 // Author : Colby Alexander Parsons 11 // Created On : Thu Jan 21 19:46:50 2021 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 15 // 16 1 17 #pragma once 2 18 3 19 #include <stdbool.h> 4 20 5 #include "bits/locks.hfa" 6 #include "bits/sequence.hfa" 7 8 #include "invoke.h" 21 #include "bits/weakso_locks.hfa" 9 22 10 23 #include "time_t.hfa" 11 24 #include "time.hfa" 12 25 26 //---------- 27 struct single_acquisition_lock { 28 inline blocking_lock; 29 }; 30 31 static inline void ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };} 32 static inline void ^?{}( single_acquisition_lock & this ) {} 33 static inline void lock ( single_acquisition_lock & this ) { lock ( (blocking_lock &)this ); } 34 static inline void unlock ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); } 35 static inline void on_wait ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); } 36 static inline void on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); } 37 static inline void set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); } 38 static inline size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); } 39 40 //---------- 41 struct owner_lock { 42 inline blocking_lock; 43 }; 44 45 static inline void ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };} 46 static inline void ^?{}( owner_lock & this ) {} 47 static inline void lock ( owner_lock & this ) { lock ( (blocking_lock &)this ); } 48 static inline void unlock ( owner_lock & this ) { unlock ( (blocking_lock &)this ); } 49 static inline void on_wait ( owner_lock & this ) { on_wait( (blocking_lock &)this ); } 50 static inline void on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); } 51 static inline void set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); } 52 static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); } 53 13 54 //----------------------------------------------------------------------------- 14 55 // is_blocking_lock 15 trait is_blocking_lock( dtype L| sized(L)) {56 trait is_blocking_lock(L & | sized(L)) { 16 57 // For synchronization locks to use when acquiring 17 58 void on_notify( L &, struct $thread * ); … … 31 72 // the info thread is a wrapper around a thread used 32 73 // to store extra data for use in the condition variable 33 forall( dtype L| is_blocking_lock(L)) {74 forall(L & | is_blocking_lock(L)) { 34 75 struct info_thread; 35 76 … … 40 81 41 82 //----------------------------------------------------------------------------- 42 // Blocking Locks43 struct blocking_lock {44 // Spin lock used for mutual exclusion45 __spinlock_t lock;46 47 // List of blocked threads48 Sequence( $thread ) blocked_threads;49 50 // Count of current blocked threads51 size_t wait_count;52 53 // Flag if the lock allows multiple acquisition54 bool multi_acquisition;55 56 // Flag if lock can be released by non owner57 bool strict_owner;58 59 // Current thread owning the lock60 struct $thread * owner;61 62 // Number of recursion level63 size_t recursion_count;64 };65 66 struct single_acquisition_lock {67 inline blocking_lock;68 };69 70 struct owner_lock {71 inline blocking_lock;72 };73 74 struct multiple_acquisition_lock {75 inline blocking_lock;76 };77 78 void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner );79 void ^?{}( blocking_lock & this );80 81 void ?{}( single_acquisition_lock & this );82 void ^?{}( single_acquisition_lock & this );83 84 void ?{}( owner_lock & this );85 void ^?{}( owner_lock & this );86 87 void ?{}( multiple_acquisition_lock & this );88 void ^?{}( multiple_acquisition_lock & this );89 90 void lock( blocking_lock & this );91 bool try_lock( blocking_lock & this );92 void unlock( blocking_lock & this );93 void on_notify( blocking_lock & this, struct $thread * t );94 void on_wait( blocking_lock & this );95 size_t wait_count( blocking_lock & this );96 void set_recursion_count( blocking_lock & this, size_t recursion );97 size_t get_recursion_count( blocking_lock & this );98 99 void lock( single_acquisition_lock & this );100 void unlock( single_acquisition_lock & this );101 void on_notify( single_acquisition_lock & this, struct $thread * t );102 void on_wait( single_acquisition_lock & this );103 void set_recursion_count( single_acquisition_lock & this, size_t recursion );104 size_t get_recursion_count( single_acquisition_lock & this );105 106 void lock( owner_lock & this );107 void unlock( owner_lock & this );108 void on_notify( owner_lock & this, struct $thread * t );109 void on_wait( owner_lock & this );110 void set_recursion_count( owner_lock & this, size_t recursion );111 size_t get_recursion_count( owner_lock & this );112 113 void lock( multiple_acquisition_lock & this );114 void unlock( multiple_acquisition_lock & this );115 void on_notify( multiple_acquisition_lock & this, struct $thread * t );116 void on_wait( multiple_acquisition_lock & this );117 void set_recursion_count( multiple_acquisition_lock & this, size_t recursion );118 size_t get_recursion_count( multiple_acquisition_lock & this );119 120 //-----------------------------------------------------------------------------121 83 // Synchronization Locks 122 forall( dtype L| is_blocking_lock(L)) {84 forall(L & | is_blocking_lock(L)) { 123 85 struct condition_variable { 124 86 // Spin lock used for mutual exclusion … … 157 119 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ); 158 120 } 121 122 //----------------------------------------------------------------------------- 123 // Semaphore 124 struct semaphore { 125 __spinlock_t lock; 126 int count; 127 __queue_t($thread) waiting; 128 }; 129 130 void ?{}(semaphore & this, int count = 1); 131 void ^?{}(semaphore & this); 132 bool P (semaphore & this); 133 bool V (semaphore & this); 134 bool V (semaphore & this, unsigned count); -
libcfa/src/concurrency/monitor.cfa
r342af53 r8e4aa05 50 50 static inline [$thread *, int] search_entry_queue( const __waitfor_mask_t &, $monitor * monitors [], __lock_size_t count ); 51 51 52 forall( dtype T| sized( T ))52 forall(T & | sized( T )) 53 53 static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ); 54 54 static inline __lock_size_t count_max ( const __waitfor_mask_t & mask ); … … 949 949 } 950 950 951 forall( dtype T| sized( T ))951 forall(T & | sized( T )) 952 952 static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) { 953 953 if( !val ) return size; -
libcfa/src/concurrency/monitor.hfa
r342af53 r8e4aa05 22 22 #include "stdlib.hfa" 23 23 24 trait is_monitor( dtype T) {24 trait is_monitor(T &) { 25 25 $monitor * get_monitor( T & ); 26 26 void ^?{}( T & mutex ); … … 59 59 void ^?{}( monitor_dtor_guard_t & this ); 60 60 61 static inline forall( dtype T| sized(T) | { void ^?{}( T & mutex ); } )61 static inline forall( T & | sized(T) | { void ^?{}( T & mutex ); } ) 62 62 void delete( T * th ) { 63 ^(*th){};63 if(th) ^(*th){}; 64 64 free( th ); 65 65 } -
libcfa/src/concurrency/mutex.cfa
r342af53 r8e4aa05 164 164 } 165 165 166 forall( dtype L| is_lock(L))166 forall(L & | is_lock(L)) 167 167 void wait(condition_variable & this, L & l) { 168 168 lock( this.lock __cfaabi_dbg_ctx2 ); … … 176 176 //----------------------------------------------------------------------------- 177 177 // Scopes 178 forall( dtype L| is_lock(L))178 forall(L & | is_lock(L)) 179 179 void lock_all ( L * locks[], size_t count) { 180 180 // Sort locks based on addresses … … 188 188 } 189 189 190 forall( dtype L| is_lock(L))190 forall(L & | is_lock(L)) 191 191 void unlock_all( L * locks[], size_t count) { 192 192 // Lock all -
libcfa/src/concurrency/mutex.hfa
r342af53 r8e4aa05 42 42 }; 43 43 44 void ?{}(mutex_lock & this) ;45 void ^?{}(mutex_lock & this) ;46 void lock(mutex_lock & this) ;47 bool try_lock(mutex_lock & this) ;48 void unlock(mutex_lock & this) ;44 void ?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 45 void ^?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 46 void lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 47 bool try_lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 48 void unlock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 49 49 50 50 // Exclusive lock - recursive … … 64 64 }; 65 65 66 void ?{}(recursive_mutex_lock & this) ;67 void ^?{}(recursive_mutex_lock & this) ;68 void lock(recursive_mutex_lock & this) ;69 bool try_lock(recursive_mutex_lock & this) ;70 void unlock(recursive_mutex_lock & this) ;66 void ?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 67 void ^?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 68 void lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 69 bool try_lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 70 void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 71 71 72 trait is_lock( dtype L| sized(L)) {72 trait is_lock(L & | sized(L)) { 73 73 void lock (L &); 74 74 void unlock(L &); … … 86 86 }; 87 87 88 void ?{}(condition_variable & this) ;89 void ^?{}(condition_variable & this) ;88 void ?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 89 void ^?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 90 90 91 void notify_one(condition_variable & this) ;92 void notify_all(condition_variable & this) ;91 void notify_one(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 92 void notify_all(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 93 93 94 void wait(condition_variable & this) ;94 void wait(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 95 95 96 forall( dtype L| is_lock(L))97 void wait(condition_variable & this, L & l) ;96 forall(L & | is_lock(L)) 97 void wait(condition_variable & this, L & l) __attribute__((deprecated("use concurrency/locks.hfa instead"))); 98 98 99 99 //----------------------------------------------------------------------------- 100 100 // Scopes 101 forall( dtype L| is_lock(L)) {101 forall(L & | is_lock(L)) { 102 102 #if !defined( __TUPLE_ARRAYS_EXIST__ ) 103 103 void lock ( L * locks [], size_t count); -
libcfa/src/concurrency/preemption.cfa
r342af53 r8e4aa05 424 424 static void timeout( $thread * this ) { 425 425 unpark( this ); 426 } 427 428 void __disable_interrupts_hard() { 429 sigset_t oldset; 430 int ret; 431 ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary 432 if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); } 433 434 ret = sigismember(&oldset, SIGUSR1); 435 if(ret < 0) { abort("ERROR sigismember returned %d", ret); } 436 if(ret == 1) { abort("ERROR SIGUSR1 is disabled"); } 437 438 ret = sigismember(&oldset, SIGALRM); 439 if(ret < 0) { abort("ERROR sigismember returned %d", ret); } 440 if(ret == 0) { abort("ERROR SIGALRM is enabled"); } 441 442 signal_block( SIGUSR1 ); 443 } 444 445 void __enable_interrupts_hard() { 446 signal_unblock( SIGUSR1 ); 447 448 sigset_t oldset; 449 int ret; 450 ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary 451 if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); } 452 453 ret = sigismember(&oldset, SIGUSR1); 454 if(ret < 0) { abort("ERROR sigismember returned %d", ret); } 455 if(ret == 1) { abort("ERROR SIGUSR1 is disabled"); } 456 457 ret = sigismember(&oldset, SIGALRM); 458 if(ret < 0) { abort("ERROR sigismember returned %d", ret); } 459 if(ret == 0) { abort("ERROR SIGALRM is enabled"); } 426 460 } 427 461 … … 551 585 552 586 // Setup proper signal handlers 553 __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO | SA_RESTART); // __cfactx_switch handler554 __cfaabi_sigaction( SIGALRM, sigHandler_alarm , SA_SIGINFO | SA_RESTART); // debug handler587 __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO ); // __cfactx_switch handler 588 __cfaabi_sigaction( SIGALRM, sigHandler_alarm , SA_SIGINFO ); // debug handler 555 589 556 590 signal_block( SIGALRM ); … … 580 614 581 615 __cfaabi_dbg_print_safe( "Kernel : Preemption stopped\n" ); 616 } 617 618 // Prevent preemption since we are about to start terminating things 619 void __kernel_abort_lock(void) { 620 signal_block( SIGUSR1 ); 582 621 } 583 622 -
libcfa/src/concurrency/ready_queue.cfa
r342af53 r8e4aa05 330 330 #if defined(BIAS) 331 331 // Don't bother trying locally too much 332 int local_tries = 8;333 332 preferred = kernelTLS().this_processor->id * 4; 334 333 #endif -
libcfa/src/concurrency/stats.cfa
r342af53 r8e4aa05 25 25 26 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 27 stats->io.submit_q.submit_avg.rdy = 0; 28 stats->io.submit_q.submit_avg.csm = 0; 29 stats->io.submit_q.submit_avg.cnt = 0; 30 stats->io.submit_q.look_avg.val = 0; 31 stats->io.submit_q.look_avg.cnt = 0; 32 stats->io.submit_q.look_avg.block = 0; 33 stats->io.submit_q.alloc_avg.val = 0; 34 stats->io.submit_q.alloc_avg.cnt = 0; 35 stats->io.submit_q.alloc_avg.block = 0; 36 stats->io.submit_q.helped = 0; 37 stats->io.submit_q.leader = 0; 38 stats->io.submit_q.busy = 0; 39 stats->io.complete_q.completed_avg.val = 0; 40 stats->io.complete_q.completed_avg.cnt = 0; 41 stats->io.complete_q.blocks = 0; 27 stats->io.alloc.fast = 0; 28 stats->io.alloc.slow = 0; 29 stats->io.alloc.fail = 0; 30 stats->io.alloc.revoke = 0; 31 stats->io.alloc.block = 0; 32 stats->io.submit.fast = 0; 33 stats->io.submit.slow = 0; 34 stats->io.flush.external = 0; 35 stats->io.calls.flush = 0; 36 stats->io.calls.submitted = 0; 37 stats->io.calls.drain = 0; 38 stats->io.calls.completed = 0; 39 stats->io.calls.errors.busy = 0; 40 stats->io.poller.sleeps = 0; 42 41 #endif 43 42 } … … 60 59 61 60 #if defined(CFA_HAVE_LINUX_IO_URING_H) 62 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.rdy , proc->io.submit_q.submit_avg.rdy , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.rdy = 0; 63 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.csm , proc->io.submit_q.submit_avg.csm , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.csm = 0; 64 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.avl , proc->io.submit_q.submit_avg.avl , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.avl = 0; 65 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.cnt , proc->io.submit_q.submit_avg.cnt , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.cnt = 0; 66 __atomic_fetch_add( &cltr->io.submit_q.look_avg.val , proc->io.submit_q.look_avg.val , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.val = 0; 67 __atomic_fetch_add( &cltr->io.submit_q.look_avg.cnt , proc->io.submit_q.look_avg.cnt , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.cnt = 0; 68 __atomic_fetch_add( &cltr->io.submit_q.look_avg.block , proc->io.submit_q.look_avg.block , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.block = 0; 69 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.val , proc->io.submit_q.alloc_avg.val , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.val = 0; 70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.cnt = 0; 71 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.block = 0; 72 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped , __ATOMIC_SEQ_CST ); proc->io.submit_q.helped = 0; 73 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader , __ATOMIC_SEQ_CST ); proc->io.submit_q.leader = 0; 74 __atomic_fetch_add( &cltr->io.submit_q.busy , proc->io.submit_q.busy , __ATOMIC_SEQ_CST ); proc->io.submit_q.busy = 0; 75 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val, proc->io.complete_q.completed_avg.val, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.val = 0; 76 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.cnt, proc->io.complete_q.completed_avg.cnt, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.cnt = 0; 77 __atomic_fetch_add( &cltr->io.complete_q.blocks , proc->io.complete_q.blocks , __ATOMIC_SEQ_CST ); proc->io.complete_q.blocks = 0; 61 __atomic_fetch_add( &cltr->io.alloc.fast , proc->io.alloc.fast , __ATOMIC_SEQ_CST ); proc->io.alloc.fast = 0; 62 __atomic_fetch_add( &cltr->io.alloc.slow , proc->io.alloc.slow , __ATOMIC_SEQ_CST ); proc->io.alloc.slow = 0; 63 __atomic_fetch_add( &cltr->io.alloc.fail , proc->io.alloc.fail , __ATOMIC_SEQ_CST ); proc->io.alloc.fail = 0; 64 __atomic_fetch_add( &cltr->io.alloc.revoke , proc->io.alloc.revoke , __ATOMIC_SEQ_CST ); proc->io.alloc.revoke = 0; 65 __atomic_fetch_add( &cltr->io.alloc.block , proc->io.alloc.block , __ATOMIC_SEQ_CST ); proc->io.alloc.block = 0; 66 __atomic_fetch_add( &cltr->io.submit.fast , proc->io.submit.fast , __ATOMIC_SEQ_CST ); proc->io.submit.fast = 0; 67 __atomic_fetch_add( &cltr->io.submit.slow , proc->io.submit.slow , __ATOMIC_SEQ_CST ); proc->io.submit.slow = 0; 68 __atomic_fetch_add( &cltr->io.flush.external , proc->io.flush.external , __ATOMIC_SEQ_CST ); proc->io.flush.external = 0; 69 __atomic_fetch_add( &cltr->io.calls.flush , proc->io.calls.flush , __ATOMIC_SEQ_CST ); proc->io.calls.flush = 0; 70 __atomic_fetch_add( &cltr->io.calls.submitted , proc->io.calls.submitted , __ATOMIC_SEQ_CST ); proc->io.calls.submitted = 0; 71 __atomic_fetch_add( &cltr->io.calls.drain , proc->io.calls.drain , __ATOMIC_SEQ_CST ); proc->io.calls.drain = 0; 72 __atomic_fetch_add( &cltr->io.calls.completed , proc->io.calls.completed , __ATOMIC_SEQ_CST ); proc->io.calls.completed = 0; 73 __atomic_fetch_add( &cltr->io.calls.errors.busy, proc->io.calls.errors.busy, __ATOMIC_SEQ_CST ); proc->io.calls.errors.busy = 0; 74 __atomic_fetch_add( &cltr->io.poller.sleeps , proc->io.poller.sleeps , __ATOMIC_SEQ_CST ); proc->io.poller.sleeps = 0; 78 75 #endif 79 76 } … … 82 79 83 80 if( flags & CFA_STATS_READY_Q ) { 84 double push_sur = (100.0 * ((double)ready.pick.push.success) / ready.pick.push.attempt);85 double pop_sur = (100.0 * ((double)ready.pick.pop .success) / ready.pick.pop .attempt);86 87 81 double push_len = ((double)ready.pick.push.attempt) / ready.pick.push.success; 88 82 double pop_len = ((double)ready.pick.pop .attempt) / ready.pick.pop .success; 89 90 double lpush_sur = (100.0 * ((double)ready.pick.push.lsuccess) / ready.pick.push.local);91 double lpop_sur = (100.0 * ((double)ready.pick.pop .lsuccess) / ready.pick.pop .local);92 83 93 84 double lpush_len = ((double)ready.pick.push.local) / ready.pick.push.lsuccess; … … 96 87 __cfaabi_bits_print_safe( STDOUT_FILENO, 97 88 "----- %s \"%s\" (%p) - Ready Q Stats -----\n" 98 "- total threads run : %'15" PRIu64 "\n" 99 "- total threads scheduled: %'15" PRIu64 "\n" 100 "- push average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 101 "- pop average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 102 "- local push avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 103 "- local pop avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 104 "- thread migrations : %'15" PRIu64 "\n" 105 "- Idle Sleep -\n" 106 "-- halts : %'15" PRIu64 "\n" 107 "-- cancelled halts : %'15" PRIu64 "\n" 108 "-- schedule wake : %'15" PRIu64 "\n" 109 "-- wake on exit : %'15" PRIu64 "\n" 89 "- total threads : %'15" PRIu64 "run, %'15" PRIu64 "schd (%'" PRIu64 "mig )\n" 90 "- push avg probe : %'3.2lf, %'3.2lfl (%'15" PRIu64 " attempts, %'15" PRIu64 " locals)\n" 91 "- pop avg probe : %'3.2lf, %'3.2lfl (%'15" PRIu64 " attempts, %'15" PRIu64 " locals)\n" 92 "- Idle Sleep : %'15" PRIu64 "h, %'15" PRIu64 "c, %'15" PRIu64 "w, %'15" PRIu64 "e\n" 110 93 "\n" 111 94 , type, name, id 112 95 , ready.pick.pop.success 113 96 , ready.pick.push.success 114 , push_len, push_sur, ready.pick.push.attempt115 , pop_len , pop_sur , ready.pick.pop .attempt116 , lpush_len, lpush_sur, ready.pick.push.local117 , lpop_len , lpop_sur , ready.pick.pop .local118 97 , ready.threads.migration 98 , push_len, lpush_len, ready.pick.push.attempt, ready.pick.push.local 99 , pop_len , lpop_len , ready.pick.pop .attempt, ready.pick.pop .local 119 100 , ready.sleep.halts, ready.sleep.cancels, ready.sleep.wakes, ready.sleep.exits 120 101 ); … … 123 104 #if defined(CFA_HAVE_LINUX_IO_URING_H) 124 105 if( flags & CFA_STATS_IO ) { 125 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt;126 double avg csm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt;106 uint64_t total_allocs = io.alloc.fast + io.alloc.slow; 107 double avgfasta = ((double)io.alloc.fast) / total_allocs; 127 108 128 double lavgv = 0; 129 double lavgb = 0; 130 if(io.submit_q.look_avg.cnt != 0) { 131 lavgv = ((double)io.submit_q.look_avg.val ) / io.submit_q.look_avg.cnt; 132 lavgb = ((double)io.submit_q.look_avg.block) / io.submit_q.look_avg.cnt; 133 } 109 uint64_t total_submits = io.submit.fast + io.submit.slow; 110 double avgfasts = ((double)io.submit.fast) / total_submits; 134 111 135 double aavgv = 0; 136 double aavgb = 0; 137 if(io.submit_q.alloc_avg.cnt != 0) { 138 aavgv = ((double)io.submit_q.alloc_avg.val ) / io.submit_q.alloc_avg.cnt; 139 aavgb = ((double)io.submit_q.alloc_avg.block) / io.submit_q.alloc_avg.cnt; 140 } 112 double avgsubs = ((double)io.calls.submitted) / io.calls.flush; 113 double avgcomp = ((double)io.calls.completed) / io.calls.drain; 141 114 142 115 __cfaabi_bits_print_safe( STDOUT_FILENO, 143 116 "----- %s \"%s\" (%p) - I/O Stats -----\n" 144 "- total submit calls : %'15" PRIu64 "\n" 145 "- avg ready entries : %'18.2lf\n" 146 "- avg submitted entries : %'18.2lf\n" 147 "- total helped entries : %'15" PRIu64 "\n" 148 "- total leader entries : %'15" PRIu64 "\n" 149 "- total busy submit : %'15" PRIu64 "\n" 150 "- total ready search : %'15" PRIu64 "\n" 151 "- avg ready search len : %'18.2lf\n" 152 "- avg ready search block : %'18.2lf\n" 153 "- total alloc search : %'15" PRIu64 "\n" 154 "- avg alloc search len : %'18.2lf\n" 155 "- avg alloc search block : %'18.2lf\n" 156 "- total wait calls : %'15" PRIu64 "\n" 157 "- avg completion/wait : %'18.2lf\n" 158 "- total completion blocks: %'15" PRIu64 "\n" 117 "- total allocations : %'" PRIu64 "f, %'" PRIu64 "s (%'2.2lff) \n" 118 "- failures : %'" PRIu64 "oom, %'" PRIu64 "rvk, %'" PRIu64 "blk\n" 119 "- total submits : %'" PRIu64 "f, %'" PRIu64 "s (%'2.2lf) \n" 120 "- flush external : %'" PRIu64 "\n" 121 "- io_uring_enter : %'" PRIu64 " (%'" PRIu64 ", %'" PRIu64 " EBUSY)\n" 122 "- submits : %'" PRIu64 " (%'.2lf) \n" 123 "- completes : %'" PRIu64 " (%'.2lf) \n" 124 "- poller sleeping : %'" PRIu64 "\n" 159 125 "\n" 160 126 , type, name, id 161 , io.submit_q.submit_avg.cnt 162 , avgrdy, avgcsm 163 , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy 164 , io.submit_q.look_avg.cnt 165 , lavgv, lavgb 166 , io.submit_q.alloc_avg.cnt 167 , aavgv, aavgb 168 , io.complete_q.completed_avg.cnt 169 , ((double)io.complete_q.completed_avg.val) / io.complete_q.completed_avg.cnt 170 , io.complete_q.blocks 127 , io.alloc.fast, io.alloc.slow, avgfasta 128 , io.alloc.fail, io.alloc.revoke, io.alloc.block 129 , io.submit.fast, io.submit.slow, avgfasts 130 , io.flush.external 131 , io.calls.flush, io.calls.drain, io.calls.errors.busy 132 , io.calls.submitted, avgsubs 133 , io.calls.completed, avgcomp 134 , io.poller.sleeps 171 135 ); 172 136 } -
libcfa/src/concurrency/stats.hfa
r342af53 r8e4aa05 2 2 3 3 #include <stdint.h> 4 5 enum { 6 CFA_STATS_READY_Q = 0x01, 7 CFA_STATS_IO = 0x02, 8 }; 4 9 5 10 #if defined(__CFA_NO_STATISTICS__) … … 9 14 static inline void __print_stats( struct __stats_t *, int, const char *, const char *, void * ) {} 10 15 #else 11 enum {12 CFA_STATS_READY_Q = 0x01,13 #if defined(CFA_HAVE_LINUX_IO_URING_H)14 CFA_STATS_IO = 0x02,15 #endif16 };17 16 18 17 struct __attribute__((aligned(64))) __stats_readQ_t { … … 67 66 struct __attribute__((aligned(64))) __stats_io_t{ 68 67 struct { 68 volatile uint64_t fast; 69 volatile uint64_t slow; 70 volatile uint64_t fail; 71 volatile uint64_t revoke; 72 volatile uint64_t block; 73 } alloc; 74 struct { 75 volatile uint64_t fast; 76 volatile uint64_t slow; 77 } submit; 78 struct { 79 volatile uint64_t external; 80 } flush; 81 struct { 82 volatile uint64_t drain; 83 volatile uint64_t completed; 84 volatile uint64_t flush; 85 volatile uint64_t submitted; 69 86 struct { 70 volatile uint64_t rdy; 71 volatile uint64_t csm; 72 volatile uint64_t avl; 73 volatile uint64_t cnt; 74 } submit_avg; 75 struct { 76 volatile uint64_t val; 77 volatile uint64_t cnt; 78 volatile uint64_t block; 79 } look_avg; 80 struct { 81 volatile uint64_t val; 82 volatile uint64_t cnt; 83 volatile uint64_t block; 84 } alloc_avg; 85 volatile uint64_t helped; 86 volatile uint64_t leader; 87 volatile uint64_t busy; 88 } submit_q; 87 volatile uint64_t busy; 88 } errors; 89 } calls; 89 90 struct { 90 struct { 91 volatile uint64_t val; 92 volatile uint64_t cnt; 93 } completed_avg; 94 volatile uint64_t blocks; 95 } complete_q; 91 volatile uint64_t sleeps; 92 } poller; 96 93 }; 97 94 #endif -
libcfa/src/concurrency/thread.cfa
r342af53 r8e4aa05 62 62 } 63 63 64 FORALL_DATA_INSTANCE(ThreadCancelled, ( dtype thread_t), (thread_t))64 FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t)) 65 65 66 forall( dtype T)66 forall(T &) 67 67 void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src) { 68 68 dst->virtual_table = src->virtual_table; … … 71 71 } 72 72 73 forall( dtype T)73 forall(T &) 74 74 const char * msg(ThreadCancelled(T) *) { 75 75 return "ThreadCancelled"; 76 76 } 77 77 78 forall( dtype T)78 forall(T &) 79 79 static void default_thread_cancel_handler(ThreadCancelled(T) & ) { 80 80 abort( "Unhandled thread cancellation.\n" ); 81 81 } 82 82 83 forall( dtype T| is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))83 forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T))) 84 84 void ?{}( thread_dtor_guard_t & this, 85 T & thrd, void(* defaultResumptionHandler)(ThreadCancelled(T) &)) {86 $monitor * m = get_monitor(thrd);85 T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) { 86 $monitor * m = get_monitor(thrd); 87 87 $thread * desc = get_thread(thrd); 88 88 89 89 // Setup the monitor guard 90 90 void (*dtor)(T& mutex this) = ^?{}; 91 bool join = defaultResumptionHandler != (void(*)(ThreadCancelled(T)&))0;91 bool join = cancelHandler != (void(*)(ThreadCancelled(T)&))0; 92 92 (this.mg){&m, (void(*)())dtor, join}; 93 93 … … 103 103 } 104 104 desc->state = Cancelled; 105 if (!join) { 106 defaultResumptionHandler = default_thread_cancel_handler; 107 } 105 void(*defaultResumptionHandler)(ThreadCancelled(T) &) = 106 join ? cancelHandler : default_thread_cancel_handler; 108 107 109 108 ThreadCancelled(T) except; … … 125 124 //----------------------------------------------------------------------------- 126 125 // Starting and stopping threads 127 forall( dtype T| is_thread(T) )126 forall( T & | is_thread(T) ) 128 127 void __thrd_start( T & this, void (*main_p)(T &) ) { 129 128 $thread * this_thrd = get_thread(this); … … 141 140 //----------------------------------------------------------------------------- 142 141 // Support for threads that don't ues the thread keyword 143 forall( dtype T| sized(T) | is_thread(T) | { void ?{}(T&); } )142 forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } ) 144 143 void ?{}( scoped(T)& this ) with( this ) { 145 144 handle{}; … … 147 146 } 148 147 149 forall( dtype T, ttype P| sized(T) | is_thread(T) | { void ?{}(T&, P); } )148 forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } ) 150 149 void ?{}( scoped(T)& this, P params ) with( this ) { 151 150 handle{ params }; … … 153 152 } 154 153 155 forall( dtype T| sized(T) | is_thread(T) )154 forall( T & | sized(T) | is_thread(T) ) 156 155 void ^?{}( scoped(T)& this ) with( this ) { 157 156 ^handle{}; … … 159 158 160 159 //----------------------------------------------------------------------------- 161 forall( dtype T| is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))160 forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T))) 162 161 T & join( T & this ) { 163 162 thread_dtor_guard_t guard = { this, defaultResumptionHandler }; -
libcfa/src/concurrency/thread.hfa
r342af53 r8e4aa05 26 26 //----------------------------------------------------------------------------- 27 27 // thread trait 28 trait is_thread( dtype T) {28 trait is_thread(T &) { 29 29 void ^?{}(T& mutex this); 30 30 void main(T& this); … … 32 32 }; 33 33 34 FORALL_DATA_EXCEPTION(ThreadCancelled, ( dtype thread_t), (thread_t)) (34 FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) ( 35 35 thread_t * the_thread; 36 36 exception_t * the_exception; 37 37 ); 38 38 39 forall( dtype T)39 forall(T &) 40 40 void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src); 41 41 42 forall( dtype T)42 forall(T &) 43 43 const char * msg(ThreadCancelled(T) *); 44 44 … … 47 47 48 48 // Inline getters for threads/coroutines/monitors 49 forall( dtype T| is_thread(T) )49 forall( T & | is_thread(T) ) 50 50 static inline $coroutine* get_coroutine(T & this) __attribute__((const)) { return &get_thread(this)->self_cor; } 51 51 52 forall( dtype T| is_thread(T) )52 forall( T & | is_thread(T) ) 53 53 static inline $monitor * get_monitor (T & this) __attribute__((const)) { return &get_thread(this)->self_mon; } 54 54 … … 60 60 extern struct cluster * mainCluster; 61 61 62 forall( dtype T| is_thread(T) )62 forall( T & | is_thread(T) ) 63 63 void __thrd_start( T & this, void (*)(T &) ); 64 64 … … 82 82 }; 83 83 84 forall( dtype T| is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )84 forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) ) 85 85 void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) ); 86 86 void ^?{}( thread_dtor_guard_t & this ); … … 89 89 // thread runner 90 90 // Structure that actually start and stop threads 91 forall( dtype T| sized(T) | is_thread(T) )91 forall( T & | sized(T) | is_thread(T) ) 92 92 struct scoped { 93 93 T handle; 94 94 }; 95 95 96 forall( dtype T| sized(T) | is_thread(T) | { void ?{}(T&); } )96 forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } ) 97 97 void ?{}( scoped(T)& this ); 98 98 99 forall( dtype T, ttype P| sized(T) | is_thread(T) | { void ?{}(T&, P); } )99 forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } ) 100 100 void ?{}( scoped(T)& this, P params ); 101 101 102 forall( dtype T| sized(T) | is_thread(T) )102 forall( T & | sized(T) | is_thread(T) ) 103 103 void ^?{}( scoped(T)& this ); 104 104 … … 115 115 void unpark( $thread * this ); 116 116 117 forall( dtype T| is_thread(T) )117 forall( T & | is_thread(T) ) 118 118 static inline void unpark( T & this ) { if(!&this) return; unpark( get_thread( this ) );} 119 119 … … 128 128 //---------- 129 129 // join 130 forall( dtype T| is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )130 forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) ) 131 131 T & join( T & this ); 132 132
Note:
See TracChangeset
for help on using the changeset viewer.