Changeset 329e26a


Ignore:
Timestamp:
Jun 10, 2022, 5:02:03 PM (4 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
master, pthread-emulation, qualifiedEnum
Children:
8419b76
Parents:
bf7c7ea
Message:

Re-instated the isolate/multi-cluster option.

Location:
benchmark/io/http
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    rbf7c7ea r329e26a  
    3737// Globals
    3838//=============================================================================================
    39 struct ServerCluster {
    40         cluster self;
    41         processor    * procs;
    42 
    43 };
    44 
    4539void ?{}( ServerCluster & this ) {
    4640        (this.self){ "Server Cluster", options.clopts.params };
     
    5650                (this.procs[i]){ "Benchmark Processor", this.self };
    5751
    58                 int c = 0;
    59                 int n = 1 + (i % cnt);
    60                 for(int j = 0; j < CPU_SETSIZE; j++) {
    61                         if(CPU_ISSET(j, &fullset)) n--;
    62                         if(n == 0) {
    63                                 c = j;
    64                                 break;
    65                         }
    66                 }
    67                 cpu_set_t localset;
    68                 CPU_ZERO(&localset);
    69                 CPU_SET(c, &localset);
    70                 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
    71                 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
     52                // int c = 0;
     53                // int n = 1 + (i % cnt);
     54                // for(int j = 0; j < CPU_SETSIZE; j++) {
     55                //      if(CPU_ISSET(j, &fullset)) n--;
     56                //      if(n == 0) {
     57                //              c = j;
     58                //              break;
     59                //      }
     60                // }
     61                // cpu_set_t localset;
     62                // CPU_ZERO(&localset);
     63                // CPU_SET(c, &localset);
     64                // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
     65                // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
    7266
    7367                #if !defined(__CFA_NO_STATISTICS__)
     
    8579        #endif
    8680
    87         options.clopts.instance = &this.self;
     81        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
     82        options.clopts.cltr_cnt++;
    8883}
    8984
     
    187182                        Acceptor * acceptors = 0p;
    188183                        Q * queues = 0p;
    189                         ServerCluster cl;
     184                        ServerCluster cl[options.clopts.nclusters];
    190185
    191186                        if(options.stats) {
    192187                                stats_thrd = alloc();
    193                                 (*stats_thrd){ cl.self };
     188                                (*stats_thrd){ cl };
    194189                        } else {
    195190                                stats_thrd = 0p;
     
    198193                        init_protocol();
    199194                        {
     195                                int nacceptors = options.clopts.nprocs * options.clopts.nclusters;
    200196                                conns = alloc(options.clopts.nworkers);
    201197                                if(options.socket.reuseport) {
    202                                         queues = alloc(options.clopts.nprocs);
    203                                         acceptors = anew(options.clopts.nprocs);
    204                                         for(i; options.clopts.nprocs) {
     198                                        queues = alloc(nacceptors);
     199                                        acceptors = alloc(nacceptors);
     200                                        sout | "Creating" | nacceptors | "Acceptors";
     201                                        for(i; nacceptors) {
     202                                                (acceptors[i]){ i % options.clopts.nclusters };
     203                                        }
     204                                        for(i; nacceptors) {
    205205                                                (queues[i]){};
    206206                                                {
     
    219219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
    220220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
    221                                                         cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
     221                                                        cworkers[i].queue = &queues[i % nacceptors].q;
    222222                                                        conns[i] = &cworkers[i].conn;
    223223                                                }
     
    246246                                        }
    247247                                }
    248 
    249                                 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
     248                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     249                                for(i; options.clopts.nclusters) {
     250                                        sout | options.clopts.thrd_cnt[i] | nonl;
     251                                }
    250252                                sout | nl;
    251253                                {
     
    274276                                if(options.socket.reuseport) {
    275277                                        sout | "Notifying connections..." | nonl; flush( sout );
    276                                         for(i; options.clopts.nprocs) {
     278                                        for(i; nacceptors) {
    277279                                                acceptors[i].done = true;
    278280                                        }
     
    283285
    284286                                        sout | "Shutting down Socket..." | nonl; flush( sout );
    285                                         for(i; options.clopts.nprocs) {
     287                                        for(i; nacceptors) {
    286288                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
    287289                                                if( ret < 0 ) {
     
    292294
    293295                                        sout | "Closing Socket..." | nonl; flush( sout );
    294                                         for(i; options.clopts.nprocs) {
     296                                        for(i; nacceptors) {
    295297                                                ret = close( acceptors[i].sockfd );
    296298                                                if( ret < 0) {
     
    301303
    302304                                        sout | "Stopping accept threads..." | nonl; flush( sout );
    303                                         for(i; options.clopts.nprocs) {
     305                                        for(i; nacceptors) {
    304306                                                join(acceptors[i]);
    305307                                        }
     
    307309
    308310                                        sout | "Draining worker queues..." | nonl; flush( sout );
    309                                         for(i; options.clopts.nprocs) {
     311                                        for(i; nacceptors) {
    310312                                                PendingRead * p = 0p;
    311313                                                while(p = pop(queues[i].q)) {
  • benchmark/io/http/options.cfa

    rbf7c7ea r329e26a  
    4242
    4343        { // cluster
     44                1,     // nclusters;
    4445                1,     // nprocs;
    4546                1,     // nworkers;
     
    5354void parse_options( int argc, char * argv[] ) {
    5455        unsigned nentries = 0;
     56        bool isolate = false;
     57
     58
    5559        static cfa_option opt[] = {
    5660                { 'p', "port",           "Port the server will listen on", options.socket.port},
    5761                { 'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
    5862                { 't', "threads",        "Number of worker threads to use", options.clopts.nworkers},
     63                {'\0', "isolate",        "Create one cluster per processor", isolate, parse_settrue},
    5964                {'\0', "log",            "Enable logs", options.log, parse_settrue},
    6065                {'\0', "sout",           "Redirect standard out to file", options.reopen_stdout},
     
    9196                nentries = v;
    9297        }
     98        if(isolate) {
     99                options.clopts.nclusters = options.clopts.nprocs;
     100                options.clopts.nprocs = 1;
     101        }
    93102        options.clopts.params.num_entries = nentries;
    94         options.clopts.instance = 0p;
    95         options.clopts.thrd_cnt = 0;
     103        options.clopts.instance = alloc(options.clopts.nclusters);
     104        options.clopts.thrd_cnt = alloc(options.clopts.nclusters);
     105        options.clopts.cltr_cnt = 0;
     106        for(i; options.clopts.nclusters) {
     107                options.clopts.thrd_cnt[i] = 0;
     108        }
    96109
    97110
  • benchmark/io/http/options.hfa

    rbf7c7ea r329e26a  
    3131
    3232        struct {
     33                int nclusters;
    3334                int nprocs;
    3435                int nworkers;
     
    3637                bool procstats;
    3738                bool viewhalts;
    38                 cluster * instance;
    39                 size_t    thrd_cnt;
     39                cluster ** instance;
     40                size_t   * thrd_cnt;
     41                size_t     cltr_cnt;
    4042        } clopts;
    4143};
  • benchmark/io/http/printer.cfa

    rbf7c7ea r329e26a  
    11#include "printer.hfa"
     2#include "options.hfa"
    23
    34#include <fstream.hfa>
     
    4041}
    4142
    42 void ?{}( StatsPrinter & this, cluster & cl ) {
     43void ?{}( StatsPrinter & this, ServerCluster * cl ) {
    4344        ((thread&)this){ "Stats Printer Thread" };
    44         &this.cl = &cl;
     45        this.cl = cl;
    4546        memset(&this.stats, 0, sizeof(this.stats));;
    4647}
     
    5960                wait(this.var, 10`s);
    6061
    61                 print_stats_now( this.cl, CFA_STATS_READY_Q | CFA_STATS_IO );
     62                for(i; options.clopts.nclusters) print_stats_now( this.cl[i].self, CFA_STATS_READY_Q | CFA_STATS_IO );
    6263                {
    6364                        struct {
  • benchmark/io/http/printer.hfa

    rbf7c7ea r329e26a  
    3535void push(acceptor_stats_t & from, acceptor_stats_t & to);
    3636
     37struct ServerCluster {
     38        cluster self;
     39        processor    * procs;
     40};
     41
    3742thread StatsPrinter {
    3843        struct {
     
    4247        } stats;
    4348        condition_variable(fast_block_lock) var;
    44         cluster & cl;
     49        ServerCluster * cl;
    4550};
    4651
    47 void ?{}( StatsPrinter & this, cluster & cl );
     52void ?{}( StatsPrinter & this, ServerCluster * cl );
    4853void ^?{}( StatsPrinter & mutex this );
    4954
  • benchmark/io/http/worker.cfa

    rbf7c7ea r329e26a  
    116116//=============================================================================================
    117117void ?{}( AcceptWorker & this ) {
    118         ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
    119         options.clopts.thrd_cnt++;
     118        size_t cli = rand() % options.clopts.cltr_cnt;
     119        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     120        options.clopts.thrd_cnt[cli]++;
    120121        this.done = false;
    121122}
     
    150151//=============================================================================================
    151152void ?{}( ChannelWorker & this ) {
    152         ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 };
    153         options.clopts.thrd_cnt++;
     153        size_t cli = rand() % options.clopts.cltr_cnt;
     154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     155        options.clopts.thrd_cnt[cli]++;
    154156        this.done = false;
    155157}
     
    181183}
    182184
    183 void ?{}( Acceptor & this ) {
    184         ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance, 64000 };
    185         options.clopts.thrd_cnt++;
     185void ?{}( Acceptor & this, int cli ) {
     186        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
     187        options.clopts.thrd_cnt[cli]++;
    186188        this.done = false;
    187189}
  • benchmark/io/http/worker.hfa

    rbf7c7ea r329e26a  
    7272        acceptor_stats_t stats;
    7373};
    74 void ?{}( Acceptor & );
     74void ?{}( Acceptor &, int cli );
    7575void main( Acceptor & );
Note: See TracChangeset for help on using the changeset viewer.