Changeset dddb3dd0 for libcfa/src/concurrency/io
- Timestamp:
- Mar 2, 2021, 1:58:12 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 2cd784a
- Parents:
- 6047b00
- Location:
- libcfa/src/concurrency/io
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/call.cfa.in
r6047b00 rdddb3dd0 75 75 76 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 78 78 #endif 79 79 … … 185 185 return ', '.join(args_a) 186 186 187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, intsubmit_flags) {{187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{ 188 188 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 189 189 ssize_t res = {name}({args}); … … 216 216 217 217 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 218 cfa_io_submit( ctx, &idx, 1 );218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) ); 219 219 #endif 220 220 }}""" 221 221 222 SyncTemplate = """{ret} cfa_{name}({params}, intsubmit_flags) {{222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{ 223 223 io_future_t future; 224 224 … … 388 388 if c.define: 389 389 print("""#if defined({define}) 390 {ret} cfa_{name}({params}, intsubmit_flags);390 {ret} cfa_{name}({params}, __u64 submit_flags); 391 391 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 392 392 else: 393 print("{ret} cfa_{name}({params}, intsubmit_flags);"393 print("{ret} cfa_{name}({params}, __u64 submit_flags);" 394 394 .format(ret=c.ret, name=c.name, params=c.params)) 395 395 … … 399 399 if c.define: 400 400 print("""#if defined({define}) 401 void async_{name}(io_future_t & future, {params}, intsubmit_flags);401 void async_{name}(io_future_t & future, {params}, __u64 submit_flags); 402 402 #endif""".format(define=c.define,name=c.name, params=c.params)) 403 403 else: 404 print("void async_{name}(io_future_t & future, {params}, intsubmit_flags);"404 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);" 405 405 .format(name=c.name, params=c.params)) 406 406 print("\n") -
libcfa/src/concurrency/io/setup.cfa
r6047b00 rdddb3dd0 26 26 27 27 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 28 void __kernel_io_startup() {29 // Nothing to do without io_uring30 }31 32 void __kernel_io_shutdown() {33 // Nothing to do without io_uring34 }35 36 28 void ?{}(io_context_params & this) {} 37 29 … … 97 89 98 90 //============================================================================================= 99 // I/O Startup / Shutdown logic + Master Poller100 //=============================================================================================101 102 // IO Master poller loop forward103 static void * iopoll_loop( __attribute__((unused)) void * args );104 105 static struct {106 pthread_t thrd; // pthread handle to io poller thread107 void * stack; // pthread stack for io poller thread108 int epollfd; // file descriptor to the epoll instance109 volatile bool run; // Whether or not to continue110 volatile bool stopped; // Whether the poller has finished running111 volatile uint64_t epoch; // Epoch used for memory reclamation112 } iopoll;113 114 void __kernel_io_startup(void) {115 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );116 117 iopoll.epollfd = epoll_create1(0);118 if (iopoll.epollfd == -1) {119 abort( "internal error, epoll_create1\n");120 }121 122 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );123 124 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );125 iopoll.run = true;126 iopoll.stopped = false;127 iopoll.epoch = 0;128 }129 130 void __kernel_io_shutdown(void) {131 // Notify the io poller thread of the shutdown132 iopoll.run = false;133 sigval val = { 1 };134 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );135 136 // Wait for the io poller thread to finish137 138 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );139 140 int ret = close(iopoll.epollfd);141 if (ret == -1) {142 abort( "internal error, close epoll\n");143 }144 145 // Io polling is now fully stopped146 147 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );148 }149 150 static void * iopoll_loop( __attribute__((unused)) void * args ) {151 __processor_id_t id;152 id.full_proc = false;153 id.id = doregister(&id);154 __cfaabi_tls.this_proc_id = &id;155 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );156 157 // Block signals to control when they arrive158 sigset_t mask;159 sigfillset(&mask);160 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {161 abort( "internal error, pthread_sigmask" );162 }163 164 sigdelset( &mask, SIGUSR1 );165 166 // Create sufficient events167 struct epoll_event events[10];168 // Main loop169 while( iopoll.run ) {170 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");171 172 // increment the epoch to notify any deleters we are starting a new cycle173 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);174 175 // Wait for events176 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );177 178 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);179 180 // Check if an error occured181 if (nfds == -1) {182 if( errno == EINTR ) continue;183 abort( "internal error, pthread_sigmask" );184 }185 186 for(i; nfds) {187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;188 /* paranoid */ verify( io_ctx );189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);190 #if !defined( __CFA_NO_STATISTICS__ )191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;192 #endif193 194 eventfd_t v;195 eventfd_read(io_ctx->efd, &v);196 197 post( io_ctx->sem );198 }199 }200 201 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);202 203 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );204 unregister(&id);205 return 0p;206 }207 208 //=============================================================================================209 91 // I/O Context Constrution/Destruction 210 92 //============================================================================================= 211 93 212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in ); 94 95 96 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd ); 213 97 static void __io_uring_teardown( $io_context & this ); 214 98 static void __epoll_register($io_context & ctx); … … 217 101 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 218 102 219 void ?{}($io_context & this, struct cluster & cl) { 220 (this.self){ "IO Poller", cl }; 103 void ?{}($io_context & this, processor * proc, struct cluster & cl) { 104 /* paranoid */ verify( cl.io.arbiter ); 105 this.proc = proc; 106 this.arbiter = cl.io.arbiter; 221 107 this.ext_sq.empty = true; 222 this.revoked = true;223 __io_uring_setup( this, cl.io.params );108 (this.ext_sq.blocked){}; 109 __io_uring_setup( this, cl.io.params, proc->idle ); 224 110 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 225 226 __epoll_register(this); 227 228 __ioarbiter_register(*cl.io.arbiter, this); 229 230 __thrd_start( this, main ); 231 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd); 232 } 233 234 void ^?{}($io_context & mutex this) { 111 } 112 113 void ^?{}($io_context & this) { 235 114 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 236 237 ^(this.self){};238 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);239 240 __ioarbiter_unregister(*this.arbiter, this);241 242 __epoll_unregister(this);243 115 244 116 __io_uring_teardown( this ); … … 246 118 } 247 119 248 void ?{}(io_context & this, struct cluster & cl) {249 // this.ctx = new(cl);250 this.ctx = alloc();251 (*this.ctx){ cl };252 253 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);254 }255 256 void ^?{}(io_context & this) {257 post( this.ctx->sem );258 259 delete(this.ctx);260 }261 262 120 extern void __disable_interrupts_hard(); 263 121 extern void __enable_interrupts_hard(); 264 122 265 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {123 static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) { 266 124 // Step 1 : call to setup 267 125 struct io_uring_params params; … … 339 197 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 340 198 341 sq.kring.ready = 0;342 199 sq.kring.released = 0; 343 200 … … 362 219 // io_uring_register is so f*cking slow on some machine that it 363 220 // will never succeed if preemption isn't hard blocked 221 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd); 222 364 223 __disable_interrupts_hard(); 365 224 366 int efd = eventfd(0, 0); 367 if (efd < 0) { 368 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 369 } 370 371 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 225 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1); 372 226 if (ret < 0) { 373 227 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); … … 375 229 376 230 __enable_interrupts_hard(); 231 232 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd); 377 233 378 234 // some paranoid checks … … 390 246 this.ring_flags = 0; 391 247 this.fd = fd; 392 this.efd = efd;393 248 } 394 249 … … 411 266 // close the file descriptor 412 267 close(this.fd); 413 close(this.efd);414 268 415 269 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 416 270 } 417 271 272 void __cfa_io_start( processor * proc ) { 273 proc->io.ctx = alloc(); 274 (*proc->io.ctx){proc, *proc->cltr}; 275 } 276 void __cfa_io_stop ( processor * proc ) { 277 ^(*proc->io.ctx){}; 278 free(proc->io.ctx); 279 } 280 418 281 //============================================================================================= 419 282 // I/O Context Sleep 420 283 //============================================================================================= 421 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {422 struct epoll_event ev;423 ev.events = EPOLLIN | EPOLLONESHOT;424 ev.data.u64 = (__u64)&ctx;425 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);426 if (ret < 0) {427 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );428 }429 }430 431 static void __epoll_register($io_context & ctx) {432 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");433 }434 435 static void __epoll_unregister($io_context & ctx) {436 // Read the current epoch so we know when to stop437 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);438 439 // Remove the fd from the iopoller440 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");441 442 // Notify the io poller thread of the shutdown443 iopoll.run = false;444 sigval val = { 1 };445 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );446 447 // Make sure all this is done448 __atomic_thread_fence(__ATOMIC_SEQ_CST);449 450 // Wait for the next epoch451 while(curr == iopoll.epoch && !iopoll.stopped) Pause();452 }453 454 void __ioctx_prepare_block($io_context & ctx) {455 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);456 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");457 }284 // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 285 // struct epoll_event ev; 286 // ev.events = EPOLLIN | EPOLLONESHOT; 287 // ev.data.u64 = (__u64)&ctx; 288 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 289 // if (ret < 0) { 290 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 291 // } 292 // } 293 294 // static void __epoll_register($io_context & ctx) { 295 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 296 // } 297 298 // static void __epoll_unregister($io_context & ctx) { 299 // // Read the current epoch so we know when to stop 300 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 301 302 // // Remove the fd from the iopoller 303 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 304 305 // // Notify the io poller thread of the shutdown 306 // iopoll.run = false; 307 // sigval val = { 1 }; 308 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 309 310 // // Make sure all this is done 311 // __atomic_thread_fence(__ATOMIC_SEQ_CST); 312 313 // // Wait for the next epoch 314 // while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 315 // } 316 317 // void __ioctx_prepare_block($io_context & ctx) { 318 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 319 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 320 // } 458 321 459 322 … … 466 329 467 330 void ^?{}( $io_arbiter & mutex this ) { 468 / * paranoid */ verify( empty(this.assigned) );469 / * paranoid */ verify( empty(this.available) );331 // /* paranoid */ verify( empty(this.assigned) ); 332 // /* paranoid */ verify( empty(this.available) ); 470 333 /* paranoid */ verify( is_empty(this.pending.blocked) ); 471 334 } -
libcfa/src/concurrency/io/types.hfa
r6047b00 rdddb3dd0 38 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 ready; // one passed last index added to array ()41 40 volatile __u32 released; // one passed last index released back to the free list 42 41 … … 97 96 98 97 struct __attribute__((aligned(128))) $io_context { 99 inline Seqable; 100 101 volatile bool revoked; 98 $io_arbiter * arbiter; 102 99 processor * proc; 103 104 $io_arbiter * arbiter;105 100 106 101 struct { … … 113 108 __u32 ring_flags; 114 109 int fd; 115 int efd;116 117 single_sem sem;118 $thread self;119 110 }; 120 121 void main( $io_context & this );122 static inline $thread * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; }123 static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; }124 static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); }125 static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); }126 void ^?{}( $io_context & mutex this );127 111 128 112 monitor __attribute__((aligned(128))) $io_arbiter { … … 132 116 volatile bool flag; 133 117 } pending; 134 135 Sequence($io_context) assigned;136 137 Sequence($io_context) available;138 118 }; 139 119 … … 167 147 #endif 168 148 169 void __ioctx_prepare_block($io_context & ctx);149 // void __ioctx_prepare_block($io_context & ctx); 170 150 #endif 171 151
Note:
See TracChangeset
for help on using the changeset viewer.