- Timestamp:
- Mar 11, 2022, 1:56:07 PM (4 years ago)
- Branches:
- ADT, ast-experimental, enum, master, pthread-emulation, qualifiedEnum
- Children:
- 623d1c8
- Parents:
- eb3bc52 (diff), 630c4bb (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
reb3bc52 r510e6f9 175 175 /* paranoid */ verify( ! __preemption_enabled() ); 176 176 177 ctx.proc->io.pending = false;177 __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED); 178 178 } 179 179 … … 287 287 //============================================================================================= 288 288 // submission 289 static inline void __submit ( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {289 static inline void __submit_only( struct $io_context * ctx, __u32 idxs[], __u32 have) { 290 290 // We can proceed to the fast path 291 291 // Get the right objects … … 304 304 sq.to_submit += have; 305 305 306 ctx->proc->io.pending = true; 307 ctx->proc->io.dirty = true; 306 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED); 307 __atomic_store_n(&ctx->proc->io.dirty , true, __ATOMIC_RELAXED); 308 } 309 310 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) { 311 __sub_ring_t & sq = ctx->sq; 312 __submit_only(ctx, idxs, have); 313 308 314 if(sq.to_submit > 30) { 309 315 __tls_stats()->io.flush.full++; … … 402 408 // I/O Arbiter 403 409 //============================================================================================= 404 static inline void block(__outstanding_io_queue & queue, __outstanding_io & item) { 410 static inline bool enqueue(__outstanding_io_queue & queue, __outstanding_io & item) { 411 bool was_empty; 412 405 413 // Lock the list, it's not thread safe 406 414 lock( queue.lock __cfaabi_dbg_ctx2 ); 407 415 { 416 was_empty = empty(queue.queue); 417 408 418 // Add our request to the list 409 419 add( queue.queue, item ); … … 414 424 unlock( queue.lock ); 415 425 416 wait( item.sem );426 return was_empty; 417 427 } 418 428 … … 432 442 pa.want = want; 433 443 434 block(this.pending, (__outstanding_io&)pa); 444 enqueue(this.pending, (__outstanding_io&)pa); 445 446 wait( pa.sem ); 435 447 436 448 return pa.ctx; … … 485 497 ei.lazy = lazy; 486 498 487 block(ctx->ext_sq, (__outstanding_io&)ei); 499 bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei); 500 501 __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST); 502 503 if( we ) { 504 sigval_t value = { PREEMPT_IO }; 505 pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); 506 } 507 508 wait( ei.sem ); 488 509 489 510 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); … … 501 522 __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue ); 502 523 503 __submit (&ctx, ei.idxs, ei.have, ei.lazy);524 __submit_only(&ctx, ei.idxs, ei.have); 504 525 505 526 post( ei.sem ); -
libcfa/src/concurrency/io/setup.cfa
reb3bc52 r510e6f9 56 56 57 57 #include "bitmanip.hfa" 58 #include "fstream.hfa" 58 59 #include "kernel_private.hfa" 59 60 #include "thread.hfa" … … 258 259 struct __sub_ring_t & sq = this.sq; 259 260 struct __cmp_ring_t & cq = this.cq; 261 { 262 __u32 fhead = sq.free_ring.head; 263 __u32 ftail = sq.free_ring.tail; 264 265 __u32 total = *sq.num; 266 __u32 avail = ftail - fhead; 267 268 if(avail != total) abort | "Processor (" | (void*)this.proc | ") tearing down ring with" | (total - avail) | "entries allocated but not submitted, out of" | total; 269 } 260 270 261 271 // unmap the submit queue entries -
libcfa/src/concurrency/io/types.hfa
reb3bc52 r510e6f9 23 23 #include "bits/locks.hfa" 24 24 #include "bits/queue.hfa" 25 #include "iofwd.hfa" 25 26 #include "kernel/fwd.hfa" 26 27 … … 170 171 // void __ioctx_prepare_block($io_context & ctx); 171 172 #endif 172 173 //-----------------------------------------------------------------------174 // IO user data175 struct io_future_t {176 future_t self;177 __s32 result;178 };179 180 static inline {181 thread$ * fulfil( io_future_t & this, __s32 result, bool do_unpark = true ) {182 this.result = result;183 return fulfil(this.self, do_unpark);184 }185 186 // Wait for the future to be fulfilled187 bool wait ( io_future_t & this ) { return wait (this.self); }188 void reset ( io_future_t & this ) { return reset (this.self); }189 bool available( io_future_t & this ) { return available(this.self); }190 } -
libcfa/src/concurrency/iofwd.hfa
reb3bc52 r510e6f9 19 19 extern "C" { 20 20 #include <asm/types.h> 21 #include <sys/stat.h> // needed for mode_t 21 22 #if CFA_HAVE_LINUX_IO_URING_H 22 23 #include <linux/io_uring.h> … … 24 25 } 25 26 #include "bits/defs.hfa" 27 #include "kernel/fwd.hfa" 26 28 #include "time.hfa" 27 29 … … 47 49 48 50 struct cluster; 49 struct io_future_t;50 51 struct $io_context; 51 52 … … 57 58 58 59 struct io_uring_sqe; 60 61 //----------------------------------------------------------------------- 62 // IO user data 63 struct io_future_t { 64 future_t self; 65 __s32 result; 66 }; 67 68 static inline { 69 thread$ * fulfil( io_future_t & this, __s32 result, bool do_unpark = true ) { 70 this.result = result; 71 return fulfil(this.self, do_unpark); 72 } 73 74 // Wait for the future to be fulfilled 75 bool wait ( io_future_t & this ) { return wait (this.self); } 76 void reset ( io_future_t & this ) { return reset (this.self); } 77 bool available( io_future_t & this ) { return available(this.self); } 78 } 59 79 60 80 //---------- … … 133 153 // Check if a function is blocks a only the user thread 134 154 bool has_user_level_blocking( fptr_t func ); 155 156 #if CFA_HAVE_LINUX_IO_URING_H 157 static inline void zero_sqe(struct io_uring_sqe * sqe) { 158 sqe->flags = 0; 159 sqe->ioprio = 0; 160 sqe->fd = 0; 161 sqe->off = 0; 162 sqe->addr = 0; 163 sqe->len = 0; 164 sqe->fsync_flags = 0; 165 sqe->__pad2[0] = 0; 166 sqe->__pad2[1] = 0; 167 sqe->__pad2[2] = 0; 168 sqe->fd = 0; 169 sqe->off = 0; 170 sqe->addr = 0; 171 sqe->len = 0; 172 } 173 #endif -
libcfa/src/concurrency/kernel.cfa
reb3bc52 r510e6f9 251 251 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 252 252 253 if( this->io.pending && !this->io.dirty) {253 if(__atomic_load_n(&this->io.pending, __ATOMIC_RELAXED) && !__atomic_load_n(&this->io.dirty, __ATOMIC_RELAXED)) { 254 254 __IO_STATS__(true, io.flush.dirty++; ) 255 255 __cfa_io_flush( this, 0 ); -
libcfa/src/concurrency/kernel.hfa
reb3bc52 r510e6f9 92 92 struct { 93 93 $io_context * ctx; 94 bool pending;95 bool dirty;94 volatile bool pending; 95 volatile bool dirty; 96 96 } io; 97 97 -
libcfa/src/concurrency/kernel/fwd.hfa
reb3bc52 r510e6f9 347 347 struct oneshot * want = expected == 0p ? 1p : 2p; 348 348 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 349 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p);return 0p; }349 if( expected == 0p ) { return 0p; } 350 350 thread$ * ret = post( *expected, do_unpark ); 351 351 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); -
libcfa/src/concurrency/kernel_private.hfa
reb3bc52 r510e6f9 60 60 extern bool __preemption_enabled(); 61 61 62 enum { 63 PREEMPT_NORMAL = 0, 64 PREEMPT_TERMINATE = 1, 65 PREEMPT_IO = 2, 66 }; 67 62 68 static inline void __disable_interrupts_checked() { 63 69 /* paranoid */ verify( __preemption_enabled() ); -
libcfa/src/concurrency/preemption.cfa
reb3bc52 r510e6f9 96 96 lock{}; 97 97 } 98 99 enum {100 PREEMPT_NORMAL = 0,101 PREEMPT_TERMINATE = 1,102 };103 98 104 99 //============================================================================================= … … 664 659 choose(sfp->si_value.sival_int) { 665 660 case PREEMPT_NORMAL : ;// Normal case, nothing to do here 661 case PREEMPT_IO : ;// I/O asked to stop spinning, nothing to do here 666 662 case PREEMPT_TERMINATE: verify( __atomic_load_n( &__cfaabi_tls.this_processor->do_terminate, __ATOMIC_SEQ_CST ) ); 667 663 default:
Note:
See TracChangeset
for help on using the changeset viewer.