Changeset 78da4ab for libcfa/src/concurrency/io
- Timestamp:
- Feb 19, 2021, 1:47:09 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 4f762d3
- Parents:
- b44959f
- Location:
- libcfa/src/concurrency/io
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/call.cfa.in
rb44959f r78da4ab 54 54 | IOSQE_IO_DRAIN 55 55 #endif 56 #if defined(CFA_HAVE_IOSQE_IO_LINK) 57 | IOSQE_IO_LINK 58 #endif 59 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 60 | IOSQE_IO_HARDLINK 61 #endif 56 62 #if defined(CFA_HAVE_IOSQE_ASYNC) 57 63 | IOSQE_ASYNC 58 64 #endif 59 ; 60 61 static const __u32 LINK_FLAGS = 0 62 #if defined(CFA_HAVE_IOSQE_IO_LINK) 63 | IOSQE_IO_LINK 64 #endif 65 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 66 | IOSQE_IO_HARDLINK 65 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED) 66 | IOSQE_BUFFER_SELECTED 67 67 #endif 68 68 ; … … 74 74 ; 75 75 76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 79 static inline io_context * __get_io_context( void ) { 80 cluster * cltr = active_cluster(); 81 82 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n"); 83 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr ); 84 85 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr); 86 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ]; 87 } 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2))); 88 78 #endif 89 79 … … 195 185 return ', '.join(args_a) 196 186 197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags , io_cancellation * cancellation, io_context * context) {{187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags) {{ 198 188 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 199 189 ssize_t res = {name}({args}); … … 205 195 }} 206 196 #else 207 // we don't support LINK yet208 if( 0 != (submit_flags & LINK_FLAGS) ) {{209 errno = ENOTSUP; return -1;210 }}211 212 if( !context ) {{213 context = __get_io_context();214 }}215 if(cancellation) {{216 cancellation->target = (__u64)(uintptr_t)&future;217 }}218 219 197 __u8 sflags = REGULAR_FLAGS & submit_flags; 220 struct __io_data & ring = *context->thrd.ring;221 222 198 __u32 idx; 223 199 struct io_uring_sqe * sqe; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future);200 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 ); 225 201 226 202 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = (__u64)(uintptr_t)&future; 227 204 sqe->flags = sflags; 228 205 sqe->ioprio = 0; … … 239 216 240 217 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 241 __submit( context, idx);218 cfa_io_submit( ctx, &idx, 1 ); 242 219 #endif 243 220 }}""" 244 221 245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{ 246 if( timeout >= 0 ) {{ 247 errno = ENOTSUP; 248 return -1; 249 }} 222 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags) {{ 250 223 io_future_t future; 251 224 252 async_{name}( future, {args}, submit_flags , cancellation, context);225 async_{name}( future, {args}, submit_flags ); 253 226 254 227 wait( future ); … … 415 388 if c.define: 416 389 print("""#if defined({define}) 417 {ret} cfa_{name}({params}, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);390 {ret} cfa_{name}({params}, int submit_flags); 418 391 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 419 392 else: 420 print("{ret} cfa_{name}({params}, int submit_flags , Duration timeout, io_cancellation * cancellation, io_context * context);"393 print("{ret} cfa_{name}({params}, int submit_flags);" 421 394 .format(ret=c.ret, name=c.name, params=c.params)) 422 395 … … 426 399 if c.define: 427 400 print("""#if defined({define}) 428 void async_{name}(io_future_t & future, {params}, int submit_flags , io_cancellation * cancellation, io_context * context);401 void async_{name}(io_future_t & future, {params}, int submit_flags); 429 402 #endif""".format(define=c.define,name=c.name, params=c.params)) 430 403 else: 431 print("void async_{name}(io_future_t & future, {params}, int submit_flags , io_cancellation * cancellation, io_context * context);"404 print("void async_{name}(io_future_t & future, {params}, int submit_flags);" 432 405 .format(name=c.name, params=c.params)) 433 406 print("\n") … … 474 447 475 448 print(""" 476 //-----------------------------------------------------------------------------477 bool cancel(io_cancellation & this) {478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)479 return false;480 #else481 io_future_t future;482 483 io_context * context = __get_io_context();484 485 __u8 sflags = 0;486 struct __io_data & ring = *context->thrd.ring;487 488 __u32 idx;489 volatile struct io_uring_sqe * sqe;490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;493 sqe->opcode = IORING_OP_ASYNC_CANCEL;494 sqe->flags = sflags;495 sqe->addr = this.target;496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future );498 __submit( context, idx );499 500 wait(future);501 502 if( future.result == 0 ) return true; // Entry found503 if( future.result == -EALREADY) return true; // Entry found but in progress504 if( future.result == -ENOENT ) return false; // Entry not found505 return false;506 #endif507 }508 509 449 //----------------------------------------------------------------------------- 510 450 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
rb44959f r78da4ab 36 36 void ?{}(io_context_params & this) {} 37 37 38 void ?{}(io_context & this, struct cluster & cl) {} 39 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 40 41 void ^?{}(io_context & this) {} 42 void ^?{}(io_context & this, bool cluster_context) {} 43 44 void register_fixed_files( io_context &, int *, unsigned ) {} 45 void register_fixed_files( cluster &, int *, unsigned ) {} 38 void ?{}($io_context & this, struct cluster & cl) {} 39 void ^?{}($io_context & this) {} 40 41 $io_arbiter * create(void) { return 0p; } 42 void destroy($io_arbiter *) {} 46 43 47 44 #else … … 68 65 void ?{}(io_context_params & this) { 69 66 this.num_entries = 256; 70 this.num_ready = 256;71 this.submit_aff = -1;72 this.eager_submits = false;73 this.poller_submits = false;74 this.poll_submit = false;75 this.poll_complete = false;76 67 } 77 68 … … 194 185 195 186 for(i; nfds) { 196 $io_c tx_thread * io_ctx = ($io_ctx_thread*)(uintptr_t)events[i].data.u64;187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64; 197 188 /* paranoid */ verify( io_ctx ); 198 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx-> ring->fd, io_ctx);189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx); 199 190 #if !defined( __CFA_NO_STATISTICS__ ) 200 191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; … … 202 193 203 194 eventfd_t v; 204 eventfd_read(io_ctx-> ring->efd, &v);195 eventfd_read(io_ctx->efd, &v); 205 196 206 197 post( io_ctx->sem ); … … 219 210 //============================================================================================= 220 211 221 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 222 void main( $io_ctx_thread & this ); 223 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 224 void ^?{}( $io_ctx_thread & mutex this ) {} 225 226 static void __io_create ( __io_data & this, const io_context_params & params_in ); 227 static void __io_destroy( __io_data & this ); 228 229 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 230 (this.thrd){ cl }; 231 this.thrd.ring = malloc(); 232 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 233 __io_create( *this.thrd.ring, params ); 234 235 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 236 this.thrd.done = false; 237 __thrd_start( this.thrd, main ); 238 239 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in ); 213 static void __io_uring_teardown( $io_context & this ); 214 static void __epoll_register($io_context & ctx); 215 static void __epoll_unregister($io_context & ctx); 216 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx ); 217 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 218 219 void ?{}($io_context & this, struct cluster & cl) { 220 (this.self){ "IO Poller", cl }; 221 this.ext_sq.empty = true; 222 __io_uring_setup( this, cl.io.params ); 223 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 224 225 __epoll_register(this); 226 227 __ioarbiter_register(*cl.io.arbiter, this); 228 229 __thrd_start( this, main ); 230 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd); 231 } 232 233 void ^?{}($io_context & mutex this) { 234 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 235 236 ^(this.self){}; 237 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd); 238 239 __ioarbiter_unregister(*this.arbiter, this); 240 241 __epoll_unregister(this); 242 243 __io_uring_teardown( this ); 244 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd); 240 245 } 241 246 242 247 void ?{}(io_context & this, struct cluster & cl) { 243 io_context_params params; 244 (this){ cl, params }; 245 } 246 247 void ^?{}(io_context & this, bool cluster_context) { 248 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 249 250 // Notify the thread of the shutdown 251 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 252 253 // If this is an io_context within a cluster, things get trickier 254 $thread & thrd = this.thrd.self; 255 if( cluster_context ) { 256 // We are about to do weird things with the threads 257 // we don't need interrupts to complicate everything 258 disable_interrupts(); 259 260 // Get cluster info 261 cluster & cltr = *thrd.curr_cluster; 262 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 263 /* paranoid */ verify( !ready_mutate_islocked() ); 264 265 // We need to adjust the clean-up based on where the thread is 266 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 267 // This is the tricky case 268 // The thread was preempted or ready to run and now it is on the ready queue 269 // but the cluster is shutting down, so there aren't any processors to run the ready queue 270 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 271 272 ready_schedule_lock(); 273 // The thread should on the list 274 /* paranoid */ verify( thrd.link.next != 0p ); 275 276 // Remove the thread from the ready queue of this cluster 277 // The thread should be the last on the list 278 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 279 /* paranoid */ verify( removed ); 280 thrd.link.next = 0p; 281 thrd.link.prev = 0p; 282 283 // Fixup the thread state 284 thrd.state = Blocked; 285 thrd.ticket = TICKET_BLOCKED; 286 thrd.preempted = __NO_PREEMPTION; 287 288 ready_schedule_unlock(); 289 290 // Pretend like the thread was blocked all along 291 } 292 // !!! This is not an else if !!! 293 // Ok, now the thread is blocked (whether we cheated to get here or not) 294 if( thrd.state == Blocked ) { 295 // This is the "easy case" 296 // The thread is parked and can easily be moved to active cluster 297 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 298 thrd.curr_cluster = active_cluster(); 299 300 // unpark the fast io_poller 301 unpark( &thrd ); 302 } 303 else { 304 // The thread is in a weird state 305 // I don't know what to do here 306 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 307 } 308 309 // The weird thread kidnapping stuff is over, restore interrupts. 310 enable_interrupts( __cfaabi_dbg_ctx ); 311 } else { 312 post( this.thrd.sem ); 313 } 314 315 ^(this.thrd){}; 316 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 317 318 __io_destroy( *this.thrd.ring ); 319 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 320 321 free(this.thrd.ring); 248 // this.ctx = new(cl); 249 this.ctx = alloc(); 250 (*this.ctx){ cl }; 251 252 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd); 322 253 } 323 254 324 255 void ^?{}(io_context & this) { 325 ^(this){ false }; 256 post( this.ctx->sem ); 257 258 delete(this.ctx); 326 259 } 327 260 … … 329 262 extern void __enable_interrupts_hard(); 330 263 331 static void __io_ create( __io_data& this, const io_context_params & params_in ) {264 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) { 332 265 // Step 1 : call to setup 333 266 struct io_uring_params params; 334 267 memset(¶ms, 0, sizeof(params)); 335 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;336 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;268 // if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 269 // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 337 270 338 271 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; … … 340 273 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 341 274 } 342 if( params_in.poller_submits && params_in.eager_submits ) {343 abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");344 }345 275 346 276 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 350 280 351 281 // Step 2 : mmap result 352 memset( &this, 0, sizeof(struct __io_data) ); 353 struct __submition_data & sq = this.submit_q; 354 struct __completion_data & cq = this.completion_q; 282 struct __sub_ring_t & sq = this.sq; 283 struct __cmp_ring_t & cq = this.cq; 355 284 356 285 // calculate the right ring size … … 401 330 // Get the pointers from the kernel to fill the structure 402 331 // submit queue 403 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 404 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 405 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 406 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 407 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 408 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 409 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 410 sq.prev_head = *sq.head; 411 412 { 413 const __u32 num = *sq.num; 414 for( i; num ) { 415 __sqe_clean( &sq.sqes[i] ); 416 } 417 } 418 419 (sq.submit_lock){}; 420 (sq.release_lock){}; 421 422 if( params_in.poller_submits || params_in.eager_submits ) { 423 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 424 sq.ready_cnt = max( params_in.num_ready, 8 ); 425 sq.ready = alloc( sq.ready_cnt, 64`align ); 426 for(i; sq.ready_cnt) { 427 sq.ready[i] = -1ul32; 428 } 429 sq.prev_ready = 0; 430 } 431 else { 432 sq.ready_cnt = 0; 433 sq.ready = 0p; 434 sq.prev_ready = 0; 435 } 332 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 333 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 334 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 335 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 336 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 337 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 338 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 339 340 sq.kring.ready = 0; 341 sq.kring.released = 0; 342 343 sq.free_ring.head = 0; 344 sq.free_ring.tail = *sq.num; 345 sq.free_ring.array = alloc( *sq.num, 128`align ); 346 for(i; (__u32)*sq.num) { 347 sq.free_ring.array[i] = i; 348 } 349 350 sq.to_submit = 0; 436 351 437 352 // completion queue … … 468 383 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 469 384 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 470 /* paranoid */ verifyf( (*sq. head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );471 /* paranoid */ verifyf( (*sq. tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );385 /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head ); 386 /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail ); 472 387 473 388 // Update the global ring info 474 this.ring_flags = params.flags;389 this.ring_flags = 0; 475 390 this.fd = fd; 476 391 this.efd = efd; 477 this.eager_submits = params_in.eager_submits; 478 this.poller_submits = params_in.poller_submits; 479 } 480 481 static void __io_destroy( __io_data & this ) { 392 } 393 394 static void __io_uring_teardown( $io_context & this ) { 482 395 // Shutdown the io rings 483 struct __sub mition_data & sq = this.submit_q;484 struct __c ompletion_data & cq = this.completion_q;396 struct __sub_ring_t & sq = this.sq; 397 struct __cmp_ring_t & cq = this.cq; 485 398 486 399 // unmap the submit queue entries … … 499 412 close(this.efd); 500 413 501 free( this.s ubmit_q.ready ); // Maybe null, doesn't matter414 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 502 415 } 503 416 … … 505 418 // I/O Context Sleep 506 419 //============================================================================================= 507 static inline void __ ioctx_epoll_ctl($io_ctx_thread& ctx, int op, const char * error) {420 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 508 421 struct epoll_event ev; 509 422 ev.events = EPOLLIN | EPOLLONESHOT; 510 423 ev.data.u64 = (__u64)&ctx; 511 int ret = epoll_ctl(iopoll.epollfd, op, ctx. ring->efd, &ev);424 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 512 425 if (ret < 0) { 513 426 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); … … 515 428 } 516 429 517 void __ioctx_register($io_ctx_thread & ctx) { 518 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 519 } 520 521 void __ioctx_prepare_block($io_ctx_thread & ctx) { 522 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 523 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 524 } 525 526 void __ioctx_unregister($io_ctx_thread & ctx) { 430 static void __epoll_register($io_context & ctx) { 431 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 432 } 433 434 static void __epoll_unregister($io_context & ctx) { 527 435 // Read the current epoch so we know when to stop 528 436 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 529 437 530 438 // Remove the fd from the iopoller 531 __ ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");439 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 532 440 533 441 // Notify the io poller thread of the shutdown … … 543 451 } 544 452 453 void __ioctx_prepare_block($io_context & ctx) { 454 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 455 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 456 } 457 458 545 459 //============================================================================================= 546 460 // I/O Context Misc Setup 547 461 //============================================================================================= 548 void register_fixed_files( io_context & ctx, int * files, unsigned count ) { 549 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 550 if( ret < 0 ) { 551 abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) ); 552 } 553 554 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 555 } 556 557 void register_fixed_files( cluster & cltr, int * files, unsigned count ) { 558 for(i; cltr.io.cnt) { 559 register_fixed_files( cltr.io.ctxs[i], files, count ); 560 } 561 } 462 void ?{}( $io_arbiter & this ) { 463 this.pending.flag = false; 464 } 465 466 void ^?{}( $io_arbiter & mutex this ) { 467 /* paranoid */ verify( empty(this.assigned) ); 468 /* paranoid */ verify( empty(this.available) ); 469 /* paranoid */ verify( is_empty(this.pending.blocked) ); 470 } 471 472 $io_arbiter * create(void) { 473 return new(); 474 } 475 void destroy($io_arbiter * arbiter) { 476 delete(arbiter); 477 } 478 479 //============================================================================================= 480 // I/O Context Misc Setup 481 //============================================================================================= 482 562 483 #endif -
libcfa/src/concurrency/io/types.hfa
rb44959f r78da4ab 25 25 26 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 27 #define LEADER_LOCK 28 struct __leaderlock_t { 29 struct $thread * volatile value; // ($thread) next_leader | (bool:1) is_locked 30 }; 27 #include "bits/sequence.hfa" 28 #include "monitor.hfa" 31 29 32 static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; } 30 struct processor; 31 monitor $io_arbiter; 33 32 34 33 //----------------------------------------------------------------------- 35 34 // Ring Data structure 36 struct __submition_data { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; 39 volatile __u32 * tail; 40 volatile __u32 prev_head; 35 struct __sub_ring_t { 36 struct { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 ready; // one passed last index added to array () 41 volatile __u32 released; // one passed last index released back to the free list 41 42 42 // The actual kernel ring which uses head/tail 43 // indexes into the sqes arrays 44 __u32 * array; 43 // The actual kernel ring which uses head/tail 44 // indexes into the sqes arrays 45 __u32 * array; 46 } kring; 47 48 struct { 49 volatile __u32 head; 50 volatile __u32 tail; 51 // The ring which contains free allocations 52 // indexes into the sqes arrays 53 __u32 * array; 54 } free_ring; 55 56 // number of sqes to submit on next system call. 57 __u32 to_submit; 45 58 46 59 // number of entries and mask to go with it … … 48 61 const __u32 * mask; 49 62 50 // Submission flags (Not sure what for)63 // Submission flags, currently only IORING_SETUP_SQPOLL 51 64 __u32 * flags; 52 65 53 // number of sqes not submitted (whatever that means) 66 // number of sqes not submitted 67 // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer. 54 68 __u32 * dropped; 55 69 56 // Like head/tail but not seen by the kernel57 volatile __u32 * ready;58 __u32 ready_cnt;59 __u32 prev_ready;60 61 #if defined(LEADER_LOCK)62 __leaderlock_t submit_lock;63 #else64 __spinlock_t submit_lock;65 #endif66 __spinlock_t release_lock;67 68 70 // A buffer of sqes (not the actual ring) 69 volatilestruct io_uring_sqe * sqes;71 struct io_uring_sqe * sqes; 70 72 71 73 // The location and size of the mmaped area … … 74 76 }; 75 77 76 struct __c ompletion_data{78 struct __cmp_ring_t { 77 79 // Head and tail of the ring 78 80 volatile __u32 * head; … … 83 85 const __u32 * num; 84 86 85 // number of cqes not submitted (whatever that means)87 // I don't know what this value is for 86 88 __u32 * overflow; 87 89 … … 94 96 }; 95 97 96 struct __io_data { 97 struct __submition_data submit_q; 98 struct __completion_data completion_q; 98 struct __attribute__((aligned(128))) $io_context { 99 inline Seqable; 100 101 volatile bool revoked; 102 processor * proc; 103 104 $io_arbiter * arbiter; 105 106 struct { 107 volatile bool empty; 108 condition blocked; 109 } ext_sq; 110 111 struct __sub_ring_t sq; 112 struct __cmp_ring_t cq; 99 113 __u32 ring_flags; 100 114 int fd; 101 115 int efd; 102 bool eager_submits:1; 103 bool poller_submits:1; 116 117 single_sem sem; 118 $thread self; 119 }; 120 121 void main( $io_context & this ); 122 static inline $thread * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; } 123 static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; } 124 static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); } 125 static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); } 126 void ^?{}( $io_context & mutex this ); 127 128 monitor __attribute__((aligned(128))) $io_arbiter { 129 struct { 130 condition blocked; 131 $io_context * ctx; 132 volatile bool flag; 133 } pending; 134 135 Sequence($io_context) assigned; 136 137 Sequence($io_context) available; 104 138 }; 105 139 … … 133 167 #endif 134 168 135 struct $io_ctx_thread; 136 void __ioctx_register($io_ctx_thread & ctx); 137 void __ioctx_unregister($io_ctx_thread & ctx); 138 void __ioctx_prepare_block($io_ctx_thread & ctx); 139 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 169 void __ioctx_prepare_block($io_context & ctx); 140 170 #endif 141 171
Note:
See TracChangeset
for help on using the changeset viewer.