Ignore:
Timestamp:
Jun 9, 2022, 2:26:43 PM (22 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
c06551b
Parents:
db7a3ad (diff), 430ce61 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    rdb7a3ad raeb20a4  
    2525#include "options.hfa"
    2626#include "socket.hfa"
     27#include "printer.hfa"
    2728#include "worker.hfa"
    2829
     
    3132Duration default_preemption() {
    3233        return 0;
    33 }
    34 
    35 //=============================================================================================
    36 // Stats Printer
    37 //============================================================================================='
    38 
    39 thread StatsPrinter {
    40         Worker * workers;
    41         int worker_cnt;
    42         condition_variable(fast_block_lock) var;
    43 };
    44 
    45 void ?{}( StatsPrinter & this, cluster & cl ) {
    46         ((thread&)this){ "Stats Printer Thread", cl };
    47         this.worker_cnt = 0;
    48 }
    49 
    50 void ^?{}( StatsPrinter & mutex this ) {}
    51 
    52 #define eng3(X) (ws(3, 3, unit(eng( X ))))
    53 
    54 void main(StatsPrinter & this) {
    55         LOOP: for() {
    56                 waitfor( ^?{} : this) {
    57                         break LOOP;
    58                 }
    59                 or else {}
    60 
    61                 wait(this.var, 10`s);
    62 
    63                 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    64                 if(this.worker_cnt != 0) {
    65                         uint64_t tries = 0;
    66                         uint64_t calls = 0;
    67                         uint64_t header = 0;
    68                         uint64_t splcin = 0;
    69                         uint64_t splcot = 0;
    70                         struct {
    71                                 volatile uint64_t calls;
    72                                 volatile uint64_t bytes;
    73                         } avgrd[zipf_cnts];
    74                         memset(avgrd, 0, sizeof(avgrd));
    75 
    76                         for(i; this.worker_cnt) {
    77                                 tries += this.workers[i].stats.sendfile.tries;
    78                                 calls += this.workers[i].stats.sendfile.calls;
    79                                 header += this.workers[i].stats.sendfile.header;
    80                                 splcin += this.workers[i].stats.sendfile.splcin;
    81                                 splcot += this.workers[i].stats.sendfile.splcot;
    82                                 for(j; zipf_cnts) {
    83                                         avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
    84                                         avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
    85                                 }
    86                         }
    87 
    88                         double ratio = ((double)tries) / calls;
    89 
    90                         sout | "----- Worker Stats -----";
    91                         sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    92                         sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
    93                         sout | " - zipf sizes:";
    94                         for(i; zipf_cnts) {
    95                                 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
    96                                 sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
    97                         }
    98                 }
    99                 else {
    100                         sout | "No Workers!";
    101                 }
    102         }
    10334}
    10435
     
    10940        cluster self;
    11041        processor    * procs;
    111         // io_context   * ctxs;
    112         StatsPrinter * prnt;
    11342
    11443};
     
    15281        }
    15382
    154         if(options.stats) {
    155                 this.prnt = alloc();
    156                 (*this.prnt){ this.self };
    157         } else {
    158                 this.prnt = 0p;
    159         }
    160 
    16183        #if !defined(__CFA_NO_STATISTICS__)
    16284                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
    16385        #endif
    16486
    165         options.clopts.instance[options.clopts.cltr_cnt] = &this.self;
    166         options.clopts.cltr_cnt++;
     87        options.clopts.instance = &this.self;
    16788}
    16889
    16990void ^?{}( ServerCluster & this ) {
    170         delete(this.prnt);
    171 
    17291        for(i; options.clopts.nprocs) {
    17392                ^(this.procs[i]){};
     
    18099extern void init_protocol(void);
    181100extern void deinit_protocol(void);
     101
     102//=============================================================================================
     103// REUSEPORT
     104//=============================================================================================
     105
     106size_t sockarr_size;
     107struct __attribute__((aligned(128))) Q {
     108        mpsc_queue(PendingRead) q;
     109};
    182110
    183111//=============================================================================================
     
    235163
    236164        int server_fd;
    237         if(!options.socket.manyreuse) {
    238                 server_fd = listener(address, addrlen);
    239         }
    240165
    241166        //===================
     
    257182                {
    258183                        // Stats printer makes a copy so this needs to persist longer than normal
    259                         Worker * workers;
    260                         ServerCluster cl[options.clopts.nclusters];
     184                        connection ** conns;
     185                        AcceptWorker  * aworkers = 0p;
     186                        ChannelWorker * cworkers = 0p;
     187                        Acceptor * acceptors = 0p;
     188                        Q * queues = 0p;
     189                        ServerCluster cl;
     190
     191                        if(options.stats) {
     192                                stats_thrd = alloc();
     193                                (*stats_thrd){ cl.self };
     194                        } else {
     195                                stats_thrd = 0p;
     196                        }
    261197
    262198                        init_protocol();
    263199                        {
    264                                 workers = anew(options.clopts.nworkers);
    265                                 cl[0].prnt->workers = workers;
    266                                 cl[0].prnt->worker_cnt = options.clopts.nworkers;
    267                                 for(i; options.clopts.nworkers) {
    268                                         // if( options.file_cache.fixed_fds ) {
    269                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    270                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    271                                         // }
    272                                         // else
    273                                         {
    274                                                 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    275                                                 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    276                                                 workers[i].sockfd  = options.socket.manyreuse ?  listener(address, addrlen) : server_fd;
    277                                                 workers[i].addr    = (struct sockaddr *)&address;
    278                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    279                                                 workers[i].flags   = 0;
    280                                         }
    281                                         unpark( workers[i] );
    282                                 }
    283                                 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
    284                                 for(i; options.clopts.nclusters) {
    285                                         sout | options.clopts.thrd_cnt[i] | nonl;
    286                                 }
     200                                conns = alloc(options.clopts.nworkers);
     201                                if(options.socket.reuseport) {
     202                                        queues = alloc(options.clopts.nprocs);
     203                                        acceptors = anew(options.clopts.nprocs);
     204                                        for(i; options.clopts.nprocs) {
     205                                                (queues[i]){};
     206                                                {
     207                                                        acceptors[i].sockfd  = listener(address, addrlen);
     208                                                        acceptors[i].addr    = (struct sockaddr *)&address;
     209                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
     210                                                        acceptors[i].flags   = 0;
     211                                                        acceptors[i].queue   = &queues[i].q;
     212                                                }
     213                                                unpark( acceptors[i] );
     214                                        }
     215
     216                                        cworkers = anew(options.clopts.nworkers);
     217                                        for(i; options.clopts.nworkers) {
     218                                                {
     219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     221                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
     222                                                        conns[i] = &cworkers[i].conn;
     223                                                }
     224                                                unpark( cworkers[i] );
     225                                        }
     226                                }
     227                                else {
     228                                        server_fd = listener(address, addrlen);
     229                                        aworkers = anew(options.clopts.nworkers);
     230                                        for(i; options.clopts.nworkers) {
     231                                                // if( options.file_cache.fixed_fds ) {
     232                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     233                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     234                                                // }
     235                                                // else
     236                                                {
     237                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     238                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     239                                                        aworkers[i].sockfd = server_fd;
     240                                                        aworkers[i].addr    = (struct sockaddr *)&address;
     241                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
     242                                                        aworkers[i].flags   = 0;
     243                                                        conns[i] = &aworkers[i].conn;
     244                                                }
     245                                                unpark( aworkers[i] );
     246                                        }
     247                                }
     248
     249                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors";
    287250                                sout | nl;
    288251                                {
     
    307270                                }
    308271
    309                                 sout | "Notifying connections..." | nonl; flush( sout );
    310                                 for(i; options.clopts.nworkers) {
    311                                         workers[i].done = true;
    312                                 }
    313                                 sout | "done";
    314 
    315                                 sout | "Shutting down socket..." | nonl; flush( sout );
    316                                 if(options.socket.manyreuse) {
    317                                         for(i; options.clopts.nworkers) {
    318                                                 ret = shutdown( workers[i].sockfd, SHUT_RD );
    319                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
     272                                //===================
     273                                // Close Socket and join
     274                                if(options.socket.reuseport) {
     275                                        sout | "Notifying connections..." | nonl; flush( sout );
     276                                        for(i; options.clopts.nprocs) {
     277                                                acceptors[i].done = true;
     278                                        }
     279                                        for(i; options.clopts.nworkers) {
     280                                                cworkers[i].done = true;
     281                                        }
     282                                        sout | "done";
     283
     284                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     285                                        for(i; options.clopts.nprocs) {
     286                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
     287                                                if( ret < 0 ) {
     288                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
     289                                                }
     290                                        }
     291                                        sout | "done";
     292
     293                                        sout | "Closing Socket..." | nonl; flush( sout );
     294                                        for(i; options.clopts.nprocs) {
     295                                                ret = close( acceptors[i].sockfd );
     296                                                if( ret < 0) {
     297                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     298                                                }
     299                                        }
     300                                        sout | "done";
     301
     302                                        sout | "Stopping accept threads..." | nonl; flush( sout );
     303                                        for(i; options.clopts.nprocs) {
     304                                                join(acceptors[i]);
     305                                        }
     306                                        sout | "done";
     307
     308                                        sout | "Draining worker queues..." | nonl; flush( sout );
     309                                        for(i; options.clopts.nprocs) {
     310                                                PendingRead * p = 0p;
     311                                                while(p = pop(queues[i].q)) {
     312                                                        fulfil(p->f, -ECONNRESET);
     313                                                }
     314                                        }
     315                                        sout | "done";
     316
     317                                        sout | "Stopping worker threads..." | nonl; flush( sout );
     318                                        for(i; options.clopts.nworkers) {
     319                                                for(j; 2) {
     320                                                        ret = close(cworkers[i].conn.pipe[j]);
     321                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     322                                                }
     323                                                join(cworkers[i]);
    320324                                        }
    321325                                }
    322326                                else {
     327                                        sout | "Notifying connections..." | nonl; flush( sout );
     328                                        for(i; options.clopts.nworkers) {
     329                                                aworkers[i].done = true;
     330                                        }
     331                                        sout | "done";
     332
     333                                        sout | "Shutting down Socket..." | nonl; flush( sout );
    323334                                        ret = shutdown( server_fd, SHUT_RD );
    324335                                        if( ret < 0 ) {
    325                                                 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
    326                                         }
    327                                 }
    328                                 sout | "done";
    329 
    330                                 //===================
    331                                 // Close Socket
    332                                 sout | "Closing Socket..." | nonl; flush( sout );
    333                                 if(options.socket.manyreuse) {
    334                                         for(i; options.clopts.nworkers) {
    335                                                 ret = close(workers[i].sockfd);
    336                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
    337                                         }
    338                                 }
    339                                 else {
     336                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
     337                                        }
     338                                        sout | "done";
     339
     340                                        sout | "Closing Socket..." | nonl; flush( sout );
    340341                                        ret = close( server_fd );
    341342                                        if(ret < 0) {
    342343                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    343344                                        }
    344                                 }
    345                                 sout | "done";
    346 
    347                                 sout | "Stopping connection threads..." | nonl; flush( sout );
    348                                 for(i; options.clopts.nworkers) {
    349                                         for(j; 2) {
    350                                                 ret = close(workers[i].pipe[j]);
    351                                                 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
    352                                         }
    353                                         join(workers[i]);
     345                                        sout | "done";
     346
     347                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     348                                        for(i; options.clopts.nworkers) {
     349                                                for(j; 2) {
     350                                                        ret = close(aworkers[i].conn.pipe[j]);
     351                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     352                                                }
     353                                                join(aworkers[i]);
     354                                        }
    354355                                }
    355356                        }
     
    361362
    362363                        sout | "Stopping printer threads..." | nonl; flush( sout );
    363                         for(i; options.clopts.nclusters) {
    364                                 StatsPrinter * p = cl[i].prnt;
    365                                 if(p) {
    366                                         notify_one(p->var);
    367                                         join(*p);
    368                                 }
    369                         }
     364                        if(stats_thrd) {
     365                                notify_one(stats_thrd->var);
     366                        }
     367                        delete(stats_thrd);
    370368                        sout | "done";
    371369
    372370                        // Now that the stats printer is stopped, we can reclaim this
    373                         adelete(workers);
     371                        adelete(aworkers);
     372                        adelete(cworkers);
     373                        adelete(acceptors);
     374                        adelete(queues);
     375                        free(conns);
    374376
    375377                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
     
    377379                sout | "done";
    378380
    379                 // sout | "Closing splice fds..." | nonl; flush( sout );
    380                 // for(i; pipe_cnt) {
    381                 //      ret = close( fds[pipe_off + i] );
    382                 //      if(ret < 0) {
    383                 //              abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    384                 //      }
    385                 // }
    386381                free(fds);
    387                 sout | "done";
    388382
    389383                sout | "Stopping processors..." | nonl; flush( sout );
Note: See TracChangeset for help on using the changeset viewer.