Ignore:
Timestamp:
Mar 2, 2021, 1:58:12 PM (8 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
arm-eh, jacob/cs343-translation, master, new-ast-unique-expr
Children:
2cd784a
Parents:
6047b00
Message:

Changed io to use ring per kernel threads.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io/setup.cfa

    r6047b00 rdddb3dd0  
    2626
    2727#if !defined(CFA_HAVE_LINUX_IO_URING_H)
    28         void __kernel_io_startup() {
    29                 // Nothing to do without io_uring
    30         }
    31 
    32         void __kernel_io_shutdown() {
    33                 // Nothing to do without io_uring
    34         }
    35 
    3628        void ?{}(io_context_params & this) {}
    3729
     
    9789
    9890//=============================================================================================
    99 // I/O Startup / Shutdown logic + Master Poller
    100 //=============================================================================================
    101 
    102         // IO Master poller loop forward
    103         static void * iopoll_loop( __attribute__((unused)) void * args );
    104 
    105         static struct {
    106                       pthread_t  thrd;    // pthread handle to io poller thread
    107                       void *     stack;   // pthread stack for io poller thread
    108                       int        epollfd; // file descriptor to the epoll instance
    109                 volatile     bool run;     // Whether or not to continue
    110                 volatile     bool stopped; // Whether the poller has finished running
    111                 volatile uint64_t epoch;   // Epoch used for memory reclamation
    112         } iopoll;
    113 
    114         void __kernel_io_startup(void) {
    115                 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
    116 
    117                 iopoll.epollfd = epoll_create1(0);
    118                 if (iopoll.epollfd == -1) {
    119                         abort( "internal error, epoll_create1\n");
    120                 }
    121 
    122                 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
    123 
    124                 iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
    125                 iopoll.run     = true;
    126                 iopoll.stopped = false;
    127                 iopoll.epoch   = 0;
    128         }
    129 
    130         void __kernel_io_shutdown(void) {
    131                 // Notify the io poller thread of the shutdown
    132                 iopoll.run = false;
    133                 sigval val = { 1 };
    134                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    135 
    136                 // Wait for the io poller thread to finish
    137 
    138                 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
    139 
    140                 int ret = close(iopoll.epollfd);
    141                 if (ret == -1) {
    142                         abort( "internal error, close epoll\n");
    143                 }
    144 
    145                 // Io polling is now fully stopped
    146 
    147                 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
    148         }
    149 
    150         static void * iopoll_loop( __attribute__((unused)) void * args ) {
    151                 __processor_id_t id;
    152                 id.full_proc = false;
    153                 id.id = doregister(&id);
    154                 __cfaabi_tls.this_proc_id = &id;
    155                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
    156 
    157                 // Block signals to control when they arrive
    158                 sigset_t mask;
    159                 sigfillset(&mask);
    160                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    161                 abort( "internal error, pthread_sigmask" );
    162                 }
    163 
    164                 sigdelset( &mask, SIGUSR1 );
    165 
    166                 // Create sufficient events
    167                 struct epoll_event events[10];
    168                 // Main loop
    169                 while( iopoll.run ) {
    170                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    171 
    172                         // increment the epoch to notify any deleters we are starting a new cycle
    173                         __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
    174 
    175                         // Wait for events
    176                         int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    177 
    178                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    179 
    180                         // Check if an error occured
    181                         if (nfds == -1) {
    182                                 if( errno == EINTR ) continue;
    183                                 abort( "internal error, pthread_sigmask" );
    184                         }
    185 
    186                         for(i; nfds) {
    187                                 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
    188                                 /* paranoid */ verify( io_ctx );
    189                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
    190                                 #if !defined( __CFA_NO_STATISTICS__ )
    191                                         __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    192                                 #endif
    193 
    194                                 eventfd_t v;
    195                                 eventfd_read(io_ctx->efd, &v);
    196 
    197                                 post( io_ctx->sem );
    198                         }
    199                 }
    200 
    201                 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
    202 
    203                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
    204                 unregister(&id);
    205                 return 0p;
    206         }
    207 
    208 //=============================================================================================
    20991// I/O Context Constrution/Destruction
    21092//=============================================================================================
    21193
    212         static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
     94
     95
     96        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd );
    21397        static void __io_uring_teardown( $io_context & this );
    21498        static void __epoll_register($io_context & ctx);
     
    217101        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
    218102
    219         void ?{}($io_context & this, struct cluster & cl) {
    220                 (this.self){ "IO Poller", cl };
     103        void ?{}($io_context & this, processor * proc, struct cluster & cl) {
     104                /* paranoid */ verify( cl.io.arbiter );
     105                this.proc = proc;
     106                this.arbiter = cl.io.arbiter;
    221107                this.ext_sq.empty = true;
    222                 this.revoked = true;
    223                 __io_uring_setup( this, cl.io.params );
     108                (this.ext_sq.blocked){};
     109                __io_uring_setup( this, cl.io.params, proc->idle );
    224110                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
    225 
    226                 __epoll_register(this);
    227 
    228                 __ioarbiter_register(*cl.io.arbiter, this);
    229 
    230                 __thrd_start( this, main );
    231                 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
    232         }
    233 
    234         void ^?{}($io_context & mutex this) {
     111        }
     112
     113        void ^?{}($io_context & this) {
    235114                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
    236 
    237                 ^(this.self){};
    238                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
    239 
    240                 __ioarbiter_unregister(*this.arbiter, this);
    241 
    242                 __epoll_unregister(this);
    243115
    244116                __io_uring_teardown( this );
     
    246118        }
    247119
    248         void ?{}(io_context & this, struct cluster & cl) {
    249                 // this.ctx = new(cl);
    250                 this.ctx = alloc();
    251                 (*this.ctx){ cl };
    252 
    253                 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
    254         }
    255 
    256         void ^?{}(io_context & this) {
    257                 post( this.ctx->sem );
    258 
    259                 delete(this.ctx);
    260         }
    261 
    262120        extern void __disable_interrupts_hard();
    263121        extern void __enable_interrupts_hard();
    264122
    265         static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
     123        static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) {
    266124                // Step 1 : call to setup
    267125                struct io_uring_params params;
     
    339197                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    340198
    341                 sq.kring.ready = 0;
    342199                sq.kring.released = 0;
    343200
     
    362219                // io_uring_register is so f*cking slow on some machine that it
    363220                // will never succeed if preemption isn't hard blocked
     221                __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);
     222
    364223                __disable_interrupts_hard();
    365224
    366                 int efd = eventfd(0, 0);
    367                 if (efd < 0) {
    368                         abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    369                 }
    370 
    371                 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     225                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1);
    372226                if (ret < 0) {
    373227                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     
    375229
    376230                __enable_interrupts_hard();
     231
     232                __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);
    377233
    378234                // some paranoid checks
     
    390246                this.ring_flags = 0;
    391247                this.fd         = fd;
    392                 this.efd        = efd;
    393248        }
    394249
     
    411266                // close the file descriptor
    412267                close(this.fd);
    413                 close(this.efd);
    414268
    415269                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
    416270        }
    417271
     272        void __cfa_io_start( processor * proc ) {
     273                proc->io.ctx = alloc();
     274                (*proc->io.ctx){proc, *proc->cltr};
     275        }
     276        void __cfa_io_stop ( processor * proc ) {
     277                ^(*proc->io.ctx){};
     278                free(proc->io.ctx);
     279        }
     280
    418281//=============================================================================================
    419282// I/O Context Sleep
    420283//=============================================================================================
    421         static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
    422                 struct epoll_event ev;
    423                 ev.events = EPOLLIN | EPOLLONESHOT;
    424                 ev.data.u64 = (__u64)&ctx;
    425                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
    426                 if (ret < 0) {
    427                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    428                 }
    429         }
    430 
    431         static void __epoll_register($io_context & ctx) {
    432                 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    433         }
    434 
    435         static void __epoll_unregister($io_context & ctx) {
    436                 // Read the current epoch so we know when to stop
    437                 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
    438 
    439                 // Remove the fd from the iopoller
    440                 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
    441 
    442                 // Notify the io poller thread of the shutdown
    443                 iopoll.run = false;
    444                 sigval val = { 1 };
    445                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    446 
    447                 // Make sure all this is done
    448                 __atomic_thread_fence(__ATOMIC_SEQ_CST);
    449 
    450                 // Wait for the next epoch
    451                 while(curr == iopoll.epoch && !iopoll.stopped) Pause();
    452         }
    453 
    454         void __ioctx_prepare_block($io_context & ctx) {
    455                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
    456                 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    457         }
     284        // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
     285        //      struct epoll_event ev;
     286        //      ev.events = EPOLLIN | EPOLLONESHOT;
     287        //      ev.data.u64 = (__u64)&ctx;
     288        //      int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
     289        //      if (ret < 0) {
     290        //              abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     291        //      }
     292        // }
     293
     294        // static void __epoll_register($io_context & ctx) {
     295        //      __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     296        // }
     297
     298        // static void __epoll_unregister($io_context & ctx) {
     299        //      // Read the current epoch so we know when to stop
     300        //      size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
     301
     302        //      // Remove the fd from the iopoller
     303        //      __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
     304
     305        //      // Notify the io poller thread of the shutdown
     306        //      iopoll.run = false;
     307        //      sigval val = { 1 };
     308        //      pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     309
     310        //      // Make sure all this is done
     311        //      __atomic_thread_fence(__ATOMIC_SEQ_CST);
     312
     313        //      // Wait for the next epoch
     314        //      while(curr == iopoll.epoch && !iopoll.stopped) Pause();
     315        // }
     316
     317        // void __ioctx_prepare_block($io_context & ctx) {
     318        //      __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
     319        //      __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     320        // }
    458321
    459322
     
    466329
    467330        void ^?{}( $io_arbiter & mutex this ) {
    468                 /* paranoid */ verify( empty(this.assigned) );
    469                 /* paranoid */ verify( empty(this.available) );
     331                // /* paranoid */ verify( empty(this.assigned) );
     332                // /* paranoid */ verify( empty(this.available) );
    470333                /* paranoid */ verify( is_empty(this.pending.blocked) );
    471334        }
Note: See TracChangeset for help on using the changeset viewer.