Changeset 17c6edeb for libcfa/src/concurrency
- Timestamp:
- Aug 16, 2022, 2:52:24 PM (2 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation
- Children:
- 71cf630
- Parents:
- 32d1383 (diff), e116db3 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
r32d1383 r17c6edeb 83 83 }; 84 84 85 static $io_context * __ioarbiter_allocate( $io_arbiter& this, __u32 idxs[], __u32 want );86 static void __ioarbiter_submit( $io_context* , __u32 idxs[], __u32 have, bool lazy );87 static void __ioarbiter_flush ( $io_context& );88 static inline void __ioarbiter_notify( $io_context& ctx );85 static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want ); 86 static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy ); 87 static void __ioarbiter_flush ( io_context$ & ); 88 static inline void __ioarbiter_notify( io_context$ & ctx ); 89 89 //============================================================================================= 90 90 // I/O Polling 91 91 //============================================================================================= 92 static inline unsigned __flush( struct $io_context& );93 static inline __u32 __release_sqes( struct $io_context& );92 static inline unsigned __flush( struct io_context$ & ); 93 static inline __u32 __release_sqes( struct io_context$ & ); 94 94 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 95 95 96 static void ioring_syscsll( struct $io_context& ctx, unsigned int min_comp, unsigned int flags ) {96 static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) { 97 97 __STATS__( true, io.calls.flush++; ) 98 98 int ret; … … 132 132 } 133 133 134 static bool try_acquire( $io_context* ctx ) __attribute__((nonnull(1))) {134 static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) { 135 135 /* paranoid */ verify( ! __preemption_enabled() ); 136 136 /* paranoid */ verify( ready_schedule_islocked() ); … … 153 153 } 154 154 155 static bool __cfa_do_drain( $io_context* ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {155 static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) { 156 156 /* paranoid */ verify( ! __preemption_enabled() ); 157 157 /* paranoid */ verify( ready_schedule_islocked() ); … … 213 213 214 214 cluster * const cltr = proc->cltr; 215 $io_context* const ctx = proc->io.ctx;215 io_context$ * const ctx = proc->io.ctx; 216 216 /* paranoid */ verify( cltr ); 217 217 /* paranoid */ verify( ctx ); … … 278 278 /* paranoid */ verify( proc->io.ctx ); 279 279 280 $io_context& ctx = *proc->io.ctx;280 io_context$ & ctx = *proc->io.ctx; 281 281 282 282 __ioarbiter_flush( ctx ); … … 312 312 // Allocation 313 313 // for user's convenience fill the sqes from the indexes 314 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context* ctx) {314 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct io_context$ * ctx) { 315 315 struct io_uring_sqe * sqes = ctx->sq.sqes; 316 316 for(i; want) { … … 322 322 // Try to directly allocate from the a given context 323 323 // Not thread-safe 324 static inline bool __alloc(struct $io_context* ctx, __u32 idxs[], __u32 want) {324 static inline bool __alloc(struct io_context$ * ctx, __u32 idxs[], __u32 want) { 325 325 __sub_ring_t & sq = ctx->sq; 326 326 const __u32 mask = *sq.mask; … … 349 349 // for convenience, return both the index and the pointer to the sqe 350 350 // sqe == &sqes[idx] 351 struct $io_context* cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) libcfa_public {351 struct io_context$ * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) libcfa_public { 352 352 // __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 353 353 354 354 disable_interrupts(); 355 355 processor * proc = __cfaabi_tls.this_processor; 356 $io_context* ctx = proc->io.ctx;356 io_context$ * ctx = proc->io.ctx; 357 357 /* paranoid */ verify( __cfaabi_tls.this_processor ); 358 358 /* paranoid */ verify( ctx ); … … 378 378 enable_interrupts(); 379 379 380 $io_arbiter* ioarb = proc->cltr->io.arbiter;380 io_arbiter$ * ioarb = proc->cltr->io.arbiter; 381 381 /* paranoid */ verify( ioarb ); 382 382 383 383 // __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 384 384 385 struct $io_context* ret = __ioarbiter_allocate(*ioarb, idxs, want);385 struct io_context$ * ret = __ioarbiter_allocate(*ioarb, idxs, want); 386 386 387 387 // __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); … … 393 393 //============================================================================================= 394 394 // submission 395 static inline void __submit_only( struct $io_context* ctx, __u32 idxs[], __u32 have) {395 static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) { 396 396 // We can proceed to the fast path 397 397 // Get the right objects … … 414 414 } 415 415 416 static inline void __submit( struct $io_context* ctx, __u32 idxs[], __u32 have, bool lazy) {416 static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) { 417 417 __sub_ring_t & sq = ctx->sq; 418 418 __submit_only(ctx, idxs, have); … … 428 428 } 429 429 430 void cfa_io_submit( struct $io_context* inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public {430 void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public { 431 431 // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 432 432 … … 434 434 __STATS__( true, if(!lazy) io.submit.eagr += 1; ) 435 435 processor * proc = __cfaabi_tls.this_processor; 436 $io_context* ctx = proc->io.ctx;436 io_context$ * ctx = proc->io.ctx; 437 437 /* paranoid */ verify( __cfaabi_tls.this_processor ); 438 438 /* paranoid */ verify( ctx ); … … 465 465 // by io_uring 466 466 // This cannot be done by multiple threads 467 static __u32 __release_sqes( struct $io_context& ctx ) {467 static __u32 __release_sqes( struct io_context$ & ctx ) { 468 468 const __u32 mask = *ctx.sq.mask; 469 469 … … 538 538 } 539 539 540 static $io_context * __ioarbiter_allocate( $io_arbiter& this, __u32 idxs[], __u32 want ) {540 static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want ) { 541 541 // __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 542 542 … … 557 557 } 558 558 559 static void __ioarbiter_notify( $io_arbiter & this, $io_context* ctx ) {559 static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) { 560 560 /* paranoid */ verify( !empty(this.pending.queue) ); 561 561 … … 587 587 } 588 588 589 static void __ioarbiter_notify( $io_context& ctx ) {589 static void __ioarbiter_notify( io_context$ & ctx ) { 590 590 if(!empty( ctx.arbiter->pending )) { 591 591 __ioarbiter_notify( *ctx.arbiter, &ctx ); … … 594 594 595 595 // Simply append to the pending 596 static void __ioarbiter_submit( $io_context* ctx, __u32 idxs[], __u32 have, bool lazy ) {596 static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) { 597 597 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 598 598 … … 618 618 } 619 619 620 static void __ioarbiter_flush( $io_context& ctx ) {620 static void __ioarbiter_flush( io_context$ & ctx ) { 621 621 if(!empty( ctx.ext_sq )) { 622 622 __STATS__( false, io.flush.external += 1; ) … … 642 642 #if defined(CFA_WITH_IO_URING_IDLE) 643 643 bool __kernel_read(processor * proc, io_future_t & future, iovec & iov, int fd) { 644 $io_context* ctx = proc->io.ctx;644 io_context$ * ctx = proc->io.ctx; 645 645 /* paranoid */ verify( ! __preemption_enabled() ); 646 646 /* paranoid */ verify( proc == __cfaabi_tls.this_processor ); -
libcfa/src/concurrency/io/call.cfa.in
r32d1383 r17c6edeb 75 75 ; 76 76 77 extern struct $io_context* cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2)));78 extern void cfa_io_submit( struct $io_context* in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));77 extern struct io_context$ * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 78 extern void cfa_io_submit( struct io_context$ * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 79 79 #endif 80 80 … … 200 200 __u32 idx; 201 201 struct io_uring_sqe * sqe; 202 struct $io_context* ctx = cfa_io_allocate( &sqe, &idx, 1 );202 struct io_context$ * ctx = cfa_io_allocate( &sqe, &idx, 1 ); 203 203 204 204 sqe->opcode = IORING_OP_{op}; -
libcfa/src/concurrency/io/setup.cfa
r32d1383 r17c6edeb 28 28 void ?{}(io_context_params & this) libcfa_public {} 29 29 30 void ?{}( $io_context& this, struct cluster & cl) {}31 void ^?{}( $io_context& this) {}30 void ?{}(io_context$ & this, struct cluster & cl) {} 31 void ^?{}(io_context$ & this) {} 32 32 33 33 void __cfa_io_start( processor * proc ) {} … … 37 37 void __cfa_io_stop ( processor * proc ) {} 38 38 39 $io_arbiter* create(void) { return 0p; }40 void destroy( $io_arbiter*) {}39 io_arbiter$ * create(void) { return 0p; } 40 void destroy(io_arbiter$ *) {} 41 41 42 42 #else … … 105 105 106 106 107 static void __io_uring_setup ( $io_context& this, const io_context_params & params_in, int procfd );108 static void __io_uring_teardown( $io_context& this );109 static void __epoll_register( $io_context& ctx);110 static void __epoll_unregister( $io_context& ctx);111 void __ioarbiter_register( $io_arbiter & mutex, $io_context& ctx );112 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context& ctx );113 114 void ?{}( $io_context& this, processor * proc, struct cluster & cl) {107 static void __io_uring_setup ( io_context$ & this, const io_context_params & params_in, int procfd ); 108 static void __io_uring_teardown( io_context$ & this ); 109 static void __epoll_register(io_context$ & ctx); 110 static void __epoll_unregister(io_context$ & ctx); 111 void __ioarbiter_register( io_arbiter$ & mutex, io_context$ & ctx ); 112 void __ioarbiter_unregister( io_arbiter$ & mutex, io_context$ & ctx ); 113 114 void ?{}(io_context$ & this, processor * proc, struct cluster & cl) { 115 115 /* paranoid */ verify( cl.io.arbiter ); 116 116 this.proc = proc; … … 122 122 } 123 123 124 void ^?{}( $io_context& this) {124 void ^?{}(io_context$ & this) { 125 125 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 126 126 … … 129 129 } 130 130 131 static void __io_uring_setup( $io_context& this, const io_context_params & params_in, int procfd ) {131 static void __io_uring_setup( io_context$ & this, const io_context_params & params_in, int procfd ) { 132 132 // Step 1 : call to setup 133 133 struct io_uring_params params; … … 270 270 } 271 271 272 static void __io_uring_teardown( $io_context& this ) {272 static void __io_uring_teardown( io_context$ & this ) { 273 273 // Shutdown the io rings 274 274 struct __sub_ring_t & sq = this.sq; … … 313 313 // I/O Context Sleep 314 314 //============================================================================================= 315 // static inline void __epoll_ctl( $io_context& ctx, int op, const char * error) {315 // static inline void __epoll_ctl(io_context$ & ctx, int op, const char * error) { 316 316 // struct epoll_event ev; 317 317 // ev.events = EPOLLIN | EPOLLONESHOT; … … 323 323 // } 324 324 325 // static void __epoll_register( $io_context& ctx) {325 // static void __epoll_register(io_context$ & ctx) { 326 326 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 327 327 // } 328 328 329 // static void __epoll_unregister( $io_context& ctx) {329 // static void __epoll_unregister(io_context$ & ctx) { 330 330 // // Read the current epoch so we know when to stop 331 331 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); … … 346 346 // } 347 347 348 // void __ioctx_prepare_block( $io_context& ctx) {348 // void __ioctx_prepare_block(io_context$ & ctx) { 349 349 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 350 350 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); … … 355 355 // I/O Context Misc Setup 356 356 //============================================================================================= 357 void ?{}( $io_arbiter& this ) {357 void ?{}( io_arbiter$ & this ) { 358 358 this.pending.empty = true; 359 359 } 360 360 361 void ^?{}( $io_arbiter& mutex this ) {}362 363 $io_arbiter* create(void) {361 void ^?{}( io_arbiter$ & mutex this ) {} 362 363 io_arbiter$ * create(void) { 364 364 return new(); 365 365 } 366 void destroy( $io_arbiter* arbiter) {366 void destroy(io_arbiter$ * arbiter) { 367 367 delete(arbiter); 368 368 } -
libcfa/src/concurrency/io/types.hfa
r32d1383 r17c6edeb 33 33 34 34 struct processor; 35 monitor $io_arbiter;35 monitor io_arbiter$; 36 36 37 37 //----------------------------------------------------------------------- … … 125 125 126 126 127 struct __attribute__((aligned(64))) $io_context{128 $io_arbiter* arbiter;127 struct __attribute__((aligned(64))) io_context$ { 128 io_arbiter$ * arbiter; 129 129 processor * proc; 130 130 … … 137 137 }; 138 138 139 static inline unsigned long long ts( $io_context*& this) {139 static inline unsigned long long ts(io_context$ *& this) { 140 140 const __u32 head = *this->cq.head; 141 141 const __u32 tail = *this->cq.tail; … … 150 150 __u32 * idxs; 151 151 __u32 want; 152 $io_context* ctx;152 io_context$ * ctx; 153 153 }; 154 154 155 monitor __attribute__((aligned(64))) $io_arbiter{155 monitor __attribute__((aligned(64))) io_arbiter$ { 156 156 __outstanding_io_queue pending; 157 157 }; … … 186 186 #endif 187 187 188 // void __ioctx_prepare_block( $io_context& ctx);188 // void __ioctx_prepare_block(io_context$ & ctx); 189 189 #endif -
libcfa/src/concurrency/iofwd.hfa
r32d1383 r17c6edeb 49 49 50 50 struct cluster; 51 struct $io_context;51 struct io_context$; 52 52 53 53 struct iovec; … … 82 82 //---------- 83 83 // underlying calls 84 extern struct $io_context* cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2)));85 extern void cfa_io_submit( struct $io_context* in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));84 extern struct io_context$ * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 85 extern void cfa_io_submit( struct io_context$ * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 86 86 87 87 //---------- -
libcfa/src/concurrency/kernel.cfa
r32d1383 r17c6edeb 305 305 RUNNING: while(true) { 306 306 thrd_dst->preempted = __NO_PREEMPTION; 307 thrd_dst->state = Active;308 307 309 308 // Update global state 310 309 kernelTLS().this_thread = thrd_dst; 310 311 // Update the state after setting this_thread 312 // so that the debugger can find all active threads 313 // in tls storage 314 thrd_dst->state = Active; 311 315 312 316 /* paranoid */ verify( ! __preemption_enabled() ); … … 335 339 /* paranoid */ verify( ! __preemption_enabled() ); 336 340 337 // Reset global state338 kernelTLS().this_thread = 0p;339 340 341 // We just finished running a thread, there are a few things that could have happened. 341 342 // 1 - Regular case : the thread has blocked and now one has scheduled it yet. … … 346 347 347 348 if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) { 349 // Reset the this_thread now that we know 350 // the state isn't active anymore 351 kernelTLS().this_thread = 0p; 352 348 353 // The thread was preempted, reschedule it and reset the flag 349 354 schedule_thread$( thrd_dst, UNPARK_LOCAL ); … … 352 357 353 358 if(unlikely(thrd_dst->state == Halting)) { 359 // Reset the this_thread now that we know 360 // the state isn't active anymore 361 kernelTLS().this_thread = 0p; 362 354 363 // The thread has halted, it should never be scheduled/run again 355 364 // finish the thread … … 360 369 /* paranoid */ verify( thrd_dst->state == Active ); 361 370 thrd_dst->state = Blocked; 371 372 // Reset the this_thread now that we know 373 // the state isn't active anymore 374 kernelTLS().this_thread = 0p; 362 375 363 376 // set state of processor coroutine to active and the thread to inactive -
libcfa/src/concurrency/kernel.hfa
r32d1383 r17c6edeb 35 35 // I/O 36 36 struct cluster; 37 struct $io_context;38 struct $io_arbiter;37 struct io_context$; 38 struct io_arbiter$; 39 39 40 40 struct io_context_params { … … 113 113 114 114 struct { 115 $io_context* ctx;115 io_context$ * ctx; 116 116 unsigned target; 117 117 volatile bool pending; … … 230 230 struct { 231 231 // Array of $io_ 232 $io_context** data;232 io_context$ ** data; 233 233 234 234 // Time since subqueues were processed … … 267 267 268 268 struct { 269 $io_arbiter* arbiter;269 io_arbiter$ * arbiter; 270 270 io_context_params params; 271 271 } io; -
libcfa/src/concurrency/kernel/cluster.cfa
r32d1383 r17c6edeb 278 278 279 279 #if defined(CFA_HAVE_LINUX_IO_URING_H) 280 static void assign_io( $io_context** data, size_t count, dlist(processor) & list) {280 static void assign_io(io_context$ ** data, size_t count, dlist(processor) & list) { 281 281 processor * it = &list`first; 282 282 while(it) { -
libcfa/src/concurrency/kernel/private.hfa
r32d1383 r17c6edeb 139 139 //----------------------------------------------------------------------------- 140 140 // I/O 141 $io_arbiter* create(void);142 void destroy( $io_arbiter*);141 io_arbiter$ * create(void); 142 void destroy(io_arbiter$ *); 143 143 144 144 //=======================================================================
Note: See TracChangeset
for help on using the changeset viewer.