Changeset 8e4aa05 for libcfa/src/concurrency/io
- Timestamp:
- Mar 4, 2021, 7:40:25 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 77d601f
- Parents:
- 342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency/io
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/call.cfa.in
r342af53 r8e4aa05 54 54 | IOSQE_IO_DRAIN 55 55 #endif 56 #if defined(CFA_HAVE_IOSQE_IO_LINK) 57 | IOSQE_IO_LINK 58 #endif 59 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 60 | IOSQE_IO_HARDLINK 61 #endif 56 62 #if defined(CFA_HAVE_IOSQE_ASYNC) 57 63 | IOSQE_ASYNC 58 64 #endif 59 ; 60 61 static const __u32 LINK_FLAGS = 0 62 #if defined(CFA_HAVE_IOSQE_IO_LINK) 63 | IOSQE_IO_LINK 64 #endif 65 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK) 66 | IOSQE_IO_HARDLINK 65 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED) 66 | IOSQE_BUFFER_SELECTED 67 67 #endif 68 68 ; … … 74 74 ; 75 75 76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 79 static inline io_context * __get_io_context( void ) { 80 cluster * cltr = active_cluster(); 81 82 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n"); 83 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr ); 84 85 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr); 86 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ]; 87 } 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 88 78 #endif 89 79 … … 98 88 99 89 extern "C" { 100 #include < sys/types.h>90 #include <asm/types.h> 101 91 #include <sys/socket.h> 102 92 #include <sys/syscall.h> … … 142 132 extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 143 133 144 extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);134 extern ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags); 145 135 extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags); 146 136 } … … 195 185 return ', '.join(args_a) 196 186 197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{ 198 188 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 199 189 ssize_t res = {name}({args}); … … 205 195 }} 206 196 #else 207 // we don't support LINK yet208 if( 0 != (submit_flags & LINK_FLAGS) ) {{209 errno = ENOTSUP; return -1;210 }}211 212 if( !context ) {{213 context = __get_io_context();214 }}215 if(cancellation) {{216 cancellation->target = (__u64)(uintptr_t)&future;217 }}218 219 197 __u8 sflags = REGULAR_FLAGS & submit_flags; 220 struct __io_data & ring = *context->thrd.ring;221 222 198 __u32 idx; 223 199 struct io_uring_sqe * sqe; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future);200 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 ); 225 201 226 202 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = (__u64)(uintptr_t)&future; 227 204 sqe->flags = sflags; 228 205 sqe->ioprio = 0; … … 239 216 240 217 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 241 __submit( context, idx);218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) ); 242 219 #endif 243 220 }}""" 244 221 245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{ 246 if( timeout >= 0 ) {{ 247 errno = ENOTSUP; 248 return -1; 249 }} 222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{ 250 223 io_future_t future; 251 224 252 async_{name}( future, {args}, submit_flags , cancellation, context);225 async_{name}( future, {args}, submit_flags ); 253 226 254 227 wait( future ); … … 393 366 }), 394 367 # CFA_HAVE_IORING_OP_SPLICE 395 Call('SPLICE', 'ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags)', {368 Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', { 396 369 'splice_fd_in': 'fd_in', 397 370 'splice_off_in': 'off_in ? (__u64)*off_in : (__u64)-1', … … 415 388 if c.define: 416 389 print("""#if defined({define}) 417 {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);390 {ret} cfa_{name}({params}, __u64 submit_flags); 418 391 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 419 392 else: 420 print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);"393 print("{ret} cfa_{name}({params}, __u64 submit_flags);" 421 394 .format(ret=c.ret, name=c.name, params=c.params)) 422 395 … … 426 399 if c.define: 427 400 print("""#if defined({define}) 428 void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);401 void async_{name}(io_future_t & future, {params}, __u64 submit_flags); 429 402 #endif""".format(define=c.define,name=c.name, params=c.params)) 430 403 else: 431 print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);"404 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);" 432 405 .format(name=c.name, params=c.params)) 433 406 print("\n") … … 474 447 475 448 print(""" 476 //-----------------------------------------------------------------------------477 bool cancel(io_cancellation & this) {478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)479 return false;480 #else481 io_future_t future;482 483 io_context * context = __get_io_context();484 485 __u8 sflags = 0;486 struct __io_data & ring = *context->thrd.ring;487 488 __u32 idx;489 volatile struct io_uring_sqe * sqe;490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;493 sqe->opcode = IORING_OP_ASYNC_CANCEL;494 sqe->flags = sflags;495 sqe->addr = this.target;496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future );498 __submit( context, idx );499 500 wait(future);501 502 if( future.result == 0 ) return true; // Entry found503 if( future.result == -EALREADY) return true; // Entry found but in progress504 if( future.result == -ENOENT ) return false; // Entry not found505 return false;506 #endif507 }508 509 449 //----------------------------------------------------------------------------- 510 450 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
r342af53 r8e4aa05 26 26 27 27 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 28 void __kernel_io_startup() {29 // Nothing to do without io_uring30 }31 32 void __kernel_io_shutdown() {33 // Nothing to do without io_uring34 }35 36 28 void ?{}(io_context_params & this) {} 37 29 38 void ?{}(io_context & this, struct cluster & cl) {} 39 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 40 41 void ^?{}(io_context & this) {} 42 void ^?{}(io_context & this, bool cluster_context) {} 30 void ?{}($io_context & this, struct cluster & cl) {} 31 void ^?{}($io_context & this) {} 32 33 void __cfa_io_start( processor * proc ) {} 34 void __cfa_io_flush( processor * proc ) {} 35 void __cfa_io_stop ( processor * proc ) {} 36 37 $io_arbiter * create(void) { return 0p; } 38 void destroy($io_arbiter *) {} 43 39 44 40 #else … … 65 61 void ?{}(io_context_params & this) { 66 62 this.num_entries = 256; 67 this.num_ready = 256;68 this.submit_aff = -1;69 this.eager_submits = false;70 this.poller_submits = false;71 this.poll_submit = false;72 this.poll_complete = false;73 63 } 74 64 … … 103 93 104 94 //============================================================================================= 105 // I/O Startup / Shutdown logic + Master Poller106 //=============================================================================================107 108 // IO Master poller loop forward109 static void * iopoll_loop( __attribute__((unused)) void * args );110 111 static struct {112 pthread_t thrd; // pthread handle to io poller thread113 void * stack; // pthread stack for io poller thread114 int epollfd; // file descriptor to the epoll instance115 volatile bool run; // Whether or not to continue116 } iopoll;117 118 void __kernel_io_startup(void) {119 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );120 121 iopoll.epollfd = epoll_create1(0);122 if (iopoll.epollfd == -1) {123 abort( "internal error, epoll_create1\n");124 }125 126 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );127 128 iopoll.run = true;129 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );130 }131 132 void __kernel_io_shutdown(void) {133 // Notify the io poller thread of the shutdown134 iopoll.run = false;135 sigval val = { 1 };136 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );137 138 // Wait for the io poller thread to finish139 140 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );141 142 int ret = close(iopoll.epollfd);143 if (ret == -1) {144 abort( "internal error, close epoll\n");145 }146 147 // Io polling is now fully stopped148 149 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );150 }151 152 static void * iopoll_loop( __attribute__((unused)) void * args ) {153 __processor_id_t id;154 id.full_proc = false;155 id.id = doregister(&id);156 __cfaabi_tls.this_proc_id = &id;157 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );158 159 // Block signals to control when they arrive160 sigset_t mask;161 sigfillset(&mask);162 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {163 abort( "internal error, pthread_sigmask" );164 }165 166 sigdelset( &mask, SIGUSR1 );167 168 // Create sufficient events169 struct epoll_event events[10];170 // Main loop171 while( iopoll.run ) {172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");173 174 // Wait for events175 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);178 179 // Check if an error occured180 if (nfds == -1) {181 if( errno == EINTR ) continue;182 abort( "internal error, pthread_sigmask" );183 }184 185 for(i; nfds) {186 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;187 /* paranoid */ verify( io_ctx );188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);189 #if !defined( __CFA_NO_STATISTICS__ )190 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;191 #endif192 193 eventfd_t v;194 eventfd_read(io_ctx->ring->efd, &v);195 196 post( io_ctx->sem );197 }198 }199 200 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );201 unregister(&id);202 return 0p;203 }204 205 //=============================================================================================206 95 // I/O Context Constrution/Destruction 207 96 //============================================================================================= 208 97 209 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 210 void main( $io_ctx_thread & this ); 211 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 212 void ^?{}( $io_ctx_thread & mutex this ) {} 213 214 static void __io_create ( __io_data & this, const io_context_params & params_in ); 215 static void __io_destroy( __io_data & this ); 216 217 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 218 (this.thrd){ cl }; 219 this.thrd.ring = malloc(); 220 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 221 __io_create( *this.thrd.ring, params ); 222 223 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 224 this.thrd.done = false; 225 __thrd_start( this.thrd, main ); 226 227 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 228 } 229 230 void ?{}(io_context & this, struct cluster & cl) { 231 io_context_params params; 232 (this){ cl, params }; 233 } 234 235 void ^?{}(io_context & this, bool cluster_context) { 236 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 237 238 // Notify the thread of the shutdown 239 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 240 241 // If this is an io_context within a cluster, things get trickier 242 $thread & thrd = this.thrd.self; 243 if( cluster_context ) { 244 // We are about to do weird things with the threads 245 // we don't need interrupts to complicate everything 246 disable_interrupts(); 247 248 // Get cluster info 249 cluster & cltr = *thrd.curr_cluster; 250 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 251 /* paranoid */ verify( !ready_mutate_islocked() ); 252 253 // We need to adjust the clean-up based on where the thread is 254 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case 256 // The thread was preempted or ready to run and now it is on the ready queue 257 // but the cluster is shutting down, so there aren't any processors to run the ready queue 258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 259 260 ready_schedule_lock(); 261 // The thread should on the list 262 /* paranoid */ verify( thrd.link.next != 0p ); 263 264 // Remove the thread from the ready queue of this cluster 265 // The thread should be the last on the list 266 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 267 /* paranoid */ verify( removed ); 268 thrd.link.next = 0p; 269 thrd.link.prev = 0p; 270 271 // Fixup the thread state 272 thrd.state = Blocked; 273 thrd.ticket = TICKET_BLOCKED; 274 thrd.preempted = __NO_PREEMPTION; 275 276 ready_schedule_unlock(); 277 278 // Pretend like the thread was blocked all along 279 } 280 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not) 282 if( thrd.state == Blocked ) { 283 // This is the "easy case" 284 // The thread is parked and can easily be moved to active cluster 285 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 286 thrd.curr_cluster = active_cluster(); 287 288 // unpark the fast io_poller 289 unpark( &thrd ); 290 } 291 else { 292 // The thread is in a weird state 293 // I don't know what to do here 294 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 295 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts. 298 enable_interrupts( __cfaabi_dbg_ctx ); 299 } else { 300 post( this.thrd.sem ); 301 } 302 303 ^(this.thrd){}; 304 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 305 306 __io_destroy( *this.thrd.ring ); 307 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 308 309 free(this.thrd.ring); 310 } 311 312 void ^?{}(io_context & this) { 313 ^(this){ false }; 314 } 315 316 static void __io_create( __io_data & this, const io_context_params & params_in ) { 98 99 100 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd ); 101 static void __io_uring_teardown( $io_context & this ); 102 static void __epoll_register($io_context & ctx); 103 static void __epoll_unregister($io_context & ctx); 104 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx ); 105 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 106 107 void ?{}($io_context & this, processor * proc, struct cluster & cl) { 108 /* paranoid */ verify( cl.io.arbiter ); 109 this.proc = proc; 110 this.arbiter = cl.io.arbiter; 111 this.ext_sq.empty = true; 112 (this.ext_sq.blocked){}; 113 __io_uring_setup( this, cl.io.params, proc->idle ); 114 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 115 } 116 117 void ^?{}($io_context & this) { 118 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 119 120 __io_uring_teardown( this ); 121 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd); 122 } 123 124 extern void __disable_interrupts_hard(); 125 extern void __enable_interrupts_hard(); 126 127 static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) { 317 128 // Step 1 : call to setup 318 129 struct io_uring_params params; 319 130 memset(¶ms, 0, sizeof(params)); 320 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;321 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;131 // if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 132 // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 322 133 323 134 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; … … 325 136 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 326 137 } 327 if( params_in.poller_submits && params_in.eager_submits ) {328 abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");329 }330 138 331 139 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 335 143 336 144 // Step 2 : mmap result 337 memset( &this, 0, sizeof(struct __io_data) ); 338 struct __submition_data & sq = this.submit_q; 339 struct __completion_data & cq = this.completion_q; 145 struct __sub_ring_t & sq = this.sq; 146 struct __cmp_ring_t & cq = this.cq; 340 147 341 148 // calculate the right ring size … … 386 193 // Get the pointers from the kernel to fill the structure 387 194 // submit queue 388 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 389 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 390 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 391 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 392 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 393 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 394 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 395 sq.prev_head = *sq.head; 396 397 { 398 const __u32 num = *sq.num; 399 for( i; num ) { 400 __sqe_clean( &sq.sqes[i] ); 401 } 402 } 403 404 (sq.submit_lock){}; 405 (sq.release_lock){}; 406 407 if( params_in.poller_submits || params_in.eager_submits ) { 408 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 409 sq.ready_cnt = max( params_in.num_ready, 8 ); 410 sq.ready = alloc( sq.ready_cnt, 64`align ); 411 for(i; sq.ready_cnt) { 412 sq.ready[i] = -1ul32; 413 } 414 sq.prev_ready = 0; 415 } 416 else { 417 sq.ready_cnt = 0; 418 sq.ready = 0p; 419 sq.prev_ready = 0; 420 } 195 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 196 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 197 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 198 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 199 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 200 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 201 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 202 203 sq.kring.released = 0; 204 205 sq.free_ring.head = 0; 206 sq.free_ring.tail = *sq.num; 207 sq.free_ring.array = alloc( *sq.num, 128`align ); 208 for(i; (__u32)*sq.num) { 209 sq.free_ring.array[i] = i; 210 } 211 212 sq.to_submit = 0; 421 213 422 214 // completion queue … … 429 221 430 222 // Step 4 : eventfd 431 int efd; 432 for() { 433 efd = eventfd(0, 0); 434 if (efd < 0) { 435 if (errno == EINTR) continue; 436 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 437 } 438 break; 439 } 440 441 int ret; 442 for() { 443 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 444 if (ret < 0) { 445 if (errno == EINTR) continue; 446 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 447 } 448 break; 449 } 223 // io_uring_register is so f*cking slow on some machine that it 224 // will never succeed if preemption isn't hard blocked 225 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd); 226 227 __disable_interrupts_hard(); 228 229 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1); 230 if (ret < 0) { 231 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 232 } 233 234 __enable_interrupts_hard(); 235 236 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd); 450 237 451 238 // some paranoid checks … … 457 244 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 458 245 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 459 /* paranoid */ verifyf( (*sq. head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );460 /* paranoid */ verifyf( (*sq. tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );246 /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head ); 247 /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail ); 461 248 462 249 // Update the global ring info 463 this.ring_flags = params.flags;250 this.ring_flags = 0; 464 251 this.fd = fd; 465 this.efd = efd; 466 this.eager_submits = params_in.eager_submits; 467 this.poller_submits = params_in.poller_submits; 468 } 469 470 static void __io_destroy( __io_data & this ) { 252 } 253 254 static void __io_uring_teardown( $io_context & this ) { 471 255 // Shutdown the io rings 472 struct __sub mition_data & sq = this.submit_q;473 struct __c ompletion_data & cq = this.completion_q;256 struct __sub_ring_t & sq = this.sq; 257 struct __cmp_ring_t & cq = this.cq; 474 258 475 259 // unmap the submit queue entries … … 486 270 // close the file descriptor 487 271 close(this.fd); 488 close(this.efd); 489 490 free( this.submit_q.ready ); // Maybe null, doesn't matter 272 273 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 274 } 275 276 void __cfa_io_start( processor * proc ) { 277 proc->io.ctx = alloc(); 278 (*proc->io.ctx){proc, *proc->cltr}; 279 } 280 void __cfa_io_stop ( processor * proc ) { 281 ^(*proc->io.ctx){}; 282 free(proc->io.ctx); 491 283 } 492 284 … … 494 286 // I/O Context Sleep 495 287 //============================================================================================= 496 #define IOEVENTS EPOLLIN | EPOLLONESHOT 497 498 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 499 struct epoll_event ev; 500 ev.events = IOEVENTS; 501 ev.data.u64 = (__u64)&ctx; 502 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); 503 if (ret < 0) { 504 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 505 } 506 } 507 508 void __ioctx_register($io_ctx_thread & ctx) { 509 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 510 } 511 512 void __ioctx_prepare_block($io_ctx_thread & ctx) { 513 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 514 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 515 } 288 // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 289 // struct epoll_event ev; 290 // ev.events = EPOLLIN | EPOLLONESHOT; 291 // ev.data.u64 = (__u64)&ctx; 292 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 293 // if (ret < 0) { 294 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 295 // } 296 // } 297 298 // static void __epoll_register($io_context & ctx) { 299 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 300 // } 301 302 // static void __epoll_unregister($io_context & ctx) { 303 // // Read the current epoch so we know when to stop 304 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 305 306 // // Remove the fd from the iopoller 307 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 308 309 // // Notify the io poller thread of the shutdown 310 // iopoll.run = false; 311 // sigval val = { 1 }; 312 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 313 314 // // Make sure all this is done 315 // __atomic_thread_fence(__ATOMIC_SEQ_CST); 316 317 // // Wait for the next epoch 318 // while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 319 // } 320 321 // void __ioctx_prepare_block($io_context & ctx) { 322 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 323 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 324 // } 325 516 326 517 327 //============================================================================================= 518 328 // I/O Context Misc Setup 519 329 //============================================================================================= 520 void register_fixed_files( io_context & ctx, int * files, unsigned count ) { 521 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 522 if( ret < 0 ) { 523 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 524 } 525 526 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 527 } 528 529 void register_fixed_files( cluster & cltr, int * files, unsigned count ) { 530 for(i; cltr.io.cnt) { 531 register_fixed_files( cltr.io.ctxs[i], files, count ); 532 } 533 } 330 void ?{}( $io_arbiter & this ) { 331 this.pending.flag = false; 332 } 333 334 void ^?{}( $io_arbiter & mutex this ) { 335 // /* paranoid */ verify( empty(this.assigned) ); 336 // /* paranoid */ verify( empty(this.available) ); 337 /* paranoid */ verify( is_empty(this.pending.blocked) ); 338 } 339 340 $io_arbiter * create(void) { 341 return new(); 342 } 343 void destroy($io_arbiter * arbiter) { 344 delete(arbiter); 345 } 346 347 //============================================================================================= 348 // I/O Context Misc Setup 349 //============================================================================================= 350 534 351 #endif -
libcfa/src/concurrency/io/types.hfa
r342af53 r8e4aa05 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa -- 7 // io/types.hfa -- PRIVATE 8 // Types used by the I/O subsystem 8 9 // 9 10 // Author : Thierry Delisle … … 21 22 22 23 #include "bits/locks.hfa" 24 #include "kernel/fwd.hfa" 23 25 24 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 25 #define LEADER_LOCK 26 struct __leaderlock_t { 27 struct $thread * volatile value; // ($thread) next_leader | (bool:1) is_locked 28 }; 27 #include "bits/sequence.hfa" 28 #include "monitor.hfa" 29 29 30 static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; } 30 struct processor; 31 monitor $io_arbiter; 31 32 32 33 //----------------------------------------------------------------------- 33 34 // Ring Data structure 34 struct __submition_data { 35 // Head and tail of the ring (associated with array) 36 volatile __u32 * head; 37 volatile __u32 * tail; 38 volatile __u32 prev_head; 35 struct __sub_ring_t { 36 struct { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 released; // one passed last index released back to the free list 39 41 40 // The actual kernel ring which uses head/tail 41 // indexes into the sqes arrays 42 __u32 * array; 42 // The actual kernel ring which uses head/tail 43 // indexes into the sqes arrays 44 __u32 * array; 45 } kring; 46 47 struct { 48 volatile __u32 head; 49 volatile __u32 tail; 50 // The ring which contains free allocations 51 // indexes into the sqes arrays 52 __u32 * array; 53 } free_ring; 54 55 // number of sqes to submit on next system call. 56 __u32 to_submit; 43 57 44 58 // number of entries and mask to go with it … … 46 60 const __u32 * mask; 47 61 48 // Submission flags (Not sure what for)62 // Submission flags, currently only IORING_SETUP_SQPOLL 49 63 __u32 * flags; 50 64 51 // number of sqes not submitted (whatever that means) 65 // number of sqes not submitted 66 // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer. 52 67 __u32 * dropped; 53 68 54 // Like head/tail but not seen by the kernel55 volatile __u32 * ready;56 __u32 ready_cnt;57 __u32 prev_ready;58 59 #if defined(LEADER_LOCK)60 __leaderlock_t submit_lock;61 #else62 __spinlock_t submit_lock;63 #endif64 __spinlock_t release_lock;65 66 69 // A buffer of sqes (not the actual ring) 67 volatilestruct io_uring_sqe * sqes;70 struct io_uring_sqe * sqes; 68 71 69 72 // The location and size of the mmaped area … … 72 75 }; 73 76 74 struct __c ompletion_data{77 struct __cmp_ring_t { 75 78 // Head and tail of the ring 76 79 volatile __u32 * head; … … 81 84 const __u32 * num; 82 85 83 // number of cqes not submitted (whatever that means)86 // I don't know what this value is for 84 87 __u32 * overflow; 85 88 … … 92 95 }; 93 96 94 struct __io_data { 95 struct __submition_data submit_q; 96 struct __completion_data completion_q; 97 struct __attribute__((aligned(128))) $io_context { 98 $io_arbiter * arbiter; 99 processor * proc; 100 101 struct { 102 volatile bool empty; 103 condition blocked; 104 } ext_sq; 105 106 struct __sub_ring_t sq; 107 struct __cmp_ring_t cq; 97 108 __u32 ring_flags; 98 109 int fd; 99 int efd; 100 bool eager_submits:1; 101 bool poller_submits:1; 110 }; 111 112 monitor __attribute__((aligned(128))) $io_arbiter { 113 struct { 114 condition blocked; 115 $io_context * ctx; 116 volatile bool flag; 117 } pending; 102 118 }; 103 119 … … 131 147 #endif 132 148 133 struct $io_ctx_thread; 134 void __ioctx_register($io_ctx_thread & ctx); 135 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 149 // void __ioctx_prepare_block($io_context & ctx); 137 150 #endif 138 151
Note:
See TracChangeset
for help on using the changeset viewer.