Changeset f00b26d4


Ignore:
Timestamp:
Jul 30, 2020, 3:00:19 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
920dca3
Parents:
e0f93e0
Message:

Re-worked IO to use epoll and support multiple io_contexts per cluster.
Also redid how cluster options are handled.
Changed how iofwd calls are passed to support future features and io_contexts rework.

Location:
libcfa/src/concurrency
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    re0f93e0 rf00b26d4  
    1616#if defined(__CFA_DEBUG__)
    1717        // #define __CFA_DEBUG_PRINT_IO__
    18         // #define __CFA_DEBUG_PRINT_IO_CORE__
     18        #define __CFA_DEBUG_PRINT_IO_CORE__
    1919#endif
    2020
    21 #include "kernel.hfa"
     21#include "kernel_private.hfa"
    2222#include "bitmanip.hfa"
    2323
    2424#if !defined(CFA_HAVE_LINUX_IO_URING_H)
    25         void __kernel_io_startup( cluster &, unsigned, bool ) {
     25        void __kernel_io_startup() {
    2626                // Nothing to do without io_uring
    2727        }
    2828
    29         void __kernel_io_finish_start( cluster & ) {
     29        void __kernel_io_shutdown() {
    3030                // Nothing to do without io_uring
    3131        }
    3232
    33         void __kernel_io_prepare_stop( cluster & ) {
    34                 // Nothing to do without io_uring
    35         }
    36 
    37         void __kernel_io_shutdown( cluster &, bool ) {
    38                 // Nothing to do without io_uring
    39         }
     33        void ?{}(io_context & this, struct cluster & cl) {}
     34        void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
     35
     36        void ^?{}(io_context & this) {}
     37        void ^?{}(io_context & this, bool cluster_context) {}
    4038
    4139#else
     
    4543        #include <string.h>
    4644        #include <unistd.h>
    47         #include <sys/mman.h>
    4845
    4946        extern "C" {
     47                #include <sys/epoll.h>
     48                #include <sys/mman.h>
    5049                #include <sys/syscall.h>
    5150
     
    5756        #include "thread.hfa"
    5857
    59         uint32_t entries_per_cluster() {
    60                 return 256;
     58        void ?{}(io_context_params & this) {
     59                this.num_entries = 256;
     60                this.num_ready = 256;
     61                this.submit_aff = -1;
     62                this.eager_submits = false;
     63                this.poller_submits = false;
     64                this.poll_submit = false;
     65                this.poll_complete = false;
    6166        }
    6267
     
    9095        #endif
    9196
    92         // Fast poller user-thread
    93         // Not using the "thread" keyword because we want to control
    94         // more carefully when to start/stop it
    95         struct __io_poller_fast {
    96                 struct __io_data * ring;
    97                 $thread thrd;
    98         };
    99 
    100         void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    101                 this.ring = cltr.io;
    102                 (this.thrd){ "Fast I/O Poller", cltr };
    103         }
    104         void ^?{}( __io_poller_fast & mutex this );
    105         void main( __io_poller_fast & this );
    106         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    107         void ^?{}( __io_poller_fast & mutex this ) {}
    108 
    10997        struct __submition_data {
    11098                // Head and tail of the ring (associated with array)
     
    166154                struct __completion_data completion_q;
    167155                uint32_t ring_flags;
    168                 int cltr_flags;
    169156                int fd;
    170                 semaphore submit;
    171                 volatile bool done;
    172                 struct {
    173                         struct {
    174                                 __processor_id_t id;
    175                                 void * stack;
    176                                 pthread_t kthrd;
    177                                 volatile bool blocked;
    178                         } slow;
    179                         __io_poller_fast fast;
    180                         __bin_sem_t sem;
    181                 } poller;
     157                bool eager_submits:1;
     158                bool poller_submits:1;
    182159        };
    183160
    184161//=============================================================================================
    185 // I/O Startup / Shutdown logic
     162// I/O Startup / Shutdown logic + Master Poller
    186163//=============================================================================================
    187         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
    188                 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
    189                         abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
    190                 }
    191 
    192                 this.io = malloc();
    193 
     164
     165// IO Master poller loop forward
     166static void * iopoll_loop( __attribute__((unused)) void * args );
     167
     168static struct {
     169        pthread_t     thrd;    // pthread handle to io poller thread
     170        void *        stack;   // pthread stack for io poller thread
     171        int           epollfd; // file descriptor to the epoll instance
     172        volatile bool run;     // Whether or not to continue
     173} iopoll;
     174
     175void __kernel_io_startup() {
     176        __cfaabi_dbg_print_safe( "Kernel : Creating EPOLL instance\n" );
     177
     178        iopoll.epollfd = epoll_create1(0);
     179      if (iopoll.epollfd == -1) {
     180            abort( "internal error, epoll_create1\n");
     181      }
     182
     183        __cfaabi_dbg_print_safe( "Kernel : Starting io poller thread\n" );
     184
     185        iopoll.run = true;
     186        iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
     187}
     188
     189void __kernel_io_shutdown() {
     190        // Notify the io poller thread of the shutdown
     191        iopoll.run = false;
     192        sigval val = { 1 };
     193        pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     194
     195        // Wait for the io poller thread to finish
     196
     197        pthread_join( iopoll.thrd, 0p );
     198        free( iopoll.stack );
     199
     200        int ret = close(iopoll.epollfd);
     201      if (ret == -1) {
     202            abort( "internal error, close epoll\n");
     203      }
     204
     205        // Io polling is now fully stopped
     206
     207        __cfaabi_dbg_print_safe( "Kernel : IO poller stopped\n" );
     208}
     209
     210static void * iopoll_loop( __attribute__((unused)) void * args ) {
     211        __processor_id_t id;
     212        id.id = doregister(&id);
     213        __cfaabi_dbg_print_safe( "Kernel : IO poller thread starting\n" );
     214
     215        // Block signals to control when they arrive
     216        sigset_t mask;
     217        sigfillset(&mask);
     218        if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
     219            abort( "internal error, pthread_sigmask" );
     220        }
     221
     222        sigdelset( &mask, SIGUSR1 );
     223
     224        // Create sufficient events
     225        struct epoll_event events[10];
     226        // Main loop
     227        while( iopoll.run ) {
     228                // Wait for events
     229                int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
     230
     231                // Check if an error occured
     232            if (nfds == -1) {
     233                        if( errno == EINTR ) continue;
     234                  abort( "internal error, pthread_sigmask" );
     235            }
     236
     237                for(i; nfds) {
     238                        $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
     239                        /* paranoid */ verify( io_ctx );
     240                        __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
     241                        #if !defined( __CFA_NO_STATISTICS__ )
     242                                kernelTLS.this_stats = io_ctx->self.curr_cluster->stats;
     243                        #endif
     244                        __post( io_ctx->sem, &id );
     245                }
     246        }
     247
     248        __cfaabi_dbg_print_safe( "Kernel : IO poller thread stopping\n" );
     249        unregister(&id);
     250        return 0p;
     251}
     252
     253//=============================================================================================
     254// I/O Context Constrution/Destruction
     255//=============================================================================================
     256
     257        void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
     258        void main( $io_ctx_thread & this );
     259        static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
     260        void ^?{}( $io_ctx_thread & mutex this ) {}
     261
     262        static void __io_create ( __io_data & this, const io_context_params & params_in );
     263        static void __io_destroy( __io_data & this );
     264
     265        void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
     266                (this.thrd){ cl };
     267                this.thrd.ring = malloc();
     268                __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
     269                __io_create( *this.thrd.ring, params );
     270
     271                __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
     272                this.thrd.done = false;
     273                __thrd_start( this.thrd, main );
     274
     275                __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
     276        }
     277
     278        void ?{}(io_context & this, struct cluster & cl) {
     279                io_context_params params;
     280                (this){ cl, params };
     281        }
     282
     283        void ^?{}(io_context & this, bool cluster_context) {
     284                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
     285
     286                // Notify the thread of the shutdown
     287                __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
     288
     289                // If this is an io_context within a cluster, things get trickier
     290                $thread & thrd = this.thrd.self;
     291                if( cluster_context ) {
     292                        cluster & cltr = *thrd.curr_cluster;
     293                        /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster );
     294                        /* paranoid */ verify( !ready_mutate_islocked() );
     295
     296                        // We need to adjust the clean-up based on where the thread is
     297                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     298
     299                                ready_schedule_lock( (struct __processor_id_t *)active_processor() );
     300
     301                                        // This is the tricky case
     302                                        // The thread was preempted and now it is on the ready queue
     303                                        // The thread should be the last on the list
     304                                        /* paranoid */ verify( thrd.link.next != 0p );
     305
     306                                        // Remove the thread from the ready queue of this cluster
     307                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
     308                                        /* paranoid */ verify( removed );
     309                                        thrd.link.next = 0p;
     310                                        thrd.link.prev = 0p;
     311                                        __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
     312
     313                                        // Fixup the thread state
     314                                        thrd.state = Blocked;
     315                                        thrd.ticket = 0;
     316                                        thrd.preempted = __NO_PREEMPTION;
     317
     318                                ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
     319
     320                                // Pretend like the thread was blocked all along
     321                        }
     322                        // !!! This is not an else if !!!
     323                        if( thrd.state == Blocked ) {
     324
     325                                // This is the "easy case"
     326                                // The thread is parked and can easily be moved to active cluster
     327                                verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
     328                                thrd.curr_cluster = active_cluster();
     329
     330                                // unpark the fast io_poller
     331                                unpark( &thrd __cfaabi_dbg_ctx2 );
     332                        }
     333                        else {
     334
     335                                // The thread is in a weird state
     336                                // I don't know what to do here
     337                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
     338                        }
     339                } else {
     340                        unpark( &thrd __cfaabi_dbg_ctx2 );
     341                }
     342
     343                ^(this.thrd){};
     344                __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
     345
     346                __io_destroy( *this.thrd.ring );
     347                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
     348
     349                free(this.thrd.ring);
     350        }
     351
     352        void ^?{}(io_context & this) {
     353                ^(this){ false };
     354        }
     355
     356        static void __io_create( __io_data & this, const io_context_params & params_in ) {
    194357                // Step 1 : call to setup
    195358                struct io_uring_params params;
    196359                memset(&params, 0, sizeof(params));
    197                 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   ) params.flags |= IORING_SETUP_SQPOLL;
    198                 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
    199 
    200                 uint32_t nentries = entries_per_cluster();
     360                if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
     361                if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
     362
     363                uint32_t nentries = params_in.num_entries;
    201364
    202365                int fd = syscall(__NR_io_uring_setup, nentries, &params );
     
    206369
    207370                // Step 2 : mmap result
    208                 memset( this.io, 0, sizeof(struct __io_data) );
    209                 struct __submition_data  & sq = this.io->submit_q;
    210                 struct __completion_data & cq = this.io->completion_q;
     371                memset( &this, 0, sizeof(struct __io_data) );
     372                struct __submition_data  & sq = this.submit_q;
     373                struct __completion_data & cq = this.completion_q;
    211374
    212375                // calculate the right ring size
     
    275438                (sq.release_lock){};
    276439
    277                 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
    278                         /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
    279                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
     440                if( params_in.poller_submits || params_in.eager_submits ) {
     441                        /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
     442                        sq.ready_cnt = max( params_in.num_ready, 8 );
    280443                        sq.ready = alloc_align( 64, sq.ready_cnt );
    281444                        for(i; sq.ready_cnt) {
     
    308471
    309472                // Update the global ring info
    310                 this.io->ring_flags = params.flags;
    311                 this.io->cltr_flags = io_flags;
    312                 this.io->fd         = fd;
    313                 this.io->done       = false;
    314                 (this.io->submit){ min(*sq.num, *cq.num) };
    315 
    316                 if(!main_cluster) {
    317                         __kernel_io_finish_start( this );
    318                 }
    319         }
    320 
    321         void __kernel_io_finish_start( cluster & this ) {
    322                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    323                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    324                         (this.io->poller.fast){ this };
    325                         __thrd_start( this.io->poller.fast, main );
    326                 }
    327 
    328                 // Create the poller thread
    329                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
    330                 this.io->poller.slow.blocked = false;
    331                 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    332         }
    333 
    334         void __kernel_io_prepare_stop( cluster & this ) {
    335                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    336                 // Notify the poller thread of the shutdown
    337                 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    338 
    339                 // Stop the IO Poller
    340                 sigval val = { 1 };
    341                 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
    342                 post( this.io->poller.sem );
    343 
    344                 // Wait for the poller thread to finish
    345                 pthread_join( this.io->poller.slow.kthrd, 0p );
    346                 free( this.io->poller.slow.stack );
    347 
    348                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    349 
    350                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    351                         with( this.io->poller.fast ) {
    352                                 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
    353                                 /* paranoid */ verify( !ready_mutate_islocked() );
    354 
    355                                 // We need to adjust the clean-up based on where the thread is
    356                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    357 
    358                                         ready_schedule_lock( (struct __processor_id_t *)active_processor() );
    359 
    360                                                 // This is the tricky case
    361                                                 // The thread was preempted and now it is on the ready queue
    362                                                 // The thread should be the last on the list
    363                                                 /* paranoid */ verify( thrd.link.next != 0p );
    364 
    365                                                 // Remove the thread from the ready queue of this cluster
    366                                                 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
    367                                                 /* paranoid */ verify( removed );
    368                                                 thrd.link.next = 0p;
    369                                                 thrd.link.prev = 0p;
    370                                                 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
    371 
    372                                                 // Fixup the thread state
    373                                                 thrd.state = Blocked;
    374                                                 thrd.ticket = 0;
    375                                                 thrd.preempted = __NO_PREEMPTION;
    376 
    377                                         ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
    378 
    379                                         // Pretend like the thread was blocked all along
    380                                 }
    381                                 // !!! This is not an else if !!!
    382                                 if( thrd.state == Blocked ) {
    383 
    384                                         // This is the "easy case"
    385                                         // The thread is parked and can easily be moved to active cluster
    386                                         verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
    387                                         thrd.curr_cluster = active_cluster();
    388 
    389                                         // unpark the fast io_poller
    390                                         unpark( &thrd __cfaabi_dbg_ctx2 );
    391                                 }
    392                                 else {
    393 
    394                                         // The thread is in a weird state
    395                                         // I don't know what to do here
    396                                         abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
    397                                 }
    398 
    399                         }
    400 
    401                         ^(this.io->poller.fast){};
    402 
    403                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
    404                 }
    405         }
    406 
    407         void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
    408                 if(!main_cluster) {
    409                         __kernel_io_prepare_stop( this );
    410                 }
    411 
     473                this.ring_flags = params.flags;
     474                this.fd         = fd;
     475                this.eager_submits  = params_in.eager_submits;
     476                this.poller_submits = params_in.poller_submits;
     477        }
     478
     479        void __io_destroy( __io_data & this ) {
    412480                // Shutdown the io rings
    413                 struct __submition_data  & sq = this.io->submit_q;
    414                 struct __completion_data & cq = this.io->completion_q;
     481                struct __submition_data  & sq = this.submit_q;
     482                struct __completion_data & cq = this.completion_q;
    415483
    416484                // unmap the submit queue entries
     
    426494
    427495                // close the file descriptor
    428                 close(this.io->fd);
    429 
    430                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
    431                 free( this.io );
    432         }
    433 
    434         int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
     496                close(this.fd);
     497
     498                free( this.submit_q.ready ); // Maybe null, doesn't matter
     499        }
     500
     501        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    435502                bool need_sys_to_submit = false;
    436503                bool need_sys_to_complete = false;
    437                 unsigned min_complete = 0;
    438504                unsigned flags = 0;
    439 
    440505
    441506                TO_SUBMIT:
     
    451516                }
    452517
    453                 TO_COMPLETE:
    454518                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    455519                        flags |= IORING_ENTER_GETEVENTS;
    456                         if( mask ) {
    457                                 need_sys_to_complete = true;
    458                                 min_complete = 1;
    459                                 break TO_COMPLETE;
    460                         }
    461520                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    462521                                need_sys_to_complete = true;
     
    466525                int ret = 0;
    467526                if( need_sys_to_submit || need_sys_to_complete ) {
    468                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
     527                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
    469528                        if( ret < 0 ) {
    470529                                switch((int)errno) {
     
    490549        static uint32_t __release_consumed_submission( struct __io_data & ring );
    491550
    492         static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
     551        static inline void process(struct io_uring_cqe & cqe ) {
    493552                struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
    494553                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
    495554
    496555                data->result = cqe.res;
    497                 if(!id) { unpark(     data->thrd __cfaabi_dbg_ctx2 ); }
    498                 else  { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
     556                unpark( data->thrd __cfaabi_dbg_ctx2 );
    499557        }
    500558
    501559        // Process a single completion message from the io_uring
    502560        // This is NOT thread-safe
    503         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
     561        static [int, bool] __drain_io( & struct __io_data ring ) {
    504562                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    505563
    506564                unsigned to_submit = 0;
    507                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     565                if( ring.poller_submits ) {
    508566                        // If the poller thread also submits, then we need to aggregate the submissions which are ready
    509567                        to_submit = __collect_submitions( ring );
    510568                }
    511569
    512                 int ret = __io_uring_enter(ring, to_submit, true, mask);
     570                int ret = __io_uring_enter(ring, to_submit, true);
    513571                if( ret < 0 ) {
    514572                        return [0, true];
     
    547605                        /* paranoid */ verify(&cqe);
    548606
    549                         process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
    550                 }
    551 
    552                 // Allow new submissions to happen
    553                 // V(ring.submit, count);
     607                        process( cqe );
     608                }
    554609
    555610                // Mark to the kernel that the cqe has been seen
     
    561616        }
    562617
    563         static void * __io_poller_slow( void * arg ) {
    564                 #if !defined( __CFA_NO_STATISTICS__ )
    565                         __stats_t local_stats;
    566                         __init_stats( &local_stats );
    567                         kernelTLS.this_stats = &local_stats;
    568                 #endif
    569 
    570                 cluster * cltr = (cluster *)arg;
    571                 struct __io_data & ring = *cltr->io;
    572 
    573                 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
    574 
    575                 sigset_t mask;
    576                 sigfillset(&mask);
    577                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    578                         abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
    579                 }
    580 
    581                 sigdelset( &mask, SIGUSR1 );
    582 
    583                 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
    584                 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    585 
    586                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
    587 
    588                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    589                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    590 
    591                                 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
    592 
    593                                 // In the user-thread approach drain and if anything was drained,
    594                                 // batton pass to the user-thread
    595                                 int count;
    596                                 bool again;
    597                                 [count, again] = __drain_io( ring, &mask );
    598 
    599                                 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
    600 
    601                                 // Update statistics
    602                                 __STATS__( true,
    603                                         io.complete_q.completed_avg.val += count;
    604                                         io.complete_q.completed_avg.slow_cnt += 1;
    605                                 )
    606 
    607                                 if(again) {
    608                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
    609                                         __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
    610                                         wait( ring.poller.sem );
    611                                 }
    612                         }
    613                 }
    614                 else {
    615                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    616                                 //In the naive approach, just poll the io completion queue directly
    617                                 int count;
    618                                 bool again;
    619                                 [count, again] = __drain_io( ring, &mask );
    620 
    621                                 // Update statistics
    622                                 __STATS__( true,
    623                                         io.complete_q.completed_avg.val += count;
    624                                         io.complete_q.completed_avg.slow_cnt += 1;
    625                                 )
    626                         }
    627                 }
    628 
    629                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
    630 
    631                 unregister( &ring.poller.slow.id );
    632 
    633                 #if !defined(__CFA_NO_STATISTICS__)
    634                         __tally_stats(cltr->stats, &local_stats);
    635                 #endif
    636 
    637                 return 0p;
    638         }
    639 
    640         void main( __io_poller_fast & this ) {
    641                 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
    642 
    643                 // Start parked
    644                 park( __cfaabi_dbg_ctx );
    645 
    646                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
     618        void main( $io_ctx_thread & this ) {
     619                epoll_event ev;
     620                ev.events = EPOLLIN | EPOLLONESHOT;
     621                ev.data.u64 = (uint64_t)&this;
     622                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, this.ring->fd, &ev);
     623                if (ret < 0) {
     624                        abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
     625                }
     626
     627                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    647628
    648629                int reset = 0;
    649 
    650630                // Then loop until we need to start
    651                 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    652 
     631                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    653632                        // Drain the io
    654633                        int count;
    655634                        bool again;
    656635                        disable_interrupts();
    657                                 [count, again] = __drain_io( *this.ring, 0p );
     636                                [count, again] = __drain_io( *this.ring );
    658637
    659638                                if(!again) reset++;
     
    672651                        // We didn't get anything baton pass to the slow poller
    673652                        else {
    674                                 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     653                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    675654                                reset = 0;
    676655
    677656                                // wake up the slow poller
    678                                 post( this.ring->poller.sem );
     657                                ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, this.ring->fd, &ev);
     658                                if (ret < 0) {
     659                                        abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
     660                                }
    679661
    680662                                // park this thread
    681                                 park( __cfaabi_dbg_ctx );
     663                                wait( this.sem );
    682664                        }
    683665                }
    684666
    685667                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    686         }
    687 
    688         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
    689         static inline void __wake_poller( struct __io_data & ring ) {
    690                 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
    691 
    692                 sigval val = { 1 };
    693                 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
    694668        }
    695669
     
    806780        }
    807781
    808         void __submit( struct __io_data & ring, uint32_t idx ) {
     782        void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
     783                __io_data & ring = *ctx->thrd.ring;
    809784                // Get now the data we definetely need
    810785                uint32_t * const tail = ring.submit_q.tail;
    811                 const uint32_t mask = *ring.submit_q.mask;
     786                const uint32_t mask  = *ring.submit_q.mask;
    812787
    813788                // There are 2 submission schemes, check which one we are using
    814                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
     789                if( ring.poller_submits ) {
    815790                        // If the poller thread submits, then we just need to add this to the ready array
    816791                        __submit_to_ready_array( ring, idx, mask );
    817792
    818                         __wake_poller( ring );
     793                        post( ctx->thrd.sem );
    819794
    820795                        __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    821796                }
    822                 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
     797                else if( ring.eager_submits ) {
    823798                        uint32_t picked = __submit_to_ready_array( ring, idx, mask );
    824799
     
    849824                        // We got the lock
    850825                        unsigned to_submit = __collect_submitions( ring );
    851                         int ret = __io_uring_enter( ring, to_submit, false, 0p );
     826                        int ret = __io_uring_enter( ring, to_submit, false );
    852827                        if( ret < 0 ) {
    853828                                unlock(ring.submit_q.lock);
     
    892867
    893868                        // Submit however, many entries need to be submitted
    894                         int ret = __io_uring_enter( ring, 1, false, 0p );
     869                        int ret = __io_uring_enter( ring, 1, false );
    895870                        if( ret < 0 ) {
    896871                                switch((int)errno) {
     
    963938//=============================================================================================
    964939
    965         void register_fixed_files( cluster & cl, int * files, unsigned count ) {
    966                 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
     940        void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
     941                int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
    967942                if( ret < 0 ) {
    968943                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     
    971946                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
    972947        }
     948
     949        void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
     950                for(i; cltr.io.cnt) {
     951                        register_fixed_files( cltr.io.ctxs[i], files, count );
     952                }
     953        }
    973954#endif
  • libcfa/src/concurrency/iocall.cfa

    re0f93e0 rf00b26d4  
    2222#if defined(CFA_HAVE_LINUX_IO_URING_H)
    2323        #include <stdint.h>
     24        #include <errno.h>
    2425        #include <linux/io_uring.h>
    2526
     
    2728
    2829        extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data );
    29         extern void __submit( struct __io_data & ring, uint32_t idx );
     30        extern void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1)));
    3031
    3132        static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd) {
     
    5253        }
    5354
     55
     56
     57      #if defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
     58                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN | IOSQE_ASYNC)
     59        #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_ASYNC)
     60                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_ASYNC)
     61      #elif defined(CFA_HAVE_IOSQE_FIXED_FILE) && defined(CFA_HAVE_IOSQE_IO_DRAIN)
     62                #define REGULAR_FLAGS (IOSQE_FIXED_FILE | IOSQE_IO_DRAIN)
     63      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN) && defined(CFA_HAVE_IOSQE_ASYNC)
     64                #define REGULAR_FLAGS (IOSQE_IO_DRAIN | IOSQE_ASYNC)
     65        #elif defined(CFA_HAVE_IOSQE_FIXED_FILE)
     66                #define REGULAR_FLAGS (IOSQE_FIXED_FILE)
     67      #elif defined(CFA_HAVE_IOSQE_IO_DRAIN)
     68                #define REGULAR_FLAGS (IOSQE_IO_DRAIN)
     69      #elif defined(CFA_HAVE_IOSQE_ASYNC)
     70                #define REGULAR_FLAGS (IOSQE_ASYNC)
     71        #else
     72                #define REGULAR_FLAGS (0)
     73        #endif
     74
     75        #if defined(CFA_HAVE_IOSQE_IO_LINK) && defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     76                #define LINK_FLAGS (IOSQE_IO_LINK | IOSQE_IO_HARDLINK)
     77        #elif defined(CFA_HAVE_IOSQE_IO_LINK)
     78                #define LINK_FLAGS (IOSQE_IO_LINK)
     79        #elif defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     80                #define LINK_FLAGS (IOSQE_IO_HARDLINK)
     81        #else
     82                #define LINK_FLAGS (0)
     83        #endif
     84
     85        #if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
     86                #define SPLICE_FLAGS (SPLICE_F_FD_IN_FIXED)
     87        #else
     88                #define SPLICE_FLAGS (0)
     89        #endif
     90
     91
    5492        #define __submit_prelude \
     93                if( 0 != (submit_flags & LINK_FLAGS) ) { errno = ENOTSUP; return -1; } \
     94                (void)timeout; (void)cancellation; \
     95                if( !context ) context = __get_io_context(); \
    5596                __io_user_data_t data = { 0, active_thread() }; \
    56                 struct __io_data & ring = *data.thrd->curr_cluster->io; \
     97                struct __io_data & ring = *context->thrd.ring; \
    5798                struct io_uring_sqe * sqe; \
    5899                uint32_t idx; \
    59                 [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data );
     100                [sqe, idx] = __submit_alloc( ring, (uint64_t)(uintptr_t)&data ); \
     101                sqe->flags = REGULAR_FLAGS & submit_flags;
    60102
    61103        #define __submit_wait \
    62104                /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \
    63105                verify( sqe->user_data == (uint64_t)(uintptr_t)&data ); \
    64                 __submit( ring, idx ); \
     106                __submit( context, idx ); \
    65107                park( __cfaabi_dbg_ctx ); \
     108                if( data.result < 0 ) { \
     109                        errno = -data.result; \
     110                        return -1; \
     111                } \
    66112                return data.result;
    67113#endif
     
    70116// I/O Forwards
    71117//=============================================================================================
     118#include <time.hfa>
    72119
    73120// Some forward declarations
     
    121168// Asynchronous operations
    122169#if defined(HAVE_PREADV2)
    123         ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     170        ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    124171                #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
    125172                        return preadv2(fd, iov, iovcnt, offset, flags);
     
    132179                #endif
    133180        }
    134 
    135         ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    136                 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READV)
    137                         return preadv2(fd, iov, iovcnt, offset, flags);
     181#endif
     182
     183#if defined(HAVE_PWRITEV2)
     184        ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
     185                #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
     186                        return pwritev2(fd, iov, iovcnt, offset, flags);
    138187                #else
    139188                        __submit_prelude
    140189
    141                         (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
    142                         sqe->flags |= IOSQE_FIXED_FILE;
     190                        (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    143191
    144192                        __submit_wait
     
    147195#endif
    148196
    149 #if defined(HAVE_PWRITEV2)
    150         ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
    151                 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITEV)
    152                         return pwritev2(fd, iov, iovcnt, offset, flags);
    153                 #else
    154                         __submit_prelude
    155 
    156                         (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset };
    157 
    158                         __submit_wait
    159                 #endif
    160         }
    161 #endif
    162 
    163 int cfa_fsync(int fd) {
     197int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    164198        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FSYNC)
    165199                return fsync(fd);
     
    173207}
    174208
    175 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags) {
     209int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    176210        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE)
    177211                return sync_file_range(fd, offset, nbytes, flags);
     
    189223
    190224
    191 ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags) {
     225ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    192226        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SENDMSG)
    193227                return sendmsg(sockfd, msg, flags);
     
    202236}
    203237
    204 ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags) {
     238ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    205239        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECVMSG)
    206240                return recvmsg(sockfd, msg, flags);
     
    215249}
    216250
    217 ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags) {
     251ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    218252        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SEND)
    219253                return send( sockfd, buf, len, flags );
     
    230264}
    231265
    232 ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags) {
     266ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    233267        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_RECV)
    234268                return recv( sockfd, buf, len, flags );
     
    245279}
    246280
    247 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {
     281int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    248282        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ACCEPT)
    249283                return accept4( sockfd, addr, addrlen, flags );
     
    260294}
    261295
    262 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) {
     296int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    263297        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CONNECT)
    264298                return connect( sockfd, addr, addrlen );
     
    274308}
    275309
    276 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len) {
     310int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    277311        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE)
    278312                return fallocate( fd, mode, offset, len );
     
    289323}
    290324
    291 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice) {
     325int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    292326        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE)
    293327                return posix_fadvise( fd, offset, len, advice );
     
    304338}
    305339
    306 int cfa_madvise(void *addr, size_t length, int advice) {
     340int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    307341        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_MADVISE)
    308342                return madvise( addr, length, advice );
     
    319353}
    320354
    321 int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode) {
     355int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    322356        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_OPENAT)
    323357                return openat( dirfd, pathname, flags, mode );
     
    334368}
    335369
    336 int cfa_close(int fd) {
     370int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    337371        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_CLOSE)
    338372                return close( fd );
     
    348382// Forward declare in case it is not supported
    349383struct statx;
    350 int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf) {
     384int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    351385        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_STATX)
    352386                #if defined(__NR_statx)
     
    360394
    361395                (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (uint64_t)statxbuf };
    362                 sqe->flags = flags;
    363 
    364                 __submit_wait
    365         #endif
    366 }
    367 
    368 ssize_t cfa_read(int fd, void *buf, size_t count) {
     396                sqe->statx_flags = flags;
     397
     398                __submit_wait
     399        #endif
     400}
     401
     402ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    369403        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_READ)
    370404                return read( fd, buf, count );
     
    378412}
    379413
    380 ssize_t cfa_write(int fd, void *buf, size_t count) {
     414ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    381415        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_WRITE)
    382416                return read( fd, buf, count );
     
    390424}
    391425
    392 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
     426ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    393427        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
    394428                return splice( fd_in, off_in, fd_out, off_out, len, flags );
     
    399433                sqe->splice_fd_in  = fd_in;
    400434                sqe->splice_off_in = off_in;
    401                 sqe->splice_flags  = flags;
    402 
    403                 __submit_wait
    404         #endif
    405 }
    406 
    407 ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int in_flags, int out_flags) {
    408         #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SPLICE)
    409                 return splice( fd_in, off_in, fd_out, off_out, len, flags );
    410         #else
    411                 __submit_prelude
    412 
    413                 (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out };
    414                 sqe->splice_fd_in  = fd_in;
    415                 sqe->splice_off_in = off_in;
    416                 sqe->splice_flags  = flags | out_flags;
    417                 sqe->flags = in_flags;
    418 
    419                 __submit_wait
    420         #endif
    421 }
    422 
    423 ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) {
     435                sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
     436
     437                __submit_wait
     438        #endif
     439}
     440
     441ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {
    424442        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_TEE)
    425443                return tee( fd_in, fd_out, len, flags );
     
    429447                (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 };
    430448                sqe->splice_fd_in = fd_in;
    431                 sqe->splice_flags = flags;
     449                sqe->splice_flags  = flags | (SPLICE_FLAGS & submit_flags);
    432450
    433451                __submit_wait
     
    536554
    537555                if( /*func == (fptr_t)splice || */
    538                         func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int))cfa_splice,
    539                         func == (fptr_t)(ssize_t (*)(int, loff_t *, int, loff_t *, size_t, unsigned int, int, int))cfa_splice )
     556                        func == (fptr_t)cfa_splice )
    540557                        #define _CFA_IO_FEATURE_CFA_HAVE_IORING_OP_SPLICE ,
    541558                        return IS_DEFINED(CFA_HAVE_IORING_OP_SPLICE);
  • libcfa/src/concurrency/iofwd.hfa

    re0f93e0 rf00b26d4  
    1919extern "C" {
    2020        #include <sys/types.h>
     21        #if CFA_HAVE_LINUX_IO_URING_H
     22                #include <linux/io_uring.h>
     23        #endif
    2124}
    2225#include "bits/defs.hfa"
     26#include "time.hfa"
     27
     28#if defined(CFA_HAVE_IOSQE_FIXED_FILE)
     29        #define CFA_IO_FIXED_FD1 IOSQE_FIXED_FILE
     30#endif
     31#if defined(CFA_HAVE_SPLICE_F_FD_IN_FIXED)
     32        #define CFA_IO_FIXED_FD2 SPLICE_F_FD_IN_FIXED
     33#endif
     34#if defined(CFA_HAVE_IOSQE_IO_DRAIN)
     35        #define CFA_IO_DRAIN IOSQE_IO_DRAIN
     36#endif
     37#if defined(CFA_HAVE_IOSQE_ASYNC)
     38        #define CFA_IO_ASYNC IOSQE_ASYNC
     39#endif
     40
     41struct cluster;
     42struct io_context;
     43struct io_cancellation;
    2344
    2445struct iovec;
     
    2748struct statx;
    2849
    29 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    30 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
    31 extern int cfa_fsync(int fd);
    32 extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags);
    33 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags);
    34 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags);
    35 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags);
    36 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags);
    37 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    38 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
    39 extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len);
    40 extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice);
    41 extern int cfa_madvise(void *addr, size_t length, int advice);
    42 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode);
    43 extern int cfa_close(int fd);
    44 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf);
    45 extern ssize_t cfa_read(int fd, void *buf, size_t count);
    46 extern ssize_t cfa_write(int fd, void *buf, size_t count);
    47 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
    48 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags);
     50extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     51extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     52extern int cfa_fsync(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     53extern int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     54extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     55extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     56extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     57extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     58extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     59extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     60extern int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     61extern int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     62extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     63extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     64extern int cfa_close(int fd, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     65extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     66extern ssize_t cfa_read(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     67extern ssize_t cfa_write(int fd, void *buf, size_t count, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     68extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
     69extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags = 0, Duration timeout = -1`s, io_cancellation * cancellation = 0p, io_context * context = 0p);
    4970
    5071//-----------------------------------------------------------------------------
    5172// Check if a function is blocks a only the user thread
    5273bool has_user_level_blocking( fptr_t func );
     74
     75//-----------------------------------------------------------------------------
     76void register_fixed_files( io_context & ctx , int * files, unsigned count );
     77void register_fixed_files( cluster    & cltr, int * files, unsigned count );
  • libcfa/src/concurrency/kernel.cfa

    re0f93e0 rf00b26d4  
    130130KERNEL_STORAGE($thread,              mainThread);
    131131KERNEL_STORAGE(__stack_t,            mainThreadCtx);
     132KERNEL_STORAGE(io_context,           mainPollerThread);
    132133KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
    133134#if !defined(__CFA_NO_STATISTICS__)
     
    310311}
    311312
    312 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) {
     313void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params) with( this ) {
    313314        this.name = name;
    314315        this.preemption_rate = preemption_rate;
     
    335336        ready_mutate_unlock( last_size );
    336337
    337 
    338         __kernel_io_startup( this, io_flags, &this == mainCluster );
     338        this.io.cnt  = num_io;
     339        this.io.ctxs = aalloc(num_io);
     340        for(i; this.io.cnt) {
     341                (this.io.ctxs[i]){ this, io_params };
     342        }
    339343}
    340344
    341345void ^?{}(cluster & this) {
    342         __kernel_io_shutdown( this, &this == mainCluster );
     346        for(i; this.io.cnt) {
     347                ^(this.io.ctxs[i]){ true };
     348        }
     349        free(this.io.ctxs);
    343350
    344351        // Lock the RWlock so no-one pushes/pops while we are changing the queue
     
    853860        // Initialize the main cluster
    854861        mainCluster = (cluster *)&storage_mainCluster;
    855         (*mainCluster){"Main Cluster"};
     862        (*mainCluster){"Main Cluster", 0};
    856863
    857864        __cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n");
     
    901908        #endif
    902909
     910        // Start IO
     911        __kernel_io_startup();
     912
    903913        // Enable preemption
    904914        kernel_start_preemption();
     
    918928
    919929        // Now that the system is up, finish creating systems that need threading
    920         __kernel_io_finish_start( *mainCluster );
    921 
     930        mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;
     931        mainCluster->io.cnt  = 1;
     932        (*mainCluster->io.ctxs){ *mainCluster };
    922933
    923934        __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
     
    930941static void __kernel_shutdown(void) {
    931942        //Before we start shutting things down, wait for systems that need threading to shutdown
    932         __kernel_io_prepare_stop( *mainCluster );
     943        ^(*mainCluster->io.ctxs){};
     944        mainCluster->io.cnt  = 0;
     945        mainCluster->io.ctxs = 0p;
    933946
    934947        /* paranoid */ verify( TL_GET( preemption_state.enabled ) );
     
    949962        // Disable preemption
    950963        kernel_stop_preemption();
     964
     965        // Stop IO
     966        __kernel_io_shutdown();
    951967
    952968        // Destroy the main processor and its context in reverse order of construction
  • libcfa/src/concurrency/kernel.hfa

    re0f93e0 rf00b26d4  
    129129struct __io_data;
    130130
    131 #define CFA_CLUSTER_IO_POLLER_USER_THREAD    (1 << 0) // 0x01
    132 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS (1 << 1) // 0x02
    133 #define CFA_CLUSTER_IO_EAGER_SUBMITS         (1 << 2) // 0x04
    134 #define CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS   (1 << 3) // 0x08
    135 #define CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES (1 << 4) // 0x10
    136 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET        16
    137 
     131// IO poller user-thread
     132// Not using the "thread" keyword because we want to control
     133// more carefully when to start/stop it
     134struct $io_ctx_thread {
     135        struct __io_data * ring;
     136        single_sem sem;
     137        volatile bool done;
     138        $thread self;
     139};
     140
     141
     142struct io_context {
     143        $io_ctx_thread thrd;
     144};
     145
     146struct io_context_params {
     147        int num_entries;
     148        int num_ready;
     149        int submit_aff;
     150        bool eager_submits:1;
     151        bool poller_submits:1;
     152        bool poll_submit:1;
     153        bool poll_complete:1;
     154};
     155
     156void  ?{}(io_context_params & this);
     157
     158void  ?{}(io_context & this, struct cluster & cl);
     159void  ?{}(io_context & this, struct cluster & cl, const io_context_params & params);
     160void ^?{}(io_context & this);
     161
     162struct io_cancellation {
     163        uint32_t target;
     164};
     165
     166static inline void  ?{}(io_cancellation & this) { this.target = -1u; }
     167static inline void ^?{}(io_cancellation & this) {}
     168bool cancel(io_cancellation & this);
    138169
    139170//-----------------------------------------------------------------------------
     
    206237        } node;
    207238
    208         struct __io_data * io;
     239        struct {
     240                io_context * ctxs;
     241                unsigned cnt;
     242        } io;
    209243
    210244        #if !defined(__CFA_NO_STATISTICS__)
     
    215249extern Duration default_preemption();
    216250
    217 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags);
     251void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned num_io, const io_context_params & io_params);
    218252void ^?{}(cluster & this);
    219253
    220 static inline void ?{} (cluster & this)                                           { this{"Anonymous Cluster", default_preemption(), 0}; }
    221 static inline void ?{} (cluster & this, Duration preemption_rate)                 { this{"Anonymous Cluster", preemption_rate, 0}; }
    222 static inline void ?{} (cluster & this, const char name[])                        { this{name, default_preemption(), 0}; }
    223 static inline void ?{} (cluster & this, unsigned flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
    224 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
    225 static inline void ?{} (cluster & this, const char name[], unsigned flags)        { this{name, default_preemption(), flags}; }
     254static inline void ?{} (cluster & this)                                            { io_context_params default_params;    this{"Anonymous Cluster", default_preemption(), 1, default_params}; }
     255static inline void ?{} (cluster & this, Duration preemption_rate)                  { io_context_params default_params;    this{"Anonymous Cluster", preemption_rate, 1, default_params}; }
     256static inline void ?{} (cluster & this, const char name[])                         { io_context_params default_params;    this{name, default_preemption(), 1, default_params}; }
     257static inline void ?{} (cluster & this, unsigned num_io)                           { io_context_params default_params;    this{"Anonymous Cluster", default_preemption(), num_io, default_params}; }
     258static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io) { io_context_params default_params;    this{"Anonymous Cluster", preemption_rate, num_io, default_params}; }
     259static inline void ?{} (cluster & this, const char name[], unsigned num_io)        { io_context_params default_params;    this{name, default_preemption(), num_io, default_params}; }
     260static inline void ?{} (cluster & this, const io_context_params & io_params)                                            { this{"Anonymous Cluster", default_preemption(), 1, io_params}; }
     261static inline void ?{} (cluster & this, Duration preemption_rate, const io_context_params & io_params)                  { this{"Anonymous Cluster", preemption_rate, 1, io_params}; }
     262static inline void ?{} (cluster & this, const char name[], const io_context_params & io_params)                         { this{name, default_preemption(), 1, io_params}; }
     263static inline void ?{} (cluster & this, unsigned num_io, const io_context_params & io_params)                           { this{"Anonymous Cluster", default_preemption(), num_io, io_params}; }
     264static inline void ?{} (cluster & this, Duration preemption_rate, unsigned num_io, const io_context_params & io_params) { this{"Anonymous Cluster", preemption_rate, num_io, io_params}; }
     265static inline void ?{} (cluster & this, const char name[], unsigned num_io, const io_context_params & io_params)        { this{name, default_preemption(), num_io, io_params}; }
    226266
    227267static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
  • libcfa/src/concurrency/kernel_private.hfa

    re0f93e0 rf00b26d4  
    8484void __unpark( struct __processor_id_t *, $thread * thrd __cfaabi_dbg_ctx_param2 );
    8585
    86 //-----------------------------------------------------------------------------
    87 // I/O
    88 void __kernel_io_startup     ( cluster &, unsigned, bool );
    89 void __kernel_io_finish_start( cluster & );
    90 void __kernel_io_prepare_stop( cluster & );
    91 void __kernel_io_shutdown    ( cluster &, bool );
     86static inline bool __post(single_sem & this, struct __processor_id_t * id) {
     87        for() {
     88                struct $thread * expected = this.ptr;
     89                if(expected == 1p) return false;
     90                if(expected == 0p) {
     91                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     92                                return false;
     93                        }
     94                }
     95                else {
     96                        if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     97                                __unpark( id, expected __cfaabi_dbg_ctx2 );
     98                                return true;
     99                        }
     100                }
     101        }
     102}
    92103
    93104//-----------------------------------------------------------------------------
     
    109120void doregister( struct cluster * cltr, struct $thread & thrd );
    110121void unregister( struct cluster * cltr, struct $thread & thrd );
     122
     123//-----------------------------------------------------------------------------
     124// I/O
     125void __kernel_io_startup     ();
     126void __kernel_io_shutdown    ();
     127
     128static inline io_context * __get_io_context( void ) {
     129        cluster * cltr = active_cluster();
     130        /* paranoid */ verifyf( cltr, "No active cluster for io operation\n");
     131        assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\n", cltr );
     132        /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\n", cltr);
     133        return &cltr->io.ctxs[ __tls_rand() % cltr->io.cnt ];
     134}
     135
     136void ^?{}(io_context & this, bool );
    111137
    112138//=======================================================================
Note: See TracChangeset for help on using the changeset viewer.