Changeset 3f95dab


Ignore:
Timestamp:
Jul 28, 2022, 12:04:19 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, master, pthread-emulation
Children:
c4c8571
Parents:
7ce8873
Message:

Committing hopefully last version of the webserver

Location:
benchmark/io/http
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    r7ce8873 r3f95dab  
    302302                                        sout | "done";
    303303
     304                                        //===================
     305                                        // Close Files
     306                                        if( options.file_cache.path ) {
     307                                                sout | "Closing open files..." | nonl; flush( sout );
     308                                                close_cache();
     309                                                sout | "done";
     310                                        }
     311
    304312                                        sout | "Stopping accept threads..." | nonl; flush( sout );
    305313                                        for(i; nacceptors) {
     
    346354                                        }
    347355                                        sout | "done";
     356
     357                                        //===================
     358                                        // Close Files
     359                                        if( options.file_cache.path ) {
     360                                                sout | "Closing open files..." | nonl; flush( sout );
     361                                                close_cache();
     362                                                sout | "done";
     363                                        }
    348364
    349365                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     
    386402        }
    387403        sout | "done";
    388 
    389         //===================
    390         // Close Files
    391         if( options.file_cache.path ) {
    392                 sout | "Closing open files..." | nonl; flush( sout );
    393                 close_cache();
    394                 sout | "done";
    395         }
    396404}
    397405
  • benchmark/io/http/protocol.cfa

    r7ce8873 r3f95dab  
    2929#define PLAINTEXT_MEMCPY
    3030#define PLAINTEXT_NOCOPY
    31 #define LINKED_IO
     31// #define LINKED_IO
    3232
    3333static inline __s32 wait_res( io_future_t & this ) {
     
    7272                if( ret < 0 ) {
    7373                        if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; }
    74                         if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
    7574
    7675                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
     
    152151}
    153152
    154 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
     153static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) {
     154        int zipf_idx = -1;
     155        STATS: for(i; zipf_cnts) {
     156                if(count <= zipf_sizes[i]) {
     157                        zipf_idx = i;
     158                        break STATS;
     159                }
     160        }
     161        if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file";
     162
     163
    155164        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    156165        off_t offset = 0;
    157166        ssize_t ret;
    158167        SPLICE1: while(count > 0) {
    159                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
    160                 if( ret < 0 ) {
    161                         if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     168                stats.tries++;
     169                // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
     170                ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
     171                if( ret <= 0 ) {
    162172                        if( errno == ECONNRESET ) return -ECONNRESET;
    163173                        if( errno == EPIPE ) return -EPIPE;
    164                         abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
    165                 }
    166 
     174                        abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     175                }
    167176                count -= ret;
     177                stats.splcin++;
     178                if(count > 0) stats.avgrd[zipf_idx].calls++;
     179                stats.avgrd[zipf_idx].bytes += ret;
     180
    168181                size_t in_pipe = ret;
    169182                SPLICE2: while(in_pipe > 0) {
    170                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
    171                         if( ret < 0 ) {
    172                                 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
     183                        // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
     184                        ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
     185                        if( ret <= 0 ) {
    173186                                if( errno == ECONNRESET ) return -ECONNRESET;
    174187                                if( errno == EPIPE ) return -EPIPE;
    175                                 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
     188                                abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
    176189                        }
     190                        stats.splcot++;
    177191                        in_pipe -= ret;
    178192                }
     
    506520                return len + fsize;
    507521        #else
    508                 stats.tries++;
    509522                int ret = answer_header(fd, fsize);
    510523                if( ret < 0 ) { close(fd); return ret; }
    511                 return sendfile(pipe, fd, ans_fd, fsize);
     524                return sendfile(pipe, fd, ans_fd, fsize, stats);
    512525        #endif
    513526}
     
    528541                }
    529542                // int ret = read(fd, (void*)it, count);
    530                 if(ret == 0 ) return [OK200, true, 0, 0];
     543                if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; }
    531544                if(ret < 0 ) {
    532                         if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    533545                        if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; }
    534546                        if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; }
  • benchmark/io/http/worker.cfa

    r7ce8873 r3f95dab  
    1313#include "protocol.hfa"
    1414#include "filecache.hfa"
     15
     16static const unsigned long long period = 50_000_000;
    1517
    1618//=============================================================================================
     
    9597                // Send the desired file
    9698                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    97                 if( ret == -ECONNRESET ) break REQUEST;
     99                if(ret < 0) {
     100                        if( ret == -ECONNABORTED ) break REQUEST;
     101                        if( ret == -ECONNRESET ) break REQUEST;
     102                        if( ret == -EPIPE ) break REQUEST;
     103                        abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );
     104                }
    98105
    99106                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     
    102109        if (stats_thrd) {
    103110                unsigned long long next = rdtscl();
    104                 if(next > (last + 500000000)) {
     111                if(next > (last + period)) {
    105112                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
    106113                                push(this.stats.sendfile, stats_thrd->stats.send);
     
    141148                char buffer[len];
    142149                handle_connection( this.conn, fd, buffer, len, 0p, last );
     150                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);
     151                this.conn.stats.sendfile.close++;
    143152
    144153                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
     
    162171        /* paranoid */ assert( this.conn.pipe[0] != -1 );
    163172        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     173        this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);
    164174        for() {
    165175                size_t len = options.socket.buflen;
     
    173183                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
    174184                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
     185                if(this.done) break;
     186                this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd);
     187                this.conn.stats.sendfile.close++;
    175188
    176189                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
    177                 if(this.done) break;
    178         }
     190        }
     191
     192        lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2);
     193        push(this.conn.stats.sendfile, stats_thrd->stats.send);
     194        unlock(stats_thrd->stats.lock);
    179195}
    180196
     
    198214
    199215static inline void push_connection( Acceptor & this, int fd ) {
     216        this.stats.accepts++;
    200217        PendingRead * p = 0p;
    201218        for() {
     
    212229
    213230// #define ACCEPT_SPIN
    214 #define ACCEPT_MANY
     231#define ACCEPT_ONE
     232// #define ACCEPT_MANY
    215233
    216234void main( Acceptor & this ) {
     
    232250                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    233251                }
    234                 this.stats.accepts++;
    235252
    236253                if(this.done) return;
     
    242259                if (stats_thrd) {
    243260                        unsigned long long next = rdtscl();
    244                         if(next > (last + 500000000)) {
     261                        if(next > (last + period)) {
     262                                if(try_lock(stats_thrd->stats.lock)) {
     263                                        push(this.stats, stats_thrd->stats.accpt);
     264                                        unlock(stats_thrd->stats.lock);
     265                                        last = next;
     266                                }
     267                        }
     268                }
     269
     270                if( options.log ) sout | "=== Accepting connection ===";
     271        }
     272
     273#elif defined(ACCEPT_ONE)
     274        if( options.log ) sout | "=== Accepting connection ===";
     275        for() {
     276                int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0);
     277                if(fd < 0) {
     278                        if( errno == ECONNABORTED ) break;
     279                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
     280                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     281                }
     282
     283                if(this.done) return;
     284
     285                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     286
     287                if(fd) push_connection(this, fd);
     288
     289                if (stats_thrd) {
     290                        unsigned long long next = rdtscl();
     291                        if(next > (last + period)) {
    245292                                if(try_lock(stats_thrd->stats.lock)) {
    246293                                        push(this.stats, stats_thrd->stats.accpt);
     
    269316                if (stats_thrd) {
    270317                        unsigned long long next = rdtscl();
    271                         if(next > (last + 500000000)) {
     318                        if(next > (last + period)) {
    272319                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
    273320                                        push(this.stats, stats_thrd->stats.accpt);
     
    284331                                int fd = get_res(res);
    285332                                reset(res);
    286                                 this.stats.accepts++;
    287333                                if(fd < 0) {
    288334                                        if( errno == ECONNABORTED ) continue;
     
    319365#error no accept algorithm specified
    320366#endif
    321 }
     367        lock(stats_thrd->stats.lock);
     368        push(this.stats, stats_thrd->stats.accpt);
     369        unlock(stats_thrd->stats.lock);
     370}
Note: See TracChangeset for help on using the changeset viewer.