Ignore:
Timestamp:
Mar 4, 2021, 7:40:25 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
77d601f
Parents:
342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    r342af53 r8e4aa05  
    66#include <unistd.h>
    77extern "C" {
     8        #include <signal.h>
    89        #include <sys/socket.h>
    910        #include <netinet/in.h>
    1011}
    1112
     13#include <fstream.hfa>
    1214#include <kernel.hfa>
     15#include <iofwd.hfa>
    1316#include <stats.hfa>
    1417#include <time.hfa>
     
    2629
    2730//=============================================================================================
     31// Stats Printer
     32//============================================================================================='
     33
     34thread StatsPrinter {};
     35
     36void ?{}( StatsPrinter & this, cluster & cl ) {
     37        ((thread&)this){ "Stats Printer Thread", cl };
     38}
     39
     40void ^?{}( StatsPrinter & mutex this ) {}
     41
     42void main(StatsPrinter & this) {
     43        LOOP: for() {
     44                waitfor( ^?{} : this) {
     45                        break LOOP;
     46                }
     47                or else {}
     48
     49                sleep(10`s);
     50
     51                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
     52        }
     53}
     54
     55//=============================================================================================
    2856// Globals
    2957//=============================================================================================
    30 struct ServerProc {
    31         processor self;
     58struct ServerCluster {
     59        cluster self;
     60        processor    * procs;
     61        // io_context   * ctxs;
     62        StatsPrinter * prnt;
     63
    3264};
    3365
    34 void ?{}( ServerProc & this ) {
    35         /* paranoid */ assert( options.clopts.instance != 0p );
    36         (this.self){ "Benchmark Processor", *options.clopts.instance };
     66void ?{}( ServerCluster & this ) {
     67        (this.self){ "Server Cluster", options.clopts.params };
     68
     69        this.procs = alloc(options.clopts.nprocs);
     70        for(i; options.clopts.nprocs) {
     71                (this.procs[i]){ "Benchmark Processor", this.self };
     72
     73                #if !defined(__CFA_NO_STATISTICS__)
     74                        if( options.clopts.procstats ) {
     75                                print_stats_at_exit( *this.procs, this.self.print_stats );
     76                        }
     77                        if( options.clopts.viewhalts ) {
     78                                print_halts( *this.procs );
     79                        }
     80                #endif
     81        }
     82
     83        if(options.stats) {
     84                this.prnt = alloc();
     85                (*this.prnt){ this.self };
     86        } else {
     87                this.prnt = 0p;
     88        }
    3789
    3890        #if !defined(__CFA_NO_STATISTICS__)
    39                 if( options.clopts.procstats ) {
    40                         print_stats_at_exit( this.self, options.clopts.instance->print_stats );
    41                 }
    42                 if( options.clopts.viewhalts ) {
    43                         print_halts( this.self );
    44                 }
     91                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
    4592        #endif
     93
     94        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
     95        options.clopts.cltr_cnt++;
     96}
     97
     98void ^?{}( ServerCluster & this ) {
     99        delete(this.prnt);
     100
     101        for(i; options.clopts.nprocs) {
     102                ^(this.procs[i]){};
     103        }
     104        free(this.procs);
     105
     106        ^(this.self){};
    46107}
    47108
     
    53114//============================================================================================='
    54115int main( int argc, char * argv[] ) {
     116        __sighandler_t s = 1p;
     117        signal(SIGPIPE, s);
     118
    55119        //===================
    56120        // Parse args
    57         const char * path = parse_options(argc, argv);
     121        parse_options(argc, argv);
    58122
    59123        //===================
    60124        // Open Files
    61         printf("Filling cache from %s\n", path);
    62         fill_cache( path );
     125        if( options.file_cache.path ) {
     126                sout | "Filling cache from" | options.file_cache.path;
     127                fill_cache( options.file_cache.path );
     128        }
    63129
    64130        //===================
    65131        // Open Socket
    66         printf("%ld : Listening on port %d\n", getpid(), options.socket.port);
     132        sout | getpid() | ": Listening on port" | options.socket.port;
    67133        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    68134        if(server_fd < 0) {
     
    84150                        if(errno == EADDRINUSE) {
    85151                                if(waited == 0) {
    86                                         printf("Waiting for port\n");
     152                                        sout | "Waiting for port";
    87153                                } else {
    88                                         printf("\r%d", waited);
    89                                         fflush(stdout);
     154                                        sout | "\r" | waited | nonl;
     155                                        flush( sout );
    90156                                }
    91157                                waited ++;
     
    106172        // Run Server Cluster
    107173        {
    108                 cluster cl = { "Server Cluster", options.clopts.params };
    109                 #if !defined(__CFA_NO_STATISTICS__)
    110                         print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );
    111                 #endif
    112                 options.clopts.instance = &cl;
    113 
    114 
    115174                int pipe_cnt = options.clopts.nworkers * 2;
    116175                int pipe_off;
     
    122181                }
    123182
    124                 if(options.file_cache.fixed_fds) {
    125                         register_fixed_files(cl, fds, pipe_off);
    126                 }
     183                // if(options.file_cache.path && options.file_cache.fixed_fds) {
     184                //      register_fixed_files(cl, fds, pipe_off);
     185                // }
    127186
    128187                {
    129                         ServerProc procs[options.clopts.nprocs];
     188                        ServerCluster cl[options.clopts.nclusters];
    130189
    131190                        init_protocol();
     
    148207                                        unpark( workers[i] );
    149208                                }
    150                                 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs);
     209                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     210                                for(i; options.clopts.nclusters) {
     211                                        sout | options.clopts.thrd_cnt[i] | nonl;
     212                                }
     213                                sout | nl;
    151214                                {
    152215                                        char buffer[128];
    153                                         while(!feof(stdin)) {
    154                                                 fgets(buffer, 128, stdin);
     216                                        for() {
     217                                                int ret = cfa_read(0, buffer, 128, 0);
     218                                                if(ret == 0) break;
     219                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
     220                                                sout | "User wrote '" | "" | nonl;
     221                                                write(sout, buffer, ret - 1);
     222                                                sout | "'";
    155223                                        }
    156224
    157                                         printf("Shutting Down\n");
    158                                 }
    159 
     225                                        sout | "Shutdown received";
     226                                }
     227
     228                                sout | "Notifying connections..." | nonl; flush( sout );
    160229                                for(i; options.clopts.nworkers) {
    161                                         printf("Cancelling %p\n", (void*)workers[i].cancel.target);
    162230                                        workers[i].done = true;
    163                                         cancel(workers[i].cancel);
    164                                 }
    165 
    166                                 printf("Shutting down socket\n");
     231                                }
     232                                sout | "done";
     233
     234                                sout | "Shutting down socket..." | nonl; flush( sout );
    167235                                int ret = shutdown( server_fd, SHUT_RD );
    168                                 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
     236                                if( ret < 0 ) {
     237                                        abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
     238                                }
     239                                sout | "done";
    169240
    170241                                //===================
    171242                                // Close Socket
    172                                 printf("Closing Socket\n");
     243                                sout | "Closing Socket..." | nonl; flush( sout );
    173244                                ret = close( server_fd );
    174245                                if(ret < 0) {
    175246                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    176247                                }
    177                         }
    178                         printf("Workers Closed\n");
    179 
     248                                sout | "done";
     249
     250                                sout | "Stopping connection threads..." | nonl; flush( sout );
     251                        }
     252                        sout | "done";
     253
     254                        sout | "Stopping protocol threads..." | nonl; flush( sout );
    180255                        deinit_protocol();
    181                 }
    182 
     256                        sout | "done";
     257
     258                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
     259                }
     260                sout | "done";
     261
     262                sout | "Closing splice fds..." | nonl; flush( sout );
    183263                for(i; pipe_cnt) {
    184264                        ret = close( fds[pipe_off + i] );
     
    188268                }
    189269                free(fds);
    190 
    191         }
     270                sout | "done";
     271
     272                sout | "Stopping processors..." | nonl; flush( sout );
     273        }
     274        sout | "done";
    192275
    193276        //===================
    194277        // Close Files
    195         printf("Closing Files\n");
    196         close_cache();
    197 }
     278        if( options.file_cache.path ) {
     279                sout | "Closing open files..." | nonl; flush( sout );
     280                close_cache();
     281                sout | "done";
     282        }
     283}
Note: See TracChangeset for help on using the changeset viewer.