Changeset 266ecf1 for benchmark


Ignore:
Timestamp:
Mar 2, 2021, 5:28:32 PM (5 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
Children:
6083392
Parents:
182256b (diff), 9eb7a532 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
benchmark/io/http
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/http_ring.cpp

    r182256b r266ecf1  
    2020                socklen_t *addrlen;
    2121                int flags;
     22                unsigned cnt;
    2223        } acpt;
    2324
     
    6768thread_local stats_block_t stats;
    6869stats_block_t global_stats;
     70
     71thread_local struct __attribute__((aligned(128))) {
     72        size_t to_submit = 0;
     73} local;
    6974
    7075// Get an array of current connections
     
    192197        static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) {
    193198                (void)ring;
     199                local.to_submit++;
    194200                #ifdef USE_ASYNC
    195201                        io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
     
    406412                switch(state) {
    407413                case ACCEPTING:
    408                         connection::accept(ring, opt);
     414                        // connection::accept(ring, opt);
    409415                        newconn(ring, res);
    410416                        break;
     
    420426
    421427//=========================================================
     428extern "C" {
     429        #include <sys/eventfd.h>  // use for termination
     430}
     431
    422432// Main loop of the WebServer
    423433// Effectively uses one thread_local copy of everything per kernel thread
     
    427437        struct io_uring * ring = opt.ring;
    428438
     439        int blockfd = eventfd(0, 0);
     440        if (blockfd < 0) {
     441                fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );
     442                exit(EXIT_FAILURE);
     443        }
     444
     445        int ret = io_uring_register_eventfd(ring, blockfd);
     446        if (ret < 0) {
     447                fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );
     448                exit(EXIT_FAILURE);
     449        }
     450
    429451        // Track the shutdown using a event_fd
    430452        char endfd_buf[8];
     
    433455        // Accept our first connection
    434456        // May not take effect until io_uring_submit_and_wait
    435         connection::accept(ring, opt);
     457        for(unsigned i = 0; i < opt.acpt.cnt; i++) {
     458                connection::accept(ring, opt);
     459        }
    436460
    437461        int reset = 1;       // Counter to print stats once in a while
     
    441465        while(!done) {
    442466                // Submit all the answers we have and wait for responses
    443                 int ret = io_uring_submit_and_wait(ring, 1);
     467                int ret = io_uring_submit(ring);
     468                local.to_submit = 0;
    444469
    445470                // check errors
     
    452477                sqes += ret;
    453478                call++;
     479
     480
     481                eventfd_t val;
     482                ret = eventfd_read(blockfd, &val);
     483
     484                // check errors
     485                if (ret < 0) {
     486                        fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );
     487                        exit(EXIT_FAILURE);
     488                }
    454489
    455490                struct io_uring_cqe *cqe;
     
    463498                                break;
    464499                        }
     500
     501                        if(local.to_submit > 30) break;
    465502
    466503                        auto req = (class connection *)cqe->user_data;
     
    509546        #include <pthread.h>      // for pthreads
    510547        #include <signal.h>       // for signal(SIGPIPE, SIG_IGN);
    511         #include <sys/eventfd.h>  // use for termination
    512548        #include <sys/socket.h>   // for sockets in general
    513549        #include <netinet/in.h>   // for sockaddr_in, AF_INET
     
    528564        unsigned entries = 256;     // number of entries per ring/kernel thread
    529565        unsigned backlog = 262144;  // backlog argument to listen
     566        unsigned preaccept = 1;     // start by accepting X per threads
    530567        bool attach = false;        // Whether or not to attach all the rings
    531568        bool sqpoll = false;        // Whether or not to use SQ Polling
     
    534571        // Arguments Parsing
    535572        int c;
    536         while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) {
     573        while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) {
    537574                switch (c)
    538575                {
     
    548585                case 'b':
    549586                        backlog = atoi(optarg);
     587                        break;
     588                case 'c':
     589                        preaccept = atoi(optarg);
    550590                        break;
    551591                case 'a':
     
    681721                thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen;
    682722                thrd_opts[i].acpt.flags   = 0;
     723                thrd_opts[i].acpt.cnt     = preaccept;
    683724                thrd_opts[i].endfd        = efd;
    684725                thrd_opts[i].ring         = &thrd_rings[i].storage;
  • benchmark/io/http/main.cfa

    r182256b r266ecf1  
    2929
    3030//=============================================================================================
    31 // Globals
    32 //=============================================================================================
    33 struct ServerProc {
    34         processor self;
    35 };
    36 
    37 void ?{}( ServerProc & this ) {
    38         /* paranoid */ assert( options.clopts.instance != 0p );
    39         (this.self){ "Benchmark Processor", *options.clopts.instance };
    40 
    41         #if !defined(__CFA_NO_STATISTICS__)
    42                 if( options.clopts.procstats ) {
    43                         print_stats_at_exit( this.self, options.clopts.instance->print_stats );
    44                 }
    45                 if( options.clopts.viewhalts ) {
    46                         print_halts( this.self );
    47                 }
    48         #endif
    49 }
    50 
    51 extern void init_protocol(void);
    52 extern void deinit_protocol(void);
    53 
    54 //=============================================================================================
    5531// Stats Printer
    5632//============================================================================================='
     
    5834thread StatsPrinter {};
    5935
    60 void ?{}( StatsPrinter & this ) {
    61         ((thread&)this){ "Stats Printer Thread" };
    62 }
     36void ?{}( StatsPrinter & this, cluster & cl ) {
     37        ((thread&)this){ "Stats Printer Thread", cl };
     38}
     39
     40void ^?{}( StatsPrinter & mutex this ) {}
    6341
    6442void main(StatsPrinter & this) {
     
    7149                sleep(10`s);
    7250
    73                 print_stats_now( *options.clopts.instance, CFA_STATS_READY_Q | CFA_STATS_IO );
    74         }
    75 }
     51                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
     52        }
     53}
     54
     55//=============================================================================================
     56// Globals
     57//=============================================================================================
     58struct ServerCluster {
     59        cluster self;
     60        processor    * procs;
     61        // io_context   * ctxs;
     62        StatsPrinter * prnt;
     63
     64};
     65
     66void ?{}( ServerCluster & this ) {
     67        (this.self){ "Server Cluster", options.clopts.params };
     68
     69        this.procs = alloc(options.clopts.nprocs);
     70        for(i; options.clopts.nprocs) {
     71                (this.procs[i]){ "Benchmark Processor", this.self };
     72
     73                #if !defined(__CFA_NO_STATISTICS__)
     74                        if( options.clopts.procstats ) {
     75                                print_stats_at_exit( *this.procs, this.self.print_stats );
     76                        }
     77                        if( options.clopts.viewhalts ) {
     78                                print_halts( *this.procs );
     79                        }
     80                #endif
     81        }
     82
     83        if(options.stats) {
     84                this.prnt = alloc();
     85                (*this.prnt){ this.self };
     86        } else {
     87                this.prnt = 0p;
     88        }
     89
     90        #if !defined(__CFA_NO_STATISTICS__)
     91                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
     92        #endif
     93
     94        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
     95        options.clopts.cltr_cnt++;
     96}
     97
     98void ^?{}( ServerCluster & this ) {
     99        delete(this.prnt);
     100
     101        for(i; options.clopts.nprocs) {
     102                ^(this.procs[i]){};
     103        }
     104        free(this.procs);
     105
     106        ^(this.self){};
     107}
     108
     109extern void init_protocol(void);
     110extern void deinit_protocol(void);
    76111
    77112//=============================================================================================
     
    137172        // Run Server Cluster
    138173        {
    139                 cluster cl = { "Server Cluster", options.clopts.params };
    140                 #if !defined(__CFA_NO_STATISTICS__)
    141                         print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );
    142                 #endif
    143                 options.clopts.instance = &cl;
    144 
    145 
    146174                int pipe_cnt = options.clopts.nworkers * 2;
    147175                int pipe_off;
     
    153181                }
    154182
    155                 if(options.file_cache.path && options.file_cache.fixed_fds) {
    156                         register_fixed_files(cl, fds, pipe_off);
    157                 }
     183                // if(options.file_cache.path && options.file_cache.fixed_fds) {
     184                //      register_fixed_files(cl, fds, pipe_off);
     185                // }
    158186
    159187                {
    160                         ServerProc procs[options.clopts.nprocs];
    161                         StatsPrinter printer;
     188                        ServerCluster cl[options.clopts.nclusters];
    162189
    163190                        init_protocol();
     
    180207                                        unpark( workers[i] );
    181208                                }
    182                                 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
     209                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     210                                for(i; options.clopts.nclusters) {
     211                                        sout | options.clopts.thrd_cnt[i] | nonl;
     212                                }
     213                                sout | nl;
    183214                                {
    184215                                        char buffer[128];
    185                                         while(int ret = cfa_read(0, buffer, 128, 0, -1`s, 0p, 0p); ret != 0) {
     216                                        for() {
     217                                                int ret = cfa_read(0, buffer, 128, 0);
     218                                                if(ret == 0) break;
    186219                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
     220                                                sout | "User wrote '" | "" | nonl;
     221                                                write(sout, buffer, ret - 1);
     222                                                sout | "'";
    187223                                        }
    188224
     
    193229                                for(i; options.clopts.nworkers) {
    194230                                        workers[i].done = true;
    195                                         cancel(workers[i].cancel);
    196231                                }
    197232                                sout | "done";
     
    221256                        sout | "done";
    222257
    223                         sout | "Stopping processors..." | nonl; flush( sout );
     258                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
    224259                }
    225260                sout | "done";
  • benchmark/io/http/options.cfa

    r182256b r266ecf1  
    1313#include <kernel.hfa>
    1414#include <parseargs.hfa>
     15#include <stdlib.hfa>
    1516
    1617#include <stdlib.h>
     
    1920Options options @= {
    2021        false, // log
     22        false, // stats
    2123
    2224        { // file_cache
     
    3638
    3739        { // cluster
     40                1,     // nclusters;
    3841                1,     // nprocs;
    3942                1,     // nworkers;
     
    4649
    4750void parse_options( int argc, char * argv[] ) {
    48         bool subthrd = false;
    49         bool eagrsub = false;
    50         bool fixedfd = false;
    51         bool sqkpoll = false;
    52         bool iokpoll = false;
    53         unsigned sublen = 16;
     51        // bool fixedfd = false;
     52        // bool sqkpoll = false;
     53        // bool iokpoll = false;
    5454        unsigned nentries = 16;
     55        bool isolate = false;
    5556
    5657
     
    5960                { 'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
    6061                { 't', "threads",        "Number of worker threads to use", options.clopts.nworkers},
     62                {'\0', "isolate",        "Create one cluster per processor", isolate, parse_settrue},
    6163                {'\0', "log",            "Enable logs", options.log, parse_settrue},
     64                {'\0', "stats",          "Enable statistics", options.stats, parse_settrue},
    6265                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
    6366                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
     
    6568                {'\0', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
    6669                {'\0', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
    67                 { 's', "submitthread",   "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },
    68                 { 'e', "eagersubmit",    "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},
    69                 { 'f', "fixed-fds",      "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},
    70                 { 'k', "kpollsubmit",    "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },
    71                 { 'i', "kpollcomplete",  "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },
    72                 {'\0', "submitlength",   "Max number of submitions that can be submitted together", sublen },
    73                 {'\0', "numentries",     "Number of I/O entries", nentries },
     70                // { 'f', "fixed-fds",      "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},
     71                // { 'k', "kpollsubmit",    "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },
     72                // { 'i', "kpollcomplete",  "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },
     73                {'e', "numentries",     "Number of I/O entries", nentries },
    7474
    7575        };
     
    9191                nentries = v;
    9292        }
     93        if(isolate) {
     94                options.clopts.nclusters = options.clopts.nprocs;
     95                options.clopts.nprocs = 1;
     96        }
    9397        options.clopts.params.num_entries = nentries;
    94 
    95         options.clopts.params.poller_submits = subthrd;
    96         options.clopts.params.eager_submits  = eagrsub;
    97 
    98         if( fixedfd ) {
    99                 options.file_cache.fixed_fds = true;
     98        options.clopts.instance = alloc(options.clopts.nclusters);
     99        options.clopts.thrd_cnt = alloc(options.clopts.nclusters);
     100        options.clopts.cltr_cnt = 0;
     101        for(i; options.clopts.nclusters) {
     102                options.clopts.thrd_cnt[i] = 0;
    100103        }
    101104
    102         if( sqkpoll ) {
    103                 options.clopts.params.poll_submit = true;
    104                 options.file_cache.fixed_fds = true;
    105         }
    106105
    107         if( iokpoll ) {
    108                 options.clopts.params.poll_complete = true;
    109                 options.file_cache.open_flags |= O_DIRECT;
    110         }
     106        // if( fixedfd ) {
     107        //      options.file_cache.fixed_fds = true;
     108        // }
    111109
    112         options.clopts.params.num_ready = sublen;
     110        // if( sqkpoll ) {
     111        //      options.file_cache.fixed_fds = true;
     112        // }
     113
     114        // if( iokpoll ) {
     115        //      options.file_cache.open_flags |= O_DIRECT;
     116        // }
    113117
    114118        if( left[0] == 0p ) { return; }
  • benchmark/io/http/options.hfa

    r182256b r266ecf1  
    99struct Options {
    1010        bool log;
     11        bool stats;
    1112
    1213        struct {
     
    2627
    2728        struct {
     29                int nclusters;
    2830                int nprocs;
    2931                int nworkers;
     
    3133                bool procstats;
    3234                bool viewhalts;
    33                 cluster * instance;
     35                cluster ** instance;
     36                size_t   * thrd_cnt;
     37                size_t     cltr_cnt;
    3438        } clopts;
    3539};
  • benchmark/io/http/protocol.cfa

    r182256b r266ecf1  
    2020#include "options.hfa"
    2121
    22 const char * volatile date = 0p;
    23 
    24 const char * http_msgs[] = {
    25         "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n",
    26         "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    27         "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    28         "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    29         "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    30         "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    31         "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    32 };
     22#define PLAINTEXT_1WRITE
     23#define PLAINTEXT_NOCOPY
     24
     25struct https_msg_str {
     26        char msg[512];
     27        size_t len;
     28};
     29
     30const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
    3331
    3432_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
    3533
    36 const int http_codes[] = {
     34const int http_codes[KNOWN_CODES] = {
     35        200,
    3736        200,
    3837        400,
     
    5352        while(len > 0) {
    5453                // Call write
    55                 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p);
    56                 // int ret = write(fd, it, len);
     54                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
    5755                if( ret < 0 ) {
    5856                        if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;
     
    7270        /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
    7371        int idx = (int)code;
    74         return answer( fd, http_msgs[idx], strlen( http_msgs[idx] ) );
     72        return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
    7573}
    7674
    7775int answer_header( int fd, size_t size ) {
    78         const char * fmt = http_msgs[OK200];
    79         int len = 200;
    80         char buffer[len];
    81         len = snprintf(buffer, len, fmt, date, size);
     76        char buffer[512];
     77        char * it = buffer;
     78        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     79        it += http_msgs[OK200]->len;
     80        int len = http_msgs[OK200]->len;
     81        len += snprintf(it, 512 - len, "%d \n\n", size);
    8282        return answer( fd, buffer, len );
    8383}
    8484
    85 int answer_plain( int fd, char buffer[], size_t size ) {
    86         int ret = answer_header(fd, size);
     85#if defined(PLAINTEXT_NOCOPY)
     86int answer_plaintext( int fd ) {
     87        return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len + 1); // +1 cause snprintf doesn't count nullterminator
     88}
     89#elif defined(PLAINTEXT_1WRITE)
     90int answer_plaintext( int fd ) {
     91        char text[] = "Hello, World!\n";
     92        char buffer[512 + sizeof(text)];
     93        char * it = buffer;
     94        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     95        it += http_msgs[OK200]->len;
     96        int len = http_msgs[OK200]->len;
     97        int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
     98        it += r;
     99        len += r;
     100        memcpy(it, text, sizeof(text));
     101        return answer(fd, buffer, len + sizeof(text));
     102}
     103#else
     104int answer_plaintext( int fd ) {
     105        char text[] = "Hello, World!\n";
     106        int ret = answer_header(fd, sizeof(text));
    87107        if( ret < 0 ) return ret;
    88         return answer(fd, buffer, size);
    89 }
     108        return answer(fd, text, sizeof(text));
     109}
     110#endif
    90111
    91112int answer_empty( int fd ) {
     
    94115
    95116
    96 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) {
     117[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
    97118        char * it = buffer;
    98119        size_t count = len - 1;
     
    100121        READ:
    101122        for() {
    102                 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p);
     123                int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
    103124                // int ret = read(fd, (void*)it, count);
    104125                if(ret == 0 ) return [OK200, true, 0, 0];
     
    139160        ssize_t ret;
    140161        SPLICE1: while(count > 0) {
    141                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p);
    142                 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
     162                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
    143163                if( ret < 0 ) {
    144164                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     
    152172                size_t in_pipe = ret;
    153173                SPLICE2: while(in_pipe > 0) {
    154                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p);
    155                         // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
     174                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
    156175                        if( ret < 0 ) {
    157176                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
     
    173192#include <thread.hfa>
    174193
     194const char * original_http_msgs[] = {
     195        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
     196        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n",
     197        "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     198        "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     199        "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     200        "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     201        "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     202        "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     203};
     204
    175205struct date_buffer {
    176         char buff[100];
     206        https_msg_str strs[KNOWN_CODES];
    177207};
    178208
     
    183213
    184214void ?{}( DateFormater & this ) {
    185         ((thread&)this){ "Server Date Thread", *options.clopts.instance };
     215        ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
    186216        this.idx = 0;
    187         memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );
    188         memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );
     217        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
     218        memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
    189219}
    190220
     
    196226                or else {}
    197227
     228
     229                char buff[100];
    198230                Time now = getTimeNsec();
    199 
    200                 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
    201 
    202                 char * next = this.buffers[this.idx].buff;
    203                 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST);
     231                strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
     232                sout | "Updated date to '" | buff | "'";
     233
     234                for(i; KNOWN_CODES) {
     235                        size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
     236                        this.buffers[this.idx].strs[i].len = len;
     237                }
     238
     239                for(i; KNOWN_CODES) {
     240                        https_msg_str * next = &this.buffers[this.idx].strs[i];
     241                        __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
     242                }
    204243                this.idx = (this.idx + 1) % 2;
     244
     245                sout | "Date thread sleeping";
    205246
    206247                sleep(1`s);
  • benchmark/io/http/protocol.hfa

    r182256b r266ecf1  
    11#pragma once
    2 
    3 struct io_cancellation;
    42
    53enum HttpCode {
    64        OK200 = 0,
     5        OK200_PlainText,
    76        E400,
    87        E404,
     
    1817int answer_error( int fd, HttpCode code );
    1918int answer_header( int fd, size_t size );
    20 int answer_plain( int fd, char buffer [], size_t size );
     19int answer_plaintext( int fd );
    2120int answer_empty( int fd );
    2221
    23 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
     22[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
    2423
    2524int sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    r182256b r266ecf1  
    1717//=============================================================================================
    1818void ?{}( Worker & this ) {
    19         ((thread&)this){ "Server Worker Thread", *options.clopts.instance };
     19        size_t cli = rand() % options.clopts.cltr_cnt;
     20        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli] };
     21        options.clopts.thrd_cnt[cli]++;
    2022        this.pipe[0] = -1;
    2123        this.pipe[1] = -1;
     
    3537        for() {
    3638                if( options.log ) sout | "=== Accepting connection ===";
    37                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
    38                 // int fd = accept4( this.[sockfd, addr, addrlen, flags] );
     39                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
    3940                if(fd < 0) {
    4041                        if( errno == ECONNABORTED ) break;
     
    4243                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    4344                }
     45                if(this.done) break;
    4446
    4547                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     
    5557                        char buffer[len];
    5658                        if( options.log ) sout | "=== Reading request ===";
    57                         [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
     59                        [code, closed, file, name_size] = http_read(fd, buffer, len);
    5860
    5961                        // if we are done, break out of the loop
     
    7072                                if( options.log ) sout | "=== Request for /plaintext ===";
    7173
    72                                 char text[] = "Hello, World!\n";
    73 
    74                                 // Send the header
    75                                 int ret = answer_plain(fd, text, sizeof(text));
     74                                int ret = answer_plaintext(fd);
    7675                                if( ret == -ECONNRESET ) break REQUEST;
    7776
  • benchmark/io/http/worker.hfa

    r182256b r266ecf1  
    1717        socklen_t * addrlen;
    1818        int flags;
    19         io_cancellation cancel;
    2019        volatile bool done;
    2120};
Note: See TracChangeset for help on using the changeset viewer.