Changeset 61dd73d for libcfa


Ignore:
Timestamp:
May 5, 2020, 10:45:18 AM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
f90d10f
Parents:
3c039b0
Message:

Moved io_uring data to io.cfa and create it using dynamic allocation.

Location:
libcfa
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • libcfa/prelude/defines.hfa.in

    r3c039b0 r61dd73d  
    1616#undef HAVE_LINUX_IO_URING_H
    1717
    18 // #define __CFA_IO_POLLING_USER__
    19 // #define __CFA_IO_POLLING_KERNEL__
     18#undef __CFA_NO_STATISTICS__
  • libcfa/src/concurrency/io.cfa

    r3c039b0 r61dd73d  
    8686        #endif
    8787
    88         #if defined(__CFA_IO_POLLING_USER__)
    89                 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    90                         this.ring = &cltr.io;
    91                         this.waiting = true;
    92                         (this.thrd){ "Fast I/O Poller", cltr };
    93                 }
    94                 void ^?{}( __io_poller_fast & mutex this );
    95         void main( __io_poller_fast & this );
    96         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    97                 void ^?{}( __io_poller_fast & mutex this ) {}
    98         #endif
     88        // Fast poller user-thread
     89        // Not using the "thread" keyword because we want to control
     90        // more carefully when to start/stop it
     91        struct __io_poller_fast {
     92                struct __io_data * ring;
     93                bool waiting;
     94                $thread thrd;
     95        };
     96
     97        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
     98                this.ring = cltr.io;
     99                this.waiting = true;
     100                (this.thrd){ "Fast I/O Poller", cltr };
     101        }
     102        void ^?{}( __io_poller_fast & mutex this );
     103        void main( __io_poller_fast & this );
     104        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
     105        void ^?{}( __io_poller_fast & mutex this ) {}
     106
     107        struct __submition_data {
     108                // Head and tail of the ring (associated with array)
     109                volatile uint32_t * head;
     110                volatile uint32_t * tail;
     111
     112                // The actual kernel ring which uses head/tail
     113                // indexes into the sqes arrays
     114                uint32_t * array;
     115
     116                // number of entries and mask to go with it
     117                const uint32_t * num;
     118                const uint32_t * mask;
     119
     120                // Submission flags (Not sure what for)
     121                uint32_t * flags;
     122
     123                // number of sqes not submitted (whatever that means)
     124                uint32_t * dropped;
     125
     126                // Like head/tail but not seen by the kernel
     127                volatile uint32_t alloc;
     128                volatile uint32_t ready;
     129
     130                __spinlock_t lock;
     131
     132                // A buffer of sqes (not the actual ring)
     133                struct io_uring_sqe * sqes;
     134
     135                // The location and size of the mmaped area
     136                void * ring_ptr;
     137                size_t ring_sz;
     138
     139                // Statistics
     140                #if !defined(__CFA_NO_STATISTICS__)
     141                        struct {
     142                                struct {
     143                                        unsigned long long int val;
     144                                        unsigned long long int cnt;
     145                                } submit_avg;
     146                        } stats;
     147                #endif
     148        };
     149
     150        struct __completion_data {
     151                // Head and tail of the ring
     152                volatile uint32_t * head;
     153                volatile uint32_t * tail;
     154
     155                // number of entries and mask to go with it
     156                const uint32_t * mask;
     157                const uint32_t * num;
     158
     159                // number of cqes not submitted (whatever that means)
     160                uint32_t * overflow;
     161
     162                // the kernel ring
     163                struct io_uring_cqe * cqes;
     164
     165                // The location and size of the mmaped area
     166                void * ring_ptr;
     167                size_t ring_sz;
     168
     169                // Statistics
     170                #if !defined(__CFA_NO_STATISTICS__)
     171                        struct {
     172                                struct {
     173                                        unsigned long long int val;
     174                                        unsigned long long int slow_cnt;
     175                                        unsigned long long int fast_cnt;
     176                                } completed_avg;
     177                        } stats;
     178                #endif
     179        };
     180
     181        struct __io_data {
     182                struct __submition_data submit_q;
     183                struct __completion_data completion_q;
     184                uint32_t flags;
     185                int fd;
     186                semaphore submit;
     187                volatile bool done;
     188                struct {
     189                        struct {
     190                                void * stack;
     191                                pthread_t kthrd;
     192                        } slow;
     193                        __io_poller_fast fast;
     194                        __bin_sem_t sem;
     195                } poller;
     196        };
    99197
    100198//=============================================================================================
     
    102200//=============================================================================================
    103201        void __kernel_io_startup( cluster & this, bool main_cluster ) {
     202                this.io = malloc();
     203
    104204                // Step 1 : call to setup
    105205                struct io_uring_params params;
     
    114214
    115215                // Step 2 : mmap result
    116                 memset(&this.io, 0, sizeof(struct io_ring));
    117                 struct io_uring_sq & sq = this.io.submit_q;
    118                 struct io_uring_cq & cq = this.io.completion_q;
     216                memset( this.io, 0, sizeof(struct __io_data) );
     217                struct __submition_data  & sq = this.io->submit_q;
     218                struct __completion_data & cq = this.io->completion_q;
    119219
    120220                // calculate the right ring size
     
    194294
    195295                // Update the global ring info
    196                 this.io.flags = params.flags;
    197                 this.io.fd    = fd;
    198                 this.io.done  = false;
    199                 (this.io.submit){ min(*sq.num, *cq.num) };
     296                this.io->flags = params.flags;
     297                this.io->fd    = fd;
     298                this.io->done  = false;
     299                (this.io->submit){ min(*sq.num, *cq.num) };
    200300
    201301                // Initialize statistics
    202302                #if !defined(__CFA_NO_STATISTICS__)
    203                         this.io.submit_q.stats.submit_avg.val = 0;
    204                         this.io.submit_q.stats.submit_avg.cnt = 0;
    205                         this.io.completion_q.stats.completed_avg.val = 0;
    206                         this.io.completion_q.stats.completed_avg.slow_cnt = 0;
    207                         this.io.completion_q.stats.completed_avg.fast_cnt = 0;
     303                        this.io->submit_q.stats.submit_avg.val = 0;
     304                        this.io->submit_q.stats.submit_avg.cnt = 0;
     305                        this.io->completion_q.stats.completed_avg.val = 0;
     306                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     307                        this.io->completion_q.stats.completed_avg.fast_cnt = 0;
    208308                #endif
    209309
     
    214314
    215315        void __kernel_io_finish_start( cluster & this ) {
    216                 #if defined(__CFA_IO_POLLING_USER__)
    217                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    218                         (this.io.poller.fast){ this };
    219                         __thrd_start( this.io.poller.fast, main );
    220                 #endif
     316                __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
     317                (this.io->poller.fast){ this };
     318                __thrd_start( this.io->poller.fast, main );
    221319
    222320                // Create the poller thread
    223321                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    224                 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
     322                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    225323        }
    226324
     
    228326                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    229327                // Notify the poller thread of the shutdown
    230                 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
     328                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    231329
    232330                // Stop the IO Poller
    233331                sigval val = { 1 };
    234                 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
     332                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
     333                post( this.io->poller.sem );
     334
     335                // Wait for the poller thread to finish
     336                pthread_join( this.io->poller.slow.kthrd, 0p );
     337                free( this.io->poller.slow.stack );
     338
     339                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
     340
    235341                #if defined(__CFA_IO_POLLING_USER__)
    236                         post( this.io.poller.sem );
    237                 #endif
    238 
    239                 // Wait for the poller thread to finish
    240                 pthread_join( this.io.poller.slow.kthrd, 0p );
    241                 free( this.io.poller.slow.stack );
    242 
    243                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    244 
    245                 #if defined(__CFA_IO_POLLING_USER__)
    246                         verify( this.io.poller.fast.waiting );
    247                         verify( this.io.poller.fast.thrd.state == Blocked );
    248 
    249                         this.io.poller.fast.thrd.curr_cluster = mainCluster;
     342                        verify( this.io->poller.fast.waiting );
     343                        verify( this.io->poller.fast.thrd.state == Blocked );
     344
     345                        this.io->poller.fast.thrd.curr_cluster = mainCluster;
    250346
    251347                        // unpark the fast io_poller
    252                         unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
    253 
    254                         ^(this.io.poller.fast){};
     348                        unpark( &this.io->poller.fast.thrd __cfaabi_dbg_ctx2 );
     349
     350                        ^(this.io->poller.fast){};
    255351
    256352                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
     
    266362                #if !defined(__CFA_NO_STATISTICS__)
    267363                        if(this.print_stats) {
    268                                 __cfaabi_bits_print_safe( STDERR_FILENO,
    269                                         "----- I/O uRing Stats -----\n"
    270                                         "- total submit calls  : %llu\n"
    271                                         "- avg submit          : %lf\n"
    272                                         "- total wait calls    : %llu (%llu slow, %llu fast)\n"
    273                                         "- avg completion/wait : %lf\n",
    274                                         this.io.submit_q.stats.submit_avg.cnt,
    275                                         ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt,
    276                                         this.io.completion_q.stats.completed_avg.slow_cnt + this.io.completion_q.stats.completed_avg.fast_cnt,
    277                                         this.io.completion_q.stats.completed_avg.slow_cnt, this.io.completion_q.stats.completed_avg.fast_cnt,
    278                                         ((double)this.io.completion_q.stats.completed_avg.val) / (this.io.completion_q.stats.completed_avg.slow_cnt + this.io.completion_q.stats.completed_avg.fast_cnt)
    279                                 );
     364                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
     365                                        __cfaabi_bits_print_safe( STDERR_FILENO,
     366                                                "----- I/O uRing Stats -----\n"
     367                                                "- total submit calls  : %llu\n"
     368                                                "- avg submit          : %lf\n"
     369                                                "- total wait calls    : %llu (%llu slow, %llu fast)\n"
     370                                                "- avg completion/wait : %lf\n",
     371                                                submit_avg.cnt,
     372                                                ((double)submit_avg.val) / submit_avg.cnt,
     373                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
     374                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     375                                                ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)
     376                                        );
     377                                }
    280378                        }
    281379                #endif
    282380
    283381                // Shutdown the io rings
    284                 struct io_uring_sq & sq = this.io.submit_q;
    285                 struct io_uring_cq & cq = this.io.completion_q;
     382                struct __submition_data  & sq = this.io->submit_q;
     383                struct __completion_data & cq = this.io->completion_q;
    286384
    287385                // unmap the submit queue entries
     
    297395
    298396                // close the file descriptor
    299                 close(this.io.fd);
     397                close(this.io->fd);
     398
     399                free( this.io );
    300400        }
    301401
     
    310410        // Process a single completion message from the io_uring
    311411        // This is NOT thread-safe
    312         static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     412        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
    313413                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    314414                if( ret < 0 ) {
     
    358458        static void * __io_poller_slow( void * arg ) {
    359459                cluster * cltr = (cluster *)arg;
    360                 struct io_ring & ring = cltr->io;
     460                struct __io_data & ring = *cltr->io;
    361461
    362462                sigset_t mask;
     
    411511        }
    412512
    413         #if defined(__CFA_IO_POLLING_USER__)
    414                 void main( __io_poller_fast & this ) {
    415                         // Start parked
    416                         park( __cfaabi_dbg_ctx );
    417 
    418                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
    419 
    420                         // Then loop until we need to start
    421                         while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    422                                 // Drain the io
    423                                 this.waiting = false;
    424                                 int count = __drain_io( *this.ring, 0p, 0, false );
    425 
    426                                 // Update statistics
    427                                 #if !defined(__CFA_NO_STATISTICS__)
    428                                         this.ring->completion_q.stats.completed_avg.val += count;
    429                                         this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
    430                                 #endif
    431 
    432                                 this.waiting = true;
    433                                 if(0 > count) {
    434                                         // If we got something, just yield and check again
    435                                         yield();
    436                                 }
    437                                 else {
    438                                         // We didn't get anything baton pass to the slow poller
    439                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
    440                                         post( this.ring->poller.sem );
    441                                         park( __cfaabi_dbg_ctx );
    442                                 }
     513        void main( __io_poller_fast & this ) {
     514                // Start parked
     515                park( __cfaabi_dbg_ctx );
     516
     517                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
     518
     519                // Then loop until we need to start
     520                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     521                        // Drain the io
     522                        this.waiting = false;
     523                        int count = __drain_io( *this.ring, 0p, 0, false );
     524
     525                        // Update statistics
     526                        #if !defined(__CFA_NO_STATISTICS__)
     527                                this.ring->completion_q.stats.completed_avg.val += count;
     528                                this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
     529                        #endif
     530
     531                        this.waiting = true;
     532                        if(0 > count) {
     533                                // If we got something, just yield and check again
     534                                yield();
    443535                        }
    444 
    445                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    446                 }
    447         #endif
     536                        else {
     537                                // We didn't get anything baton pass to the slow poller
     538                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     539                                post( this.ring->poller.sem );
     540                                park( __cfaabi_dbg_ctx );
     541                        }
     542                }
     543
     544                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     545        }
    448546
    449547//=============================================================================================
     
    475573//
    476574
    477         static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {
     575        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
    478576                // Wait for a spot to be available
    479577                P(ring.submit);
     
    493591        }
    494592
    495         static inline void __submit( struct io_ring & ring, uint32_t idx ) {
     593        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    496594                // get mutual exclusion
    497595                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     
    554652
    555653        #define __submit_prelude \
    556                 struct io_ring & ring = active_cluster()->io; \
     654                struct __io_data & ring = *active_cluster()->io; \
    557655                struct io_uring_sqe * sqe; \
    558656                uint32_t idx; \
  • libcfa/src/concurrency/kernel.hfa

    r3c039b0 r61dd73d  
    114114//-----------------------------------------------------------------------------
    115115// I/O
    116 #if defined(HAVE_LINUX_IO_URING_H)
    117 struct io_uring_sq {
    118         // Head and tail of the ring (associated with array)
    119         volatile uint32_t * head;
    120         volatile uint32_t * tail;
    121 
    122         // The actual kernel ring which uses head/tail
    123         // indexes into the sqes arrays
    124         uint32_t * array;
    125 
    126         // number of entries and mask to go with it
    127         const uint32_t * num;
    128         const uint32_t * mask;
    129 
    130         // Submission flags (Not sure what for)
    131         uint32_t * flags;
    132 
    133         // number of sqes not submitted (whatever that means)
    134         uint32_t * dropped;
    135 
    136         // Like head/tail but not seen by the kernel
    137         volatile uint32_t alloc;
    138         volatile uint32_t ready;
    139 
    140         __spinlock_t lock;
    141 
    142         // A buffer of sqes (not the actual ring)
    143         struct io_uring_sqe * sqes;
    144 
    145         // The location and size of the mmaped area
    146         void * ring_ptr;
    147         size_t ring_sz;
    148 
    149         // Statistics
    150         #if !defined(__CFA_NO_STATISTICS__)
    151                 struct {
    152                         struct {
    153                                 unsigned long long int val;
    154                                 unsigned long long int cnt;
    155                         } submit_avg;
    156                 } stats;
    157         #endif
    158 };
    159 
    160 struct io_uring_cq {
    161         // Head and tail of the ring
    162         volatile uint32_t * head;
    163         volatile uint32_t * tail;
    164 
    165         // number of entries and mask to go with it
    166         const uint32_t * mask;
    167         const uint32_t * num;
    168 
    169         // number of cqes not submitted (whatever that means)
    170         uint32_t * overflow;
    171 
    172         // the kernel ring
    173         struct io_uring_cqe * cqes;
    174 
    175         // The location and size of the mmaped area
    176         void * ring_ptr;
    177         size_t ring_sz;
    178 
    179         // Statistics
    180         #if !defined(__CFA_NO_STATISTICS__)
    181                 struct {
    182                         struct {
    183                                 unsigned long long int val;
    184                                 unsigned long long int slow_cnt;
    185                                 unsigned long long int fast_cnt;
    186                         } completed_avg;
    187                 } stats;
    188         #endif
    189 };
    190 
    191 #if defined(__CFA_IO_POLLING_USER__)
    192         struct __io_poller_fast {
    193                 struct io_ring * ring;
    194                 bool waiting;
    195                 $thread thrd;
    196         };
    197 #endif
    198 
    199 struct io_ring {
    200         struct io_uring_sq submit_q;
    201         struct io_uring_cq completion_q;
    202         uint32_t flags;
    203         int fd;
    204         semaphore submit;
    205         volatile bool done;
    206         struct {
    207                 struct {
    208                         void * stack;
    209                         pthread_t kthrd;
    210                 } slow;
    211                 #if defined(__CFA_IO_POLLING_USER__)
    212                         __io_poller_fast fast;
    213                         __bin_sem_t sem;
    214                 #endif
    215         } poller;
    216 };
    217 #endif
     116struct __io_data;
    218117
    219118//-----------------------------------------------------------------------------
     
    249148        } node;
    250149
    251         #if defined(HAVE_LINUX_IO_URING_H)
    252                 struct io_ring io;
    253         #endif
     150        struct __io_data * io;
    254151
    255152        #if !defined(__CFA_NO_STATISTICS__)
Note: See TracChangeset for help on using the changeset viewer.