Changeset 77ff383 for benchmark/io


Ignore:
Timestamp:
Jan 15, 2021, 3:22:50 PM (5 years ago)
Author:
m3zulfiq <m3zulfiq@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
02b73ea
Parents:
03ecdcf (diff), d46bdac (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
benchmark/io/http
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/filecache.cfa

    r03ecdcf r77ff383  
    44#include <string.h>
    55
     6#include <fstream.hfa>
    67#include <stdlib.hfa>
    78
     
    182183                conflicts += put_file( raw[i], fd );
    183184        }
    184         printf("Filled cache from path \"%s\" with %zu files\n", path, fcount);
     185        sout | "Filled cache from path \"" | path | "\" with" | fcount | "files";
    185186        if( conflicts > 0 ) {
    186                 printf("Found %d conflicts (seed: %u)\n", conflicts, options.file_cache.hash_seed);
     187                sout | "Found" | conflicts | "conflicts (seed: " | options.file_cache.hash_seed | ")";
    187188                #if defined(REJECT_CONFLICTS)
    188189                        abort("Conflicts found in the cache");
     
    191192
    192193        if(options.file_cache.list) {
    193                 printf("Listing files and exiting\n");
     194                sout | "Listing files and exiting";
    194195                for(i; fcount) {
    195196                        int s; char u;
    196197                        [s, u] = human_size(raw[i].size);
    197                         printf("%4d%c - %s\n", s, u, raw[i].file);
     198                        sout | s | u | "-" | raw[i].file;
    198199                        free(raw[i].file);
    199200                }
  • benchmark/io/http/main.cfa

    r03ecdcf r77ff383  
    1010}
    1111
     12#include <fstream.hfa>
    1213#include <kernel.hfa>
     14#include <iofwd.hfa>
    1315#include <stats.hfa>
    1416#include <time.hfa>
     
    5052
    5153//=============================================================================================
     54// Stats Printer
     55//============================================================================================='
     56
     57thread StatsPrinter {};
     58
     59void ?{}( StatsPrinter & this ) {
     60        ((thread&)this){ "Stats Printer Thread" };
     61}
     62
     63void main(StatsPrinter & this) {
     64        LOOP: for() {
     65                waitfor( ^?{} : this) {
     66                        break LOOP;
     67                }
     68                or else {}
     69
     70                sleep(10`s);
     71
     72                print_stats_now( *options.clopts.instance, CFA_STATS_READY_Q | CFA_STATS_IO );
     73        }
     74}
     75
     76//=============================================================================================
    5277// Main
    5378//============================================================================================='
     
    5984        //===================
    6085        // Open Files
    61         printf("Filling cache from %s\n", path);
     86        sout | "Filling cache from" | path;
    6287        fill_cache( path );
    6388
    6489        //===================
    6590        // Open Socket
    66         printf("%ld : Listening on port %d\n", getpid(), options.socket.port);
     91        sout | getpid() | ": Listening on port" | options.socket.port;
    6792        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    6893        if(server_fd < 0) {
     
    84109                        if(errno == EADDRINUSE) {
    85110                                if(waited == 0) {
    86                                         printf("Waiting for port\n");
     111                                        sout | "Waiting for port";
    87112                                } else {
    88                                         printf("\r%d", waited);
    89                                         fflush(stdout);
     113                                        sout | "\r" | waited | nonl;
     114                                        flush( sout );
    90115                                }
    91116                                waited ++;
     
    128153                {
    129154                        ServerProc procs[options.clopts.nprocs];
     155                        StatsPrinter printer;
    130156
    131157                        init_protocol();
     
    148174                                        unpark( workers[i] );
    149175                                }
    150                                 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);
     176                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
    151177                                {
    152178                                        char buffer[128];
    153                                         while(!feof(stdin)) {
    154                                                 fgets(buffer, 128, stdin);
     179                                        while(int ret = cfa_read(0, buffer, 128, 0, -1`s, 0p, 0p); ret != 0) {
     180                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
    155181                                        }
    156182
    157                                         printf("Shutting Down\n");
    158                                 }
    159 
     183                                        sout | "Shutdown received";
     184                                }
     185
     186                                sout | "Notifying connections";
    160187                                for(i; options.clopts.nworkers) {
    161                                         printf("Cancelling %p\n", (void*)workers[i].cancel.target);
    162188                                        workers[i].done = true;
    163189                                        cancel(workers[i].cancel);
    164190                                }
    165191
    166                                 printf("Shutting down socket\n");
     192                                sout | "Shutting down socket";
    167193                                int ret = shutdown( server_fd, SHUT_RD );
    168194                                if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
     
    170196                                //===================
    171197                                // Close Socket
    172                                 printf("Closing Socket\n");
     198                                sout | "Closing Socket";
    173199                                ret = close( server_fd );
    174200                                if(ret < 0) {
    175201                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    176202                                }
     203                                sout | "Stopping connection threads..." | nonl;
    177204                        }
    178                         printf("Workers Closed\n");
    179 
     205                        sout | "done";
     206
     207                        sout | "Stopping protocol threads..." | nonl;
    180208                        deinit_protocol();
    181                 }
    182 
     209                        sout | "done";
     210
     211                        sout | "Stopping processors..." | nonl;
     212                }
     213                sout | "done";
     214
     215                sout | "Closing splice fds..." | nonl;
    183216                for(i; pipe_cnt) {
    184217                        ret = close( fds[pipe_off + i] );
     
    188221                }
    189222                free(fds);
    190 
    191         }
     223                sout | "done";
     224
     225                sout | "Stopping processors..." | nonl;
     226        }
     227        sout | "done";
    192228
    193229        //===================
    194230        // Close Files
    195         printf("Closing Files\n");
     231        sout | "Closing open files..." | nonl;
    196232        close_cache();
    197 }
     233        sout | "done";
     234}
  • benchmark/io/http/options.cfa

    r03ecdcf r77ff383  
    5050
    5151        static cfa_option opt[] = {
    52                 {'p', "port",           "Port the server will listen on", options.socket.port},
    53                 {'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
    54                 {'L', "log",            "Enable logs", options.log, parse_settrue},
    55                 {'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    56                 {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
    57                 {'r', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    58                 {'S', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
    59                 {'C', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
    60                 {'l', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
    61                 {'s', "submitthread",   "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },
    62                 {'e', "eagersubmit",    "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},
    63                 {'f', "fixed-fds",      "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},
    64                 {'k', "kpollsubmit",    "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },
    65                 {'i', "kpollcomplete",  "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },
    66                 {'L', "submitlength",   "Max number of submitions that can be submitted together", sublen },
     52                { 'p', "port",           "Port the server will listen on", options.socket.port},
     53                { 'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
     54                { 't', "threads",        "Number of worker threads to use", options.clopts.nworkers},
     55                {'\0', "log",            "Enable logs", options.log, parse_settrue},
     56                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
     57                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
     58                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
     59                {'\0', "cache-size",     "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size },
     60                {'\0', "list-files",     "List the files in the specified path and exit", options.file_cache.list, parse_settrue },
     61                { 's', "submitthread",   "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue },
     62                { 'e', "eagersubmit",    "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue},
     63                { 'f', "fixed-fds",      "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue},
     64                { 'k', "kpollsubmit",    "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue },
     65                { 'i', "kpollcomplete",  "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue },
     66                {'\0', "submitlength",   "Max number of submitions that can be submitted together", sublen },
    6767
    6868        };
  • benchmark/io/http/protocol.cfa

    r03ecdcf r77ff383  
    55        #include <fcntl.h>
    66}
     7
     8#include <fstream.hfa>
    79#include <iofwd.hfa>
    810
     
    1113extern "C" {
    1214      int snprintf ( char * s, size_t n, const char * format, ... );
    13         #include <linux/io_uring.h>
     15        // #include <linux/io_uring.h>
    1416}
    1517#include <string.h>
     
    2426        "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    2527        "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     28        "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    2629        "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    2730        "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     
    3437        400,
    3538        404,
     39        408,
    3640        413,
    3741        414,
     
    4953                int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p);
    5054                // int ret = write(fd, it, len);
    51                 if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); }
     55                if( ret < 0 ) {
     56                        if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;
     57                        if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;
     58
     59                        abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) );
     60                }
    5261
    5362                // update it/len
     
    94103                if(ret < 0 ) {
    95104                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    96                         // if( errno == EINVAL ) return [E400, true, 0, 0];
     105                        if( errno == ECONNRESET ) return [E408, true, 0, 0];
    97106                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    98107                }
     
    108117        }
    109118
    110         if( options.log ) printf("%.*s\n", rlen, buffer);
     119        if( options.log ) {
     120                write(sout, buffer, rlen);
     121                sout | nl;
     122        }
    111123
    112124        it = buffer;
     
    119131}
    120132
    121 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
     133int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
    122134        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    123135        off_t offset = 0;
     
    128140                if( ret < 0 ) {
    129141                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     142                        if( errno == ECONNRESET ) return -ECONNRESET;
     143                        if( errno == EPIPE ) return -EPIPE;
    130144                        abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
    131145                }
     
    139153                        if( ret < 0 ) {
    140154                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
     155                                if( errno == ECONNRESET ) return -ECONNRESET;
     156                                if( errno == EPIPE ) return -EPIPE;
    141157                                abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
    142158                        }
     
    145161
    146162        }
     163        return count;
    147164}
    148165
  • benchmark/io/http/protocol.hfa

    r03ecdcf r77ff383  
    77        E400,
    88        E404,
     9        E408,
    910        E413,
    1011        E414,
     
    2122[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
    2223
    23 void sendfile( int pipe[2], int fd, int ans_fd, size_t count );
     24int sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    r03ecdcf r77ff383  
    66#include <unistd.h>
    77
     8#include <fstream.hfa>
    89#include <iofwd.hfa>
    910
     
    3334        CONNECTION:
    3435        for() {
    35                 if( options.log ) printf("=== Accepting connection ===\n");
     36                if( options.log ) sout | "=== Accepting connection ===";
    3637                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
    3738                // int fd = accept4( this.[sockfd, addr, addrlen, flags] );
    3839                if(fd < 0) {
    3940                        if( errno == ECONNABORTED ) break;
    40                         if( errno == EINVAL && this.done ) break;
     41                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
    4142                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    4243                }
    4344
    44                 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);
     45                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    4546                REQUEST:
    4647                for() {
     
    5354                        size_t len = options.socket.buflen;
    5455                        char buffer[len];
    55                         if( options.log ) printf("=== Reading request ===\n");
     56                        if( options.log ) sout | "=== Reading request ===";
    5657                        [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
    5758
    5859                        // if we are done, break out of the loop
    59                         if( closed ) {
    60                                 if( options.log ) printf("=== Connection closed ===\n");
    61                                 close(fd);
    62                                 continue CONNECTION;
    63                         }
     60                        if( closed ) break REQUEST;
    6461
    6562                        // If this wasn't a request retrun 400
    6663                        if( code != OK200 ) {
    67                                 printf("=== Invalid Request : %d ===\n", code_val(code));
     64                                sout | "=== Invalid Request :" | code_val(code) | "===";
    6865                                answer_error(fd, code);
    6966                                continue REQUEST;
     
    7168
    7269                        if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    73                                 if( options.log ) printf("=== Request for /plaintext ===\n");
     70                                if( options.log ) sout | "=== Request for /plaintext ===";
    7471
    7572                                char text[] = "Hello, World!\n";
    7673
    7774                                // Send the header
    78                                 answer_plain(fd, text, sizeof(text));
     75                                int ret = answer_plain(fd, text, sizeof(text));
     76                                if( ret == -ECONNRESET ) break REQUEST;
    7977
    80                                 if( options.log ) printf("=== Answer sent ===\n");
     78                                if( options.log ) sout | "=== Answer sent ===";
    8179                                continue REQUEST;
    8280                        }
    8381
    8482                        if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    85                                 if( options.log ) printf("=== Request for /ping ===\n");
     83                                if( options.log ) sout | "=== Request for /ping ===";
    8684
    8785                                // Send the header
    88                                 answer_empty(fd);
     86                                int ret = answer_empty(fd);
     87                                if( ret == -ECONNRESET ) break REQUEST;
    8988
    90                                 if( options.log ) printf("=== Answer sent ===\n");
     89                                if( options.log ) sout | "=== Answer sent ===";
    9190                                continue REQUEST;
    9291                        }
    9392
    94                         if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file);
     93                        if( options.log ) {
     94                                sout | "=== Request for file " | nonl;
     95                                write(sout, file, name_size);
     96                                sout | " ===";
     97                        }
    9598
    9699                        // Get the fd from the file cache
     
    101104                        // If we can't find the file, return 404
    102105                        if( ans_fd < 0 ) {
    103                                 printf("=== File Not Found ===\n");
     106                                sout | "=== File Not Found (" | nonl;
     107                                write(sout, file, name_size);
     108                                sout | ") ===";
    104109                                answer_error(fd, E404);
    105110                                continue REQUEST;
     
    107112
    108113                        // Send the header
    109                         answer_header(fd, count);
     114                        int ret = answer_header(fd, count);
     115                        if( ret == -ECONNRESET ) break REQUEST;
    110116
    111117                        // Send the desired file
    112                         sendfile( this.pipe, fd, ans_fd, count);
     118                        ret = sendfile( this.pipe, fd, ans_fd, count);
     119                        if( ret == -ECONNRESET ) break REQUEST;
    113120
    114                         if( options.log ) printf("=== Answer sent ===\n");
     121                        if( options.log ) sout | "=== Answer sent ===";
    115122                }
     123
     124                if( options.log ) sout | "=== Connection closed ===";
     125                close(fd);
     126                continue CONNECTION;
    116127        }
    117128}
Note: See TracChangeset for help on using the changeset viewer.