Changeset aeb20a4

Jun 9, 2022, 2:26:43 PM (2 years ago)
caparsons <caparson@…>
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
db7a3ad (diff), 430ce61 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.

Merge branch 'master' of

3 added
10 edited


  • benchmark/io/http/

    rdb7a3ad raeb20a4  
    3737 \
    3838        options.hfa \
     39 \
     40        printer.hfa \
    3941 \
    4042        protocol.hfa \
  • benchmark/io/http/

    rdb7a3ad raeb20a4  
    2525#include "options.hfa"
    2626#include "socket.hfa"
     27#include "printer.hfa"
    2728#include "worker.hfa"
    3132Duration default_preemption() {
    3233        return 0;
    33 }
    35 //=============================================================================================
    36 // Stats Printer
    37 //============================================================================================='
    39 thread StatsPrinter {
    40         Worker * workers;
    41         int worker_cnt;
    42         condition_variable(fast_block_lock) var;
    43 };
    45 void ?{}( StatsPrinter & this, cluster & cl ) {
    46         ((thread&)this){ "Stats Printer Thread", cl };
    47         this.worker_cnt = 0;
    48 }
    50 void ^?{}( StatsPrinter & mutex this ) {}
    52 #define eng3(X) (ws(3, 3, unit(eng( X ))))
    54 void main(StatsPrinter & this) {
    55         LOOP: for() {
    56                 waitfor( ^?{} : this) {
    57                         break LOOP;
    58                 }
    59                 or else {}
    61                 wait(this.var, 10`s);
    63                 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    64                 if(this.worker_cnt != 0) {
    65                         uint64_t tries = 0;
    66                         uint64_t calls = 0;
    67                         uint64_t header = 0;
    68                         uint64_t splcin = 0;
    69                         uint64_t splcot = 0;
    70                         struct {
    71                                 volatile uint64_t calls;
    72                                 volatile uint64_t bytes;
    73                         } avgrd[zipf_cnts];
    74                         memset(avgrd, 0, sizeof(avgrd));
    76                         for(i; this.worker_cnt) {
    77                                 tries += this.workers[i].stats.sendfile.tries;
    78                                 calls += this.workers[i].stats.sendfile.calls;
    79                                 header += this.workers[i].stats.sendfile.header;
    80                                 splcin += this.workers[i].stats.sendfile.splcin;
    81                                 splcot += this.workers[i].stats.sendfile.splcot;
    82                                 for(j; zipf_cnts) {
    83                                         avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
    84                                         avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
    85                                 }
    86                         }
    88                         double ratio = ((double)tries) / calls;
    90                         sout | "----- Worker Stats -----";
    91                         sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    92                         sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
    93                         sout | " - zipf sizes:";
    94                         for(i; zipf_cnts) {
    95                                 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
    96                                 sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
    97                         }
    98                 }
    99                 else {
    100                         sout | "No Workers!";
    101                 }
    102         }
    10940        cluster self;
    11041        processor    * procs;
    111         // io_context   * ctxs;
    112         StatsPrinter * prnt;
    15281        }
    154         if(options.stats) {
    155                 this.prnt = alloc();
    156                 (*this.prnt){ this.self };
    157         } else {
    158                 this.prnt = 0p;
    159         }
    16183        #if !defined(__CFA_NO_STATISTICS__)
    16284                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
    16385        #endif
    165         options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
    166         options.clopts.cltr_cnt++;
     87        options.clopts.instance = &this.self;
    16990void ^?{}( ServerCluster & this ) {
    170         delete(this.prnt);
    17291        for(i; options.clopts.nprocs) {
    17392                ^(this.procs[i]){};
    18099extern void init_protocol(void);
    181100extern void deinit_protocol(void);
     103// REUSEPORT
     106size_t sockarr_size;
     107struct __attribute__((aligned(128))) Q {
     108        mpsc_queue(PendingRead) q;
    236164        int server_fd;
    237         if(!options.socket.manyreuse) {
    238                 server_fd = listener(address, addrlen);
    239         }
    241166        //===================
    257182                {
    258183                        // Stats printer makes a copy so this needs to persist longer than normal
    259                         Worker * workers;
    260                         ServerCluster cl[options.clopts.nclusters];
     184                        connection ** conns;
     185                        AcceptWorker  * aworkers = 0p;
     186                        ChannelWorker * cworkers = 0p;
     187                        Acceptor * acceptors = 0p;
     188                        Q * queues = 0p;
     189                        ServerCluster cl;
     191                        if(options.stats) {
     192                                stats_thrd = alloc();
     193                                (*stats_thrd){ cl.self };
     194                        } else {
     195                                stats_thrd = 0p;
     196                        }
    262198                        init_protocol();
    263199                        {
    264                                 workers = anew(options.clopts.nworkers);
    265                                 cl[0].prnt->workers = workers;
    266                                 cl[0].prnt->worker_cnt = options.clopts.nworkers;
    267                                 for(i; options.clopts.nworkers) {
    268                                         // if( options.file_cache.fixed_fds ) {
    269                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    270                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    271                                         // }
    272                                         // else
    273                                         {
    274                                                 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    275                                                 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    276                                                 workers[i].sockfd  = options.socket.manyreuse ?  listener(address, addrlen) : server_fd;
    277                                                 workers[i].addr    = (struct sockaddr *)&address;
    278                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    279                                                 workers[i].flags   = 0;
    280                                         }
    281                                         unpark( workers[i] );
    282                                 }
    283                                 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
    284                                 for(i; options.clopts.nclusters) {
    285                                         sout | options.clopts.thrd_cnt[i] | nonl;
    286                                 }
     200                                conns = alloc(options.clopts.nworkers);
     201                                if(options.socket.reuseport) {
     202                                        queues = alloc(options.clopts.nprocs);
     203                                        acceptors = anew(options.clopts.nprocs);
     204                                        for(i; options.clopts.nprocs) {
     205                                                (queues[i]){};
     206                                                {
     207                                                        acceptors[i].sockfd  = listener(address, addrlen);
     208                                                        acceptors[i].addr    = (struct sockaddr *)&address;
     209                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
     210                                                        acceptors[i].flags   = 0;
     211                                                        acceptors[i].queue   = &queues[i].q;
     212                                                }
     213                                                unpark( acceptors[i] );
     214                                        }
     216                                        cworkers = anew(options.clopts.nworkers);
     217                                        for(i; options.clopts.nworkers) {
     218                                                {
     219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     221                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
     222                                                        conns[i] = &cworkers[i].conn;
     223                                                }
     224                                                unpark( cworkers[i] );
     225                                        }
     226                                }
     227                                else {
     228                                        server_fd = listener(address, addrlen);
     229                                        aworkers = anew(options.clopts.nworkers);
     230                                        for(i; options.clopts.nworkers) {
     231                                                // if( options.file_cache.fixed_fds ) {
     232                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     233                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     234                                                // }
     235                                                // else
     236                                                {
     237                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     238                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     239                                                        aworkers[i].sockfd = server_fd;
     240                                                        aworkers[i].addr    = (struct sockaddr *)&address;
     241                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
     242                                                        aworkers[i].flags   = 0;
     243                                                        conns[i] = &aworkers[i].conn;
     244                                                }
     245                                                unpark( aworkers[i] );
     246                                        }
     247                                }
     249                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
    287250                                sout | nl;
    288251                                {
    307270                                }
    309                                 sout | "Notifying connections..." | nonl; flush( sout );
    310                                 for(i; options.clopts.nworkers) {
    311                                         workers[i].done = true;
    312                                 }
    313                                 sout | "done";
    315                                 sout | "Shutting down socket..." | nonl; flush( sout );
    316                                 if(options.socket.manyreuse) {
    317                                         for(i; options.clopts.nworkers) {
    318                                                 ret = shutdown( workers[i].sockfd, SHUT_RD );
    319                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
     272                                //===================
     273                                // Close Socket and join
     274                                if(options.socket.reuseport) {
     275                                        sout | "Notifying connections..." | nonl; flush( sout );
     276                                        for(i; options.clopts.nprocs) {
     277                                                acceptors[i].done = true;
     278                                        }
     279                                        for(i; options.clopts.nworkers) {
     280                                                cworkers[i].done = true;
     281                                        }
     282                                        sout | "done";
     284                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     285                                        for(i; options.clopts.nprocs) {
     286                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
     287                                                if( ret < 0 ) {
     288                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
     289                                                }
     290                                        }
     291                                        sout | "done";
     293                                        sout | "Closing Socket..." | nonl; flush( sout );
     294                                        for(i; options.clopts.nprocs) {
     295                                                ret = close( acceptors[i].sockfd );
     296                                                if( ret < 0) {
     297                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     298                                                }
     299                                        }
     300                                        sout | "done";
     302                                        sout | "Stopping accept threads..." | nonl; flush( sout );
     303                                        for(i; options.clopts.nprocs) {
     304                                                join(acceptors[i]);
     305                                        }
     306                                        sout | "done";
     308                                        sout | "Draining worker queues..." | nonl; flush( sout );
     309                                        for(i; options.clopts.nprocs) {
     310                                                PendingRead * p = 0p;
     311                                                while(p = pop(queues[i].q)) {
     312                                                        fulfil(p->f, -ECONNRESET);
     313                                                }
     314                                        }
     315                                        sout | "done";
     317                                        sout | "Stopping worker threads..." | nonl; flush( sout );
     318                                        for(i; options.clopts.nworkers) {
     319                                                for(j; 2) {
     320                                                        ret = close(cworkers[i].conn.pipe[j]);
     321                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     322                                                }
     323                                                join(cworkers[i]);
    320324                                        }
    321325                                }
    322326                                else {
     327                                        sout | "Notifying connections..." | nonl; flush( sout );
     328                                        for(i; options.clopts.nworkers) {
     329                                                aworkers[i].done = true;
     330                                        }
     331                                        sout | "done";
     333                                        sout | "Shutting down Socket..." | nonl; flush( sout );
    323334                                        ret = shutdown( server_fd, SHUT_RD );
    324335                                        if( ret < 0 ) {
    325                                                 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
    326                                         }
    327                                 }
    328                                 sout | "done";
    330                                 //===================
    331                                 // Close Socket
    332                                 sout | "Closing Socket..." | nonl; flush( sout );
    333                                 if(options.socket.manyreuse) {
    334                                         for(i; options.clopts.nworkers) {
    335                                                 ret = close(workers[i].sockfd);
    336                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
    337                                         }
    338                                 }
    339                                 else {
     336                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
     337                                        }
     338                                        sout | "done";
     340                                        sout | "Closing Socket..." | nonl; flush( sout );
    340341                                        ret = close( server_fd );
    341342                                        if(ret < 0) {
    342343                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    343344                                        }
    344                                 }
    345                                 sout | "done";
    347                                 sout | "Stopping connection threads..." | nonl; flush( sout );
    348                                 for(i; options.clopts.nworkers) {
    349                                         for(j; 2) {
    350                                                 ret = close(workers[i].pipe[j]);
    351                                                 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
    352                                         }
    353                                         join(workers[i]);
     345                                        sout | "done";
     347                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     348                                        for(i; options.clopts.nworkers) {
     349                                                for(j; 2) {
     350                                                        ret = close(aworkers[i].conn.pipe[j]);
     351                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     352                                                }
     353                                                join(aworkers[i]);
     354                                        }
    354355                                }
    355356                        }
    362363                        sout | "Stopping printer threads..." | nonl; flush( sout );
    363                         for(i; options.clopts.nclusters) {
    364                                 StatsPrinter * p = cl[i].prnt;
    365                                 if(p) {
    366                                         notify_one(p->var);
    367                                         join(*p);
    368                                 }
    369                         }
     364                        if(stats_thrd) {
     365                                notify_one(stats_thrd->var);
     366                        }
     367                        delete(stats_thrd);
    370368                        sout | "done";
    372370                        // Now that the stats printer is stopped, we can reclaim this
    373                         adelete(workers);
     371                        adelete(aworkers);
     372                        adelete(cworkers);
     373                        adelete(acceptors);
     374                        adelete(queues);
     375                        free(conns);
    375377                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
    377379                sout | "done";
    379                 // sout | "Closing splice fds..." | nonl; flush( sout );
    380                 // for(i; pipe_cnt) {
    381                 //      ret = close( fds[pipe_off + i] );
    382                 //      if(ret < 0) {
    383                 //              abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    384                 //      }
    385                 // }
    386381                free(fds);
    387                 sout | "done";
    389383                sout | "Stopping processors..." | nonl; flush( sout );
  • benchmark/io/http/

    rdb7a3ad raeb20a4  
    3838                10,    // backlog
    3939                1024,  // buflen
    40                 false, // onereuse
    41                 false  // manyreuse
     40                false  // reuseport
    4241        },
    4443        { // cluster
    45                 1,     // nclusters;
    4644                1,     // nprocs;
    4745                1,     // nworkers;
    5553void parse_options( int argc, char * argv[] ) {
    56         // bool fixedfd = false;
    57         // bool sqkpoll = false;
    58         // bool iokpoll = false;
    5954        unsigned nentries = 0;
    60         bool isolate = false;
    6355        static cfa_option opt[] = {
    6456                { 'p', "port",           "Port the server will listen on", options.socket.port},
    6557                { 'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
    6658                { 't', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    67                 {'\0', "isolate",        "Create one cluster per processor", isolate, parse_settrue},
    6859                {'\0', "log",            "Enable logs", options.log, parse_settrue},
    6960                {'\0', "sout",           "Redirect standard out to file", options.reopen_stdout},
    7263                {'\0', "shell",          "Disable interactive mode", options.interactive, parse_setfalse},
    7364                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
    74                 {'\0', "reuseport-one",  "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue},
    75                 {'\0', "reuseport",      "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue},
     65                {'\0', "reuseport",      "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue},
    7666                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    7767                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
    10191                nentries = v;
    10292        }
    103         if(isolate) {
    104                 options.clopts.nclusters = options.clopts.nprocs;
    105                 options.clopts.nprocs = 1;
    106         }
    10793        options.clopts.params.num_entries = nentries;
    108         options.clopts.instance = alloc(options.clopts.nclusters);
    109         options.clopts.thrd_cnt = alloc(options.clopts.nclusters);
    110         options.clopts.cltr_cnt = 0;
    111         for(i; options.clopts.nclusters) {
    112                 options.clopts.thrd_cnt[i] = 0;
    113         }
     94        options.clopts.instance = 0p;
     95        options.clopts.thrd_cnt = 0;
  • benchmark/io/http/options.hfa

    rdb7a3ad raeb20a4  
    2727                int backlog;
    2828                int buflen;
    29                 bool onereuse;
    30                 bool manyreuse;
     29                bool reuseport;
    3130        } socket;
    3332        struct {
    34                 int nclusters;
    3533                int nprocs;
    3634                int nworkers;
    3836                bool procstats;
    3937                bool viewhalts;
    40                 cluster ** instance;
    41                 size_t   * thrd_cnt;
    42                 size_t     cltr_cnt;
     38                cluster * instance;
     39                size_t    thrd_cnt;
    4340        } clopts;
  • benchmark/io/http/

    rdb7a3ad raeb20a4  
    3030#define PLAINTEXT_NOCOPY
    3131#define LINKED_IO
     33static inline __s32 wait_res( io_future_t & this ) {
     34        wait( this );
     35        if( this.result < 0 ) {{
     36                errno = -this.result;
     37                return -1;
     38        }}
     39        return this.result;
    3342struct https_msg_str {
    471480                        if(is_error(splice_in.res)) {
     481                                if(splice_in.res.error == -EPIPE) return -ECONNRESET;
    472482                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
    473483                                close(fd);
    505 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     515[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
    506516        char * it = buffer;
    507517        size_t count = len - 1;
    509519        READ:
    510520        for() {
    511                 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     521                int ret;
     522                if( f ) {
     523                        ret = wait_res(*f);
     524                        reset(*f);
     525                        f = 0p;
     526                } else {
     527                        ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     528                }
    512529                // int ret = read(fd, (void*)it, count);
    513530                if(ret == 0 ) return [OK200, true, 0, 0];
    571588void ?{}( DateFormater & this ) {
    572         ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
     589        ((thread&)this){ "Server Date Thread", *options.clopts.instance };
    573590        this.idx = 0;
    574591        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
  • benchmark/io/http/protocol.hfa

    rdb7a3ad raeb20a4  
    11#pragma once
     3struct io_future_t;
    34struct sendfile_stats_t;
    2223int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & );
    24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
     25[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f);
  • benchmark/io/http/

    rdb7a3ad raeb20a4  
    2727int listener(struct sockaddr_in & address, int addrlen) {
    28         int sockfd = socket(AF_INET, SOCK_STREAM, 0);
     28        int type = SOCK_STREAM;
     29        if(options.socket.reuseport) type |= SOCK_NONBLOCK;
     30        int sockfd = socket(AF_INET, type, 0);
    2931        if(sockfd < 0) {
    3032                abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
    3133        }
    33         if(options.socket.onereuse || options.socket.manyreuse) {
     35        if(options.socket.reuseport) {
    3436                int value = 1;
    3537                // if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&on, sizeof(on)))
  • benchmark/io/http/

    rdb7a3ad raeb20a4  
    88#include <fstream.hfa>
    99#include <iofwd.hfa>
     10#include <mutex_stmt.hfa>
    1112#include "options.hfa"
    16 // Worker Thread
    17 //=============================================================================================
    18 void ?{}( Worker & this ) {
    19         size_t cli = rand() % options.clopts.cltr_cnt;
    20         ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
    21         options.clopts.thrd_cnt[cli]++;
    22         this.pipe[0] = -1;
    23         this.pipe[1] = -1;
     17// Generic connection handling
     19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
     20        REQUEST:
     21        for() {
     22                bool closed;
     23                HttpCode code;
     24                const char * file;
     25                size_t name_size;
     27                // Read the http request
     28                if( options.log ) sout | "=== Reading request ===";
     29                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
     30                f = 0p;
     32                // if we are done, break out of the loop
     33                if( closed ) break REQUEST;
     35                // If this wasn't a request retrun 400
     36                if( code != OK200 ) {
     37                        sout | "=== Invalid Request :" | code_val(code) | "===";
     38                        answer_error(fd, code);
     39                        continue REQUEST;
     40                }
     42                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
     43                        if( options.log ) sout | "=== Request for /plaintext ===";
     45                        int ret = answer_plaintext(fd);
     46                        if( ret == -ECONNRESET ) break REQUEST;
     48                        if( options.log ) sout | "=== Answer sent ===";
     49                        continue REQUEST;
     50                }
     52                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
     53                        if( options.log ) sout | "=== Request for /ping ===";
     55                        // Send the header
     56                        int ret = answer_empty(fd);
     57                        if( ret == -ECONNRESET ) break REQUEST;
     59                        if( options.log ) sout | "=== Answer sent ===";
     60                        continue REQUEST;
     61                }
     63                if( options.log ) {
     64                        sout | "=== Request for file " | nonl;
     65                        write(sout, file, name_size);
     66                        sout | " ===";
     67                }
     69                if( !options.file_cache.path ) {
     70                        if( options.log ) {
     71                                sout | "=== File Not Found (" | nonl;
     72                                write(sout, file, name_size);
     73                                sout | ") ===";
     74                        }
     75                        answer_error(fd, E405);
     76                        continue REQUEST;
     77                }
     79                // Get the fd from the file cache
     80                int ans_fd;
     81                size_t count;
     82                [ans_fd, count] = get_file( file, name_size );
     84                // If we can't find the file, return 404
     85                if( ans_fd < 0 ) {
     86                        if( options.log ) {
     87                                sout | "=== File Not Found (" | nonl;
     88                                write(sout, file, name_size);
     89                                sout | ") ===";
     90                        }
     91                        answer_error(fd, E404);
     92                        continue REQUEST;
     93                }
     95                // Send the desired file
     96                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
     97                if( ret == -ECONNRESET ) break REQUEST;
     99                if( options.log ) sout | "=== Answer sent ===";
     100        }
     102        if (stats_thrd) {
     103                unsigned long long next = rdtscl();
     104                if(next > (last + 500000000)) {
     105                        if(try_lock(stats_thrd->stats.lock)) {
     106                                push(this.stats.sendfile, stats_thrd->stats.send);
     107                                unlock(stats_thrd->stats.lock);
     108                                last = next;
     109                        }
     110                }
     111        }
     115// Self Accepting Worker Thread
     117void ?{}( AcceptWorker & this ) {
     118        ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
     119        options.clopts.thrd_cnt++;
    24120        this.done = false;
    26         this.stats.sendfile.calls = 0;
    27         this.stats.sendfile.tries = 0;
    28         this.stats.sendfile.header = 0;
    29         this.stats.sendfile.splcin = 0;
    30         this.stats.sendfile.splcot = 0;
    31         for(i; zipf_cnts) {
    32                 this.stats.sendfile.avgrd[i].calls = 0;
    33                 this.stats.sendfile.avgrd[i].bytes = 0;
    34         }
    35 }
    37 extern "C" {
    38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    39 }
    41 void main( Worker & this ) {
     123void main( AcceptWorker & this ) {
    42124        park();
    43         /* paranoid */ assert( this.pipe[0] != -1 );
    44         /* paranoid */ assert( this.pipe[1] != -1 );
    46         const bool reuse = options.socket.manyreuse;
    48         CONNECTION:
     125        unsigned long long last = rdtscl();
     126        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     127        /* paranoid */ assert( this.conn.pipe[1] != -1 );
    49128        for() {
    50129                if( options.log ) sout | "=== Accepting connection ===";
    51                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
     130                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
    52131                if(fd < 0) {
    53132                        if( errno == ECONNABORTED ) break;
    59138                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    60                 REQUEST:
    61                 for() {
    62                         bool closed;
    63                         HttpCode code;
    64                         const char * file;
    65                         size_t name_size;
    67                         // Read the http request
    68                         size_t len = options.socket.buflen;
    69                         char buffer[len];
    70                         if( options.log ) sout | "=== Reading request ===";
    71                         [code, closed, file, name_size] = http_read(fd, buffer, len);
    73                         // if we are done, break out of the loop
    74                         if( closed ) break REQUEST;
    76                         // If this wasn't a request retrun 400
    77                         if( code != OK200 ) {
    78                                 sout | "=== Invalid Request :" | code_val(code) | "===";
    79                                 answer_error(fd, code);
    80                                 continue REQUEST;
    81                         }
    83                         if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    84                                 if( options.log ) sout | "=== Request for /plaintext ===";
    86                                 int ret = answer_plaintext(fd);
    87                                 if( ret == -ECONNRESET ) break REQUEST;
    89                                 if( options.log ) sout | "=== Answer sent ===";
    90                                 continue REQUEST;
    91                         }
    93                         if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    94                                 if( options.log ) sout | "=== Request for /ping ===";
    96                                 // Send the header
    97                                 int ret = answer_empty(fd);
    98                                 if( ret == -ECONNRESET ) break REQUEST;
    100                                 if( options.log ) sout | "=== Answer sent ===";
    101                                 continue REQUEST;
    102                         }
    104                         if( options.log ) {
    105                                 sout | "=== Request for file " | nonl;
    106                                 write(sout, file, name_size);
    107                                 sout | " ===";
    108                         }
    110                         if( !options.file_cache.path ) {
    111                                 if( options.log ) {
    112                                         sout | "=== File Not Found (" | nonl;
    113                                         write(sout, file, name_size);
    114                                         sout | ") ===";
     139                size_t len = options.socket.buflen;
     140                char buffer[len];
     141                handle_connection( this.conn, fd, buffer, len, 0p, last );
     143                if( options.log ) sout | "=== Connection closed ===";
     144        }
     149// Channel Worker Thread
     151void ?{}( ChannelWorker & this ) {
     152        ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
     153        options.clopts.thrd_cnt++;
     154        this.done = false;
     157void main( ChannelWorker & this ) {
     158        park();
     159        unsigned long long last = rdtscl();
     160        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     161        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     162        for() {
     163                size_t len = options.socket.buflen;
     164                char buffer[len];
     165                PendingRead p;
     166       = (void*)buffer;
     167       = len;
     168                push(*this.queue, &p);
     170                if( options.log ) sout | "=== Waiting new connection ===";
     171                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
     173                if( options.log ) sout | "=== Connection closed ===";
     174                if(this.done) break;
     175        }
     178extern "C" {
     179extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
     182void ?{}( Acceptor & this ) {
     183        ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
     184        options.clopts.thrd_cnt++;
     185        this.done = false;
     188void main( Acceptor & this ) {
     189        park();
     190        unsigned long long last = rdtscl();
     191        if( options.log ) sout | "=== Accepting connection ===";
     192        for() {
     193                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
     194                if(fd < 0) {
     195                        if( errno == EWOULDBLOCK) {
     196                                this.stats.eagains++;
     197                                yield();
     198                                continue;
     199                        }
     200                        if( errno == ECONNABORTED ) break;
     201                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
     202                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     203                }
     204                this.stats.accepts++;
     206                if(this.done) return;
     208                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     210                if(fd) {
     211                        PendingRead * p = 0p;
     212                        for() {
     213                                if(this.done) return;
     214                                p = pop(*this.queue);
     215                                if(p) break;
     216                                yield();
     217                                this.stats.creates++;
     218                        };
     220                        p->out.fd = fd;
     221                        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
     222                }
     224                if (stats_thrd) {
     225                        unsigned long long next = rdtscl();
     226                        if(next > (last + 500000000)) {
     227                                if(try_lock(stats_thrd->stats.lock)) {
     228                                        push(this.stats, stats_thrd->stats.accpt);
     229                                        unlock(stats_thrd->stats.lock);
     230                                        last = next;
    115231                                }
    116                                 answer_error(fd, E405);
    117                                 continue REQUEST;
    118                         }
    120                         // Get the fd from the file cache
    121                         int ans_fd;
    122                         size_t count;
    123                         [ans_fd, count] = get_file( file, name_size );
    125                         // If we can't find the file, return 404
    126                         if( ans_fd < 0 ) {
    127                                 if( options.log ) {
    128                                         sout | "=== File Not Found (" | nonl;
    129                                         write(sout, file, name_size);
    130                                         sout | ") ===";
    131                                 }
    132                                 answer_error(fd, E404);
    133                                 continue REQUEST;
    134                         }
    136                         // Send the desired file
    137                         int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    138                         if( ret == -ECONNRESET ) break REQUEST;
    140                         if( options.log ) sout | "=== Answer sent ===";
    141                 }
    143                 if( options.log ) sout | "=== Connection closed ===";
    144                 continue CONNECTION;
    145         }
    146 }
     232                        }
     233                }
     235                if( options.log ) sout | "=== Accepting connection ===";
     236        }
  • benchmark/io/http/worker.hfa

    rdb7a3ad raeb20a4  
    11#pragma once
     3#include <iofwd.hfa>
     4#include <queueLockFree.hfa>
    35#include <thread.hfa>
     11#include "printer.hfa"
    1014// Worker Thread
    13 extern const size_t zipf_sizes[];
    14 enum { zipf_cnts = 36, };
    16 struct sendfile_stats_t {
    17         volatile uint64_t calls;
    18         volatile uint64_t tries;
    19         volatile uint64_t header;
    20         volatile uint64_t splcin;
    21         volatile uint64_t splcot;
     17struct connection {
     18        int pipe[2];
    2219        struct {
    23                 volatile uint64_t calls;
    24                 volatile uint64_t bytes;
    25         } avgrd[zipf_cnts];
     20                sendfile_stats_t sendfile;
     21        } stats;
    28 thread Worker {
    29         int pipe[2];
     24static inline void ?{}( connection & this ) {
     25        this.pipe[0] = -1;
     26        this.pipe[1] = -1;
     29thread AcceptWorker {
     30        connection conn;
    3031        int sockfd;
    3132        struct sockaddr * addr;
    3334        int flags;
    3435        volatile bool done;
     37void ?{}( AcceptWorker & this);
     38void main( AcceptWorker & );
     41struct PendingRead {
     42        PendingRead * volatile next;
     43        io_future_t f;
    3544        struct {
    36                 sendfile_stats_t sendfile;
    37         } stats;
     45                void * buf;
     46                size_t len;
     47        } in;
     48        struct {
     49                volatile int fd;
     50        } out;
    39 void ?{}( Worker & this);
    40 void main( Worker & );
     53static inline PendingRead * volatile & ?`next ( PendingRead * node ) {
     54        return node->next;
     57thread ChannelWorker {
     58        connection conn;
     59        volatile bool done;
     60        mpsc_queue(PendingRead) * queue;
     62void ?{}( ChannelWorker & );
     63void main( ChannelWorker & );
     65thread Acceptor {
     66        mpsc_queue(PendingRead) * queue;
     67        int sockfd;
     68        struct sockaddr * addr;
     69        socklen_t * addrlen;
     70        int flags;
     71        volatile bool done;
     72        acceptor_stats_t stats;
     74void ?{}( Acceptor & );
     75void main( Acceptor & );
  • libcfa/src/containers/queueLockFree.hfa

    rdb7a3ad raeb20a4  
    33#include <assert.h>
     5#include <bits/defs.hfa>
    57forall( T &) {
Note: See TracChangeset for help on using the changeset viewer.