Changeset eb5962a for benchmark


Ignore:
Timestamp:
Jun 21, 2022, 1:39:24 PM (4 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
Children:
b62d1d6
Parents:
1df492a (diff), 1dbbef6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
benchmark/io
Files:
5 added
8 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/Makefile.am

    r1df492a reb5962a  
    2121include $(top_srcdir)/tools/build/cfa.make
    2222
    23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror
     23AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g # -Werror
    2424AM_CFAFLAGS = -quiet -nodebug
    2525AM_LDFLAGS = -quiet -nodebug
     
    3737        options.cfa \
    3838        options.hfa \
     39        printer.cfa \
     40        printer.hfa \
    3941        protocol.cfa \
    4042        protocol.hfa \
     43        socket.cfa \
     44        socket.hfa \
    4145        worker.cfa \
    4246        worker.hfa
  • benchmark/io/http/main.cfa

    r1df492a reb5962a  
    22
    33#include <errno.h>
     4#include <signal.h>
    45#include <stdio.h>
    56#include <string.h>
     
    89        #include <sched.h>
    910        #include <signal.h>
     11        #include <sys/eventfd.h>
    1012        #include <sys/socket.h>
    1113        #include <netinet/in.h>
     
    1416#include <fstream.hfa>
    1517#include <kernel.hfa>
     18#include <locks.hfa>
    1619#include <iofwd.hfa>
    1720#include <stats.hfa>
     
    2124#include "filecache.hfa"
    2225#include "options.hfa"
     26#include "socket.hfa"
     27#include "printer.hfa"
    2328#include "worker.hfa"
    2429
     
    3035
    3136//=============================================================================================
    32 // Stats Printer
    33 //============================================================================================='
    34 
    35 thread StatsPrinter {
    36         Worker * workers;
    37         int worker_cnt;
    38 };
    39 
    40 void ?{}( StatsPrinter & this, cluster & cl ) {
    41         ((thread&)this){ "Stats Printer Thread", cl };
    42         this.worker_cnt = 0;
    43 }
    44 
    45 void ^?{}( StatsPrinter & mutex this ) {}
    46 
    47 #define eng3(X) (ws(3, 3, unit(eng( X ))))
    48 
    49 void main(StatsPrinter & this) {
    50         LOOP: for() {
    51                 waitfor( ^?{} : this) {
    52                         break LOOP;
    53                 }
    54                 or else {}
    55 
    56                 sleep(10`s);
    57 
    58                 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    59                 if(this.worker_cnt != 0) {
    60                         uint64_t tries = 0;
    61                         uint64_t calls = 0;
    62                         uint64_t header = 0;
    63                         uint64_t splcin = 0;
    64                         uint64_t splcot = 0;
    65                         struct {
    66                                 volatile uint64_t calls;
    67                                 volatile uint64_t bytes;
    68                         } avgrd[zipf_cnts];
    69                         memset(avgrd, 0, sizeof(avgrd));
    70 
    71                         for(i; this.worker_cnt) {
    72                                 tries += this.workers[i].stats.sendfile.tries;
    73                                 calls += this.workers[i].stats.sendfile.calls;
    74                                 header += this.workers[i].stats.sendfile.header;
    75                                 splcin += this.workers[i].stats.sendfile.splcin;
    76                                 splcot += this.workers[i].stats.sendfile.splcot;
    77                                 for(j; zipf_cnts) {
    78                                         avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
    79                                         avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
    80                                 }
    81                         }
    82 
    83                         double ratio = ((double)tries) / calls;
    84 
    85                         sout | "----- Worker Stats -----";
    86                         sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    87                         sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
    88                         sout | " - zipf sizes:";
    89                         for(i; zipf_cnts) {
    90                                 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
    91                                 sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
    92                         }
    93                 }
    94                 else {
    95                         sout | "No Workers!";
    96                 }
    97         }
    98 }
    99 
    100 //=============================================================================================
    10137// Globals
    10238//=============================================================================================
    103 struct ServerCluster {
    104         cluster self;
    105         processor    * procs;
    106         // io_context   * ctxs;
    107         StatsPrinter * prnt;
    108 
    109 };
    110 
    11139void ?{}( ServerCluster & this ) {
    11240        (this.self){ "Server Cluster", options.clopts.params };
     
    12250                (this.procs[i]){ "Benchmark Processor", this.self };
    12351
    124                 int c = 0;
    125                 int n = 1 + (i % cnt);
    126                 for(int j = 0; j < CPU_SETSIZE; j++) {
    127                         if(CPU_ISSET(j, &fullset)) n--;
    128                         if(n == 0) {
    129                                 c = j;
    130                                 break;
    131                         }
    132                 }
    133                 cpu_set_t localset;
    134                 CPU_ZERO(&localset);
    135                 CPU_SET(c, &localset);
    136                 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
    137                 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
     52                // int c = 0;
     53                // int n = 1 + (i % cnt);
     54                // for(int j = 0; j < CPU_SETSIZE; j++) {
     55                //      if(CPU_ISSET(j, &fullset)) n--;
     56                //      if(n == 0) {
     57                //              c = j;
     58                //              break;
     59                //      }
     60                // }
     61                // cpu_set_t localset;
     62                // CPU_ZERO(&localset);
     63                // CPU_SET(c, &localset);
     64                // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
     65                // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
    13866
    13967                #if !defined(__CFA_NO_STATISTICS__)
     
    14775        }
    14876
    149         if(options.stats) {
    150                 this.prnt = alloc();
    151                 (*this.prnt){ this.self };
    152         } else {
    153                 this.prnt = 0p;
    154         }
    155 
    15677        #if !defined(__CFA_NO_STATISTICS__)
    15778                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
     
    16384
    16485void ^?{}( ServerCluster & this ) {
    165         delete(this.prnt);
    166 
    16786        for(i; options.clopts.nprocs) {
    16887                ^(this.procs[i]){};
     
    17594extern void init_protocol(void);
    17695extern void deinit_protocol(void);
     96
     97//=============================================================================================
     98// REUSEPORT
     99//=============================================================================================
     100
     101size_t sockarr_size;
     102struct __attribute__((aligned(128))) Q {
     103        mpsc_queue(PendingRead) q;
     104};
     105
     106//=============================================================================================
     107// Termination
     108//=============================================================================================
     109
     110int closefd;
     111void cleanstop(int) {
     112        eventfd_t buffer = 1;
     113        char * buffer_s = (char*)&buffer;
     114        int ret = write(closefd, buffer_s, sizeof(buffer));
     115        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
     116        return;
     117}
    177118
    178119//=============================================================================================
     
    180121//============================================================================================='
    181122int main( int argc, char * argv[] ) {
     123        int ret;
    182124        __sighandler_t s = 1p;
    183125        signal(SIGPIPE, s);
     
    186128        // Parse args
    187129        parse_options(argc, argv);
     130
     131        //===================
     132        // Setup non-interactive termination
     133        if(!options.interactive) {
     134                closefd = eventfd(0, 0);
     135                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
     136
     137                sighandler_t prev = signal(SIGTERM, cleanstop);
     138                intptr_t prev_workaround = (intptr_t) prev;
     139                // can't use SIG_ERR it crashes the compiler
     140                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
     141
     142                sout | "Signal termination ready";
     143        }
    188144
    189145        //===================
     
    197153        // Open Socket
    198154        sout | getpid() | ": Listening on port" | options.socket.port;
    199         int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    200         if(server_fd < 0) {
    201                 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
    202         }
    203 
    204         int ret = 0;
     155
    205156        struct sockaddr_in address;
    206         int addrlen = sizeof(address);
    207         memset( (char *)&address, '\0' );
    208         address.sin_family = AF_INET;
    209         address.sin_addr.s_addr = htonl(INADDR_ANY);
    210         address.sin_port = htons( options.socket.port );
    211 
    212         int waited = 0;
    213         for() {
    214                 int sockfd = server_fd;
    215                 __CONST_SOCKADDR_ARG addr;
    216                 addr.__sockaddr__ = (struct sockaddr *)&address;
    217                 socklen_t addrlen = sizeof(address);
    218                 ret = bind( sockfd, addr, addrlen );
    219                 if(ret < 0) {
    220                         if(errno == EADDRINUSE) {
    221                                 if(waited == 0) {
    222                                         if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
    223                                         sout | "Waiting for port";
    224                                 } else {
    225                                         sout | "\r" | waited | nonl;
    226                                         flush( sout );
    227                                 }
    228                                 waited ++;
    229                                 sleep( 1`s );
    230                                 continue;
    231                         }
    232                         abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
    233                 }
    234                 break;
    235         }
    236 
    237         ret = listen( server_fd, options.socket.backlog );
    238         if(ret < 0) {
    239                 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
    240         }
     157        int addrlen = prepaddr(address);
     158
     159        int server_fd;
    241160
    242161        //===================
     
    257176
    258177                {
     178                        // Stats printer makes a copy so this needs to persist longer than normal
     179                        connection ** conns;
     180                        AcceptWorker  * aworkers = 0p;
     181                        ChannelWorker * cworkers = 0p;
     182                        Acceptor * acceptors = 0p;
     183                        Q * queues = 0p;
    259184                        ServerCluster cl[options.clopts.nclusters];
     185
     186                        if(options.stats) {
     187                                stats_thrd = alloc();
     188                                (*stats_thrd){ cl };
     189                        } else {
     190                                stats_thrd = 0p;
     191                        }
    260192
    261193                        init_protocol();
    262194                        {
    263                                 Worker * workers = anew(options.clopts.nworkers);
    264                                 cl[0].prnt->workers = workers;
    265                                 cl[0].prnt->worker_cnt = options.clopts.nworkers;
    266                                 for(i; options.clopts.nworkers) {
    267                                         // if( options.file_cache.fixed_fds ) {
    268                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    269                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    270                                         // }
    271                                         // else
    272                                         {
    273                                                 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    274                                                 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    275                                                 workers[i].sockfd  = server_fd;
    276                                                 workers[i].addr    = (struct sockaddr *)&address;
    277                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    278                                                 workers[i].flags   = 0;
    279                                         }
    280                                         unpark( workers[i] );
     195                                int nacceptors = options.clopts.nprocs * options.clopts.nclusters;
     196                                conns = alloc(options.clopts.nworkers);
     197                                if(options.socket.reuseport) {
     198                                        queues = alloc(nacceptors);
     199                                        acceptors = alloc(nacceptors);
     200                                        sout | "Creating" | nacceptors | "Acceptors";
     201                                        for(i; nacceptors) {
     202                                                (acceptors[i]){ i % options.clopts.nclusters };
     203                                        }
     204                                        for(i; nacceptors) {
     205                                                (queues[i]){};
     206                                                {
     207                                                        acceptors[i].sockfd  = listener(address, addrlen);
     208                                                        acceptors[i].addr    = (struct sockaddr *)&address;
     209                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
     210                                                        acceptors[i].flags   = 0;
     211                                                        acceptors[i].queue   = &queues[i].q;
     212                                                }
     213                                                unpark( acceptors[i] );
     214                                        }
     215
     216                                        cworkers = anew(options.clopts.nworkers);
     217                                        for(i; options.clopts.nworkers) {
     218                                                {
     219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     221                                                        cworkers[i].queue = &queues[i % nacceptors].q;
     222                                                        conns[i] = &cworkers[i].conn;
     223                                                }
     224                                                unpark( cworkers[i] );
     225                                        }
     226                                }
     227                                else {
     228                                        server_fd = listener(address, addrlen);
     229                                        aworkers = anew(options.clopts.nworkers);
     230                                        for(i; options.clopts.nworkers) {
     231                                                // if( options.file_cache.fixed_fds ) {
     232                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     233                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     234                                                // }
     235                                                // else
     236                                                {
     237                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     238                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     239                                                        aworkers[i].sockfd = server_fd;
     240                                                        aworkers[i].addr    = (struct sockaddr *)&address;
     241                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
     242                                                        aworkers[i].flags   = 0;
     243                                                        conns[i] = &aworkers[i].conn;
     244                                                }
     245                                                unpark( aworkers[i] );
     246                                        }
    281247                                }
    282248                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     
    285251                                }
    286252                                sout | nl;
    287                                 if(!options.interactive) park();
    288253                                {
    289                                         char buffer[128];
    290                                         for() {
    291                                                 int ret = cfa_read(0, buffer, 128, 0);
    292                                                 if(ret == 0) break;
     254                                        if(options.interactive) {
     255                                                char buffer[128];
     256                                                for() {
     257                                                        int ret = cfa_read(0, buffer, 128, 0);
     258                                                        if(ret == 0) break;
     259                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
     260                                                        sout | "User wrote '" | "" | nonl;
     261                                                        write(sout, buffer, ret - 1);
     262                                                        sout | "'";
     263                                                }
     264                                        }
     265                                        else {
     266                                                char buffer[sizeof(eventfd_t)];
     267                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
    293268                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
    294                                                 sout | "User wrote '" | "" | nonl;
    295                                                 write(sout, buffer, ret - 1);
    296                                                 sout | "'";
    297269                                        }
    298270
     
    300272                                }
    301273
    302                                 sout | "Notifying connections..." | nonl; flush( sout );
    303                                 for(i; options.clopts.nworkers) {
    304                                         workers[i].done = true;
    305                                 }
    306                                 sout | "done";
    307 
    308                                 sout | "Shutting down socket..." | nonl; flush( sout );
    309                                 int ret = shutdown( server_fd, SHUT_RD );
    310                                 if( ret < 0 ) {
    311                                         abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
    312                                 }
    313                                 sout | "done";
    314 
    315274                                //===================
    316                                 // Close Socket
    317                                 sout | "Closing Socket..." | nonl; flush( sout );
    318                                 ret = close( server_fd );
    319                                 if(ret < 0) {
    320                                         abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    321                                 }
    322                                 sout | "done";
    323 
    324                                 sout | "Stopping connection threads..." | nonl; flush( sout );
    325                                 adelete(workers);
     275                                // Close Socket and join
     276                                if(options.socket.reuseport) {
     277                                        sout | "Notifying connections..." | nonl; flush( sout );
     278                                        for(i; nacceptors) {
     279                                                acceptors[i].done = true;
     280                                        }
     281                                        for(i; options.clopts.nworkers) {
     282                                                cworkers[i].done = true;
     283                                        }
     284                                        sout | "done";
     285
     286                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     287                                        for(i; nacceptors) {
     288                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
     289                                                if( ret < 0 ) {
     290                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
     291                                                }
     292                                        }
     293                                        sout | "done";
     294
     295                                        sout | "Closing Socket..." | nonl; flush( sout );
     296                                        for(i; nacceptors) {
     297                                                ret = close( acceptors[i].sockfd );
     298                                                if( ret < 0) {
     299                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     300                                                }
     301                                        }
     302                                        sout | "done";
     303
     304                                        sout | "Stopping accept threads..." | nonl; flush( sout );
     305                                        for(i; nacceptors) {
     306                                                join(acceptors[i]);
     307                                        }
     308                                        sout | "done";
     309
     310                                        sout | "Draining worker queues..." | nonl; flush( sout );
     311                                        for(i; nacceptors) {
     312                                                PendingRead * p = 0p;
     313                                                while(p = pop(queues[i].q)) {
     314                                                        fulfil(p->f, -ECONNRESET);
     315                                                }
     316                                        }
     317                                        sout | "done";
     318
     319                                        sout | "Stopping worker threads..." | nonl; flush( sout );
     320                                        for(i; options.clopts.nworkers) {
     321                                                for(j; 2) {
     322                                                        ret = close(cworkers[i].conn.pipe[j]);
     323                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     324                                                }
     325                                                join(cworkers[i]);
     326                                        }
     327                                }
     328                                else {
     329                                        sout | "Notifying connections..." | nonl; flush( sout );
     330                                        for(i; options.clopts.nworkers) {
     331                                                aworkers[i].done = true;
     332                                        }
     333                                        sout | "done";
     334
     335                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     336                                        ret = shutdown( server_fd, SHUT_RD );
     337                                        if( ret < 0 ) {
     338                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
     339                                        }
     340                                        sout | "done";
     341
     342                                        sout | "Closing Socket..." | nonl; flush( sout );
     343                                        ret = close( server_fd );
     344                                        if(ret < 0) {
     345                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     346                                        }
     347                                        sout | "done";
     348
     349                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     350                                        for(i; options.clopts.nworkers) {
     351                                                for(j; 2) {
     352                                                        ret = close(aworkers[i].conn.pipe[j]);
     353                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     354                                                }
     355                                                join(aworkers[i]);
     356                                        }
     357                                }
    326358                        }
    327359                        sout | "done";
     
    331363                        sout | "done";
    332364
     365                        sout | "Stopping printer threads..." | nonl; flush( sout );
     366                        if(stats_thrd) {
     367                                notify_one(stats_thrd->var);
     368                        }
     369                        delete(stats_thrd);
     370                        sout | "done";
     371
     372                        // Now that the stats printer is stopped, we can reclaim this
     373                        adelete(aworkers);
     374                        adelete(cworkers);
     375                        adelete(acceptors);
     376                        adelete(queues);
     377                        free(conns);
     378
    333379                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
    334380                }
    335381                sout | "done";
    336382
    337                 sout | "Closing splice fds..." | nonl; flush( sout );
    338                 for(i; pipe_cnt) {
    339                         ret = close( fds[pipe_off + i] );
    340                         if(ret < 0) {
    341                                 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    342                         }
    343                 }
    344383                free(fds);
    345                 sout | "done";
    346384
    347385                sout | "Stopping processors..." | nonl; flush( sout );
  • benchmark/io/http/options.cfa

    r1df492a reb5962a  
    3535
    3636        { // socket
    37                 8080, // port
    38                 10,   // backlog
    39                 1024  // buflen
     37                8080,  // port
     38                10,    // backlog
     39                1024,  // buflen
     40                false  // reuseport
    4041        },
    4142
     
    5253
    5354void parse_options( int argc, char * argv[] ) {
    54         // bool fixedfd = false;
    55         // bool sqkpoll = false;
    56         // bool iokpoll = false;
    5755        unsigned nentries = 0;
    5856        bool isolate = false;
     
    7068                {'\0', "shell",          "Disable interactive mode", options.interactive, parse_setfalse},
    7169                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
     70                {'\0', "reuseport",      "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue},
    7271                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    7372                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
  • benchmark/io/http/options.hfa

    r1df492a reb5962a  
    2727                int backlog;
    2828                int buflen;
     29                bool reuseport;
    2930        } socket;
    3031
  • benchmark/io/http/protocol.cfa

    r1df492a reb5962a  
    3030#define PLAINTEXT_NOCOPY
    3131#define LINKED_IO
     32
     33static inline __s32 wait_res( io_future_t & this ) {
     34        wait( this );
     35        if( this.result < 0 ) {{
     36                errno = -this.result;
     37                return -1;
     38        }}
     39        return this.result;
     40}
    3241
    3342struct https_msg_str {
     
    470479
    471480                        if(is_error(splice_in.res)) {
     481                                if(splice_in.res.error == -EPIPE) return -ECONNRESET;
    472482                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
    473483                                close(fd);
     
    503513}
    504514
    505 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     515[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
    506516        char * it = buffer;
    507517        size_t count = len - 1;
     
    509519        READ:
    510520        for() {
    511                 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     521                int ret;
     522                if( f ) {
     523                        ret = wait_res(*f);
     524                        reset(*f);
     525                        f = 0p;
     526                } else {
     527                        ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     528                }
    512529                // int ret = read(fd, (void*)it, count);
    513530                if(ret == 0 ) return [OK200, true, 0, 0];
     
    570587
    571588void ?{}( DateFormater & this ) {
    572         ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
     589        ((thread&)this){ "Server Date Thread" };
    573590        this.idx = 0;
    574591        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
  • benchmark/io/http/protocol.hfa

    r1df492a reb5962a  
    11#pragma once
    22
     3struct io_future_t;
    34struct sendfile_stats_t;
    45
     
    2223int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & );
    2324
    24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
     25[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f);
  • benchmark/io/http/worker.cfa

    r1df492a reb5962a  
    88#include <fstream.hfa>
    99#include <iofwd.hfa>
     10#include <mutex_stmt.hfa>
    1011
    1112#include "options.hfa"
     
    1415
    1516//=============================================================================================
    16 // Worker Thread
    17 //=============================================================================================
    18 void ?{}( Worker & this ) {
     17// Generic connection handling
     18//=============================================================================================
     19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
     20        REQUEST:
     21        for() {
     22                bool closed;
     23                HttpCode code;
     24                const char * file;
     25                size_t name_size;
     26
     27                // Read the http request
     28                if( options.log ) mutex(sout) sout | "=== Reading request ===";
     29                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
     30                f = 0p;
     31
     32                // if we are done, break out of the loop
     33                if( closed ) break REQUEST;
     34
     35                // If this wasn't a request retrun 400
     36                if( code != OK200 ) {
     37                        sout | "=== Invalid Request :" | code_val(code) | "===";
     38                        answer_error(fd, code);
     39                        continue REQUEST;
     40                }
     41
     42                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
     43                        if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
     44
     45                        int ret = answer_plaintext(fd);
     46                        if( ret == -ECONNRESET ) break REQUEST;
     47
     48                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     49                        continue REQUEST;
     50                }
     51
     52                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
     53                        if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
     54
     55                        // Send the header
     56                        int ret = answer_empty(fd);
     57                        if( ret == -ECONNRESET ) break REQUEST;
     58
     59                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     60                        continue REQUEST;
     61                }
     62
     63                if( options.log ) {
     64                        sout | "=== Request for file " | nonl;
     65                        write(sout, file, name_size);
     66                        sout | " ===";
     67                }
     68
     69                if( !options.file_cache.path ) {
     70                        if( options.log ) {
     71                                sout | "=== File Not Found (" | nonl;
     72                                write(sout, file, name_size);
     73                                sout | ") ===";
     74                        }
     75                        answer_error(fd, E405);
     76                        continue REQUEST;
     77                }
     78
     79                // Get the fd from the file cache
     80                int ans_fd;
     81                size_t count;
     82                [ans_fd, count] = get_file( file, name_size );
     83
     84                // If we can't find the file, return 404
     85                if( ans_fd < 0 ) {
     86                        if( options.log ) {
     87                                sout | "=== File Not Found (" | nonl;
     88                                write(sout, file, name_size);
     89                                sout | ") ===";
     90                        }
     91                        answer_error(fd, E404);
     92                        continue REQUEST;
     93                }
     94
     95                // Send the desired file
     96                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
     97                if( ret == -ECONNRESET ) break REQUEST;
     98
     99                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     100        }
     101
     102        if (stats_thrd) {
     103                unsigned long long next = rdtscl();
     104                if(next > (last + 500000000)) {
     105                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
     106                                push(this.stats.sendfile, stats_thrd->stats.send);
     107                                unlock(stats_thrd->stats.lock);
     108                                last = next;
     109                        }
     110                }
     111        }
     112}
     113
     114//=============================================================================================
     115// Self Accepting Worker Thread
     116//=============================================================================================
     117void ?{}( AcceptWorker & this ) {
    19118        size_t cli = rand() % options.clopts.cltr_cnt;
    20119        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
    21120        options.clopts.thrd_cnt[cli]++;
    22         this.pipe[0] = -1;
    23         this.pipe[1] = -1;
    24121        this.done = false;
    25 
    26         this.stats.sendfile.calls = 0;
    27         this.stats.sendfile.tries = 0;
    28         this.stats.sendfile.header = 0;
    29         this.stats.sendfile.splcin = 0;
    30         this.stats.sendfile.splcot = 0;
    31         for(i; zipf_cnts) {
    32                 this.stats.sendfile.avgrd[i].calls = 0;
    33                 this.stats.sendfile.avgrd[i].bytes = 0;
    34         }
    35 }
    36 
    37 extern "C" {
    38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    39 }
    40 
    41 void main( Worker & this ) {
     122}
     123
     124void main( AcceptWorker & this ) {
    42125        park();
    43         /* paranoid */ assert( this.pipe[0] != -1 );
    44         /* paranoid */ assert( this.pipe[1] != -1 );
    45 
    46         CONNECTION:
    47         for() {
    48                 if( options.log ) sout | "=== Accepting connection ===";
    49                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
     126        unsigned long long last = rdtscl();
     127        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     128        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     129        for() {
     130                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
     131                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
    50132                if(fd < 0) {
    51133                        if( errno == ECONNABORTED ) break;
     
    55137                if(this.done) break;
    56138
     139                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     140                size_t len = options.socket.buflen;
     141                char buffer[len];
     142                handle_connection( this.conn, fd, buffer, len, 0p, last );
     143
     144                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
     145        }
     146}
     147
     148
     149//=============================================================================================
     150// Channel Worker Thread
     151//=============================================================================================
     152void ?{}( ChannelWorker & this ) {
     153        size_t cli = rand() % options.clopts.cltr_cnt;
     154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     155        options.clopts.thrd_cnt[cli]++;
     156        this.done = false;
     157}
     158
     159void main( ChannelWorker & this ) {
     160        park();
     161        unsigned long long last = rdtscl();
     162        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     163        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     164        for() {
     165                size_t len = options.socket.buflen;
     166                char buffer[len];
     167                PendingRead p;
     168                p.next = 0p;
     169                p.in.buf = (void*)buffer;
     170                p.in.len = len;
     171                push(*this.queue, &p);
     172
     173                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
     174                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
     175
     176                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
     177                if(this.done) break;
     178        }
     179}
     180
     181extern "C" {
     182extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
     183}
     184
     185void ?{}( Acceptor & this, int cli ) {
     186        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
     187        options.clopts.thrd_cnt[cli]++;
     188        this.done = false;
     189}
     190
     191static inline __s32 get_res( io_future_t & this ) {
     192        if( this.result < 0 ) {{
     193                errno = -this.result;
     194                return -1;
     195        }}
     196        return this.result;
     197}
     198
     199static inline void push_connection( Acceptor & this, int fd ) {
     200        PendingRead * p = 0p;
     201        for() {
     202                if(this.done) return;
     203                p = pop(*this.queue);
     204                if(p) break;
     205                yield();
     206                this.stats.creates++;
     207        };
     208
     209        p->out.fd = fd;
     210        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
     211}
     212
     213// #define ACCEPT_SPIN
     214#define ACCEPT_MANY
     215
     216void main( Acceptor & this ) {
     217        park();
     218        unsigned long long last = rdtscl();
     219
     220#if defined(ACCEPT_SPIN)
     221        if( options.log ) sout | "=== Accepting connection ===";
     222        for() {
     223                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
     224                if(fd < 0) {
     225                        if( errno == EWOULDBLOCK) {
     226                                this.stats.eagains++;
     227                                yield();
     228                                continue;
     229                        }
     230                        if( errno == ECONNABORTED ) break;
     231                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
     232                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     233                }
     234                this.stats.accepts++;
     235
     236                if(this.done) return;
     237
    57238                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    58                 REQUEST:
    59                 for() {
    60                         bool closed;
    61                         HttpCode code;
    62                         const char * file;
    63                         size_t name_size;
    64 
    65                         // Read the http request
    66                         size_t len = options.socket.buflen;
    67                         char buffer[len];
    68                         if( options.log ) sout | "=== Reading request ===";
    69                         [code, closed, file, name_size] = http_read(fd, buffer, len);
    70 
    71                         // if we are done, break out of the loop
    72                         if( closed ) break REQUEST;
    73 
    74                         // If this wasn't a request retrun 400
    75                         if( code != OK200 ) {
    76                                 sout | "=== Invalid Request :" | code_val(code) | "===";
    77                                 answer_error(fd, code);
    78                                 continue REQUEST;
    79                         }
    80 
    81                         if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    82                                 if( options.log ) sout | "=== Request for /plaintext ===";
    83 
    84                                 int ret = answer_plaintext(fd);
    85                                 if( ret == -ECONNRESET ) break REQUEST;
    86 
    87                                 if( options.log ) sout | "=== Answer sent ===";
    88                                 continue REQUEST;
    89                         }
    90 
    91                         if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    92                                 if( options.log ) sout | "=== Request for /ping ===";
    93 
    94                                 // Send the header
    95                                 int ret = answer_empty(fd);
    96                                 if( ret == -ECONNRESET ) break REQUEST;
    97 
    98                                 if( options.log ) sout | "=== Answer sent ===";
    99                                 continue REQUEST;
    100                         }
    101 
    102                         if( options.log ) {
    103                                 sout | "=== Request for file " | nonl;
    104                                 write(sout, file, name_size);
    105                                 sout | " ===";
    106                         }
    107 
    108                         if( !options.file_cache.path ) {
    109                                 if( options.log ) {
    110                                         sout | "=== File Not Found (" | nonl;
    111                                         write(sout, file, name_size);
    112                                         sout | ") ===";
     239
     240                if(fd) push_connection(this, fd);
     241
     242                if (stats_thrd) {
     243                        unsigned long long next = rdtscl();
     244                        if(next > (last + 500000000)) {
     245                                if(try_lock(stats_thrd->stats.lock)) {
     246                                        push(this.stats, stats_thrd->stats.accpt);
     247                                        unlock(stats_thrd->stats.lock);
     248                                        last = next;
    113249                                }
    114                                 answer_error(fd, E405);
    115                                 continue REQUEST;
    116                         }
    117 
    118                         // Get the fd from the file cache
    119                         int ans_fd;
    120                         size_t count;
    121                         [ans_fd, count] = get_file( file, name_size );
    122 
    123                         // If we can't find the file, return 404
    124                         if( ans_fd < 0 ) {
    125                                 if( options.log ) {
    126                                         sout | "=== File Not Found (" | nonl;
    127                                         write(sout, file, name_size);
    128                                         sout | ") ===";
     250                        }
     251                }
     252
     253                if( options.log ) sout | "=== Accepting connection ===";
     254        }
     255
     256#elif defined(ACCEPT_MANY)
     257        const int nacc = 10;
     258        io_future_t results[nacc];
     259
     260        for(i; nacc) {
     261                io_future_t & res = results[i];
     262                reset(res);
     263                /* paranoid */ assert(!available(res));
     264                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
     265                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
     266        }
     267
     268        for() {
     269                if (stats_thrd) {
     270                        unsigned long long next = rdtscl();
     271                        if(next > (last + 500000000)) {
     272                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
     273                                        push(this.stats, stats_thrd->stats.accpt);
     274                                        unlock(stats_thrd->stats.lock);
     275                                        last = next;
    129276                                }
    130                                 answer_error(fd, E404);
    131                                 continue REQUEST;
    132                         }
    133 
    134                         // Send the desired file
    135                         int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    136                         if( ret == -ECONNRESET ) break REQUEST;
    137 
    138                         if( options.log ) sout | "=== Answer sent ===";
    139                 }
    140 
    141                 if( options.log ) sout | "=== Connection closed ===";
    142                 continue CONNECTION;
    143         }
    144 }
     277                        }
     278                }
     279
     280                for(i; nacc) {
     281                        io_future_t & res = results[i];
     282                        if(available(res)) {
     283                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
     284                                int fd = get_res(res);
     285                                reset(res);
     286                                this.stats.accepts++;
     287                                if(fd < 0) {
     288                                        if( errno == ECONNABORTED ) continue;
     289                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
     290                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     291                                }
     292                                push_connection( this, fd );
     293
     294                                /* paranoid */ assert(!available(res));
     295                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
     296                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
     297                        }
     298                }
     299                if(this.done) return;
     300
     301                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
     302                this.stats.eagains++;
     303                wait_any(results, nacc);
     304
     305                if( options.log ) mutex(sout) {
     306                        sout | "=== Acceptor wake-up ===";
     307                        for(i; nacc) {
     308                                io_future_t & res = results[i];
     309                                sout | i | "available:" | available(res);
     310                        }
     311                }
     312
     313        }
     314
     315        for(i; nacc) {
     316                wait(results[i]);
     317        }
     318#else
     319#error no accept algorithm specified
     320#endif
     321}
  • benchmark/io/http/worker.hfa

    r1df492a reb5962a  
    11#pragma once
    22
     3#include <iofwd.hfa>
     4#include <queueLockFree.hfa>
    35#include <thread.hfa>
    46
     
    79}
    810
     11#include "printer.hfa"
     12
    913//=============================================================================================
    1014// Worker Thread
    1115//=============================================================================================
    1216
    13 extern const size_t zipf_sizes[];
    14 enum { zipf_cnts = 36, };
    15 
    16 struct sendfile_stats_t {
    17         volatile uint64_t calls;
    18         volatile uint64_t tries;
    19         volatile uint64_t header;
    20         volatile uint64_t splcin;
    21         volatile uint64_t splcot;
     17struct connection {
     18        int pipe[2];
    2219        struct {
    23                 volatile uint64_t calls;
    24                 volatile uint64_t bytes;
    25         } avgrd[zipf_cnts];
     20                sendfile_stats_t sendfile;
     21        } stats;
    2622};
    2723
    28 thread Worker {
    29         int pipe[2];
     24static inline void ?{}( connection & this ) {
     25        this.pipe[0] = -1;
     26        this.pipe[1] = -1;
     27}
     28
     29thread AcceptWorker {
     30        connection conn;
    3031        int sockfd;
    3132        struct sockaddr * addr;
     
    3334        int flags;
    3435        volatile bool done;
     36};
     37void ?{}( AcceptWorker & this);
     38void main( AcceptWorker & );
     39
     40
     41struct PendingRead {
     42        PendingRead * volatile next;
     43        io_future_t f;
    3544        struct {
    36                 sendfile_stats_t sendfile;
    37         } stats;
     45                void * buf;
     46                size_t len;
     47        } in;
     48        struct {
     49                volatile int fd;
     50        } out;
    3851};
    39 void ?{}( Worker & this);
    40 void main( Worker & );
     52
     53static inline PendingRead * volatile & ?`next ( PendingRead * node ) {
     54        return node->next;
     55}
     56
     57thread ChannelWorker {
     58        connection conn;
     59        volatile bool done;
     60        mpsc_queue(PendingRead) * queue;
     61};
     62void ?{}( ChannelWorker & );
     63void main( ChannelWorker & );
     64
     65thread Acceptor {
     66        mpsc_queue(PendingRead) * queue;
     67        int sockfd;
     68        struct sockaddr * addr;
     69        socklen_t * addrlen;
     70        int flags;
     71        volatile bool done;
     72        acceptor_stats_t stats;
     73};
     74void ?{}( Acceptor &, int cli );
     75void main( Acceptor & );
Note: See TracChangeset for help on using the changeset viewer.