Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io/setup.cfa

    rec19b21 r80444bb  
    5252                #include <pthread.h>
    5353                #include <sys/epoll.h>
    54                 #include <sys/eventfd.h>
    5554                #include <sys/mman.h>
    5655                #include <sys/syscall.h>
     
    170169                // Main loop
    171170                while( iopoll.run ) {
    172                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    173 
    174171                        // Wait for events
    175172                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    176 
    177                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    178173
    179174                        // Check if an error occured
     
    186181                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    187182                                /* paranoid */ verify( io_ctx );
    188                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
     183                                __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
    189184                                #if !defined( __CFA_NO_STATISTICS__ )
    190185                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    191186                                #endif
    192 
    193                                 eventfd_t v;
    194                                 eventfd_read(io_ctx->ring->efd, &v);
    195 
    196187                                post( io_ctx->sem );
    197188                        }
     
    242233                $thread & thrd = this.thrd.self;
    243234                if( cluster_context ) {
    244                         // We are about to do weird things with the threads
    245                         // we don't need interrupts to complicate everything
    246                         disable_interrupts();
    247 
    248                         // Get cluster info
    249235                        cluster & cltr = *thrd.curr_cluster;
    250236                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     
    253239                        // We need to adjust the clean-up based on where the thread is
    254240                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    255                                 // This is the tricky case
    256                                 // The thread was preempted or ready to run and now it is on the ready queue
    257                                 // but the cluster is shutting down, so there aren't any processors to run the ready queue
    258                                 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    259241
    260242                                ready_schedule_lock();
    261                                         // The thread should on the list
     243
     244                                        // This is the tricky case
     245                                        // The thread was preempted and now it is on the ready queue
     246                                        // The thread should be the last on the list
    262247                                        /* paranoid */ verify( thrd.link.next != 0p );
    263248
    264249                                        // Remove the thread from the ready queue of this cluster
    265                                         // The thread should be the last on the list
    266250                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    267251                                        /* paranoid */ verify( removed );
     
    279263                        }
    280264                        // !!! This is not an else if !!!
    281                         // Ok, now the thread is blocked (whether we cheated to get here or not)
    282265                        if( thrd.state == Blocked ) {
     266
    283267                                // This is the "easy case"
    284268                                // The thread is parked and can easily be moved to active cluster
     
    290274                        }
    291275                        else {
     276
    292277                                // The thread is in a weird state
    293278                                // I don't know what to do here
    294279                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    295280                        }
    296 
    297                         // The weird thread kidnapping stuff is over, restore interrupts.
    298                         enable_interrupts( __cfaabi_dbg_ctx );
    299281                } else {
    300282                        post( this.thrd.sem );
     
    383365                }
    384366
    385                 // Step 3 : Initialize the data structure
    386367                // Get the pointers from the kernel to fill the structure
    387368                // submit queue
     
    398379                        const __u32 num = *sq.num;
    399380                        for( i; num ) {
    400                                 __sqe_clean( &sq.sqes[i] );
     381                                sq.sqes[i].user_data = 0ul64;
    401382                        }
    402383                }
     
    428409                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    429410
    430                 // Step 4 : eventfd
    431                 int efd;
    432                 for() {
    433                         efd = eventfd(0, 0);
    434                         if (efd < 0) {
    435                                 if (errno == EINTR) continue;
    436                                 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    437                         }
    438                         break;
    439                 }
    440 
    441                 int ret;
    442                 for() {
    443                         ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
    444                         if (ret < 0) {
    445                                 if (errno == EINTR) continue;
    446                                 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
    447                         }
    448                         break;
    449                 }
    450 
    451411                // some paranoid checks
    452412                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
     
    463423                this.ring_flags = params.flags;
    464424                this.fd         = fd;
    465                 this.efd        = efd;
    466425                this.eager_submits  = params_in.eager_submits;
    467426                this.poller_submits = params_in.poller_submits;
     
    486445                // close the file descriptor
    487446                close(this.fd);
    488                 close(this.efd);
    489447
    490448                free( this.submit_q.ready ); // Maybe null, doesn't matter
     
    494452// I/O Context Sleep
    495453//=============================================================================================
    496         #define IOEVENTS EPOLLIN | EPOLLONESHOT
    497 
    498         static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
    499                 struct epoll_event ev;
    500                 ev.events = IOEVENTS;
     454
     455        void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
     456                ev.events = EPOLLIN | EPOLLONESHOT;
    501457                ev.data.u64 = (__u64)&ctx;
    502                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
     458                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
    503459                if (ret < 0) {
    504                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    505                 }
    506         }
    507 
    508         void __ioctx_register($io_ctx_thread & ctx) {
    509                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    510         }
    511 
    512         void __ioctx_prepare_block($io_ctx_thread & ctx) {
    513                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
    514                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     460                        abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
     461                }
     462        }
     463
     464        void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
     465                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
     466                if (ret < 0) {
     467                        abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
     468                }
    515469        }
    516470
Note: See TracChangeset for help on using the changeset viewer.