Ignore:
Timestamp:
Mar 2, 2021, 1:58:12 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
2cd784a
Parents:
6047b00
Message:

Changed io to use ring per kernel threads.

Location:
libcfa/src/concurrency/io
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io/call.cfa.in

    r6047b00 rdddb3dd0  
    7575
    7676        extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
    77         extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have ) __attribute__((nonnull (1,2)));
     77        extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    7878#endif
    7979
     
    185185                return ', '.join(args_a)
    186186
    187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags) {{
     187AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{
    188188        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op})
    189189                ssize_t res = {name}({args});
     
    216216
    217217                verify( sqe->user_data == (__u64)(uintptr_t)&future );
    218                 cfa_io_submit( ctx, &idx, 1 );
     218                cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) );
    219219        #endif
    220220}}"""
    221221
    222 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags) {{
     222SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{
    223223        io_future_t future;
    224224
     
    388388        if c.define:
    389389                print("""#if defined({define})
    390         {ret} cfa_{name}({params}, int submit_flags);
     390        {ret} cfa_{name}({params}, __u64 submit_flags);
    391391#endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params))
    392392        else:
    393                 print("{ret} cfa_{name}({params}, int submit_flags);"
     393                print("{ret} cfa_{name}({params}, __u64 submit_flags);"
    394394                .format(ret=c.ret, name=c.name, params=c.params))
    395395
     
    399399        if c.define:
    400400                print("""#if defined({define})
    401         void async_{name}(io_future_t & future, {params}, int submit_flags);
     401        void async_{name}(io_future_t & future, {params}, __u64 submit_flags);
    402402#endif""".format(define=c.define,name=c.name, params=c.params))
    403403        else:
    404                 print("void async_{name}(io_future_t & future, {params}, int submit_flags);"
     404                print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"
    405405                .format(name=c.name, params=c.params))
    406406print("\n")
  • libcfa/src/concurrency/io/setup.cfa

    r6047b00 rdddb3dd0  
    2626
    2727#if !defined(CFA_HAVE_LINUX_IO_URING_H)
    28         void __kernel_io_startup() {
    29                 // Nothing to do without io_uring
    30         }
    31 
    32         void __kernel_io_shutdown() {
    33                 // Nothing to do without io_uring
    34         }
    35 
    3628        void ?{}(io_context_params & this) {}
    3729
     
    9789
    9890//=============================================================================================
    99 // I/O Startup / Shutdown logic + Master Poller
    100 //=============================================================================================
    101 
    102         // IO Master poller loop forward
    103         static void * iopoll_loop( __attribute__((unused)) void * args );
    104 
    105         static struct {
    106                       pthread_t  thrd;    // pthread handle to io poller thread
    107                       void *     stack;   // pthread stack for io poller thread
    108                       int        epollfd; // file descriptor to the epoll instance
    109                 volatile     bool run;     // Whether or not to continue
    110                 volatile     bool stopped; // Whether the poller has finished running
    111                 volatile uint64_t epoch;   // Epoch used for memory reclamation
    112         } iopoll;
    113 
    114         void __kernel_io_startup(void) {
    115                 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
    116 
    117                 iopoll.epollfd = epoll_create1(0);
    118                 if (iopoll.epollfd == -1) {
    119                         abort( "internal error, epoll_create1\n");
    120                 }
    121 
    122                 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
    123 
    124                 iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
    125                 iopoll.run     = true;
    126                 iopoll.stopped = false;
    127                 iopoll.epoch   = 0;
    128         }
    129 
    130         void __kernel_io_shutdown(void) {
    131                 // Notify the io poller thread of the shutdown
    132                 iopoll.run = false;
    133                 sigval val = { 1 };
    134                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    135 
    136                 // Wait for the io poller thread to finish
    137 
    138                 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
    139 
    140                 int ret = close(iopoll.epollfd);
    141                 if (ret == -1) {
    142                         abort( "internal error, close epoll\n");
    143                 }
    144 
    145                 // Io polling is now fully stopped
    146 
    147                 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
    148         }
    149 
    150         static void * iopoll_loop( __attribute__((unused)) void * args ) {
    151                 __processor_id_t id;
    152                 id.full_proc = false;
    153                 id.id = doregister(&id);
    154                 __cfaabi_tls.this_proc_id = &id;
    155                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
    156 
    157                 // Block signals to control when they arrive
    158                 sigset_t mask;
    159                 sigfillset(&mask);
    160                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    161                 abort( "internal error, pthread_sigmask" );
    162                 }
    163 
    164                 sigdelset( &mask, SIGUSR1 );
    165 
    166                 // Create sufficient events
    167                 struct epoll_event events[10];
    168                 // Main loop
    169                 while( iopoll.run ) {
    170                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    171 
    172                         // increment the epoch to notify any deleters we are starting a new cycle
    173                         __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
    174 
    175                         // Wait for events
    176                         int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    177 
    178                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    179 
    180                         // Check if an error occured
    181                         if (nfds == -1) {
    182                                 if( errno == EINTR ) continue;
    183                                 abort( "internal error, pthread_sigmask" );
    184                         }
    185 
    186                         for(i; nfds) {
    187                                 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
    188                                 /* paranoid */ verify( io_ctx );
    189                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
    190                                 #if !defined( __CFA_NO_STATISTICS__ )
    191                                         __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    192                                 #endif
    193 
    194                                 eventfd_t v;
    195                                 eventfd_read(io_ctx->efd, &v);
    196 
    197                                 post( io_ctx->sem );
    198                         }
    199                 }
    200 
    201                 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
    202 
    203                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
    204                 unregister(&id);
    205                 return 0p;
    206         }
    207 
    208 //=============================================================================================
    20991// I/O Context Constrution/Destruction
    21092//=============================================================================================
    21193
    212         static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
     94
     95
     96        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd );
    21397        static void __io_uring_teardown( $io_context & this );
    21498        static void __epoll_register($io_context & ctx);
     
    217101        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
    218102
    219         void ?{}($io_context & this, struct cluster & cl) {
    220                 (this.self){ "IO Poller", cl };
     103        void ?{}($io_context & this, processor * proc, struct cluster & cl) {
     104                /* paranoid */ verify( cl.io.arbiter );
     105                this.proc = proc;
     106                this.arbiter = cl.io.arbiter;
    221107                this.ext_sq.empty = true;
    222                 this.revoked = true;
    223                 __io_uring_setup( this, cl.io.params );
     108                (this.ext_sq.blocked){};
     109                __io_uring_setup( this, cl.io.params, proc->idle );
    224110                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
    225 
    226                 __epoll_register(this);
    227 
    228                 __ioarbiter_register(*cl.io.arbiter, this);
    229 
    230                 __thrd_start( this, main );
    231                 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
    232         }
    233 
    234         void ^?{}($io_context & mutex this) {
     111        }
     112
     113        void ^?{}($io_context & this) {
    235114                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
    236 
    237                 ^(this.self){};
    238                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
    239 
    240                 __ioarbiter_unregister(*this.arbiter, this);
    241 
    242                 __epoll_unregister(this);
    243115
    244116                __io_uring_teardown( this );
     
    246118        }
    247119
    248         void ?{}(io_context & this, struct cluster & cl) {
    249                 // this.ctx = new(cl);
    250                 this.ctx = alloc();
    251                 (*this.ctx){ cl };
    252 
    253                 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
    254         }
    255 
    256         void ^?{}(io_context & this) {
    257                 post( this.ctx->sem );
    258 
    259                 delete(this.ctx);
    260         }
    261 
    262120        extern void __disable_interrupts_hard();
    263121        extern void __enable_interrupts_hard();
    264122
    265         static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
     123        static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) {
    266124                // Step 1 : call to setup
    267125                struct io_uring_params params;
     
    339197                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    340198
    341                 sq.kring.ready = 0;
    342199                sq.kring.released = 0;
    343200
     
    362219                // io_uring_register is so f*cking slow on some machine that it
    363220                // will never succeed if preemption isn't hard blocked
     221                __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);
     222
    364223                __disable_interrupts_hard();
    365224
    366                 int efd = eventfd(0, 0);
    367                 if (efd < 0) {
    368                         abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    369                 }
    370 
    371                 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     225                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1);
    372226                if (ret < 0) {
    373227                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     
    375229
    376230                __enable_interrupts_hard();
     231
     232                __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);
    377233
    378234                // some paranoid checks
     
    390246                this.ring_flags = 0;
    391247                this.fd         = fd;
    392                 this.efd        = efd;
    393248        }
    394249
     
    411266                // close the file descriptor
    412267                close(this.fd);
    413                 close(this.efd);
    414268
    415269                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
    416270        }
    417271
     272        void __cfa_io_start( processor * proc ) {
     273                proc->io.ctx = alloc();
     274                (*proc->io.ctx){proc, *proc->cltr};
     275        }
     276        void __cfa_io_stop ( processor * proc ) {
     277                ^(*proc->io.ctx){};
     278                free(proc->io.ctx);
     279        }
     280
    418281//=============================================================================================
    419282// I/O Context Sleep
    420283//=============================================================================================
    421         static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
    422                 struct epoll_event ev;
    423                 ev.events = EPOLLIN | EPOLLONESHOT;
    424                 ev.data.u64 = (__u64)&ctx;
    425                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
    426                 if (ret < 0) {
    427                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    428                 }
    429         }
    430 
    431         static void __epoll_register($io_context & ctx) {
    432                 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    433         }
    434 
    435         static void __epoll_unregister($io_context & ctx) {
    436                 // Read the current epoch so we know when to stop
    437                 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
    438 
    439                 // Remove the fd from the iopoller
    440                 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
    441 
    442                 // Notify the io poller thread of the shutdown
    443                 iopoll.run = false;
    444                 sigval val = { 1 };
    445                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    446 
    447                 // Make sure all this is done
    448                 __atomic_thread_fence(__ATOMIC_SEQ_CST);
    449 
    450                 // Wait for the next epoch
    451                 while(curr == iopoll.epoch && !iopoll.stopped) Pause();
    452         }
    453 
    454         void __ioctx_prepare_block($io_context & ctx) {
    455                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
    456                 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    457         }
     284        // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
     285        //      struct epoll_event ev;
     286        //      ev.events = EPOLLIN | EPOLLONESHOT;
     287        //      ev.data.u64 = (__u64)&ctx;
     288        //      int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
     289        //      if (ret < 0) {
     290        //              abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     291        //      }
     292        // }
     293
     294        // static void __epoll_register($io_context & ctx) {
     295        //      __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     296        // }
     297
     298        // static void __epoll_unregister($io_context & ctx) {
     299        //      // Read the current epoch so we know when to stop
     300        //      size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
     301
     302        //      // Remove the fd from the iopoller
     303        //      __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
     304
     305        //      // Notify the io poller thread of the shutdown
     306        //      iopoll.run = false;
     307        //      sigval val = { 1 };
     308        //      pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     309
     310        //      // Make sure all this is done
     311        //      __atomic_thread_fence(__ATOMIC_SEQ_CST);
     312
     313        //      // Wait for the next epoch
     314        //      while(curr == iopoll.epoch && !iopoll.stopped) Pause();
     315        // }
     316
     317        // void __ioctx_prepare_block($io_context & ctx) {
     318        //      __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
     319        //      __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     320        // }
    458321
    459322
     
    466329
    467330        void ^?{}( $io_arbiter & mutex this ) {
    468                 /* paranoid */ verify( empty(this.assigned) );
    469                 /* paranoid */ verify( empty(this.available) );
     331                // /* paranoid */ verify( empty(this.assigned) );
     332                // /* paranoid */ verify( empty(this.available) );
    470333                /* paranoid */ verify( is_empty(this.pending.blocked) );
    471334        }
  • libcfa/src/concurrency/io/types.hfa

    r6047b00 rdddb3dd0  
    3838                        volatile __u32 * head;   // one passed last index consumed by the kernel
    3939                        volatile __u32 * tail;   // one passed last index visible to the kernel
    40                         volatile __u32 ready;    // one passed last index added to array ()
    4140                        volatile __u32 released; // one passed last index released back to the free list
    4241
     
    9796
    9897        struct __attribute__((aligned(128))) $io_context {
    99                 inline Seqable;
    100 
    101                 volatile bool revoked;
     98                $io_arbiter * arbiter;
    10299                processor * proc;
    103 
    104                 $io_arbiter * arbiter;
    105100
    106101                struct {
     
    113108                __u32 ring_flags;
    114109                int fd;
    115                 int efd;
    116 
    117                 single_sem sem;
    118                 $thread self;
    119110        };
    120 
    121         void main( $io_context & this );
    122         static inline $thread  * get_thread ( $io_context & this ) __attribute__((const)) { return &this.self; }
    123         static inline $monitor * get_monitor( $io_context & this ) __attribute__((const)) { return &this.self.self_mon; }
    124         static inline $io_context *& Back( $io_context * n ) { return ($io_context *)Back( (Seqable *)n ); }
    125         static inline $io_context *& Next( $io_context * n ) { return ($io_context *)Next( (Colable *)n ); }
    126         void ^?{}( $io_context & mutex this );
    127111
    128112        monitor __attribute__((aligned(128))) $io_arbiter {
     
    132116                        volatile bool flag;
    133117                } pending;
    134 
    135                 Sequence($io_context) assigned;
    136 
    137                 Sequence($io_context) available;
    138118        };
    139119
     
    167147        #endif
    168148
    169         void __ioctx_prepare_block($io_context & ctx);
     149        // void __ioctx_prepare_block($io_context & ctx);
    170150#endif
    171151
Note: See TracChangeset for help on using the changeset viewer.