Changes in / [c4c8571:2af1943]


Ignore:
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    rc4c8571 r2af1943  
    302302                                        sout | "done";
    303303
    304                                         //===================
    305                                         // Close Files
    306                                         if( options.file_cache.path ) {
    307                                                 sout | "Closing open files..." | nonl; flush( sout );
    308                                                 close_cache();
    309                                                 sout | "done";
    310                                         }
    311 
    312304                                        sout | "Stopping accept threads..." | nonl; flush( sout );
    313305                                        for(i; nacceptors) {
     
    354346                                        }
    355347                                        sout | "done";
    356 
    357                                         //===================
    358                                         // Close Files
    359                                         if( options.file_cache.path ) {
    360                                                 sout | "Closing open files..." | nonl; flush( sout );
    361                                                 close_cache();
    362                                                 sout | "done";
    363                                         }
    364348
    365349                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     
    402386        }
    403387        sout | "done";
     388
     389        //===================
     390        // Close Files
     391        if( options.file_cache.path ) {
     392                sout | "Closing open files..." | nonl; flush( sout );
     393                close_cache();
     394                sout | "done";
     395        }
    404396}
    405397
  • benchmark/io/http/protocol.cfa

    rc4c8571 r2af1943  
    2929#define PLAINTEXT_MEMCPY
    3030#define PLAINTEXT_NOCOPY
    31 // #define LINKED_IO
     31#define LINKED_IO
    3232
    3333static inline __s32 wait_res( io_future_t & this ) {
     
    7272                if( ret < 0 ) {
    7373                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
     74                        if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
    7475
    7576                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
     
    151152}
    152153
    153 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) {
    154         int zipf_idx = -1;
    155         STATS: for(i; zipf_cnts) {
    156                 if(count <= zipf_sizes[i]) {
    157                         zipf_idx = i;
    158                         break STATS;
    159                 }
    160         }
    161         if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file";
    162 
    163 
     154static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
    164155        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    165156        off_t offset = 0;
    166157        ssize_t ret;
    167158        SPLICE1: while(count > 0) {
    168                 stats.tries++;
    169                 // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
    170                 ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
    171                 if( ret <= 0 ) {
     159                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
     160                if( ret < 0 ) {
     161                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
    172162                        if( errno == ECONNRESET ) return -ECONNRESET;
    173163                        if( errno == EPIPE ) return -EPIPE;
    174                         abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    175                 }
     164                        abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
     165                }
     166
    176167                count -= ret;
    177                 stats.splcin++;
    178                 if(count > 0) stats.avgrd[zipf_idx].calls++;
    179                 stats.avgrd[zipf_idx].bytes += ret;
    180 
    181168                size_t in_pipe = ret;
    182169                SPLICE2: while(in_pipe > 0) {
    183                         // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
    184                         ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
    185                         if( ret <= 0 ) {
     170                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
     171                        if( ret < 0 ) {
     172                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
    186173                                if( errno == ECONNRESET ) return -ECONNRESET;
    187174                                if( errno == EPIPE ) return -EPIPE;
    188                                 abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     175                                abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
    189176                        }
    190                         stats.splcot++;
    191177                        in_pipe -= ret;
    192178                }
     
    520506                return len + fsize;
    521507        #else
     508                stats.tries++;
    522509                int ret = answer_header(fd, fsize);
    523510                if( ret < 0 ) { close(fd); return ret; }
    524                 return sendfile(pipe, fd, ans_fd, fsize, stats);
     511                return sendfile(pipe, fd, ans_fd, fsize);
    525512        #endif
    526513}
     
    541528                }
    542529                // int ret = read(fd, (void*)it, count);
    543                 if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; }
     530                if(ret == 0 ) return [OK200, true, 0, 0];
    544531                if(ret < 0 ) {
     532                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    545533                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
    546534                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
  • benchmark/io/http/worker.cfa

    rc4c8571 r2af1943  
    1313#include "protocol.hfa"
    1414#include "filecache.hfa"
    15 
    16 static const unsigned long long period = 50_000_000;
    1715
    1816//=============================================================================================
     
    9795                // Send the desired file
    9896                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    99                 if(ret < 0) {
    100                         if( ret == -ECONNABORTED ) break REQUEST;
    101                         if( ret == -ECONNRESET ) break REQUEST;
    102                         if( ret == -EPIPE ) break REQUEST;
    103                         abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    104                 }
     97                if( ret == -ECONNRESET ) break REQUEST;
    10598
    10699                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     
    109102        if (stats_thrd) {
    110103                unsigned long long next = rdtscl();
    111                 if(next > (last + period)) {
     104                if(next > (last + 500000000)) {
    112105                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
    113106                                push(this.stats.sendfile, stats_thrd->stats.send);
     
    148141                char buffer[len];
    149142                handle_connection( this.conn, fd, buffer, len, 0p, last );
    150                 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
    151                 this.conn.stats.sendfile.close++;
    152143
    153144                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
     
    171162        /* paranoid */ assert( this.conn.pipe[0] != -1 );
    172163        /* paranoid */ assert( this.conn.pipe[1] != -1 );
    173         this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
    174164        for() {
    175165                size_t len = options.socket.buflen;
     
    183173                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
    184174                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
     175
     176                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
    185177                if(this.done) break;
    186                 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
    187                 this.conn.stats.sendfile.close++;
    188 
    189                 if( options.log ) mutex(sout) sout | "=== Connection closed ===";
    190         }
    191 
    192         lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
    193         push(this.conn.stats.sendfile, stats_thrd->stats.send);
    194         unlock(stats_thrd->stats.lock);
     178        }
    195179}
    196180
     
    214198
    215199static inline void push_connection( Acceptor & this, int fd ) {
    216         this.stats.accepts++;
    217200        PendingRead * p = 0p;
    218201        for() {
     
    229212
    230213// #define ACCEPT_SPIN
    231 #define ACCEPT_ONE
    232 // #define ACCEPT_MANY
     214#define ACCEPT_MANY
    233215
    234216void main( Acceptor & this ) {
     
    250232                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    251233                }
     234                this.stats.accepts++;
    252235
    253236                if(this.done) return;
     
    259242                if (stats_thrd) {
    260243                        unsigned long long next = rdtscl();
    261                         if(next > (last + period)) {
    262                                 if(try_lock(stats_thrd->stats.lock)) {
    263                                         push(this.stats, stats_thrd->stats.accpt);
    264                                         unlock(stats_thrd->stats.lock);
    265                                         last = next;
    266                                 }
    267                         }
    268                 }
    269 
    270                 if( options.log ) sout | "=== Accepting connection ===";
    271         }
    272 
    273 #elif defined(ACCEPT_ONE)
    274         if( options.log ) sout | "=== Accepting connection ===";
    275         for() {
    276                 int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
    277                 if(fd < 0) {
    278                         if( errno == ECONNABORTED ) break;
    279                         if( this.done && (errno == EINVAL || errno == EBADF) ) break;
    280                         abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    281                 }
    282 
    283                 if(this.done) return;
    284 
    285                 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    286 
    287                 if(fd) push_connection(this, fd);
    288 
    289                 if (stats_thrd) {
    290                         unsigned long long next = rdtscl();
    291                         if(next > (last + period)) {
     244                        if(next > (last + 500000000)) {
    292245                                if(try_lock(stats_thrd->stats.lock)) {
    293246                                        push(this.stats, stats_thrd->stats.accpt);
     
    316269                if (stats_thrd) {
    317270                        unsigned long long next = rdtscl();
    318                         if(next > (last + period)) {
     271                        if(next > (last + 500000000)) {
    319272                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
    320273                                        push(this.stats, stats_thrd->stats.accpt);
     
    331284                                int fd = get_res(res);
    332285                                reset(res);
     286                                this.stats.accepts++;
    333287                                if(fd < 0) {
    334288                                        if( errno == ECONNABORTED ) continue;
     
    365319#error no accept algorithm specified
    366320#endif
    367         lock(stats_thrd->stats.lock);
    368         push(this.stats, stats_thrd->stats.accpt);
    369         unlock(stats_thrd->stats.lock);
    370 }
     321}
  • libcfa/src/concurrency/io.cfa

    rc4c8571 r2af1943  
    432432
    433433                disable_interrupts();
    434                 __STATS__( true, if(!lazy) io.submit.eagr += 1; )
    435434                processor * proc = __cfaabi_tls.this_processor;
    436435                $io_context * ctx = proc->io.ctx;
  • libcfa/src/concurrency/io/call.cfa.in

    rc4c8571 r2af1943  
    3434#include "kernel.hfa"
    3535#include "io/types.hfa"
    36 #include "stats.hfa"
    3736
    3837//=============================================================================================
     
    227226        async_{name}( future, {args}, submit_flags );
    228227
    229         __attribute__((unused)) bool parked;
    230         parked = wait( future );
    231         __STATS__(false, if(!parked) io.submit.nblk += 1; )
     228        wait( future );
    232229        if( future.result < 0 ) {{
    233230                errno = -future.result;
  • libcfa/src/concurrency/io/setup.cfa

    rc4c8571 r2af1943  
    229229                #if !defined(CFA_WITH_IO_URING_IDLE)
    230230                        // Step 4 : eventfd
     231                        // io_uring_register is so f*cking slow on some machine that it
     232                        // will never succeed if preemption isn't hard blocked
    231233                        __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);
    232234
     
    238240                        __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);
    239241                #endif
    240 
    241                 // #if defined(CFA_HAVE_IORING_REGISTER_IOWQ_MAX_WORKERS)
    242                 //      // Step 5 : max worker count
    243                 //      __cfadbg_print_safe(io_core, "Kernel I/O : lmiting max workers for ring %d\n", fd);
    244 
    245                 //      unsigned int maxes[2];
    246                 //      maxes[0] = 64; // max number of bounded workers (Regular files / block)
    247                 //      maxes[1] = 64;  // max number of unbounded workers (IOSQE_ASYNC)
    248                 //      int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_IOWQ_MAX_WORKERS, maxes, 2);
    249                 //      if (ret < 0) {
    250                 //              abort("KERNEL ERROR: IO_URING MAX WORKER REGISTER - %s\n", strerror(errno));
    251                 //      }
    252 
    253                 //      __cfadbg_print_safe(io_core, "Kernel I/O : lmited max workers for ring %d\n", fd);
    254                 // #endif
    255242
    256243                // some paranoid checks
  • libcfa/src/concurrency/stats.cfa

    rc4c8571 r2af1943  
    4646                        stats->io.submit.fast       = 0;
    4747                        stats->io.submit.slow       = 0;
    48                         stats->io.submit.eagr       = 0;
    49                         stats->io.submit.nblk       = 0;
    5048                        stats->io.flush.external    = 0;
    5149                        stats->io.flush.dirty       = 0;
     
    118116                        tally_one( &cltr->io.submit.fast      , &proc->io.submit.fast       );
    119117                        tally_one( &cltr->io.submit.slow      , &proc->io.submit.slow       );
    120                         tally_one( &cltr->io.submit.eagr      , &proc->io.submit.eagr       );
    121                         tally_one( &cltr->io.submit.nblk      , &proc->io.submit.nblk       );
    122118                        tally_one( &cltr->io.flush.external   , &proc->io.flush.external    );
    123119                        tally_one( &cltr->io.flush.dirty      , &proc->io.flush.dirty       );
     
    201197                                        sstr | "fast," | eng3(io.submit.slow) | "slow (" | ws(3, 3, avgfasts) | "%)" | nonl;
    202198                                }
    203                                 sstr | " - eager" | eng3(io.submit.eagr) | nonl;
    204                                 sstr | " - no-wait" | eng3(io.submit.nblk) | nonl;
    205199                                sstr | nl;
    206200
  • libcfa/src/concurrency/stats.hfa

    rc4c8571 r2af1943  
    9292                                volatile uint64_t fast;
    9393                                volatile uint64_t slow;
    94                                 volatile uint64_t eagr;
    95                                 volatile uint64_t nblk;
    9694                        } submit;
    9795                        struct {
Note: See TracChangeset for help on using the changeset viewer.