Changeset 137974a


Ignore:
Timestamp:
Jun 9, 2022, 10:56:34 AM (4 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
master, pthread-emulation, qualifiedEnum
Children:
430ce61
Parents:
8c58e73
Message:

Moved stats printer to it's own file and now using push-stats rather than pull

Location:
benchmark/io/http
Files:
2 added
5 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/Makefile.am

    r8c58e73 r137974a  
    3737        options.cfa \
    3838        options.hfa \
     39        printer.cfa \
     40        printer.hfa \
    3941        protocol.cfa \
    4042        protocol.hfa \
  • benchmark/io/http/main.cfa

    r8c58e73 r137974a  
    2525#include "options.hfa"
    2626#include "socket.hfa"
     27#include "printer.hfa"
    2728#include "worker.hfa"
    2829
     
    3132Duration default_preemption() {
    3233        return 0;
    33 }
    34 
    35 //=============================================================================================
    36 // Stats Printer
    37 //============================================================================================='
    38 
    39 thread StatsPrinter {
    40         connection ** conns;
    41         volatile int conn_cnt;
    42         condition_variable(fast_block_lock) var;
    43 };
    44 
    45 void ?{}( StatsPrinter & this, cluster & cl ) {
    46         ((thread&)this){ "Stats Printer Thread", cl };
    47         this.conn_cnt = 0;
    48 }
    49 
    50 void ^?{}( StatsPrinter & mutex this ) {}
    51 
    52 #define eng3(X) (ws(3, 3, unit(eng( X ))))
    53 
    54 void main(StatsPrinter & this) {
    55         LOOP: for() {
    56                 waitfor( ^?{} : this) {
    57                         break LOOP;
    58                 }
    59                 or else {}
    60 
    61                 wait(this.var, 10`s);
    62 
    63                 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    64                 if(this.conn_cnt != 0) {
    65                         uint64_t tries = 0;
    66                         uint64_t calls = 0;
    67                         uint64_t header = 0;
    68                         uint64_t splcin = 0;
    69                         uint64_t splcot = 0;
    70                         struct {
    71                                 volatile uint64_t calls;
    72                                 volatile uint64_t bytes;
    73                         } avgrd[zipf_cnts];
    74                         memset(avgrd, 0, sizeof(avgrd));
    75 
    76                         for(i; this.conn_cnt) {
    77                                 tries += this.conns[i]->stats.sendfile.tries;
    78                                 calls += this.conns[i]->stats.sendfile.calls;
    79                                 header += this.conns[i]->stats.sendfile.header;
    80                                 splcin += this.conns[i]->stats.sendfile.splcin;
    81                                 splcot += this.conns[i]->stats.sendfile.splcot;
    82                                 for(j; zipf_cnts) {
    83                                         avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls;
    84                                         avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes;
    85                                 }
    86                         }
    87 
    88                         double ratio = ((double)tries) / calls;
    89 
    90                         sout | "----- Connection Stats -----";
    91                         sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    92                         sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
    93                         sout | " - zipf sizes:";
    94                         for(i; zipf_cnts) {
    95                                 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
    96                                 sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
    97                         }
    98                 }
    99                 else {
    100                         sout | "No Connections!";
    101                 }
    102         }
    10334}
    10435
     
    10940        cluster self;
    11041        processor    * procs;
    111         // io_context   * ctxs;
    112         StatsPrinter * prnt;
    11342
    11443};
     
    15281        }
    15382
    154         if(options.stats) {
    155                 this.prnt = alloc();
    156                 (*this.prnt){ this.self };
    157         } else {
    158                 this.prnt = 0p;
    159         }
    160 
    16183        #if !defined(__CFA_NO_STATISTICS__)
    16284                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
     
    16789
    16890void ^?{}( ServerCluster & this ) {
    169         delete(this.prnt);
    170 
    17191        for(i; options.clopts.nprocs) {
    17292                ^(this.procs[i]){};
     
    268188                        Q * queues = 0p;
    269189                        ServerCluster cl;
     190
     191                        if(options.stats) {
     192                                stats_thrd = alloc();
     193                                (*stats_thrd){ cl.self };
     194                        } else {
     195                                stats_thrd = 0p;
     196                        }
    270197
    271198                        init_protocol();
     
    319246                                        }
    320247                                }
    321                                 cl.prnt->conns = conns;
    322                                 cl.prnt->conn_cnt = options.clopts.nworkers;
     248
    323249                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
    324250                                sout | nl;
     
    436362
    437363                        sout | "Stopping printer threads..." | nonl; flush( sout );
    438                         StatsPrinter * p = cl.prnt;
    439                         if(p) {
    440                                 notify_one(p->var);
    441                                 join(*p);
    442                         }
     364                        if(stats_thrd) {
     365                                notify_one(stats_thrd->var);
     366                        }
     367                        delete(stats_thrd);
    443368                        sout | "done";
    444369
  • benchmark/io/http/protocol.cfa

    r8c58e73 r137974a  
    587587
    588588void ?{}( DateFormater & this ) {
    589         ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
     589        ((thread&)this){ "Server Date Thread", *options.clopts.instance };
    590590        this.idx = 0;
    591591        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
  • benchmark/io/http/worker.cfa

    r8c58e73 r137974a  
    1414#include "filecache.hfa"
    1515
    16 void ?{}( sendfile_stats_t & this ) {
    17         this.calls = 0;
    18         this.tries = 0;
    19         this.header = 0;
    20         this.splcin = 0;
    21         this.splcot = 0;
    22         for(i; zipf_cnts) {
    23                 this.avgrd[i].calls = 0;
    24                 this.avgrd[i].bytes = 0;
    25         }
    26 }
    27 
    2816//=============================================================================================
    2917// Generic connection handling
    3018//=============================================================================================
    31 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) {
     19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
    3220        REQUEST:
    3321        for() {
     
    11199                if( options.log ) sout | "=== Answer sent ===";
    112100        }
     101
     102        if (stats_thrd) {
     103                unsigned long long next = rdtscl();
     104                if(next > (last + 500000000)) {
     105                        if(try_lock(stats_thrd->stats.lock)) {
     106                                push(this.stats.sendfile, stats_thrd->stats.send);
     107                                unlock(stats_thrd->stats.lock);
     108                                last = next;
     109                        }
     110                }
     111        }
    113112}
    114113
     
    124123void main( AcceptWorker & this ) {
    125124        park();
     125        unsigned long long last = rdtscl();
    126126        /* paranoid */ assert( this.conn.pipe[0] != -1 );
    127127        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     
    139139                size_t len = options.socket.buflen;
    140140                char buffer[len];
    141                 handle_connection( this.conn, fd, buffer, len, 0p );
     141                handle_connection( this.conn, fd, buffer, len, 0p, last );
    142142
    143143                if( options.log ) sout | "=== Connection closed ===";
     
    157157void main( ChannelWorker & this ) {
    158158        park();
     159        unsigned long long last = rdtscl();
    159160        /* paranoid */ assert( this.conn.pipe[0] != -1 );
    160161        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     
    168169
    169170                if( options.log ) sout | "=== Waiting new connection ===";
    170                 handle_connection( this.conn, p.out.fd, buffer, len, &p.f );
     171                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
    171172
    172173                if( options.log ) sout | "=== Connection closed ===";
     
    187188void main( Acceptor & this ) {
    188189        park();
     190        unsigned long long last = rdtscl();
    189191        if( options.log ) sout | "=== Accepting connection ===";
    190192        for() {
     
    192194                if(fd < 0) {
    193195                        if( errno == EWOULDBLOCK) {
     196                                this.stats.eagains++;
    194197                                yield();
    195198                                continue;
     
    199202                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    200203                }
     204                this.stats.accepts++;
     205
    201206                if(this.done) return;
    202207
     
    210215                                if(p) break;
    211216                                yield();
     217                                this.stats.creates++;
    212218                        };
    213219
     
    216222                }
    217223
     224                if (stats_thrd) {
     225                        unsigned long long next = rdtscl();
     226                        if(next > (last + 500000000)) {
     227                                if(try_lock(stats_thrd->stats.lock)) {
     228                                        push(this.stats, stats_thrd->stats.accpt);
     229                                        unlock(stats_thrd->stats.lock);
     230                                        last = next;
     231                                }
     232                        }
     233                }
     234
    218235                if( options.log ) sout | "=== Accepting connection ===";
    219236        }
  • benchmark/io/http/worker.hfa

    r8c58e73 r137974a  
    99}
    1010
     11#include "printer.hfa"
     12
    1113//=============================================================================================
    1214// Worker Thread
    1315//=============================================================================================
    14 
    15 extern const size_t zipf_sizes[];
    16 enum { zipf_cnts = 36, };
    17 
    18 struct sendfile_stats_t {
    19         volatile uint64_t calls;
    20         volatile uint64_t tries;
    21         volatile uint64_t header;
    22         volatile uint64_t splcin;
    23         volatile uint64_t splcot;
    24         struct {
    25                 volatile uint64_t calls;
    26                 volatile uint64_t bytes;
    27         } avgrd[zipf_cnts];
    28 };
    29 
    30 void ?{}( sendfile_stats_t & this );
    3116
    3217struct connection {
     
    8570        int flags;
    8671        volatile bool done;
     72        acceptor_stats_t stats;
    8773};
    8874void ?{}( Acceptor & );
Note: See TracChangeset for help on using the changeset viewer.