Ignore:
Timestamp:
Jun 10, 2022, 5:02:03 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
8419b76
Parents:
bf7c7ea
Message:

Re-instated the isolate/multi-cluster option.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    rbf7c7ea r329e26a  
    3737// Globals
    3838//=============================================================================================
    39 struct ServerCluster {
    40         cluster self;
    41         processor    * procs;
    42 
    43 };
    44 
    4539void ?{}( ServerCluster & this ) {
    4640        (this.self){ "Server Cluster", options.clopts.params };
     
    5650                (this.procs[i]){ "Benchmark Processor", this.self };
    5751
    58                 int c = 0;
    59                 int n = 1 + (i % cnt);
    60                 for(int j = 0; j < CPU_SETSIZE; j++) {
    61                         if(CPU_ISSET(j, &fullset)) n--;
    62                         if(n == 0) {
    63                                 c = j;
    64                                 break;
    65                         }
    66                 }
    67                 cpu_set_t localset;
    68                 CPU_ZERO(&localset);
    69                 CPU_SET(c, &localset);
    70                 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
    71                 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
     52                // int c = 0;
     53                // int n = 1 + (i % cnt);
     54                // for(int j = 0; j < CPU_SETSIZE; j++) {
     55                //      if(CPU_ISSET(j, &fullset)) n--;
     56                //      if(n == 0) {
     57                //              c = j;
     58                //              break;
     59                //      }
     60                // }
     61                // cpu_set_t localset;
     62                // CPU_ZERO(&localset);
     63                // CPU_SET(c, &localset);
     64                // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
     65                // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
    7266
    7367                #if !defined(__CFA_NO_STATISTICS__)
     
    8579        #endif
    8680
    87         options.clopts.instance = &this.self;
     81        options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
     82        options.clopts.cltr_cnt++;
    8883}
    8984
     
    187182                        Acceptor * acceptors = 0p;
    188183                        Q * queues = 0p;
    189                         ServerCluster cl;
     184                        ServerCluster cl[options.clopts.nclusters];
    190185
    191186                        if(options.stats) {
    192187                                stats_thrd = alloc();
    193                                 (*stats_thrd){ cl.self };
     188                                (*stats_thrd){ cl };
    194189                        } else {
    195190                                stats_thrd = 0p;
     
    198193                        init_protocol();
    199194                        {
     195                                int nacceptors = options.clopts.nprocs * options.clopts.nclusters;
    200196                                conns = alloc(options.clopts.nworkers);
    201197                                if(options.socket.reuseport) {
    202                                         queues = alloc(options.clopts.nprocs);
    203                                         acceptors = anew(options.clopts.nprocs);
    204                                         for(i; options.clopts.nprocs) {
     198                                        queues = alloc(nacceptors);
     199                                        acceptors = alloc(nacceptors);
     200                                        sout | "Creating" | nacceptors | "Acceptors";
     201                                        for(i; nacceptors) {
     202                                                (acceptors[i]){ i % options.clopts.nclusters };
     203                                        }
     204                                        for(i; nacceptors) {
    205205                                                (queues[i]){};
    206206                                                {
     
    219219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
    220220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
    221                                                         cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
     221                                                        cworkers[i].queue = &queues[i % nacceptors].q;
    222222                                                        conns[i] = &cworkers[i].conn;
    223223                                                }
     
    246246                                        }
    247247                                }
    248 
    249                                 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
     248                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     249                                for(i; options.clopts.nclusters) {
     250                                        sout | options.clopts.thrd_cnt[i] | nonl;
     251                                }
    250252                                sout | nl;
    251253                                {
     
    274276                                if(options.socket.reuseport) {
    275277                                        sout | "Notifying connections..." | nonl; flush( sout );
    276                                         for(i; options.clopts.nprocs) {
     278                                        for(i; nacceptors) {
    277279                                                acceptors[i].done = true;
    278280                                        }
     
    283285
    284286                                        sout | "Shutting down Socket..." | nonl; flush( sout );
    285                                         for(i; options.clopts.nprocs) {
     287                                        for(i; nacceptors) {
    286288                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
    287289                                                if( ret < 0 ) {
     
    292294
    293295                                        sout | "Closing Socket..." | nonl; flush( sout );
    294                                         for(i; options.clopts.nprocs) {
     296                                        for(i; nacceptors) {
    295297                                                ret = close( acceptors[i].sockfd );
    296298                                                if( ret < 0) {
     
    301303
    302304                                        sout | "Stopping accept threads..." | nonl; flush( sout );
    303                                         for(i; options.clopts.nprocs) {
     305                                        for(i; nacceptors) {
    304306                                                join(acceptors[i]);
    305307                                        }
     
    307309
    308310                                        sout | "Draining worker queues..." | nonl; flush( sout );
    309                                         for(i; options.clopts.nprocs) {
     311                                        for(i; nacceptors) {
    310312                                                PendingRead * p = 0p;
    311313                                                while(p = pop(queues[i].q)) {
Note: See TracChangeset for help on using the changeset viewer.