Changes in / [d3ab183:f90d10f]


Ignore:
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    rd3ab183 rf90d10f  
    1616#include <thread.hfa>
    1717#include <time.hfa>
    18 
    19 #if !defined(HAVE_LINUX_IO_URING_H)
    20 #warning no io uring
    21 #endif
    2218
    2319extern bool traceHeapOn();
  • libcfa/prelude/defines.hfa.in

    rd3ab183 rf90d10f  
    1616#undef HAVE_LINUX_IO_URING_H
    1717
    18 // #define __CFA_IO_POLLING_USER__
    19 // #define __CFA_IO_POLLING_KERNEL__
     18#undef __CFA_NO_STATISTICS__
  • libcfa/src/concurrency/io.cfa

    rd3ab183 rf90d10f  
    8686        #endif
    8787
    88         #if defined(__CFA_IO_POLLING_USER__)
    89                 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    90                         this.ring = &cltr.io;
    91                         (this.thrd){ "Fast I/O Poller", cltr };
    92                 }
    93                 void ^?{}( __io_poller_fast & mutex this );
    94         void main( __io_poller_fast & this );
    95         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    96                 void ^?{}( __io_poller_fast & mutex this ) {}
    97         #endif
     88        // Fast poller user-thread
     89        // Not using the "thread" keyword because we want to control
     90        // more carefully when to start/stop it
     91        struct __io_poller_fast {
     92                struct __io_data * ring;
     93                bool waiting;
     94                $thread thrd;
     95        };
     96
     97        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
     98                this.ring = cltr.io;
     99                this.waiting = true;
     100                (this.thrd){ "Fast I/O Poller", cltr };
     101        }
     102        void ^?{}( __io_poller_fast & mutex this );
     103        void main( __io_poller_fast & this );
     104        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
     105        void ^?{}( __io_poller_fast & mutex this ) {}
     106
     107        struct __submition_data {
     108                // Head and tail of the ring (associated with array)
     109                volatile uint32_t * head;
     110                volatile uint32_t * tail;
     111
     112                // The actual kernel ring which uses head/tail
     113                // indexes into the sqes arrays
     114                uint32_t * array;
     115
     116                // number of entries and mask to go with it
     117                const uint32_t * num;
     118                const uint32_t * mask;
     119
     120                // Submission flags (Not sure what for)
     121                uint32_t * flags;
     122
     123                // number of sqes not submitted (whatever that means)
     124                uint32_t * dropped;
     125
     126                // Like head/tail but not seen by the kernel
     127                volatile uint32_t alloc;
     128                volatile uint32_t ready;
     129
     130                __spinlock_t lock;
     131
     132                // A buffer of sqes (not the actual ring)
     133                struct io_uring_sqe * sqes;
     134
     135                // The location and size of the mmaped area
     136                void * ring_ptr;
     137                size_t ring_sz;
     138
     139                // Statistics
     140                #if !defined(__CFA_NO_STATISTICS__)
     141                        struct {
     142                                struct {
     143                                        unsigned long long int val;
     144                                        unsigned long long int cnt;
     145                                } submit_avg;
     146                        } stats;
     147                #endif
     148        };
     149
     150        struct __completion_data {
     151                // Head and tail of the ring
     152                volatile uint32_t * head;
     153                volatile uint32_t * tail;
     154
     155                // number of entries and mask to go with it
     156                const uint32_t * mask;
     157                const uint32_t * num;
     158
     159                // number of cqes not submitted (whatever that means)
     160                uint32_t * overflow;
     161
     162                // the kernel ring
     163                struct io_uring_cqe * cqes;
     164
     165                // The location and size of the mmaped area
     166                void * ring_ptr;
     167                size_t ring_sz;
     168
     169                // Statistics
     170                #if !defined(__CFA_NO_STATISTICS__)
     171                        struct {
     172                                struct {
     173                                        unsigned long long int val;
     174                                        unsigned long long int slow_cnt;
     175                                        unsigned long long int fast_cnt;
     176                                } completed_avg;
     177                        } stats;
     178                #endif
     179        };
     180
     181        struct __io_data {
     182                struct __submition_data submit_q;
     183                struct __completion_data completion_q;
     184                uint32_t flags;
     185                int fd;
     186                semaphore submit;
     187                volatile bool done;
     188                struct {
     189                        struct {
     190                                void * stack;
     191                                pthread_t kthrd;
     192                        } slow;
     193                        __io_poller_fast fast;
     194                        __bin_sem_t sem;
     195                } poller;
     196        };
    98197
    99198//=============================================================================================
     
    101200//=============================================================================================
    102201        void __kernel_io_startup( cluster & this, bool main_cluster ) {
     202                this.io = malloc();
     203
    103204                // Step 1 : call to setup
    104205                struct io_uring_params params;
     
    113214
    114215                // Step 2 : mmap result
    115                 memset(&this.io, 0, sizeof(struct io_ring));
    116                 struct io_uring_sq & sq = this.io.submit_q;
    117                 struct io_uring_cq & cq = this.io.completion_q;
     216                memset( this.io, 0, sizeof(struct __io_data) );
     217                struct __submition_data  & sq = this.io->submit_q;
     218                struct __completion_data & cq = this.io->completion_q;
    118219
    119220                // calculate the right ring size
     
    193294
    194295                // Update the global ring info
    195                 this.io.flags = params.flags;
    196                 this.io.fd    = fd;
    197                 this.io.done  = false;
    198                 (this.io.submit){ min(*sq.num, *cq.num) };
     296                this.io->flags = params.flags;
     297                this.io->fd    = fd;
     298                this.io->done  = false;
     299                (this.io->submit){ min(*sq.num, *cq.num) };
    199300
    200301                // Initialize statistics
    201302                #if !defined(__CFA_NO_STATISTICS__)
    202                         this.io.submit_q.stats.submit_avg.val = 0;
    203                         this.io.submit_q.stats.submit_avg.cnt = 0;
    204                         this.io.completion_q.stats.completed_avg.val = 0;
    205                         this.io.completion_q.stats.completed_avg.cnt = 0;
     303                        this.io->submit_q.stats.submit_avg.val = 0;
     304                        this.io->submit_q.stats.submit_avg.cnt = 0;
     305                        this.io->completion_q.stats.completed_avg.val = 0;
     306                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     307                        this.io->completion_q.stats.completed_avg.fast_cnt = 0;
    206308                #endif
    207309
     
    212314
    213315        void __kernel_io_finish_start( cluster & this ) {
    214                 #if defined(__CFA_IO_POLLING_USER__)
    215                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    216                         (this.io.poller.fast){ this };
    217                         __thrd_start( this.io.poller.fast, main );
    218                 #endif
     316                __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
     317                (this.io->poller.fast){ this };
     318                __thrd_start( this.io->poller.fast, main );
    219319
    220320                // Create the poller thread
    221321                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    222                 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
     322                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    223323        }
    224324
     
    226326                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    227327                // Notify the poller thread of the shutdown
    228                 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
     328                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    229329
    230330                // Stop the IO Poller
    231331                sigval val = { 1 };
    232                 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
     332                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
     333                post( this.io->poller.sem );
     334
     335                // Wait for the poller thread to finish
     336                pthread_join( this.io->poller.slow.kthrd, 0p );
     337                free( this.io->poller.slow.stack );
     338
     339                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
     340
    233341                #if defined(__CFA_IO_POLLING_USER__)
    234                         post( this.io.poller.sem );
    235                 #endif
    236 
    237                 // Wait for the poller thread to finish
    238                 pthread_join( this.io.poller.slow.kthrd, 0p );
    239                 free( this.io.poller.slow.stack );
    240 
    241                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    242 
    243                 #if defined(__CFA_IO_POLLING_USER__)
     342                        verify( this.io->poller.fast.waiting );
     343                        verify( this.io->poller.fast.thrd.state == Blocked );
     344
     345                        this.io->poller.fast.thrd.curr_cluster = mainCluster;
     346
    244347                        // unpark the fast io_poller
    245                         unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
    246 
    247                         ^(this.io.poller.fast){};
     348                        unpark( &this.io->poller.fast.thrd __cfaabi_dbg_ctx2 );
     349
     350                        ^(this.io->poller.fast){};
    248351
    249352                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
     
    259362                #if !defined(__CFA_NO_STATISTICS__)
    260363                        if(this.print_stats) {
    261                                 __cfaabi_bits_print_safe( STDERR_FILENO,
    262                                         "----- I/O uRing Stats -----\n"
    263                                         "- total submit calls  : %llu\n"
    264                                         "- avg submit          : %lf\n"
    265                                         "- total wait calls    : %llu\n"
    266                                         "- avg completion/wait : %lf\n",
    267                                         this.io.submit_q.stats.submit_avg.cnt,
    268                                         ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt,
    269                                         this.io.completion_q.stats.completed_avg.cnt,
    270                                         ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt
    271                                 );
     364                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
     365                                        __cfaabi_bits_print_safe( STDERR_FILENO,
     366                                                "----- I/O uRing Stats -----\n"
     367                                                "- total submit calls  : %llu\n"
     368                                                "- avg submit          : %lf\n"
     369                                                "- total wait calls    : %llu (%llu slow, %llu fast)\n"
     370                                                "- avg completion/wait : %lf\n",
     371                                                submit_avg.cnt,
     372                                                ((double)submit_avg.val) / submit_avg.cnt,
     373                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
     374                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     375                                                ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)
     376                                        );
     377                                }
    272378                        }
    273379                #endif
    274380
    275381                // Shutdown the io rings
    276                 struct io_uring_sq & sq = this.io.submit_q;
    277                 struct io_uring_cq & cq = this.io.completion_q;
     382                struct __submition_data  & sq = this.io->submit_q;
     383                struct __completion_data & cq = this.io->completion_q;
    278384
    279385                // unmap the submit queue entries
     
    289395
    290396                // close the file descriptor
    291                 close(this.io.fd);
     397                close(this.io->fd);
     398
     399                free( this.io );
    292400        }
    293401
     
    302410        // Process a single completion message from the io_uring
    303411        // This is NOT thread-safe
    304         static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     412        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
    305413                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    306414                if( ret < 0 ) {
     
    320428                // Nothing was new return 0
    321429                if (head == tail) {
    322                         #if !defined(__CFA_NO_STATISTICS__)
    323                                 ring.completion_q.stats.completed_avg.cnt += 1;
    324                         #endif
    325430                        return 0;
    326431                }
     
    348453                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    349454
    350                 // Update statistics
    351                 #if !defined(__CFA_NO_STATISTICS__)
    352                         ring.completion_q.stats.completed_avg.val += count;
    353                         ring.completion_q.stats.completed_avg.cnt += 1;
    354                 #endif
    355 
    356455                return count;
    357456        }
     
    359458        static void * __io_poller_slow( void * arg ) {
    360459                cluster * cltr = (cluster *)arg;
    361                 struct io_ring & ring = cltr->io;
     460                struct __io_data & ring = *cltr->io;
    362461
    363462                sigset_t mask;
     
    372471                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    373472
     473                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
     474
    374475                while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    375476                        #if defined(__CFA_IO_POLLING_USER__)
     
    378479                                // batton pass to the user-thread
    379480                                int count = __drain_io( ring, &mask, 1, true );
     481
     482                                // Update statistics
     483                                #if !defined(__CFA_NO_STATISTICS__)
     484                                        ring.completion_q.stats.completed_avg.val += count;
     485                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
     486                                #endif
     487
    380488                                if(count > 0) {
    381489                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
     
    387495
    388496                                //In the naive approach, just poll the io completion queue directly
    389                                 __drain_io( ring, &mask, 1, true );
     497                                int count = __drain_io( ring, &mask, 1, true );
     498
     499                                // Update statistics
     500                                #if !defined(__CFA_NO_STATISTICS__)
     501                                        ring.completion_q.stats.completed_avg.val += count;
     502                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
     503                                #endif
    390504
    391505                        #endif
    392506                }
    393507
     508                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
     509
    394510                return 0p;
    395511        }
    396512
    397         #if defined(__CFA_IO_POLLING_USER__)
    398                 void main( __io_poller_fast & this ) {
    399                         // Start parked
    400                         park( __cfaabi_dbg_ctx );
    401 
    402                         // Then loop until we need to start
    403                         while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    404                                 // Drain the io
    405                                 if(0 > __drain_io( *this.ring, 0p, 0, false )) {
    406                                         // If we got something, just yield and check again
    407                                         yield();
    408                                 }
    409                                 else {
    410                                         // We didn't get anything baton pass to the slow poller
    411                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
    412                                         post( this.ring->poller.sem );
    413                                         park( __cfaabi_dbg_ctx );
    414                                 }
     513        void main( __io_poller_fast & this ) {
     514                // Start parked
     515                park( __cfaabi_dbg_ctx );
     516
     517                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
     518
     519                // Then loop until we need to start
     520                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     521                        // Drain the io
     522                        this.waiting = false;
     523                        int count = __drain_io( *this.ring, 0p, 0, false );
     524
     525                        // Update statistics
     526                        #if !defined(__CFA_NO_STATISTICS__)
     527                                this.ring->completion_q.stats.completed_avg.val += count;
     528                                this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
     529                        #endif
     530
     531                        this.waiting = true;
     532                        if(0 > count) {
     533                                // If we got something, just yield and check again
     534                                yield();
    415535                        }
    416                 }
    417         #endif
     536                        else {
     537                                // We didn't get anything baton pass to the slow poller
     538                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     539                                post( this.ring->poller.sem );
     540                                park( __cfaabi_dbg_ctx );
     541                        }
     542                }
     543
     544                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     545        }
    418546
    419547//=============================================================================================
     
    445573//
    446574
    447         static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {
     575        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
    448576                // Wait for a spot to be available
    449577                P(ring.submit);
     
    463591        }
    464592
    465         static inline void __submit( struct io_ring & ring, uint32_t idx ) {
     593        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    466594                // get mutual exclusion
    467595                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     
    524652
    525653        #define __submit_prelude \
    526                 struct io_ring & ring = active_cluster()->io; \
     654                struct __io_data & ring = *active_cluster()->io; \
    527655                struct io_uring_sqe * sqe; \
    528656                uint32_t idx; \
  • libcfa/src/concurrency/kernel.hfa

    rd3ab183 rf90d10f  
    114114//-----------------------------------------------------------------------------
    115115// I/O
    116 #if defined(HAVE_LINUX_IO_URING_H)
    117 struct io_uring_sq {
    118         // Head and tail of the ring (associated with array)
    119         volatile uint32_t * head;
    120         volatile uint32_t * tail;
    121 
    122         // The actual kernel ring which uses head/tail
    123         // indexes into the sqes arrays
    124         uint32_t * array;
    125 
    126         // number of entries and mask to go with it
    127         const uint32_t * num;
    128         const uint32_t * mask;
    129 
    130         // Submission flags (Not sure what for)
    131         uint32_t * flags;
    132 
    133         // number of sqes not submitted (whatever that means)
    134         uint32_t * dropped;
    135 
    136         // Like head/tail but not seen by the kernel
    137         volatile uint32_t alloc;
    138         volatile uint32_t ready;
    139 
    140         __spinlock_t lock;
    141 
    142         // A buffer of sqes (not the actual ring)
    143         struct io_uring_sqe * sqes;
    144 
    145         // The location and size of the mmaped area
    146         void * ring_ptr;
    147         size_t ring_sz;
    148 
    149         // Statistics
    150         #if !defined(__CFA_NO_STATISTICS__)
    151                 struct {
    152                         struct {
    153                                 unsigned long long int val;
    154                                 unsigned long long int cnt;
    155                         } submit_avg;
    156                 } stats;
    157         #endif
    158 };
    159 
    160 struct io_uring_cq {
    161         // Head and tail of the ring
    162         volatile uint32_t * head;
    163         volatile uint32_t * tail;
    164 
    165         // number of entries and mask to go with it
    166         const uint32_t * mask;
    167         const uint32_t * num;
    168 
    169         // number of cqes not submitted (whatever that means)
    170         uint32_t * overflow;
    171 
    172         // the kernel ring
    173         struct io_uring_cqe * cqes;
    174 
    175         // The location and size of the mmaped area
    176         void * ring_ptr;
    177         size_t ring_sz;
    178 
    179         // Statistics
    180         #if !defined(__CFA_NO_STATISTICS__)
    181                 struct {
    182                         struct {
    183                                 unsigned long long int val;
    184                                 unsigned long long int cnt;
    185                         } completed_avg;
    186                 } stats;
    187         #endif
    188 };
    189 
    190 #if defined(__CFA_IO_POLLING_USER__)
    191         struct __io_poller_fast {
    192                 struct io_ring * ring;
    193                 $thread thrd;
    194         };
    195 #endif
    196 
    197 struct io_ring {
    198         struct io_uring_sq submit_q;
    199         struct io_uring_cq completion_q;
    200         uint32_t flags;
    201         int fd;
    202         semaphore submit;
    203         volatile bool done;
    204         struct {
    205                 struct {
    206                         void * stack;
    207                         pthread_t kthrd;
    208                 } slow;
    209                 #if defined(__CFA_IO_POLLING_USER__)
    210                         __io_poller_fast fast;
    211                         __bin_sem_t sem;
    212                 #endif
    213         } poller;
    214 };
    215 #endif
     116struct __io_data;
    216117
    217118//-----------------------------------------------------------------------------
     
    247148        } node;
    248149
    249         #if defined(HAVE_LINUX_IO_URING_H)
    250                 struct io_ring io;
    251         #endif
     150        struct __io_data * io;
    252151
    253152        #if !defined(__CFA_NO_STATISTICS__)
  • libcfa/src/concurrency/kernel_private.hfa

    rd3ab183 rf90d10f  
    5959extern volatile thread_local __cfa_kernel_preemption_state_t preemption_state __attribute__ ((tls_model ( "initial-exec" )));
    6060
     61extern cluster * mainCluster;
     62
    6163//-----------------------------------------------------------------------------
    6264// Threads
Note: See TracChangeset for help on using the changeset viewer.