Changes in / [08a994e:4385e8b]


Ignore:
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/readv.cfa

    r08a994e r4385e8b  
    1616#include <thread.hfa>
    1717#include <time.hfa>
    18 
    19 #if !defined(HAVE_LINUX_IO_URING_H)
    20 #warning no io uring
    21 #endif
    2218
    2319extern bool traceHeapOn();
     
    5349        while(__atomic_load_n(&run, __ATOMIC_RELAXED)) {
    5450                int r = cfa_preadv2(fd, &iov, 1, 0, 0);
    55                 if(r < 0) abort(strerror(-r));
     51                if(r < 0) abort("%s\n", strerror(-r));
    5652
    5753                __atomic_fetch_add( &count, 1, __ATOMIC_SEQ_CST );
     
    6359        unsigned long int nthreads = 2;
    6460        unsigned long int nprocs   = 1;
    65 
    66         printf("Setting local\n");
    67         setlocale(LC_NUMERIC, "");
     61        int flags = 0;
    6862
    6963        arg_loop:
    7064        for(;;) {
    7165                static struct option options[] = {
    72                         {"duration",  required_argument, 0, 'd'},
    73                         {"nthreads",  required_argument, 0, 't'},
    74                         {"nprocs",    required_argument, 0, 'p'},
    75                         {"bufsize",   required_argument, 0, 'b'},
     66                        {"duration",   required_argument, 0, 'd'},
     67                        {"nthreads",   required_argument, 0, 't'},
     68                        {"nprocs",     required_argument, 0, 'p'},
     69                        {"bufsize",    required_argument, 0, 'b'},
     70                        {"userthread", no_argument      , 0, 'u'},
    7671                        {0, 0, 0, 0}
    7772                };
    7873
    7974                int idx = 0;
    80                 int opt = getopt_long(argc, argv, "d:t:p:b:", options, &idx);
     75                int opt = getopt_long(argc, argv, "d:t:p:b:u", options, &idx);
    8176
    8277                const char * arg = optarg ? optarg : "";
     
    115110                                }
    116111                                break;
     112                        case 'u':
     113                                flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD;
     114                                break;
    117115                        // Other cases
    118116                        default: /* ? */
     
    135133        }
    136134
    137         printf("Running %lu threads over %lu processors for %lf seconds\n", nthreads, nprocs, duration);
     135        printf("Running %lu threads, reading %lu bytes each, over %lu processors for %lf seconds\n", buflen, nthreads, nprocs, duration);
    138136
    139137        {
    140138                Time start, end;
    141                 cluster cl = { "IO Cluster" };
     139                cluster cl = { "IO Cluster", flags };
    142140                the_cluster = &cl;
    143141                #if !defined(__CFA_NO_STATISTICS__)
     
    161159                        }
    162160                }
    163                 printf("Took %ld ms\n", (end - start)`ms);
     161                printf("Took %'ld ms\n", (end - start)`ms);
    164162                printf("Total reads:      %'zu\n", count);
    165                 printf("Reads per second: %'lf\n", ((double)count) / (end - start)`s);
     163                printf("Reads per second: %'.2lf\n", ((double)count) / (end - start)`s);
     164                printf("Total read size:  %'zu\n", buflen * count);
     165                printf("Bytes per second: %'.2lf\n", ((double)count * buflen) / (end - start)`s);
    166166        }
    167167
  • libcfa/prelude/defines.hfa.in

    r08a994e r4385e8b  
    1919#undef HAVE_PWRITEV2
    2020
    21 // #define __CFA_IO_POLLING_USER__
    22 // #define __CFA_IO_POLLING_KERNEL__
     21#undef __CFA_NO_STATISTICS__
  • libcfa/src/concurrency/io.cfa

    r08a994e r4385e8b  
    2020
    2121#if !defined(HAVE_LINUX_IO_URING_H)
    22         void __kernel_io_startup( cluster &, bool ) {
     22        void __kernel_io_startup( cluster &, int, bool ) {
    2323                // Nothing to do without io_uring
    2424        }
     
    8686        #endif
    8787
    88         #if defined(__CFA_IO_POLLING_USER__)
    89                 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
    90                         this.ring = &cltr.io;
    91                         (this.thrd){ "Fast I/O Poller", cltr };
    92                 }
    93                 void ^?{}( __io_poller_fast & mutex this );
    94         void main( __io_poller_fast & this );
    95         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
    96                 void ^?{}( __io_poller_fast & mutex this ) {}
    97         #endif
     88        // Fast poller user-thread
     89        // Not using the "thread" keyword because we want to control
     90        // more carefully when to start/stop it
     91        struct __io_poller_fast {
     92                struct __io_data * ring;
     93                bool waiting;
     94                $thread thrd;
     95        };
     96
     97        void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
     98                this.ring = cltr.io;
     99                this.waiting = true;
     100                (this.thrd){ "Fast I/O Poller", cltr };
     101        }
     102        void ^?{}( __io_poller_fast & mutex this );
     103        void main( __io_poller_fast & this );
     104        static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
     105        void ^?{}( __io_poller_fast & mutex this ) {}
     106
     107        struct __submition_data {
     108                // Head and tail of the ring (associated with array)
     109                volatile uint32_t * head;
     110                volatile uint32_t * tail;
     111
     112                // The actual kernel ring which uses head/tail
     113                // indexes into the sqes arrays
     114                uint32_t * array;
     115
     116                // number of entries and mask to go with it
     117                const uint32_t * num;
     118                const uint32_t * mask;
     119
     120                // Submission flags (Not sure what for)
     121                uint32_t * flags;
     122
     123                // number of sqes not submitted (whatever that means)
     124                uint32_t * dropped;
     125
     126                // Like head/tail but not seen by the kernel
     127                volatile uint32_t alloc;
     128                volatile uint32_t ready;
     129
     130                __spinlock_t lock;
     131
     132                // A buffer of sqes (not the actual ring)
     133                struct io_uring_sqe * sqes;
     134
     135                // The location and size of the mmaped area
     136                void * ring_ptr;
     137                size_t ring_sz;
     138
     139                // Statistics
     140                #if !defined(__CFA_NO_STATISTICS__)
     141                        struct {
     142                                struct {
     143                                        volatile unsigned long long int val;
     144                                        volatile unsigned long long int cnt;
     145                                        volatile unsigned long long int block;
     146                                } submit_avg;
     147                        } stats;
     148                #endif
     149        };
     150
     151        struct __completion_data {
     152                // Head and tail of the ring
     153                volatile uint32_t * head;
     154                volatile uint32_t * tail;
     155
     156                // number of entries and mask to go with it
     157                const uint32_t * mask;
     158                const uint32_t * num;
     159
     160                // number of cqes not submitted (whatever that means)
     161                uint32_t * overflow;
     162
     163                // the kernel ring
     164                struct io_uring_cqe * cqes;
     165
     166                // The location and size of the mmaped area
     167                void * ring_ptr;
     168                size_t ring_sz;
     169
     170                // Statistics
     171                #if !defined(__CFA_NO_STATISTICS__)
     172                        struct {
     173                                struct {
     174                                        unsigned long long int val;
     175                                        unsigned long long int slow_cnt;
     176                                        unsigned long long int fast_cnt;
     177                                } completed_avg;
     178                        } stats;
     179                #endif
     180        };
     181
     182        struct __io_data {
     183                struct __submition_data submit_q;
     184                struct __completion_data completion_q;
     185                uint32_t ring_flags;
     186                int cltr_flags;
     187                int fd;
     188                semaphore submit;
     189                volatile bool done;
     190                struct {
     191                        struct {
     192                                void * stack;
     193                                pthread_t kthrd;
     194                        } slow;
     195                        __io_poller_fast fast;
     196                        __bin_sem_t sem;
     197                } poller;
     198        };
    98199
    99200//=============================================================================================
    100201// I/O Startup / Shutdown logic
    101202//=============================================================================================
    102         void __kernel_io_startup( cluster & this, bool main_cluster ) {
     203        void __kernel_io_startup( cluster & this, int io_flags, bool main_cluster ) {
     204                this.io = malloc();
     205
    103206                // Step 1 : call to setup
    104207                struct io_uring_params params;
     
    113216
    114217                // Step 2 : mmap result
    115                 memset(&this.io, 0, sizeof(struct io_ring));
    116                 struct io_uring_sq & sq = this.io.submit_q;
    117                 struct io_uring_cq & cq = this.io.completion_q;
     218                memset( this.io, 0, sizeof(struct __io_data) );
     219                struct __submition_data  & sq = this.io->submit_q;
     220                struct __completion_data & cq = this.io->completion_q;
    118221
    119222                // calculate the right ring size
     
    193296
    194297                // Update the global ring info
    195                 this.io.flags = params.flags;
    196                 this.io.fd    = fd;
    197                 this.io.done  = false;
    198                 (this.io.submit){ min(*sq.num, *cq.num) };
     298                this.io->ring_flags = params.flags;
     299                this.io->cltr_flags = io_flags;
     300                this.io->fd         = fd;
     301                this.io->done       = false;
     302                (this.io->submit){ min(*sq.num, *cq.num) };
    199303
    200304                // Initialize statistics
    201305                #if !defined(__CFA_NO_STATISTICS__)
    202                         this.io.submit_q.stats.submit_avg.val = 0;
    203                         this.io.submit_q.stats.submit_avg.cnt = 0;
    204                         this.io.completion_q.stats.completed_avg.val = 0;
    205                         this.io.completion_q.stats.completed_avg.cnt = 0;
     306                        this.io->submit_q.stats.submit_avg.val   = 0;
     307                        this.io->submit_q.stats.submit_avg.cnt   = 0;
     308                        this.io->submit_q.stats.submit_avg.block = 0;
     309                        this.io->completion_q.stats.completed_avg.val = 0;
     310                        this.io->completion_q.stats.completed_avg.slow_cnt = 0;
     311                        this.io->completion_q.stats.completed_avg.fast_cnt = 0;
    206312                #endif
    207313
     
    212318
    213319        void __kernel_io_finish_start( cluster & this ) {
    214                 #if defined(__CFA_IO_POLLING_USER__)
     320                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
    215321                        __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
    216                         (this.io.poller.fast){ this };
    217                         __thrd_start( this.io.poller.fast, main );
    218                 #endif
     322                        (this.io->poller.fast){ this };
     323                        __thrd_start( this.io->poller.fast, main );
     324                }
    219325
    220326                // Create the poller thread
    221327                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
    222                 this.io.poller.slow.stack = __create_pthread( &this.io.poller.slow.kthrd, __io_poller_slow, &this );
     328                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
    223329        }
    224330
     
    226332                __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
    227333                // Notify the poller thread of the shutdown
    228                 __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
     334                __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
    229335
    230336                // Stop the IO Poller
    231337                sigval val = { 1 };
    232                 pthread_sigqueue( this.io.poller.slow.kthrd, SIGUSR1, val );
    233                 #if defined(__CFA_IO_POLLING_USER__)
    234                         post( this.io.poller.sem );
    235                 #endif
     338                pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
     339                post( this.io->poller.sem );
    236340
    237341                // Wait for the poller thread to finish
    238                 pthread_join( this.io.poller.slow.kthrd, 0p );
    239                 free( this.io.poller.slow.stack );
     342                pthread_join( this.io->poller.slow.kthrd, 0p );
     343                free( this.io->poller.slow.stack );
    240344
    241345                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
    242346
    243                 #if defined(__CFA_IO_POLLING_USER__)
     347                if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
     348                        with( this.io->poller.fast ) {
     349                                /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call
     350                                /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster );
     351                                /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster );
     352
     353                                // We need to adjust the clean-up based on where the thread is
     354                                if( thrd.preempted != __NO_PREEMPTION ) {
     355
     356                                        // This is the tricky case
     357                                        // The thread was preempted and now it is on the ready queue
     358                                        /* paranoid */ verify( thrd.state == Active );           // The thread better be in this state
     359                                        /* paranoid */ verify( thrd.next == 1p );                // The thread should be the last on the list
     360                                        /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list
     361
     362                                        // Remove the thread from the ready queue of this cluster
     363                                        this.ready_queue.head = 1p;
     364                                        thrd.next = 0p;
     365
     366                                        // Fixup the thread state
     367                                        thrd.state = Blocked;
     368                                        thrd.preempted = __NO_PREEMPTION;
     369
     370                                        // Pretend like the thread was blocked all along
     371                                }
     372                                // !!! This is not an else if !!!
     373                                if( thrd.state == Blocked ) {
     374
     375                                        // This is the "easy case"
     376                                        // The thread is parked and can easily be moved to active cluster
     377                                        verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
     378                                        thrd.curr_cluster = active_cluster();
     379
    244380                        // unpark the fast io_poller
    245                         unpark( &this.io.poller.fast.thrd __cfaabi_dbg_ctx2 );
    246 
    247                         ^(this.io.poller.fast){};
     381                                        unpark( &thrd __cfaabi_dbg_ctx2 );
     382                                }
     383                                else {
     384
     385                                        // The thread is in a weird state
     386                                        // I don't know what to do here
     387                                        abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
     388                                }
     389
     390                        }
     391
     392                        ^(this.io->poller.fast){};
    248393
    249394                        __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
    250                 #endif
     395                }
    251396        }
    252397
     
    259404                #if !defined(__CFA_NO_STATISTICS__)
    260405                        if(this.print_stats) {
    261                                 __cfaabi_bits_print_safe( STDERR_FILENO,
    262                                         "----- I/O uRing Stats -----\n"
    263                                         "- total submit calls  : %llu\n"
    264                                         "- avg submit          : %lf\n"
    265                                         "- total wait calls    : %llu\n"
    266                                         "- avg completion/wait : %lf\n",
    267                                         this.io.submit_q.stats.submit_avg.cnt,
    268                                         ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt,
    269                                         this.io.completion_q.stats.completed_avg.cnt,
    270                                         ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt
    271                                 );
     406                                with(this.io->submit_q.stats, this.io->completion_q.stats) {
     407                                        __cfaabi_bits_print_safe( STDERR_FILENO,
     408                                                "----- I/O uRing Stats -----\n"
     409                                                "- total submit calls  : %'llu\n"
     410                                                "- avg submit          : %'.2lf\n"
     411                                                "- pre-submit block %%  : %'.2lf\n"
     412                                                "- total wait calls    : %'llu (%'llu slow, %'llu fast)\n"
     413                                                "- avg completion/wait : %'.2lf\n",
     414                                                submit_avg.cnt,
     415                                                ((double)submit_avg.val) / submit_avg.cnt,
     416                                                (100.0 * submit_avg.block) / submit_avg.cnt,
     417                                                completed_avg.slow_cnt + completed_avg.fast_cnt,
     418                                                completed_avg.slow_cnt,  completed_avg.fast_cnt,
     419                                                ((double)completed_avg.val) / (completed_avg.slow_cnt + completed_avg.fast_cnt)
     420                                        );
     421                                }
    272422                        }
    273423                #endif
    274424
    275425                // Shutdown the io rings
    276                 struct io_uring_sq & sq = this.io.submit_q;
    277                 struct io_uring_cq & cq = this.io.completion_q;
     426                struct __submition_data  & sq = this.io->submit_q;
     427                struct __completion_data & cq = this.io->completion_q;
    278428
    279429                // unmap the submit queue entries
     
    289439
    290440                // close the file descriptor
    291                 close(this.io.fd);
     441                close(this.io->fd);
     442
     443                free( this.io );
    292444        }
    293445
     
    302454        // Process a single completion message from the io_uring
    303455        // This is NOT thread-safe
    304         static int __drain_io( struct io_ring & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
     456        static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) {
    305457                int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    306458                if( ret < 0 ) {
     
    320472                // Nothing was new return 0
    321473                if (head == tail) {
    322                         #if !defined(__CFA_NO_STATISTICS__)
    323                                 ring.completion_q.stats.completed_avg.cnt += 1;
    324                         #endif
    325474                        return 0;
    326475                }
     
    348497                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    349498
    350                 // Update statistics
    351                 #if !defined(__CFA_NO_STATISTICS__)
    352                         ring.completion_q.stats.completed_avg.val += count;
    353                         ring.completion_q.stats.completed_avg.cnt += 1;
    354                 #endif
    355 
    356499                return count;
    357500        }
     
    359502        static void * __io_poller_slow( void * arg ) {
    360503                cluster * cltr = (cluster *)arg;
    361                 struct io_ring & ring = cltr->io;
     504                struct __io_data & ring = *cltr->io;
    362505
    363506                sigset_t mask;
     
    372515                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
    373516
    374                 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    375                         #if defined(__CFA_IO_POLLING_USER__)
    376 
     517                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
     518
     519                if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
     520                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    377521                                // In the user-thread approach drain and if anything was drained,
    378522                                // batton pass to the user-thread
    379523                                int count = __drain_io( ring, &mask, 1, true );
     524
     525                                // Update statistics
     526                                #if !defined(__CFA_NO_STATISTICS__)
     527                                        ring.completion_q.stats.completed_avg.val += count;
     528                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
     529                                #endif
     530
    380531                                if(count > 0) {
    381532                                        __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
     
    383534                                        wait( ring.poller.sem );
    384535                                }
    385 
    386                         #else
    387 
     536                        }
     537                }
     538                else {
     539                        while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
    388540                                //In the naive approach, just poll the io completion queue directly
    389                                 __drain_io( ring, &mask, 1, true );
    390 
     541                                int count = __drain_io( ring, &mask, 1, true );
     542
     543                                // Update statistics
     544                                #if !defined(__CFA_NO_STATISTICS__)
     545                                        ring.completion_q.stats.completed_avg.val += count;
     546                                        ring.completion_q.stats.completed_avg.slow_cnt += 1;
     547                                #endif
     548                        }
     549                }
     550
     551                __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
     552
     553                return 0p;
     554        }
     555
     556        void main( __io_poller_fast & this ) {
     557                verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
     558
     559                // Start parked
     560                park( __cfaabi_dbg_ctx );
     561
     562                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
     563
     564                int reset = 0;
     565
     566                // Then loop until we need to start
     567                while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
     568                        // Drain the io
     569                        this.waiting = false;
     570                        int count = __drain_io( *this.ring, 0p, 0, false );
     571                        reset += count > 0 ? 1 : 0;
     572
     573                        // Update statistics
     574                        #if !defined(__CFA_NO_STATISTICS__)
     575                                this.ring->completion_q.stats.completed_avg.val += count;
     576                                this.ring->completion_q.stats.completed_avg.fast_cnt += 1;
    391577                        #endif
    392                 }
    393 
    394                 return 0p;
    395         }
    396 
    397         #if defined(__CFA_IO_POLLING_USER__)
    398                 void main( __io_poller_fast & this ) {
    399                         // Start parked
    400                         park( __cfaabi_dbg_ctx );
    401 
    402                         // Then loop until we need to start
    403                         while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
    404                                 // Drain the io
    405                                 if(0 > __drain_io( *this.ring, 0p, 0, false )) {
    406                                         // If we got something, just yield and check again
    407                                         yield();
    408                                 }
    409                                 else {
    410                                         // We didn't get anything baton pass to the slow poller
    411                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
    412                                         post( this.ring->poller.sem );
    413                                         park( __cfaabi_dbg_ctx );
    414                                 }
     578
     579                        this.waiting = true;
     580                        if(reset < 5) {
     581                                // If we got something, just yield and check again
     582                                yield();
    415583                        }
    416                 }
    417         #endif
     584                        else {
     585                                // We didn't get anything baton pass to the slow poller
     586                                __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
     587                                post( this.ring->poller.sem );
     588                                park( __cfaabi_dbg_ctx );
     589                                reset = 0;
     590                        }
     591                }
     592
     593                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     594        }
    418595
    419596//=============================================================================================
     
    445622//
    446623
    447         static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct io_ring & ring ) {
     624        static inline [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring ) {
    448625                // Wait for a spot to be available
    449                 P(ring.submit);
     626                __attribute__((unused)) bool blocked = P(ring.submit);
     627                #if !defined(__CFA_NO_STATISTICS__)
     628                        __atomic_fetch_add( &ring.submit_q.stats.submit_avg.block, blocked ? 1ul64 : 0ul64, __ATOMIC_RELAXED );
     629                #endif
    450630
    451631                // Allocate the sqe
     
    463643        }
    464644
    465         static inline void __submit( struct io_ring & ring, uint32_t idx ) {
     645        static inline void __submit( struct __io_data & ring, uint32_t idx ) {
    466646                // get mutual exclusion
    467647                lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
     
    524704
    525705        #define __submit_prelude \
    526                 struct io_ring & ring = active_cluster()->io; \
     706                struct __io_data & ring = *active_cluster()->io; \
    527707                struct io_uring_sqe * sqe; \
    528708                uint32_t idx; \
  • libcfa/src/concurrency/kernel.cfa

    r08a994e r4385e8b  
    254254}
    255255
    256 void ?{}(cluster & this, const char name[], Duration preemption_rate) with( this ) {
     256void ?{}(cluster & this, const char name[], Duration preemption_rate, int io_flags) with( this ) {
    257257        this.name = name;
    258258        this.preemption_rate = preemption_rate;
     
    268268        threads{ __get };
    269269
    270         __kernel_io_startup( this, &this == mainCluster );
     270        __kernel_io_startup( this, io_flags, &this == mainCluster );
    271271
    272272        doregister(this);
     
    987987void ^?{}(semaphore & this) {}
    988988
    989 void P(semaphore & this) with( this ){
     989bool P(semaphore & this) with( this ){
    990990        lock( lock __cfaabi_dbg_ctx2 );
    991991        count -= 1;
     
    997997                unlock( lock );
    998998                park( __cfaabi_dbg_ctx );
     999                return true;
    9991000        }
    10001001        else {
    10011002            unlock( lock );
     1003            return false;
    10021004        }
    10031005}
  • libcfa/src/concurrency/kernel.hfa

    r08a994e r4385e8b  
    3838void  ?{}(semaphore & this, int count = 1);
    3939void ^?{}(semaphore & this);
    40 void   P (semaphore & this);
     40bool   P (semaphore & this);
    4141bool   V (semaphore & this);
    4242bool   V (semaphore & this, unsigned count);
     
    114114//-----------------------------------------------------------------------------
    115115// I/O
    116 #if defined(HAVE_LINUX_IO_URING_H)
    117 struct io_uring_sq {
    118         // Head and tail of the ring (associated with array)
    119         volatile uint32_t * head;
    120         volatile uint32_t * tail;
     116struct __io_data;
    121117
    122         // The actual kernel ring which uses head/tail
    123         // indexes into the sqes arrays
    124         uint32_t * array;
    125 
    126         // number of entries and mask to go with it
    127         const uint32_t * num;
    128         const uint32_t * mask;
    129 
    130         // Submission flags (Not sure what for)
    131         uint32_t * flags;
    132 
    133         // number of sqes not submitted (whatever that means)
    134         uint32_t * dropped;
    135 
    136         // Like head/tail but not seen by the kernel
    137         volatile uint32_t alloc;
    138         volatile uint32_t ready;
    139 
    140         __spinlock_t lock;
    141 
    142         // A buffer of sqes (not the actual ring)
    143         struct io_uring_sqe * sqes;
    144 
    145         // The location and size of the mmaped area
    146         void * ring_ptr;
    147         size_t ring_sz;
    148 
    149         // Statistics
    150         #if !defined(__CFA_NO_STATISTICS__)
    151                 struct {
    152                         struct {
    153                                 unsigned long long int val;
    154                                 unsigned long long int cnt;
    155                         } submit_avg;
    156                 } stats;
    157         #endif
    158 };
    159 
    160 struct io_uring_cq {
    161         // Head and tail of the ring
    162         volatile uint32_t * head;
    163         volatile uint32_t * tail;
    164 
    165         // number of entries and mask to go with it
    166         const uint32_t * mask;
    167         const uint32_t * num;
    168 
    169         // number of cqes not submitted (whatever that means)
    170         uint32_t * overflow;
    171 
    172         // the kernel ring
    173         struct io_uring_cqe * cqes;
    174 
    175         // The location and size of the mmaped area
    176         void * ring_ptr;
    177         size_t ring_sz;
    178 
    179         // Statistics
    180         #if !defined(__CFA_NO_STATISTICS__)
    181                 struct {
    182                         struct {
    183                                 unsigned long long int val;
    184                                 unsigned long long int cnt;
    185                         } completed_avg;
    186                 } stats;
    187         #endif
    188 };
    189 
    190 #if defined(__CFA_IO_POLLING_USER__)
    191         struct __io_poller_fast {
    192                 struct io_ring * ring;
    193                 $thread thrd;
    194         };
    195 #endif
    196 
    197 struct io_ring {
    198         struct io_uring_sq submit_q;
    199         struct io_uring_cq completion_q;
    200         uint32_t flags;
    201         int fd;
    202         semaphore submit;
    203         volatile bool done;
    204         struct {
    205                 struct {
    206                         void * stack;
    207                         pthread_t kthrd;
    208                 } slow;
    209                 #if defined(__CFA_IO_POLLING_USER__)
    210                         __io_poller_fast fast;
    211                         __bin_sem_t sem;
    212                 #endif
    213         } poller;
    214 };
    215 #endif
     118#define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0
     119// #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1
    216120
    217121//-----------------------------------------------------------------------------
     
    247151        } node;
    248152
    249         #if defined(HAVE_LINUX_IO_URING_H)
    250                 struct io_ring io;
    251         #endif
     153        struct __io_data * io;
    252154
    253155        #if !defined(__CFA_NO_STATISTICS__)
     
    257159extern Duration default_preemption();
    258160
    259 void ?{} (cluster & this, const char name[], Duration preemption_rate);
     161void ?{} (cluster & this, const char name[], Duration preemption_rate, int flags);
    260162void ^?{}(cluster & this);
    261163
    262 static inline void ?{} (cluster & this)                           { this{"Anonymous Cluster", default_preemption()}; }
    263 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate}; }
    264 static inline void ?{} (cluster & this, const char name[])        { this{name, default_preemption()}; }
     164static inline void ?{} (cluster & this)                                      { this{"Anonymous Cluster", default_preemption(), 0}; }
     165static inline void ?{} (cluster & this, Duration preemption_rate)            { this{"Anonymous Cluster", preemption_rate, 0}; }
     166static inline void ?{} (cluster & this, const char name[])                   { this{name, default_preemption(), 0}; }
     167static inline void ?{} (cluster & this, int flags)                           { this{"Anonymous Cluster", default_preemption(), flags}; }
     168static inline void ?{} (cluster & this, Duration preemption_rate, int flags) { this{"Anonymous Cluster", preemption_rate, flags}; }
     169static inline void ?{} (cluster & this, const char name[], int flags)        { this{name, default_preemption(), flags}; }
    265170
    266171static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; }
  • libcfa/src/concurrency/kernel_private.hfa

    r08a994e r4385e8b  
    5959extern volatile thread_local __cfa_kernel_preemption_state_t preemption_state __attribute__ ((tls_model ( "initial-exec" )));
    6060
     61extern cluster * mainCluster;
     62
    6163//-----------------------------------------------------------------------------
    6264// Threads
     
    7577//-----------------------------------------------------------------------------
    7678// I/O
    77 void __kernel_io_startup     ( cluster &, bool );
     79void __kernel_io_startup     ( cluster &, int, bool );
    7880void __kernel_io_finish_start( cluster & );
    7981void __kernel_io_prepare_stop( cluster & );
  • libcfa/src/startup.cfa

    r08a994e r4385e8b  
    1414//
    1515
    16 #include <time.h>                                                                               // tzset
     16#include <time.h>                // tzset
     17#include <locale.h>        // setlocale
    1718#include "startup.hfa"
    1819
     
    2122    void __cfaabi_appready_startup( void ) {
    2223                tzset();                                                                                // initialize time global variables
     24                setlocale(LC_NUMERIC, "");
    2325                #ifdef __CFA_DEBUG__
    2426                extern void heapAppStart();
Note: See TracChangeset for help on using the changeset viewer.