Changeset 7f0ac12


Ignore:
Timestamp:
Jun 8, 2022, 7:07:51 PM (6 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
master, pthread-emulation, qualifiedEnum
Children:
bbf61838
Parents:
6e2b04e
Message:

First draft at acceptor thread webserver

Location:
benchmark/io/http
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    r6e2b04e r7f0ac12  
    3838
    3939thread StatsPrinter {
    40         Worker * workers;
    41         int worker_cnt;
     40        connection ** conns;
     41        volatile int conn_cnt;
    4242        condition_variable(fast_block_lock) var;
    4343};
     
    4545void ?{}( StatsPrinter & this, cluster & cl ) {
    4646        ((thread&)this){ "Stats Printer Thread", cl };
    47         this.worker_cnt = 0;
     47        this.conn_cnt = 0;
    4848}
    4949
     
    6262
    6363                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    64                 if(this.worker_cnt != 0) {
     64                if(this.conn_cnt != 0) {
    6565                        uint64_t tries = 0;
    6666                        uint64_t calls = 0;
     
    7474                        memset(avgrd, 0, sizeof(avgrd));
    7575
    76                         for(i; this.worker_cnt) {
    77                                 tries += this.workers[i].stats.sendfile.tries;
    78                                 calls += this.workers[i].stats.sendfile.calls;
    79                                 header += this.workers[i].stats.sendfile.header;
    80                                 splcin += this.workers[i].stats.sendfile.splcin;
    81                                 splcot += this.workers[i].stats.sendfile.splcot;
     76                        for(i; this.conn_cnt) {
     77                                tries += this.conns[i]->stats.sendfile.tries;
     78                                calls += this.conns[i]->stats.sendfile.calls;
     79                                header += this.conns[i]->stats.sendfile.header;
     80                                splcin += this.conns[i]->stats.sendfile.splcin;
     81                                splcot += this.conns[i]->stats.sendfile.splcot;
    8282                                for(j; zipf_cnts) {
    83                                         avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
    84                                         avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
     83                                        avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls;
     84                                        avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes;
    8585                                }
    8686                        }
     
    8888                        double ratio = ((double)tries) / calls;
    8989
    90                         sout | "----- Worker Stats -----";
     90                        sout | "----- Connection Stats -----";
    9191                        sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    9292                        sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
     
    9898                }
    9999                else {
    100                         sout | "No Workers!";
     100                        sout | "No Connections!";
    101101                }
    102102        }
     
    182182
    183183//=============================================================================================
     184// REUSEPORT
     185//=============================================================================================
     186
     187size_t sockarr_size;
     188struct __attribute__((aligned(128))) Q {
     189        mpsc_queue(PendingRead) q;
     190};
     191
     192//=============================================================================================
    184193// Termination
    185194//=============================================================================================
     
    235244
    236245        int server_fd;
    237         if(!options.socket.manyreuse) {
    238                 server_fd = listener(address, addrlen);
    239         }
    240246
    241247        //===================
     
    257263                {
    258264                        // Stats printer makes a copy so this needs to persist longer than normal
    259                         Worker * workers;
     265                        connection ** conns;
     266                        AcceptWorker  * aworkers = 0p;
     267                        ChannelWorker * cworkers = 0p;
     268                        Acceptor * acceptors = 0p;
     269                        Q * queues = 0p;
    260270                        ServerCluster cl[options.clopts.nclusters];
    261271
    262272                        init_protocol();
    263273                        {
    264                                 workers = anew(options.clopts.nworkers);
    265                                 cl[0].prnt->workers = workers;
    266                                 cl[0].prnt->worker_cnt = options.clopts.nworkers;
    267                                 for(i; options.clopts.nworkers) {
    268                                         // if( options.file_cache.fixed_fds ) {
    269                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    270                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    271                                         // }
    272                                         // else
    273                                         {
    274                                                 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    275                                                 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    276                                                 workers[i].sockfd  = options.socket.manyreuse ?  listener(address, addrlen) : server_fd;
    277                                                 workers[i].addr    = (struct sockaddr *)&address;
    278                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    279                                                 workers[i].flags   = 0;
    280                                         }
    281                                         unpark( workers[i] );
    282                                 }
     274                                conns = alloc(options.clopts.nworkers);
     275                                if(options.socket.reuseport) {
     276                                        queues = alloc(options.clopts.nprocs);
     277                                        acceptors = anew(options.clopts.nprocs);
     278                                        for(i; options.clopts.nprocs) {
     279                                                (queues[i]){};
     280                                                {
     281                                                        acceptors[i].sockfd  = listener(address, addrlen);
     282                                                        acceptors[i].addr    = (struct sockaddr *)&address;
     283                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
     284                                                        acceptors[i].flags   = 0;
     285                                                        acceptors[i].queue   = &queues[i].q;
     286                                                }
     287                                                unpark( acceptors[i] );
     288                                        }
     289
     290                                        cworkers = anew(options.clopts.nworkers);
     291                                        for(i; options.clopts.nworkers) {
     292                                                {
     293                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     294                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     295                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
     296                                                        conns[i] = &cworkers[i].conn;
     297                                                }
     298                                                unpark( cworkers[i] );
     299                                        }
     300                                }
     301                                else {
     302                                        server_fd = listener(address, addrlen);
     303                                        aworkers = anew(options.clopts.nworkers);
     304                                        for(i; options.clopts.nworkers) {
     305                                                // if( options.file_cache.fixed_fds ) {
     306                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     307                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     308                                                // }
     309                                                // else
     310                                                {
     311                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     312                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     313                                                        aworkers[i].sockfd = server_fd;
     314                                                        aworkers[i].addr    = (struct sockaddr *)&address;
     315                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
     316                                                        aworkers[i].flags   = 0;
     317                                                        conns[i] = &aworkers[i].conn;
     318                                                }
     319                                                unpark( aworkers[i] );
     320                                        }
     321                                }
     322                                cl[0].prnt->conns = conns;
     323                                cl[0].prnt->conn_cnt = options.clopts.nworkers;
    283324                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
    284325                                for(i; options.clopts.nclusters) {
     
    307348                                }
    308349
    309                                 sout | "Notifying connections..." | nonl; flush( sout );
    310                                 for(i; options.clopts.nworkers) {
    311                                         workers[i].done = true;
    312                                 }
    313                                 sout | "done";
    314 
    315                                 sout | "Shutting down socket..." | nonl; flush( sout );
    316                                 if(options.socket.manyreuse) {
    317                                         for(i; options.clopts.nworkers) {
    318                                                 ret = shutdown( workers[i].sockfd, SHUT_RD );
    319                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
     350                                //===================
     351                                // Close Socket and join
     352                                if(options.socket.reuseport) {
     353                                        sout | "Notifying connections..." | nonl; flush( sout );
     354                                        for(i; options.clopts.nprocs) {
     355                                                acceptors[i].done = true;
     356                                        }
     357                                        for(i; options.clopts.nworkers) {
     358                                                cworkers[i].done = true;
     359                                        }
     360                                        sout | "done";
     361
     362                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     363                                        for(i; options.clopts.nprocs) {
     364                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
     365                                                if( ret < 0 ) {
     366                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
     367                                                }
     368                                        }
     369                                        sout | "done";
     370
     371                                        sout | "Closing Socket..." | nonl; flush( sout );
     372                                        for(i; options.clopts.nprocs) {
     373                                                ret = close( acceptors[i].sockfd );
     374                                                if( ret < 0) {
     375                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     376                                                }
     377                                        }
     378                                        sout | "done";
     379
     380                                        sout | "Stopping accept threads..." | nonl; flush( sout );
     381                                        for(i; options.clopts.nprocs) {
     382                                                join(acceptors[i]);
     383                                        }
     384                                        sout | "done";
     385
     386                                        sout | "Draining worker queues..." | nonl; flush( sout );
     387                                        for(i; options.clopts.nprocs) {
     388                                                PendingRead * p = 0p;
     389                                                while(p = pop(queues[i].q)) {
     390                                                        fulfil(p->f, -ECONNRESET);
     391                                                }
     392                                        }
     393                                        sout | "done";
     394
     395                                        sout | "Stopping worker threads..." | nonl; flush( sout );
     396                                        for(i; options.clopts.nworkers) {
     397                                                for(j; 2) {
     398                                                        ret = close(cworkers[i].conn.pipe[j]);
     399                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     400                                                }
     401                                                join(cworkers[i]);
    320402                                        }
    321403                                }
    322404                                else {
     405                                        sout | "Notifying connections..." | nonl; flush( sout );
     406                                        for(i; options.clopts.nworkers) {
     407                                                aworkers[i].done = true;
     408                                        }
     409                                        sout | "done";
     410
     411                                        sout | "Shutting down Socket..." | nonl; flush( sout );
    323412                                        ret = shutdown( server_fd, SHUT_RD );
    324413                                        if( ret < 0 ) {
    325                                                 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
    326                                         }
    327                                 }
    328                                 sout | "done";
    329 
    330                                 //===================
    331                                 // Close Socket
    332                                 sout | "Closing Socket..." | nonl; flush( sout );
    333                                 if(options.socket.manyreuse) {
    334                                         for(i; options.clopts.nworkers) {
    335                                                 ret = close(workers[i].sockfd);
    336                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
    337                                         }
    338                                 }
    339                                 else {
     414                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
     415                                        }
     416                                        sout | "done";
     417
     418                                        sout | "Closing Socket..." | nonl; flush( sout );
    340419                                        ret = close( server_fd );
    341420                                        if(ret < 0) {
    342421                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    343422                                        }
    344                                 }
    345                                 sout | "done";
    346 
    347                                 sout | "Stopping connection threads..." | nonl; flush( sout );
    348                                 for(i; options.clopts.nworkers) {
    349                                         for(j; 2) {
    350                                                 ret = close(workers[i].pipe[j]);
    351                                                 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
    352                                         }
    353                                         join(workers[i]);
     423                                        sout | "done";
     424
     425                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     426                                        for(i; options.clopts.nworkers) {
     427                                                for(j; 2) {
     428                                                        ret = close(aworkers[i].conn.pipe[j]);
     429                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     430                                                }
     431                                                join(aworkers[i]);
     432                                        }
    354433                                }
    355434                        }
     
    371450
    372451                        // Now that the stats printer is stopped, we can reclaim this
    373                         adelete(workers);
     452                        adelete(aworkers);
     453                        adelete(cworkers);
     454                        adelete(acceptors);
     455                        adelete(queues);
     456                        free(conns);
    374457
    375458                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
     
    377460                sout | "done";
    378461
    379                 // sout | "Closing splice fds..." | nonl; flush( sout );
    380                 // for(i; pipe_cnt) {
    381                 //      ret = close( fds[pipe_off + i] );
    382                 //      if(ret < 0) {
    383                 //              abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    384                 //      }
    385                 // }
    386462                free(fds);
    387                 sout | "done";
    388463
    389464                sout | "Stopping processors..." | nonl; flush( sout );
  • benchmark/io/http/options.cfa

    r6e2b04e r7f0ac12  
    3838                10,    // backlog
    3939                1024,  // buflen
    40                 false, // onereuse
    41                 false  // manyreuse
     40                false  // reuseport
    4241        },
    4342
     
    7271                {'\0', "shell",          "Disable interactive mode", options.interactive, parse_setfalse},
    7372                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
    74                 {'\0', "reuseport-one",  "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue},
    75                 {'\0', "reuseport",      "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue},
     73                {'\0', "reuseport",      "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue},
    7674                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    7775                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
  • benchmark/io/http/options.hfa

    r6e2b04e r7f0ac12  
    2727                int backlog;
    2828                int buflen;
    29                 bool onereuse;
    30                 bool manyreuse;
     29                bool reuseport;
    3130        } socket;
    3231
  • benchmark/io/http/protocol.cfa

    r6e2b04e r7f0ac12  
    3030#define PLAINTEXT_NOCOPY
    3131#define LINKED_IO
     32
     33static inline __s32 wait_res( io_future_t & this ) {
     34        wait( this );
     35        if( this.result < 0 ) {{
     36                errno = -this.result;
     37                return -1;
     38        }}
     39        return this.result;
     40}
    3241
    3342struct https_msg_str {
     
    470479
    471480                        if(is_error(splice_in.res)) {
     481                                if(splice_in.res.error == -EPIPE) return -ECONNRESET;
    472482                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
    473483                                close(fd);
     
    503513}
    504514
    505 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     515[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
    506516        char * it = buffer;
    507517        size_t count = len - 1;
     
    509519        READ:
    510520        for() {
    511                 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     521                int ret;
     522                if( f ) {
     523                        ret = wait_res(*f);
     524                        reset(*f);
     525                        f = 0p;
     526                } else {
     527                        ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     528                }
    512529                // int ret = read(fd, (void*)it, count);
    513530                if(ret == 0 ) return [OK200, true, 0, 0];
  • benchmark/io/http/protocol.hfa

    r6e2b04e r7f0ac12  
    11#pragma once
    22
     3struct io_future_t;
    34struct sendfile_stats_t;
    45
     
    2223int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & );
    2324
    24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
     25[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f);
  • benchmark/io/http/socket.cfa

    r6e2b04e r7f0ac12  
    2626
    2727int listener(struct sockaddr_in & address, int addrlen) {
    28         int sockfd = socket(AF_INET, SOCK_STREAM, 0);
     28        int type = SOCK_STREAM;
     29        if(options.socket.reuseport) type |= SOCK_NONBLOCK;
     30        int sockfd = socket(AF_INET, type, 0);
    2931        if(sockfd < 0) {
    3032                abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
    3133        }
    3234
    33         if(options.socket.onereuse || options.socket.manyreuse) {
     35        if(options.socket.reuseport) {
    3436                int value = 1;
    3537                // if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&on, sizeof(on)))
  • benchmark/io/http/worker.cfa

    r6e2b04e r7f0ac12  
    88#include <fstream.hfa>
    99#include <iofwd.hfa>
     10#include <mutex_stmt.hfa>
    1011
    1112#include "options.hfa"
     
    1314#include "filecache.hfa"
    1415
    15 //=============================================================================================
    16 // Worker Thread
    17 //=============================================================================================
    18 void ?{}( Worker & this ) {
     16void ?{}( sendfile_stats_t & this ) {
     17        this.calls = 0;
     18        this.tries = 0;
     19        this.header = 0;
     20        this.splcin = 0;
     21        this.splcot = 0;
     22        for(i; zipf_cnts) {
     23                this.avgrd[i].calls = 0;
     24                this.avgrd[i].bytes = 0;
     25        }
     26}
     27
     28//=============================================================================================
     29// Generic connection handling
     30//=============================================================================================
     31static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) {
     32        REQUEST:
     33        for() {
     34                bool closed;
     35                HttpCode code;
     36                const char * file;
     37                size_t name_size;
     38
     39                // Read the http request
     40                if( options.log ) sout | "=== Reading request ===";
     41                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
     42                f = 0p;
     43
     44                // if we are done, break out of the loop
     45                if( closed ) break REQUEST;
     46
     47                // If this wasn't a request retrun 400
     48                if( code != OK200 ) {
     49                        sout | "=== Invalid Request :" | code_val(code) | "===";
     50                        answer_error(fd, code);
     51                        continue REQUEST;
     52                }
     53
     54                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
     55                        if( options.log ) sout | "=== Request for /plaintext ===";
     56
     57                        int ret = answer_plaintext(fd);
     58                        if( ret == -ECONNRESET ) break REQUEST;
     59
     60                        if( options.log ) sout | "=== Answer sent ===";
     61                        continue REQUEST;
     62                }
     63
     64                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
     65                        if( options.log ) sout | "=== Request for /ping ===";
     66
     67                        // Send the header
     68                        int ret = answer_empty(fd);
     69                        if( ret == -ECONNRESET ) break REQUEST;
     70
     71                        if( options.log ) sout | "=== Answer sent ===";
     72                        continue REQUEST;
     73                }
     74
     75                if( options.log ) {
     76                        sout | "=== Request for file " | nonl;
     77                        write(sout, file, name_size);
     78                        sout | " ===";
     79                }
     80
     81                if( !options.file_cache.path ) {
     82                        if( options.log ) {
     83                                sout | "=== File Not Found (" | nonl;
     84                                write(sout, file, name_size);
     85                                sout | ") ===";
     86                        }
     87                        answer_error(fd, E405);
     88                        continue REQUEST;
     89                }
     90
     91                // Get the fd from the file cache
     92                int ans_fd;
     93                size_t count;
     94                [ans_fd, count] = get_file( file, name_size );
     95
     96                // If we can't find the file, return 404
     97                if( ans_fd < 0 ) {
     98                        if( options.log ) {
     99                                sout | "=== File Not Found (" | nonl;
     100                                write(sout, file, name_size);
     101                                sout | ") ===";
     102                        }
     103                        answer_error(fd, E404);
     104                        continue REQUEST;
     105                }
     106
     107                // Send the desired file
     108                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
     109                if( ret == -ECONNRESET ) break REQUEST;
     110
     111                if( options.log ) sout | "=== Answer sent ===";
     112        }
     113}
     114
     115//=============================================================================================
     116// Self Accepting Worker Thread
     117//=============================================================================================
     118void ?{}( AcceptWorker & this ) {
    19119        size_t cli = rand() % options.clopts.cltr_cnt;
    20120        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
    21121        options.clopts.thrd_cnt[cli]++;
    22         this.pipe[0] = -1;
    23         this.pipe[1] = -1;
    24122        this.done = false;
    25 
    26         this.stats.sendfile.calls = 0;
    27         this.stats.sendfile.tries = 0;
    28         this.stats.sendfile.header = 0;
    29         this.stats.sendfile.splcin = 0;
    30         this.stats.sendfile.splcot = 0;
    31         for(i; zipf_cnts) {
    32                 this.stats.sendfile.avgrd[i].calls = 0;
    33                 this.stats.sendfile.avgrd[i].bytes = 0;
    34         }
    35 }
    36 
    37 extern "C" {
    38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    39 }
    40 
    41 void main( Worker & this ) {
     123}
     124
     125void main( AcceptWorker & this ) {
    42126        park();
    43         /* paranoid */ assert( this.pipe[0] != -1 );
    44         /* paranoid */ assert( this.pipe[1] != -1 );
    45 
    46         const bool reuse = options.socket.manyreuse;
    47 
    48         CONNECTION:
     127        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     128        /* paranoid */ assert( this.conn.pipe[1] != -1 );
    49129        for() {
    50130                if( options.log ) sout | "=== Accepting connection ===";
    51                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
     131                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
    52132                if(fd < 0) {
    53133                        if( errno == ECONNABORTED ) break;
     
    58138
    59139                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    60                 REQUEST:
    61                 for() {
    62                         bool closed;
    63                         HttpCode code;
    64                         const char * file;
    65                         size_t name_size;
    66 
    67                         // Read the http request
    68                         size_t len = options.socket.buflen;
    69                         char buffer[len];
    70                         if( options.log ) sout | "=== Reading request ===";
    71                         [code, closed, file, name_size] = http_read(fd, buffer, len);
    72 
    73                         // if we are done, break out of the loop
    74                         if( closed ) break REQUEST;
    75 
    76                         // If this wasn't a request retrun 400
    77                         if( code != OK200 ) {
    78                                 sout | "=== Invalid Request :" | code_val(code) | "===";
    79                                 answer_error(fd, code);
    80                                 continue REQUEST;
     140                size_t len = options.socket.buflen;
     141                char buffer[len];
     142                handle_connection( this.conn, fd, buffer, len, 0p );
     143
     144                if( options.log ) sout | "=== Connection closed ===";
     145        }
     146}
     147
     148
     149//=============================================================================================
     150// Channel Worker Thread
     151//=============================================================================================
     152void ?{}( ChannelWorker & this ) {
     153        size_t cli = rand() % options.clopts.cltr_cnt;
     154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     155        options.clopts.thrd_cnt[cli]++;
     156        this.done = false;
     157}
     158
     159void main( ChannelWorker & this ) {
     160        park();
     161        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     162        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     163        for() {
     164                size_t len = options.socket.buflen;
     165                char buffer[len];
     166                PendingRead p;
     167                p.in.buf = (void*)buffer;
     168                p.in.len = len;
     169                push(*this.queue, &p);
     170
     171                if( options.log ) sout | "=== Waiting new connection ===";
     172                handle_connection( this.conn, p.out.fd, buffer, len, &p.f );
     173
     174                if( options.log ) sout | "=== Connection closed ===";
     175                if(this.done) break;
     176        }
     177}
     178
     179extern "C" {
     180extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
     181}
     182
     183void ?{}( Acceptor & this ) {
     184        size_t cli = rand() % options.clopts.cltr_cnt;
     185        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     186        options.clopts.thrd_cnt[cli]++;
     187        this.done = false;
     188}
     189
     190void main( Acceptor & this ) {
     191        park();
     192        if( options.log ) sout | "=== Accepting connection ===";
     193        for() {
     194                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
     195                if(fd < 0) {
     196                        if( errno == EWOULDBLOCK) {
     197                                yield();
     198                                continue;
    81199                        }
    82 
    83                         if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    84                                 if( options.log ) sout | "=== Request for /plaintext ===";
    85 
    86                                 int ret = answer_plaintext(fd);
    87                                 if( ret == -ECONNRESET ) break REQUEST;
    88 
    89                                 if( options.log ) sout | "=== Answer sent ===";
    90                                 continue REQUEST;
    91                         }
    92 
    93                         if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    94                                 if( options.log ) sout | "=== Request for /ping ===";
    95 
    96                                 // Send the header
    97                                 int ret = answer_empty(fd);
    98                                 if( ret == -ECONNRESET ) break REQUEST;
    99 
    100                                 if( options.log ) sout | "=== Answer sent ===";
    101                                 continue REQUEST;
    102                         }
    103 
    104                         if( options.log ) {
    105                                 sout | "=== Request for file " | nonl;
    106                                 write(sout, file, name_size);
    107                                 sout | " ===";
    108                         }
    109 
    110                         if( !options.file_cache.path ) {
    111                                 if( options.log ) {
    112                                         sout | "=== File Not Found (" | nonl;
    113                                         write(sout, file, name_size);
    114                                         sout | ") ===";
    115                                 }
    116                                 answer_error(fd, E405);
    117                                 continue REQUEST;
    118                         }
    119 
    120                         // Get the fd from the file cache
    121                         int ans_fd;
    122                         size_t count;
    123                         [ans_fd, count] = get_file( file, name_size );
    124 
    125                         // If we can't find the file, return 404
    126                         if( ans_fd < 0 ) {
    127                                 if( options.log ) {
    128                                         sout | "=== File Not Found (" | nonl;
    129                                         write(sout, file, name_size);
    130                                         sout | ") ===";
    131                                 }
    132                                 answer_error(fd, E404);
    133                                 continue REQUEST;
    134                         }
    135 
    136                         // Send the desired file
    137                         int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    138                         if( ret == -ECONNRESET ) break REQUEST;
    139 
    140                         if( options.log ) sout | "=== Answer sent ===";
    141                 }
    142 
    143                 if( options.log ) sout | "=== Connection closed ===";
    144                 continue CONNECTION;
    145         }
    146 }
     200                        if( errno == ECONNABORTED ) break;
     201                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
     202                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     203                }
     204                if(this.done) return;
     205
     206                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     207
     208                if(fd) {
     209                        PendingRead * p = 0p;
     210                        for() {
     211                                if(this.done) return;
     212                                p = pop(*this.queue);
     213                                if(p) break;
     214                                yield();
     215                        };
     216
     217                        p->out.fd = fd;
     218                        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
     219                }
     220
     221                if( options.log ) sout | "=== Accepting connection ===";
     222        }
     223}
  • benchmark/io/http/worker.hfa

    r6e2b04e r7f0ac12  
    11#pragma once
    22
     3#include <iofwd.hfa>
     4#include <queueLockFree.hfa>
    35#include <thread.hfa>
    46
     
    2628};
    2729
    28 thread Worker {
     30void ?{}( sendfile_stats_t & this );
     31
     32struct connection {
    2933        int pipe[2];
     34        struct {
     35                sendfile_stats_t sendfile;
     36        } stats;
     37};
     38
     39static inline void ?{}( connection & this ) {
     40        this.pipe[0] = -1;
     41        this.pipe[1] = -1;
     42}
     43
     44thread AcceptWorker {
     45        connection conn;
    3046        int sockfd;
    3147        struct sockaddr * addr;
     
    3349        int flags;
    3450        volatile bool done;
     51};
     52void ?{}( AcceptWorker & this);
     53void main( AcceptWorker & );
     54
     55
     56struct PendingRead {
     57        PendingRead * volatile next;
     58        io_future_t f;
    3559        struct {
    36                 sendfile_stats_t sendfile;
    37         } stats;
     60                void * buf;
     61                size_t len;
     62        } in;
     63        struct {
     64                volatile int fd;
     65        } out;
    3866};
    39 void ?{}( Worker & this);
    40 void main( Worker & );
     67
     68static inline PendingRead * volatile & ?`next ( PendingRead * node ) {
     69        return node->next;
     70}
     71
     72thread ChannelWorker {
     73        connection conn;
     74        volatile bool done;
     75        mpsc_queue(PendingRead) * queue;
     76};
     77void ?{}( ChannelWorker & );
     78void main( ChannelWorker & );
     79
     80thread Acceptor {
     81        mpsc_queue(PendingRead) * queue;
     82        int sockfd;
     83        struct sockaddr * addr;
     84        socklen_t * addrlen;
     85        int flags;
     86        volatile bool done;
     87};
     88void ?{}( Acceptor & );
     89void main( Acceptor & );
Note: See TracChangeset for help on using the changeset viewer.