Changeset dddb3dd0 for libcfa/src/concurrency/io/setup.cfa
- Timestamp:
- Mar 2, 2021, 1:58:12 PM (3 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 2cd784a
- Parents:
- 6047b00
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/setup.cfa
r6047b00 rdddb3dd0 26 26 27 27 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 28 void __kernel_io_startup() {29 // Nothing to do without io_uring30 }31 32 void __kernel_io_shutdown() {33 // Nothing to do without io_uring34 }35 36 28 void ?{}(io_context_params & this) {} 37 29 … … 97 89 98 90 //============================================================================================= 99 // I/O Startup / Shutdown logic + Master Poller100 //=============================================================================================101 102 // IO Master poller loop forward103 static void * iopoll_loop( __attribute__((unused)) void * args );104 105 static struct {106 pthread_t thrd; // pthread handle to io poller thread107 void * stack; // pthread stack for io poller thread108 int epollfd; // file descriptor to the epoll instance109 volatile bool run; // Whether or not to continue110 volatile bool stopped; // Whether the poller has finished running111 volatile uint64_t epoch; // Epoch used for memory reclamation112 } iopoll;113 114 void __kernel_io_startup(void) {115 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );116 117 iopoll.epollfd = epoll_create1(0);118 if (iopoll.epollfd == -1) {119 abort( "internal error, epoll_create1\n");120 }121 122 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );123 124 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );125 iopoll.run = true;126 iopoll.stopped = false;127 iopoll.epoch = 0;128 }129 130 void __kernel_io_shutdown(void) {131 // Notify the io poller thread of the shutdown132 iopoll.run = false;133 sigval val = { 1 };134 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );135 136 // Wait for the io poller thread to finish137 138 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );139 140 int ret = close(iopoll.epollfd);141 if (ret == -1) {142 abort( "internal error, close epoll\n");143 }144 145 // Io polling is now fully stopped146 147 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );148 }149 150 static void * iopoll_loop( __attribute__((unused)) void * args ) {151 __processor_id_t id;152 id.full_proc = false;153 id.id = doregister(&id);154 __cfaabi_tls.this_proc_id = &id;155 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );156 157 // Block signals to control when they arrive158 sigset_t mask;159 sigfillset(&mask);160 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {161 abort( "internal error, pthread_sigmask" );162 }163 164 sigdelset( &mask, SIGUSR1 );165 166 // Create sufficient events167 struct epoll_event events[10];168 // Main loop169 while( iopoll.run ) {170 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");171 172 // increment the epoch to notify any deleters we are starting a new cycle173 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);174 175 // Wait for events176 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );177 178 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);179 180 // Check if an error occured181 if (nfds == -1) {182 if( errno == EINTR ) continue;183 abort( "internal error, pthread_sigmask" );184 }185 186 for(i; nfds) {187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;188 /* paranoid */ verify( io_ctx );189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);190 #if !defined( __CFA_NO_STATISTICS__ )191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;192 #endif193 194 eventfd_t v;195 eventfd_read(io_ctx->efd, &v);196 197 post( io_ctx->sem );198 }199 }200 201 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);202 203 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );204 unregister(&id);205 return 0p;206 }207 208 //=============================================================================================209 91 // I/O Context Constrution/Destruction 210 92 //============================================================================================= 211 93 212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in ); 94 95 96 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd ); 213 97 static void __io_uring_teardown( $io_context & this ); 214 98 static void __epoll_register($io_context & ctx); … … 217 101 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 218 102 219 void ?{}($io_context & this, struct cluster & cl) { 220 (this.self){ "IO Poller", cl }; 103 void ?{}($io_context & this, processor * proc, struct cluster & cl) { 104 /* paranoid */ verify( cl.io.arbiter ); 105 this.proc = proc; 106 this.arbiter = cl.io.arbiter; 221 107 this.ext_sq.empty = true; 222 this.revoked = true;223 __io_uring_setup( this, cl.io.params );108 (this.ext_sq.blocked){}; 109 __io_uring_setup( this, cl.io.params, proc->idle ); 224 110 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 225 226 __epoll_register(this); 227 228 __ioarbiter_register(*cl.io.arbiter, this); 229 230 __thrd_start( this, main ); 231 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd); 232 } 233 234 void ^?{}($io_context & mutex this) { 111 } 112 113 void ^?{}($io_context & this) { 235 114 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 236 237 ^(this.self){};238 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);239 240 __ioarbiter_unregister(*this.arbiter, this);241 242 __epoll_unregister(this);243 115 244 116 __io_uring_teardown( this ); … … 246 118 } 247 119 248 void ?{}(io_context & this, struct cluster & cl) {249 // this.ctx = new(cl);250 this.ctx = alloc();251 (*this.ctx){ cl };252 253 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);254 }255 256 void ^?{}(io_context & this) {257 post( this.ctx->sem );258 259 delete(this.ctx);260 }261 262 120 extern void __disable_interrupts_hard(); 263 121 extern void __enable_interrupts_hard(); 264 122 265 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {123 static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) { 266 124 // Step 1 : call to setup 267 125 struct io_uring_params params; … … 339 197 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 340 198 341 sq.kring.ready = 0;342 199 sq.kring.released = 0; 343 200 … … 362 219 // io_uring_register is so f*cking slow on some machine that it 363 220 // will never succeed if preemption isn't hard blocked 221 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd); 222 364 223 __disable_interrupts_hard(); 365 224 366 int efd = eventfd(0, 0); 367 if (efd < 0) { 368 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 369 } 370 371 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 225 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1); 372 226 if (ret < 0) { 373 227 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); … … 375 229 376 230 __enable_interrupts_hard(); 231 232 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd); 377 233 378 234 // some paranoid checks … … 390 246 this.ring_flags = 0; 391 247 this.fd = fd; 392 this.efd = efd;393 248 } 394 249 … … 411 266 // close the file descriptor 412 267 close(this.fd); 413 close(this.efd);414 268 415 269 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 416 270 } 417 271 272 void __cfa_io_start( processor * proc ) { 273 proc->io.ctx = alloc(); 274 (*proc->io.ctx){proc, *proc->cltr}; 275 } 276 void __cfa_io_stop ( processor * proc ) { 277 ^(*proc->io.ctx){}; 278 free(proc->io.ctx); 279 } 280 418 281 //============================================================================================= 419 282 // I/O Context Sleep 420 283 //============================================================================================= 421 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {422 struct epoll_event ev;423 ev.events = EPOLLIN | EPOLLONESHOT;424 ev.data.u64 = (__u64)&ctx;425 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);426 if (ret < 0) {427 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );428 }429 }430 431 static void __epoll_register($io_context & ctx) {432 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");433 }434 435 static void __epoll_unregister($io_context & ctx) {436 // Read the current epoch so we know when to stop437 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);438 439 // Remove the fd from the iopoller440 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");441 442 // Notify the io poller thread of the shutdown443 iopoll.run = false;444 sigval val = { 1 };445 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );446 447 // Make sure all this is done448 __atomic_thread_fence(__ATOMIC_SEQ_CST);449 450 // Wait for the next epoch451 while(curr == iopoll.epoch && !iopoll.stopped) Pause();452 }453 454 void __ioctx_prepare_block($io_context & ctx) {455 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);456 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");457 }284 // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 285 // struct epoll_event ev; 286 // ev.events = EPOLLIN | EPOLLONESHOT; 287 // ev.data.u64 = (__u64)&ctx; 288 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 289 // if (ret < 0) { 290 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 291 // } 292 // } 293 294 // static void __epoll_register($io_context & ctx) { 295 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 296 // } 297 298 // static void __epoll_unregister($io_context & ctx) { 299 // // Read the current epoch so we know when to stop 300 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 301 302 // // Remove the fd from the iopoller 303 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 304 305 // // Notify the io poller thread of the shutdown 306 // iopoll.run = false; 307 // sigval val = { 1 }; 308 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 309 310 // // Make sure all this is done 311 // __atomic_thread_fence(__ATOMIC_SEQ_CST); 312 313 // // Wait for the next epoch 314 // while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 315 // } 316 317 // void __ioctx_prepare_block($io_context & ctx) { 318 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 319 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 320 // } 458 321 459 322 … … 466 329 467 330 void ^?{}( $io_arbiter & mutex this ) { 468 / * paranoid */ verify( empty(this.assigned) );469 / * paranoid */ verify( empty(this.available) );331 // /* paranoid */ verify( empty(this.assigned) ); 332 // /* paranoid */ verify( empty(this.available) ); 470 333 /* paranoid */ verify( is_empty(this.pending.blocked) ); 471 334 }
Note: See TracChangeset
for help on using the changeset viewer.