Changeset 8e4aa05 for benchmark


Ignore:
Timestamp:
Mar 4, 2021, 7:40:25 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
77d601f
Parents:
342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
benchmark
Files:
4 added
9 edited

Legend:

Unmodified
Added
Removed
  • benchmark/Makefile.am

    r342af53 r8e4aa05  
    502502
    503503compile-io$(EXEEXT):
    504         $(CFACOMPILE) -DNO_COMPILED_PRAGMA -fsyntax-only -w $(testdir)/io1.cfa
     504        $(CFACOMPILE) -DNO_COMPILED_PRAGMA -fsyntax-only -w $(testdir)/io/io.cfa
    505505
    506506compile-monitor$(EXEEXT):
  • benchmark/io/http/filecache.cfa

    r342af53 r8e4aa05  
    44#include <string.h>
    55
     6#include <fstream.hfa>
    67#include <stdlib.hfa>
    78
     
    182183                conflicts += put_file( raw[i], fd );
    183184        }
    184         printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);
     185        sout | "Filled cache from path \"" | path | "\" with" | fcount | "files";
    185186        if( conflicts > 0 ) {
    186                 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed);
     187                sout | "Found" | conflicts | "conflicts (seed: " | options.file_cache.hash_seed | ")";
    187188                #if defined(REJECT_CONFLICTS)
    188189                        abort("Conflicts found in the cache");
     
    191192
    192193        if(options.file_cache.list) {
    193                 printf("Listing files and exiting\n");
     194                sout | "Listing files and exiting";
    194195                for(i; fcount) {
    195196                        int s; char u;
    196197                        [s, u] = human_size(raw[i].size);
    197                         printf("%4d%c - %s\n", s, u, raw[i].file);
     198                        sout | s | u | "-" | raw[i].file;
    198199                        free(raw[i].file);
    199200                }
     
    208209
    209210[int *, int] filefds(int extra) {
     211        if(!options.file_cache.path) {
     212                int * data = alloc(extra);
     213                return [data, 0];
     214        }
     215
    210216        if(!file_cache.entries) {
    211217                abort("File cache not filled!\n");
  • benchmark/io/http/main.cfa

    r342af53 r8e4aa05  
    66#include <unistd.h>
    77extern "C" {
     8        #include <signal.h>
    89        #include <sys/socket.h>
    910        #include <netinet/in.h>
    1011}
    1112
     13#include <fstream.hfa>
    1214#include <kernel.hfa>
     15#include <iofwd.hfa>
    1316#include <stats.hfa>
    1417#include <time.hfa>
     
    2629
    2730//=============================================================================================
     31// Stats Printer
     32//============================================================================================='
     33
     34thread StatsPrinter {};
     35
     36void ?{}( StatsPrinter & this, cluster & cl ) {
     37        ((thread&)this){ "Stats Printer Thread", cl };
     38}
     39
     40void ^?{}( StatsPrinter & mutex this ) {}
     41
     42void main(StatsPrinter & this) {
     43        LOOP: for() {
     44                waitfor( ^?{} : this) {
     45                        break LOOP;
     46                }
     47                or else {}
     48
     49                sleep(10`s);
     50
     51                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
     52        }
     53}
     54
     55//=============================================================================================
    2856// Globals
    2957//=============================================================================================
    30 struct ServerProc {
    31         processor self;
     58struct ServerCluster {
     59        cluster self;
     60        processor    * procs;
     61        // io_context   * ctxs;
     62        StatsPrinter * prnt;
     63
    3264};
    3365
    34 void ?{}( ServerProc & this ) {
    35         /* paranoid */ assert( options.clopts.instance != 0p );
    36         (this.self){ "Benchmark Processor", *options.clopts.instance };
     66void ?{}( ServerCluster & this ) {
     67        (this.self){ "Server Cluster", options.clopts.params };
     68
     69        this.procs = alloc(options.clopts.nprocs);
     70        for(i; options.clopts.nprocs) {
     71                (this.procs[i]){ "Benchmark Processor", this.self };
     72
     73                #if !defined(__CFA_NO_STATISTICS__)
     74                        if( options.clopts.procstats ) {
     75                                print_stats_at_exit( *this.procs, this.self.print_stats );
     76                        }
     77                        if( options.clopts.viewhalts ) {
     78                                print_halts( *this.procs );
     79                        }
     80                #endif
     81        }
     82
     83        if(options.stats) {
     84                this.prnt = alloc();
     85                (*this.prnt){ this.self };
     86        } else {
     87                this.prnt = 0p;
     88        }
    3789
    3890        #if !defined(__CFA_NO_STATISTICS__)
    39                 if( options.clopts.procstats ) {
    40                         print_stats_at_exit( this.self, options.clopts.instance->print_stats );
    41                 }
    42                 if( options.clopts.viewhalts ) {
    43                         print_halts( this.self );
    44                 }
     91                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
    4592        #endif
     93
     94        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
     95        options.clopts.cltr_cnt++;
     96}
     97
     98void ^?{}( ServerCluster & this ) {
     99        delete(this.prnt);
     100
     101        for(i; options.clopts.nprocs) {
     102                ^(this.procs[i]){};
     103        }
     104        free(this.procs);
     105
     106        ^(this.self){};
    46107}
    47108
     
    53114//============================================================================================='
    54115int main( int argc, char * argv[] ) {
     116        __sighandler_t s = 1p;
     117        signal(SIGPIPE, s);
     118
    55119        //===================
    56120        // Parse args
    57         const char * path = parse_options(argc, argv);
     121        parse_options(argc, argv);
    58122
    59123        //===================
    60124        // Open Files
    61         printf("Filling cache from %s\n", path);
    62         fill_cache( path );
     125        if( options.file_cache.path ) {
     126                sout | "Filling cache from" | options.file_cache.path;
     127                fill_cache( options.file_cache.path );
     128        }
    63129
    64130        //===================
    65131        // Open Socket
    66         printf("%ld : Listening on port %d\n", getpid(), options.socket.port);
     132        sout | getpid() | ": Listening on port" | options.socket.port;
    67133        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    68134        if(server_fd < 0) {
     
    84150                        if(errno == EADDRINUSE) {
    85151                                if(waited == 0) {
    86                                         printf("Waiting for port\n");
     152                                        sout | "Waiting for port";
    87153                                } else {
    88                                         printf("\r%d", waited);
    89                                         fflush(stdout);
     154                                        sout | "\r" | waited | nonl;
     155                                        flush( sout );
    90156                                }
    91157                                waited ++;
     
    106172        // Run Server Cluster
    107173        {
    108                 cluster cl = { "Server Cluster", options.clopts.params };
    109                 #if !defined(__CFA_NO_STATISTICS__)
    110                         print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );
    111                 #endif
    112                 options.clopts.instance = &cl;
    113 
    114 
    115174                int pipe_cnt = options.clopts.nworkers * 2;
    116175                int pipe_off;
     
    122181                }
    123182
    124                 if(options.file_cache.fixed_fds) {
    125                         register_fixed_files(cl, fds, pipe_off);
    126                 }
     183                // if(options.file_cache.path && options.file_cache.fixed_fds) {
     184                //      register_fixed_files(cl, fds, pipe_off);
     185                // }
    127186
    128187                {
    129                         ServerProc procs[options.clopts.nprocs];
     188                        ServerCluster cl[options.clopts.nclusters];
    130189
    131190                        init_protocol();
     
    148207                                        unpark( workers[i] );
    149208                                }
    150                                 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);
     209                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     210                                for(i; options.clopts.nclusters) {
     211                                        sout | options.clopts.thrd_cnt[i] | nonl;
     212                                }
     213                                sout | nl;
    151214                                {
    152215                                        char buffer[128];
    153                                         while(!feof(stdin)) {
    154                                                 fgets(buffer, 128, stdin);
     216                                        for() {
     217                                                int ret = cfa_read(0, buffer, 128, 0);
     218                                                if(ret == 0) break;
     219                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
     220                                                sout | "User wrote '" | "" | nonl;
     221                                                write(sout, buffer, ret - 1);
     222                                                sout | "'";
    155223                                        }
    156224
    157                                         printf("Shutting Down\n");
    158                                 }
    159 
     225                                        sout | "Shutdown received";
     226                                }
     227
     228                                sout | "Notifying connections..." | nonl; flush( sout );
    160229                                for(i; options.clopts.nworkers) {
    161                                         printf("Cancelling %p\n", (void*)workers[i].cancel.target);
    162230                                        workers[i].done = true;
    163                                         cancel(workers[i].cancel);
    164                                 }
    165 
    166                                 printf("Shutting down socket\n");
     231                                }
     232                                sout | "done";
     233
     234                                sout | "Shutting down socket..." | nonl; flush( sout );
    167235                                int ret = shutdown( server_fd, SHUT_RD );
    168                                 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
     236                                if( ret < 0 ) {
     237                                        abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
     238                                }
     239                                sout | "done";
    169240
    170241                                //===================
    171242                                // Close Socket
    172                                 printf("Closing Socket\n");
     243                                sout | "Closing Socket..." | nonl; flush( sout );
    173244                                ret = close( server_fd );
    174245                                if(ret < 0) {
    175246                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    176247                                }
    177                         }
    178                         printf("Workers Closed\n");
    179 
     248                                sout | "done";
     249
     250                                sout | "Stopping connection threads..." | nonl; flush( sout );
     251                        }
     252                        sout | "done";
     253
     254                        sout | "Stopping protocol threads..." | nonl; flush( sout );
    180255                        deinit_protocol();
    181                 }
    182 
     256                        sout | "done";
     257
     258                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
     259                }
     260                sout | "done";
     261
     262                sout | "Closing splice fds..." | nonl; flush( sout );
    183263                for(i; pipe_cnt) {
    184264                        ret = close( fds[pipe_off + i] );
     
    188268                }
    189269                free(fds);
    190 
    191         }
     270                sout | "done";
     271
     272                sout | "Stopping processors..." | nonl; flush( sout );
     273        }
     274        sout | "done";
    192275
    193276        //===================
    194277        // Close Files
    195         printf("Closing Files\n");
    196         close_cache();
    197 }
     278        if( options.file_cache.path ) {
     279                sout | "Closing open files..." | nonl; flush( sout );
     280                close_cache();
     281                sout | "done";
     282        }
     283}
  • benchmark/io/http/options.cfa

    r342af53 r8e4aa05  
    99}
    1010
     11#include <bitmanip.hfa>
     12#include <fstream.hfa>
    1113#include <kernel.hfa>
    1214#include <parseargs.hfa>
     15#include <stdlib.hfa>
    1316
     17#include <stdlib.h>
    1418#include <string.h>
    1519
    1620Options options @= {
    1721        false, // log
     22        false, // stats
    1823
    1924        { // file_cache
     25                0,     // path
    2026                0,     // open_flags;
    2127                42u,   // hash_seed;
     
    3238
    3339        { // cluster
     40                1,     // nclusters;
    3441                1,     // nprocs;
    3542                1,     // nworkers;
    36                 0,     // flags;
     43                {},     // params;
    3744                false, // procstats
    3845                false, // viewhalts
     
    4148};
    4249
    43 const char * parse_options( int argc, char * argv[] ) {
    44         bool subthrd = false;
    45         bool eagrsub = false;
    46         bool fixedfd = false;
    47         bool sqkpoll = false;
    48         bool iokpoll = false;
    49         unsigned sublen = 16;
     50void parse_options( int argc, char * argv[] ) {
     51        // bool fixedfd = false;
     52        // bool sqkpoll = false;
     53        // bool iokpoll = false;
     54        unsigned nentries = 16;
     55        bool isolate = false;
     56
    5057
    5158        static cfa_option opt[] = {
    52                 {'p', "port",           "Port the server will listen on", options.socket.port},
    53                 {'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
    54                 {'L', "log",            "Enable logs", options.log, parse_settrue},
    55                 {'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    56                 {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
    57                 {'r', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    58                 {'S', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
    59                 {'C', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
    60                 {'l', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
    61                 {'s', "submitthread",   "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },
    62                 {'e', "eagersubmit",    "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},
    63                 {'f', "fixed-fds",      "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},
    64                 {'k', "kpollsubmit",    "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },
    65                 {'i', "kpollcomplete",  "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },
    66                 {'L', "submitlength",   "Max number of submitions that can be submitted together", sublen },
     59                { 'p', "port",           "Port the server will listen on", options.socket.port},
     60                { 'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
     61                { 't', "threads",        "Number of worker threads to use", options.clopts.nworkers},
     62                {'\0', "isolate",        "Create one cluster per processor", isolate, parse_settrue},
     63                {'\0', "log",            "Enable logs", options.log, parse_settrue},
     64                {'\0', "stats",          "Enable statistics", options.stats, parse_settrue},
     65                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
     66                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
     67                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
     68                {'\0', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
     69                {'\0', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
     70                // { 'f', "fixed-fds",      "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},
     71                // { 'k', "kpollsubmit",    "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },
     72                // { 'i', "kpollcomplete",  "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },
     73                {'e', "numentries",     "Number of I/O entries", nentries },
    6774
    6875        };
     
    7279        parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left );
    7380
    74         options.clopts.params.poller_submits = subthrd;
    75         options.clopts.params.eager_submits  = eagrsub;
    76 
    77         if( fixedfd ) {
    78                 options.file_cache.fixed_fds = true;
     81        if( !is_pow2(nentries) ) {
     82                unsigned v = nentries;
     83                v--;
     84                v |= v >> 1;
     85                v |= v >> 2;
     86                v |= v >> 4;
     87                v |= v >> 8;
     88                v |= v >> 16;
     89                v++;
     90                serr | "Warning: num_entries not a power of 2" | '(' | nentries | ')' | "raising to " | v;
     91                nentries = v;
     92        }
     93        if(isolate) {
     94                options.clopts.nclusters = options.clopts.nprocs;
     95                options.clopts.nprocs = 1;
     96        }
     97        options.clopts.params.num_entries = nentries;
     98        options.clopts.instance = alloc(options.clopts.nclusters);
     99        options.clopts.thrd_cnt = alloc(options.clopts.nclusters);
     100        options.clopts.cltr_cnt = 0;
     101        for(i; options.clopts.nclusters) {
     102                options.clopts.thrd_cnt[i] = 0;
    79103        }
    80104
    81         if( sqkpoll ) {
    82                 options.clopts.params.poll_submit = true;
    83                 options.file_cache.fixed_fds = true;
    84         }
    85105
    86         if( iokpoll ) {
    87                 options.clopts.params.poll_complete = true;
    88                 options.file_cache.open_flags |= O_DIRECT;
    89         }
     106        // if( fixedfd ) {
     107        //      options.file_cache.fixed_fds = true;
     108        // }
    90109
    91         options.clopts.params.num_ready = sublen;
     110        // if( sqkpoll ) {
     111        //      options.file_cache.fixed_fds = true;
     112        // }
    92113
    93         if( left[0] == 0p ) { return "."; }
     114        // if( iokpoll ) {
     115        //      options.file_cache.open_flags |= O_DIRECT;
     116        // }
     117
     118        if( left[0] == 0p ) { return; }
    94119
    95120        const char * path = left[0];
     
    97122
    98123        if( left[0] != 0p ) {
    99                 abort("Too many trailing arguments!\n");
     124                serr | "Too many trailing arguments!" | '\'' | path | '\'';
     125                while(left[0] != 0p) {
     126                        serr | " - " | left[0];
     127                        left++;
     128                }
     129                exit(EXIT_FAILURE);
    100130        }
    101131
    102         return path;
     132        options.file_cache.path = path;
    103133}
  • benchmark/io/http/options.hfa

    r342af53 r8e4aa05  
    99struct Options {
    1010        bool log;
     11        bool stats;
    1112
    1213        struct {
     14                const char * path;
    1315                int open_flags;
    1416                uint32_t hash_seed;
     
    2527
    2628        struct {
     29                int nclusters;
    2730                int nprocs;
    2831                int nworkers;
     
    3033                bool procstats;
    3134                bool viewhalts;
    32                 cluster * instance;
     35                cluster ** instance;
     36                size_t   * thrd_cnt;
     37                size_t     cltr_cnt;
    3338        } clopts;
    3439};
     
    3641extern Options options;
    3742
    38 const char * parse_options( int argc, char * argv[] );
     43void parse_options( int argc, char * argv[] );
  • benchmark/io/http/protocol.cfa

    r342af53 r8e4aa05  
    55        #include <fcntl.h>
    66}
     7
     8#include <fstream.hfa>
    79#include <iofwd.hfa>
    810
     
    1113extern "C" {
    1214      int snprintf ( char * s, size_t n, const char * format, ... );
    13         #include <linux/io_uring.h>
     15        // #include <linux/io_uring.h>
    1416}
    1517#include <string.h>
     
    1820#include "options.hfa"
    1921
    20 const char * volatile date = 0p;
    21 
    22 const char * http_msgs[] = {
    23         "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n",
    24         "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    25         "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    26         "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    27         "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    28 };
     22#define PLAINTEXT_1WRITE
     23#define PLAINTEXT_NOCOPY
     24
     25struct https_msg_str {
     26        char msg[512];
     27        size_t len;
     28};
     29
     30const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 };
    2931
    3032_Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0])));
    3133
    32 const int http_codes[] = {
     34const int http_codes[KNOWN_CODES] = {
     35        200,
    3336        200,
    3437        400,
    3538        404,
     39        405,
     40        408,
    3641        413,
    3742        414,
     
    4752        while(len > 0) {
    4853                // Call write
    49                 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p);
    50                 // int ret = write(fd, it, len);
    51                 if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); }
     54                int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY);
     55                if( ret < 0 ) {
     56                        if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;
     57                        if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
     58
     59                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
     60                }
    5261
    5362                // update it/len
     
    6170        /* paranoid */ assert( code < KNOWN_CODES && code != OK200 );
    6271        int idx = (int)code;
    63         return answer( fd, http_msgs[idx], strlen( http_msgs[idx] ) );
     72        return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len );
    6473}
    6574
    6675int answer_header( int fd, size_t size ) {
    67         const char * fmt = http_msgs[OK200];
    68         int len = 200;
    69         char buffer[len];
    70         len = snprintf(buffer, len, fmt, date, size);
     76        char buffer[512];
     77        char * it = buffer;
     78        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     79        it += http_msgs[OK200]->len;
     80        int len = http_msgs[OK200]->len;
     81        len += snprintf(it, 512 - len, "%d \n\n", size);
    7182        return answer( fd, buffer, len );
    7283}
    7384
    74 int answer_plain( int fd, char buffer[], size_t size ) {
    75         int ret = answer_header(fd, size);
     85#if defined(PLAINTEXT_NOCOPY)
     86int answer_plaintext( int fd ) {
     87        return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len + 1); // +1 cause snprintf doesn't count nullterminator
     88}
     89#elif defined(PLAINTEXT_1WRITE)
     90int answer_plaintext( int fd ) {
     91        char text[] = "Hello, World!\n";
     92        char buffer[512 + sizeof(text)];
     93        char * it = buffer;
     94        memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len);
     95        it += http_msgs[OK200]->len;
     96        int len = http_msgs[OK200]->len;
     97        int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text));
     98        it += r;
     99        len += r;
     100        memcpy(it, text, sizeof(text));
     101        return answer(fd, buffer, len + sizeof(text));
     102}
     103#else
     104int answer_plaintext( int fd ) {
     105        char text[] = "Hello, World!\n";
     106        int ret = answer_header(fd, sizeof(text));
    76107        if( ret < 0 ) return ret;
    77         return answer(fd, buffer, size);
    78 }
     108        return answer(fd, text, sizeof(text));
     109}
     110#endif
    79111
    80112int answer_empty( int fd ) {
     
    83115
    84116
    85 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) {
     117[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
    86118        char * it = buffer;
    87119        size_t count = len - 1;
     
    89121        READ:
    90122        for() {
    91                 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p);
     123                int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
    92124                // int ret = read(fd, (void*)it, count);
    93125                if(ret == 0 ) return [OK200, true, 0, 0];
    94126                if(ret < 0 ) {
    95127                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    96                         // if( errno == EINVAL ) return [E400, true, 0, 0];
     128                        if( errno == ECONNRESET ) return [E408, true, 0, 0];
     129                        if( errno == EPIPE ) return [E408, true, 0, 0];
    97130                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    98131                }
     
    108141        }
    109142
    110         if( options.log ) printf("%.*s\n", rlen, buffer);
     143        if( options.log ) {
     144                write(sout, buffer, rlen);
     145                sout | nl;
     146        }
    111147
    112148        it = buffer;
     
    119155}
    120156
    121 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
     157int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
    122158        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    123159        off_t offset = 0;
    124160        ssize_t ret;
    125161        SPLICE1: while(count > 0) {
    126                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p);
    127                 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
     162                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY);
    128163                if( ret < 0 ) {
    129164                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     165                        if( errno == ECONNRESET ) return -ECONNRESET;
     166                        if( errno == EPIPE ) return -EPIPE;
    130167                        abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
    131168                }
     
    135172                size_t in_pipe = ret;
    136173                SPLICE2: while(in_pipe > 0) {
    137                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p);
    138                         // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
     174                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);
    139175                        if( ret < 0 ) {
    140176                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
     177                                if( errno == ECONNRESET ) return -ECONNRESET;
     178                                if( errno == EPIPE ) return -EPIPE;
    141179                                abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
    142180                        }
     
    145183
    146184        }
     185        return count;
    147186}
    148187
     
    153192#include <thread.hfa>
    154193
     194const char * original_http_msgs[] = {
     195        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",
     196        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n",
     197        "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     198        "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     199        "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     200        "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     201        "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     202        "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     203};
     204
    155205struct date_buffer {
    156         char buff[100];
     206        https_msg_str strs[KNOWN_CODES];
    157207};
    158208
     
    163213
    164214void ?{}( DateFormater & this ) {
    165         ((thread&)this){ "Server Date Thread", *options.clopts.instance };
     215        ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
    166216        this.idx = 0;
    167         memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );
    168         memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );
     217        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
     218        memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );
    169219}
    170220
     
    176226                or else {}
    177227
     228
     229                char buff[100];
    178230                Time now = getTimeNsec();
    179 
    180                 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
    181 
    182                 char * next = this.buffers[this.idx].buff;
    183                 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST);
     231                strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
     232                sout | "Updated date to '" | buff | "'";
     233
     234                for(i; KNOWN_CODES) {
     235                        size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff );
     236                        this.buffers[this.idx].strs[i].len = len;
     237                }
     238
     239                for(i; KNOWN_CODES) {
     240                        https_msg_str * next = &this.buffers[this.idx].strs[i];
     241                        __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST);
     242                }
    184243                this.idx = (this.idx + 1) % 2;
     244
     245                sout | "Date thread sleeping";
    185246
    186247                sleep(1`s);
  • benchmark/io/http/protocol.hfa

    r342af53 r8e4aa05  
    11#pragma once
    2 
    3 struct io_cancellation;
    42
    53enum HttpCode {
    64        OK200 = 0,
     5        OK200_PlainText,
    76        E400,
    87        E404,
     8        E405,
     9        E408,
    910        E413,
    1011        E414,
     
    1617int answer_error( int fd, HttpCode code );
    1718int answer_header( int fd, size_t size );
    18 int answer_plain( int fd, char buffer [], size_t size );
     19int answer_plaintext( int fd );
    1920int answer_empty( int fd );
    2021
    21 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
     22[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
    2223
    23 void sendfile( int pipe[2], int fd, int ans_fd, size_t count );
     24int sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    r342af53 r8e4aa05  
    66#include <unistd.h>
    77
     8#include <fstream.hfa>
    89#include <iofwd.hfa>
    910
     
    1617//=============================================================================================
    1718void ?{}( Worker & this ) {
    18         ((thread&)this){ "Server Worker Thread", *options.clopts.instance };
     19        size_t cli = rand() % options.clopts.cltr_cnt;
     20        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli] };
     21        options.clopts.thrd_cnt[cli]++;
    1922        this.pipe[0] = -1;
    2023        this.pipe[1] = -1;
     
    3336        CONNECTION:
    3437        for() {
    35                 if( options.log ) printf("=== Accepting connection ===\n");
    36                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
    37                 // int fd = accept4( this.[sockfd, addr, addrlen, flags] );
     38                if( options.log ) sout | "=== Accepting connection ===";
     39                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
    3840                if(fd < 0) {
    3941                        if( errno == ECONNABORTED ) break;
    40                         if( errno == EINVAL && this.done ) break;
     42                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
    4143                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    4244                }
     45                if(this.done) break;
    4346
    44                 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);
     47                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    4548                REQUEST:
    4649                for() {
     
    5356                        size_t len = options.socket.buflen;
    5457                        char buffer[len];
    55                         if( options.log ) printf("=== Reading request ===\n");
    56                         [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
     58                        if( options.log ) sout | "=== Reading request ===";
     59                        [code, closed, file, name_size] = http_read(fd, buffer, len);
    5760
    5861                        // if we are done, break out of the loop
    59                         if( closed ) {
    60                                 if( options.log ) printf("=== Connection closed ===\n");
    61                                 close(fd);
    62                                 continue CONNECTION;
    63                         }
     62                        if( closed ) break REQUEST;
    6463
    6564                        // If this wasn't a request retrun 400
    6665                        if( code != OK200 ) {
    67                                 printf("=== Invalid Request : %d ===\n", code_val(code));
     66                                sout | "=== Invalid Request :" | code_val(code) | "===";
    6867                                answer_error(fd, code);
    6968                                continue REQUEST;
     
    7170
    7271                        if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    73                                 if( options.log ) printf("=== Request for /plaintext ===\n");
     72                                if( options.log ) sout | "=== Request for /plaintext ===";
    7473
    75                                 char text[] = "Hello, World!\n";
     74                                int ret = answer_plaintext(fd);
     75                                if( ret == -ECONNRESET ) break REQUEST;
    7676
    77                                 // Send the header
    78                                 answer_plain(fd, text, sizeof(text));
    79 
    80                                 if( options.log ) printf("=== Answer sent ===\n");
     77                                if( options.log ) sout | "=== Answer sent ===";
    8178                                continue REQUEST;
    8279                        }
    8380
    8481                        if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    85                                 if( options.log ) printf("=== Request for /ping ===\n");
     82                                if( options.log ) sout | "=== Request for /ping ===";
    8683
    8784                                // Send the header
    88                                 answer_empty(fd);
     85                                int ret = answer_empty(fd);
     86                                if( ret == -ECONNRESET ) break REQUEST;
    8987
    90                                 if( options.log ) printf("=== Answer sent ===\n");
     88                                if( options.log ) sout | "=== Answer sent ===";
    9189                                continue REQUEST;
    9290                        }
    9391
    94                         if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file);
     92                        if( options.log ) {
     93                                sout | "=== Request for file " | nonl;
     94                                write(sout, file, name_size);
     95                                sout | " ===";
     96                        }
     97
     98                        if( !options.file_cache.path ) {
     99                                if( options.log ) {
     100                                        sout | "=== File Not Found (" | nonl;
     101                                        write(sout, file, name_size);
     102                                        sout | ") ===";
     103                                }
     104                                answer_error(fd, E405);
     105                                continue REQUEST;
     106                        }
    95107
    96108                        // Get the fd from the file cache
     
    101113                        // If we can't find the file, return 404
    102114                        if( ans_fd < 0 ) {
    103                                 printf("=== File Not Found ===\n");
     115                                if( options.log ) {
     116                                        sout | "=== File Not Found (" | nonl;
     117                                        write(sout, file, name_size);
     118                                        sout | ") ===";
     119                                }
    104120                                answer_error(fd, E404);
    105121                                continue REQUEST;
     
    107123
    108124                        // Send the header
    109                         answer_header(fd, count);
     125                        int ret = answer_header(fd, count);
     126                        if( ret == -ECONNRESET ) break REQUEST;
    110127
    111128                        // Send the desired file
    112                         sendfile( this.pipe, fd, ans_fd, count);
     129                        ret = sendfile( this.pipe, fd, ans_fd, count);
     130                        if( ret == -ECONNRESET ) break REQUEST;
    113131
    114                         if( options.log ) printf("=== Answer sent ===\n");
     132                        if( options.log ) sout | "=== Answer sent ===";
    115133                }
     134
     135                if( options.log ) sout | "=== Connection closed ===";
     136                close(fd);
     137                continue CONNECTION;
    116138        }
    117139}
  • benchmark/io/http/worker.hfa

    r342af53 r8e4aa05  
    1717        socklen_t * addrlen;
    1818        int flags;
    19         io_cancellation cancel;
    2019        volatile bool done;
    2120};
Note: See TracChangeset for help on using the changeset viewer.