Changeset 6e2b04e for benchmark


Ignore:
Timestamp:
Jun 8, 2022, 4:23:41 PM (4 years ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
7f0ac12, db7a3ad
Parents:
55422cf (diff), 720f2fe2 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
benchmark/io/http
Files:
2 added
5 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/Makefile.am

    r55422cf r6e2b04e  
    2121include $(top_srcdir)/tools/build/cfa.make
    2222
    23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror
     23AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g # -Werror
    2424AM_CFAFLAGS = -quiet -nodebug
    2525AM_LDFLAGS = -quiet -nodebug
     
    3939        protocol.cfa \
    4040        protocol.hfa \
     41        socket.cfa \
     42        socket.hfa \
    4143        worker.cfa \
    4244        worker.hfa
  • benchmark/io/http/main.cfa

    r55422cf r6e2b04e  
    22
    33#include <errno.h>
     4#include <signal.h>
    45#include <stdio.h>
    56#include <string.h>
     
    89        #include <sched.h>
    910        #include <signal.h>
     11        #include <sys/eventfd.h>
    1012        #include <sys/socket.h>
    1113        #include <netinet/in.h>
     
    1416#include <fstream.hfa>
    1517#include <kernel.hfa>
     18#include <locks.hfa>
    1619#include <iofwd.hfa>
    1720#include <stats.hfa>
     
    2124#include "filecache.hfa"
    2225#include "options.hfa"
     26#include "socket.hfa"
    2327#include "worker.hfa"
    2428
     
    3640        Worker * workers;
    3741        int worker_cnt;
     42        condition_variable(fast_block_lock) var;
    3843};
    3944
     
    5459                or else {}
    5560
    56                 sleep(10`s);
     61                wait(this.var, 10`s);
    5762
    5863                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
     
    177182
    178183//=============================================================================================
     184// Termination
     185//=============================================================================================
     186
     187int closefd;
     188void cleanstop(int) {
     189        eventfd_t buffer = 1;
     190        char * buffer_s = (char*)&buffer;
     191        int ret = write(closefd, buffer_s, sizeof(buffer));
     192        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
     193        return;
     194}
     195
     196//=============================================================================================
    179197// Main
    180198//============================================================================================='
    181199int main( int argc, char * argv[] ) {
     200        int ret;
    182201        __sighandler_t s = 1p;
    183202        signal(SIGPIPE, s);
     
    186205        // Parse args
    187206        parse_options(argc, argv);
     207
     208        //===================
     209        // Setup non-interactive termination
     210        if(!options.interactive) {
     211                closefd = eventfd(0, 0);
     212                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
     213
     214                sighandler_t prev = signal(SIGTERM, cleanstop);
     215                intptr_t prev_workaround = (intptr_t) prev;
     216                // can't use SIG_ERR it crashes the compiler
     217                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
     218
     219                sout | "Signal termination ready";
     220        }
    188221
    189222        //===================
     
    197230        // Open Socket
    198231        sout | getpid() | ": Listening on port" | options.socket.port;
    199         int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    200         if(server_fd < 0) {
    201                 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
    202         }
    203 
    204         int ret = 0;
     232
    205233        struct sockaddr_in address;
    206         int addrlen = sizeof(address);
    207         memset( (char *)&address, '\0' );
    208         address.sin_family = AF_INET;
    209         address.sin_addr.s_addr = htonl(INADDR_ANY);
    210         address.sin_port = htons( options.socket.port );
    211 
    212         int waited = 0;
    213         for() {
    214                 int sockfd = server_fd;
    215                 __CONST_SOCKADDR_ARG addr;
    216                 addr.__sockaddr__ = (struct sockaddr *)&address;
    217                 socklen_t addrlen = sizeof(address);
    218                 ret = bind( sockfd, addr, addrlen );
    219                 if(ret < 0) {
    220                         if(errno == EADDRINUSE) {
    221                                 if(waited == 0) {
    222                                         if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
    223                                         sout | "Waiting for port";
    224                                 } else {
    225                                         sout | "\r" | waited | nonl;
    226                                         flush( sout );
    227                                 }
    228                                 waited ++;
    229                                 sleep( 1`s );
    230                                 continue;
    231                         }
    232                         abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
    233                 }
    234                 break;
    235         }
    236 
    237         ret = listen( server_fd, options.socket.backlog );
    238         if(ret < 0) {
    239                 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
     234        int addrlen = prepaddr(address);
     235
     236        int server_fd;
     237        if(!options.socket.manyreuse) {
     238                server_fd = listener(address, addrlen);
    240239        }
    241240
     
    257256
    258257                {
     258                        // Stats printer makes a copy so this needs to persist longer than normal
     259                        Worker * workers;
    259260                        ServerCluster cl[options.clopts.nclusters];
    260261
    261262                        init_protocol();
    262263                        {
    263                                 Worker * workers = anew(options.clopts.nworkers);
     264                                workers = anew(options.clopts.nworkers);
    264265                                cl[0].prnt->workers = workers;
    265266                                cl[0].prnt->worker_cnt = options.clopts.nworkers;
     
    273274                                                workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    274275                                                workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    275                                                 workers[i].sockfd  = server_fd;
     276                                                workers[i].sockfd  = options.socket.manyreuse ?  listener(address, addrlen) : server_fd;
    276277                                                workers[i].addr    = (struct sockaddr *)&address;
    277278                                                workers[i].addrlen = (socklen_t*)&addrlen;
     
    285286                                }
    286287                                sout | nl;
    287                                 if(!options.interactive) park();
    288288                                {
    289                                         char buffer[128];
    290                                         for() {
    291                                                 int ret = cfa_read(0, buffer, 128, 0);
    292                                                 if(ret == 0) break;
     289                                        if(options.interactive) {
     290                                                char buffer[128];
     291                                                for() {
     292                                                        int ret = cfa_read(0, buffer, 128, 0);
     293                                                        if(ret == 0) break;
     294                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
     295                                                        sout | "User wrote '" | "" | nonl;
     296                                                        write(sout, buffer, ret - 1);
     297                                                        sout | "'";
     298                                                }
     299                                        }
     300                                        else {
     301                                                char buffer[sizeof(eventfd_t)];
     302                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
    293303                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
    294                                                 sout | "User wrote '" | "" | nonl;
    295                                                 write(sout, buffer, ret - 1);
    296                                                 sout | "'";
    297304                                        }
    298305
     
    307314
    308315                                sout | "Shutting down socket..." | nonl; flush( sout );
    309                                 int ret = shutdown( server_fd, SHUT_RD );
    310                                 if( ret < 0 ) {
    311                                         abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
     316                                if(options.socket.manyreuse) {
     317                                        for(i; options.clopts.nworkers) {
     318                                                ret = shutdown( workers[i].sockfd, SHUT_RD );
     319                                                if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
     320                                        }
     321                                }
     322                                else {
     323                                        ret = shutdown( server_fd, SHUT_RD );
     324                                        if( ret < 0 ) {
     325                                                abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
     326                                        }
    312327                                }
    313328                                sout | "done";
     
    316331                                // Close Socket
    317332                                sout | "Closing Socket..." | nonl; flush( sout );
    318                                 ret = close( server_fd );
    319                                 if(ret < 0) {
    320                                         abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     333                                if(options.socket.manyreuse) {
     334                                        for(i; options.clopts.nworkers) {
     335                                                ret = close(workers[i].sockfd);
     336                                                if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
     337                                        }
     338                                }
     339                                else {
     340                                        ret = close( server_fd );
     341                                        if(ret < 0) {
     342                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     343                                        }
    321344                                }
    322345                                sout | "done";
    323346
    324347                                sout | "Stopping connection threads..." | nonl; flush( sout );
    325                                 adelete(workers);
     348                                for(i; options.clopts.nworkers) {
     349                                        for(j; 2) {
     350                                                ret = close(workers[i].pipe[j]);
     351                                                if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     352                                        }
     353                                        join(workers[i]);
     354                                }
    326355                        }
    327356                        sout | "done";
     
    331360                        sout | "done";
    332361
     362                        sout | "Stopping printer threads..." | nonl; flush( sout );
     363                        for(i; options.clopts.nclusters) {
     364                                StatsPrinter * p = cl[i].prnt;
     365                                if(p) {
     366                                        notify_one(p->var);
     367                                        join(*p);
     368                                }
     369                        }
     370                        sout | "done";
     371
     372                        // Now that the stats printer is stopped, we can reclaim this
     373                        adelete(workers);
     374
    333375                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
    334376                }
    335377                sout | "done";
    336378
    337                 sout | "Closing splice fds..." | nonl; flush( sout );
    338                 for(i; pipe_cnt) {
    339                         ret = close( fds[pipe_off + i] );
    340                         if(ret < 0) {
    341                                 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    342                         }
    343                 }
     379                // sout | "Closing splice fds..." | nonl; flush( sout );
     380                // for(i; pipe_cnt) {
     381                //      ret = close( fds[pipe_off + i] );
     382                //      if(ret < 0) {
     383                //              abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
     384                //      }
     385                // }
    344386                free(fds);
    345387                sout | "done";
  • benchmark/io/http/options.cfa

    r55422cf r6e2b04e  
    3535
    3636        { // socket
    37                 8080, // port
    38                 10,   // backlog
    39                 1024  // buflen
     37                8080,  // port
     38                10,    // backlog
     39                1024,  // buflen
     40                false, // onereuse
     41                false  // manyreuse
    4042        },
    4143
     
    7072                {'\0', "shell",          "Disable interactive mode", options.interactive, parse_setfalse},
    7173                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
     74                {'\0', "reuseport-one",  "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue},
     75                {'\0', "reuseport",      "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue},
    7276                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    7377                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
  • benchmark/io/http/options.hfa

    r55422cf r6e2b04e  
    2727                int backlog;
    2828                int buflen;
     29                bool onereuse;
     30                bool manyreuse;
    2931        } socket;
    3032
  • benchmark/io/http/worker.cfa

    r55422cf r6e2b04e  
    4343        /* paranoid */ assert( this.pipe[0] != -1 );
    4444        /* paranoid */ assert( this.pipe[1] != -1 );
     45
     46        const bool reuse = options.socket.manyreuse;
    4547
    4648        CONNECTION:
Note: See TracChangeset for help on using the changeset viewer.