Ignore:
Timestamp:
Jun 8, 2022, 7:07:51 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
bbf61838
Parents:
6e2b04e
Message:

First draft at acceptor thread webserver

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/worker.cfa

    r6e2b04e r7f0ac12  
    88#include <fstream.hfa>
    99#include <iofwd.hfa>
     10#include <mutex_stmt.hfa>
    1011
    1112#include "options.hfa"
     
    1314#include "filecache.hfa"
    1415
    15 //=============================================================================================
    16 // Worker Thread
    17 //=============================================================================================
    18 void ?{}( Worker & this ) {
     16void ?{}( sendfile_stats_t & this ) {
     17        this.calls = 0;
     18        this.tries = 0;
     19        this.header = 0;
     20        this.splcin = 0;
     21        this.splcot = 0;
     22        for(i; zipf_cnts) {
     23                this.avgrd[i].calls = 0;
     24                this.avgrd[i].bytes = 0;
     25        }
     26}
     27
     28//=============================================================================================
     29// Generic connection handling
     30//=============================================================================================
     31static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) {
     32        REQUEST:
     33        for() {
     34                bool closed;
     35                HttpCode code;
     36                const char * file;
     37                size_t name_size;
     38
     39                // Read the http request
     40                if( options.log ) sout | "=== Reading request ===";
     41                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
     42                f = 0p;
     43
     44                // if we are done, break out of the loop
     45                if( closed ) break REQUEST;
     46
     47                // If this wasn't a request retrun 400
     48                if( code != OK200 ) {
     49                        sout | "=== Invalid Request :" | code_val(code) | "===";
     50                        answer_error(fd, code);
     51                        continue REQUEST;
     52                }
     53
     54                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
     55                        if( options.log ) sout | "=== Request for /plaintext ===";
     56
     57                        int ret = answer_plaintext(fd);
     58                        if( ret == -ECONNRESET ) break REQUEST;
     59
     60                        if( options.log ) sout | "=== Answer sent ===";
     61                        continue REQUEST;
     62                }
     63
     64                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
     65                        if( options.log ) sout | "=== Request for /ping ===";
     66
     67                        // Send the header
     68                        int ret = answer_empty(fd);
     69                        if( ret == -ECONNRESET ) break REQUEST;
     70
     71                        if( options.log ) sout | "=== Answer sent ===";
     72                        continue REQUEST;
     73                }
     74
     75                if( options.log ) {
     76                        sout | "=== Request for file " | nonl;
     77                        write(sout, file, name_size);
     78                        sout | " ===";
     79                }
     80
     81                if( !options.file_cache.path ) {
     82                        if( options.log ) {
     83                                sout | "=== File Not Found (" | nonl;
     84                                write(sout, file, name_size);
     85                                sout | ") ===";
     86                        }
     87                        answer_error(fd, E405);
     88                        continue REQUEST;
     89                }
     90
     91                // Get the fd from the file cache
     92                int ans_fd;
     93                size_t count;
     94                [ans_fd, count] = get_file( file, name_size );
     95
     96                // If we can't find the file, return 404
     97                if( ans_fd < 0 ) {
     98                        if( options.log ) {
     99                                sout | "=== File Not Found (" | nonl;
     100                                write(sout, file, name_size);
     101                                sout | ") ===";
     102                        }
     103                        answer_error(fd, E404);
     104                        continue REQUEST;
     105                }
     106
     107                // Send the desired file
     108                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
     109                if( ret == -ECONNRESET ) break REQUEST;
     110
     111                if( options.log ) sout | "=== Answer sent ===";
     112        }
     113}
     114
     115//=============================================================================================
     116// Self Accepting Worker Thread
     117//=============================================================================================
     118void ?{}( AcceptWorker & this ) {
    19119        size_t cli = rand() % options.clopts.cltr_cnt;
    20120        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
    21121        options.clopts.thrd_cnt[cli]++;
    22         this.pipe[0] = -1;
    23         this.pipe[1] = -1;
    24122        this.done = false;
    25 
    26         this.stats.sendfile.calls = 0;
    27         this.stats.sendfile.tries = 0;
    28         this.stats.sendfile.header = 0;
    29         this.stats.sendfile.splcin = 0;
    30         this.stats.sendfile.splcot = 0;
    31         for(i; zipf_cnts) {
    32                 this.stats.sendfile.avgrd[i].calls = 0;
    33                 this.stats.sendfile.avgrd[i].bytes = 0;
    34         }
    35 }
    36 
    37 extern "C" {
    38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    39 }
    40 
    41 void main( Worker & this ) {
     123}
     124
     125void main( AcceptWorker & this ) {
    42126        park();
    43         /* paranoid */ assert( this.pipe[0] != -1 );
    44         /* paranoid */ assert( this.pipe[1] != -1 );
    45 
    46         const bool reuse = options.socket.manyreuse;
    47 
    48         CONNECTION:
     127        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     128        /* paranoid */ assert( this.conn.pipe[1] != -1 );
    49129        for() {
    50130                if( options.log ) sout | "=== Accepting connection ===";
    51                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
     131                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
    52132                if(fd < 0) {
    53133                        if( errno == ECONNABORTED ) break;
     
    58138
    59139                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    60                 REQUEST:
    61                 for() {
    62                         bool closed;
    63                         HttpCode code;
    64                         const char * file;
    65                         size_t name_size;
    66 
    67                         // Read the http request
    68                         size_t len = options.socket.buflen;
    69                         char buffer[len];
    70                         if( options.log ) sout | "=== Reading request ===";
    71                         [code, closed, file, name_size] = http_read(fd, buffer, len);
    72 
    73                         // if we are done, break out of the loop
    74                         if( closed ) break REQUEST;
    75 
    76                         // If this wasn't a request retrun 400
    77                         if( code != OK200 ) {
    78                                 sout | "=== Invalid Request :" | code_val(code) | "===";
    79                                 answer_error(fd, code);
    80                                 continue REQUEST;
     140                size_t len = options.socket.buflen;
     141                char buffer[len];
     142                handle_connection( this.conn, fd, buffer, len, 0p );
     143
     144                if( options.log ) sout | "=== Connection closed ===";
     145        }
     146}
     147
     148
     149//=============================================================================================
     150// Channel Worker Thread
     151//=============================================================================================
     152void ?{}( ChannelWorker & this ) {
     153        size_t cli = rand() % options.clopts.cltr_cnt;
     154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     155        options.clopts.thrd_cnt[cli]++;
     156        this.done = false;
     157}
     158
     159void main( ChannelWorker & this ) {
     160        park();
     161        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     162        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     163        for() {
     164                size_t len = options.socket.buflen;
     165                char buffer[len];
     166                PendingRead p;
     167                p.in.buf = (void*)buffer;
     168                p.in.len = len;
     169                push(*this.queue, &p);
     170
     171                if( options.log ) sout | "=== Waiting new connection ===";
     172                handle_connection( this.conn, p.out.fd, buffer, len, &p.f );
     173
     174                if( options.log ) sout | "=== Connection closed ===";
     175                if(this.done) break;
     176        }
     177}
     178
     179extern "C" {
     180extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
     181}
     182
     183void ?{}( Acceptor & this ) {
     184        size_t cli = rand() % options.clopts.cltr_cnt;
     185        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     186        options.clopts.thrd_cnt[cli]++;
     187        this.done = false;
     188}
     189
     190void main( Acceptor & this ) {
     191        park();
     192        if( options.log ) sout | "=== Accepting connection ===";
     193        for() {
     194                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
     195                if(fd < 0) {
     196                        if( errno == EWOULDBLOCK) {
     197                                yield();
     198                                continue;
    81199                        }
    82 
    83                         if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    84                                 if( options.log ) sout | "=== Request for /plaintext ===";
    85 
    86                                 int ret = answer_plaintext(fd);
    87                                 if( ret == -ECONNRESET ) break REQUEST;
    88 
    89                                 if( options.log ) sout | "=== Answer sent ===";
    90                                 continue REQUEST;
    91                         }
    92 
    93                         if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    94                                 if( options.log ) sout | "=== Request for /ping ===";
    95 
    96                                 // Send the header
    97                                 int ret = answer_empty(fd);
    98                                 if( ret == -ECONNRESET ) break REQUEST;
    99 
    100                                 if( options.log ) sout | "=== Answer sent ===";
    101                                 continue REQUEST;
    102                         }
    103 
    104                         if( options.log ) {
    105                                 sout | "=== Request for file " | nonl;
    106                                 write(sout, file, name_size);
    107                                 sout | " ===";
    108                         }
    109 
    110                         if( !options.file_cache.path ) {
    111                                 if( options.log ) {
    112                                         sout | "=== File Not Found (" | nonl;
    113                                         write(sout, file, name_size);
    114                                         sout | ") ===";
    115                                 }
    116                                 answer_error(fd, E405);
    117                                 continue REQUEST;
    118                         }
    119 
    120                         // Get the fd from the file cache
    121                         int ans_fd;
    122                         size_t count;
    123                         [ans_fd, count] = get_file( file, name_size );
    124 
    125                         // If we can't find the file, return 404
    126                         if( ans_fd < 0 ) {
    127                                 if( options.log ) {
    128                                         sout | "=== File Not Found (" | nonl;
    129                                         write(sout, file, name_size);
    130                                         sout | ") ===";
    131                                 }
    132                                 answer_error(fd, E404);
    133                                 continue REQUEST;
    134                         }
    135 
    136                         // Send the desired file
    137                         int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    138                         if( ret == -ECONNRESET ) break REQUEST;
    139 
    140                         if( options.log ) sout | "=== Answer sent ===";
    141                 }
    142 
    143                 if( options.log ) sout | "=== Connection closed ===";
    144                 continue CONNECTION;
    145         }
    146 }
     200                        if( errno == ECONNABORTED ) break;
     201                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
     202                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     203                }
     204                if(this.done) return;
     205
     206                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     207
     208                if(fd) {
     209                        PendingRead * p = 0p;
     210                        for() {
     211                                if(this.done) return;
     212                                p = pop(*this.queue);
     213                                if(p) break;
     214                                yield();
     215                        };
     216
     217                        p->out.fd = fd;
     218                        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
     219                }
     220
     221                if( options.log ) sout | "=== Accepting connection ===";
     222        }
     223}
Note: See TracChangeset for help on using the changeset viewer.