Ignore:
Timestamp:
Jun 8, 2022, 7:07:51 PM (2 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
Children:
bbf61838
Parents:
6e2b04e
Message:

First draft at acceptor thread webserver

File:
1 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    r6e2b04e r7f0ac12  
    3838
    3939thread StatsPrinter {
    40         Worker * workers;
    41         int worker_cnt;
     40        connection ** conns;
     41        volatile int conn_cnt;
    4242        condition_variable(fast_block_lock) var;
    4343};
     
    4545void ?{}( StatsPrinter & this, cluster & cl ) {
    4646        ((thread&)this){ "Stats Printer Thread", cl };
    47         this.worker_cnt = 0;
     47        this.conn_cnt = 0;
    4848}
    4949
     
    6262
    6363                print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    64                 if(this.worker_cnt != 0) {
     64                if(this.conn_cnt != 0) {
    6565                        uint64_t tries = 0;
    6666                        uint64_t calls = 0;
     
    7474                        memset(avgrd, 0, sizeof(avgrd));
    7575
    76                         for(i; this.worker_cnt) {
    77                                 tries += this.workers[i].stats.sendfile.tries;
    78                                 calls += this.workers[i].stats.sendfile.calls;
    79                                 header += this.workers[i].stats.sendfile.header;
    80                                 splcin += this.workers[i].stats.sendfile.splcin;
    81                                 splcot += this.workers[i].stats.sendfile.splcot;
     76                        for(i; this.conn_cnt) {
     77                                tries += this.conns[i]->stats.sendfile.tries;
     78                                calls += this.conns[i]->stats.sendfile.calls;
     79                                header += this.conns[i]->stats.sendfile.header;
     80                                splcin += this.conns[i]->stats.sendfile.splcin;
     81                                splcot += this.conns[i]->stats.sendfile.splcot;
    8282                                for(j; zipf_cnts) {
    83                                         avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
    84                                         avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
     83                                        avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls;
     84                                        avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes;
    8585                                }
    8686                        }
     
    8888                        double ratio = ((double)tries) / calls;
    8989
    90                         sout | "----- Worker Stats -----";
     90                        sout | "----- Connection Stats -----";
    9191                        sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    9292                        sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
     
    9898                }
    9999                else {
    100                         sout | "No Workers!";
     100                        sout | "No Connections!";
    101101                }
    102102        }
     
    182182
    183183//=============================================================================================
     184// REUSEPORT
     185//=============================================================================================
     186
     187size_t sockarr_size;
     188struct __attribute__((aligned(128))) Q {
     189        mpsc_queue(PendingRead) q;
     190};
     191
     192//=============================================================================================
    184193// Termination
    185194//=============================================================================================
     
    235244
    236245        int server_fd;
    237         if(!options.socket.manyreuse) {
    238                 server_fd = listener(address, addrlen);
    239         }
    240246
    241247        //===================
     
    257263                {
    258264                        // Stats printer makes a copy so this needs to persist longer than normal
    259                         Worker * workers;
     265                        connection ** conns;
     266                        AcceptWorker  * aworkers = 0p;
     267                        ChannelWorker * cworkers = 0p;
     268                        Acceptor * acceptors = 0p;
     269                        Q * queues = 0p;
    260270                        ServerCluster cl[options.clopts.nclusters];
    261271
    262272                        init_protocol();
    263273                        {
    264                                 workers = anew(options.clopts.nworkers);
    265                                 cl[0].prnt->workers = workers;
    266                                 cl[0].prnt->worker_cnt = options.clopts.nworkers;
    267                                 for(i; options.clopts.nworkers) {
    268                                         // if( options.file_cache.fixed_fds ) {
    269                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    270                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    271                                         // }
    272                                         // else
    273                                         {
    274                                                 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    275                                                 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    276                                                 workers[i].sockfd  = options.socket.manyreuse ?  listener(address, addrlen) : server_fd;
    277                                                 workers[i].addr    = (struct sockaddr *)&address;
    278                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    279                                                 workers[i].flags   = 0;
    280                                         }
    281                                         unpark( workers[i] );
    282                                 }
     274                                conns = alloc(options.clopts.nworkers);
     275                                if(options.socket.reuseport) {
     276                                        queues = alloc(options.clopts.nprocs);
     277                                        acceptors = anew(options.clopts.nprocs);
     278                                        for(i; options.clopts.nprocs) {
     279                                                (queues[i]){};
     280                                                {
     281                                                        acceptors[i].sockfd  = listener(address, addrlen);
     282                                                        acceptors[i].addr    = (struct sockaddr *)&address;
     283                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
     284                                                        acceptors[i].flags   = 0;
     285                                                        acceptors[i].queue   = &queues[i].q;
     286                                                }
     287                                                unpark( acceptors[i] );
     288                                        }
     289
     290                                        cworkers = anew(options.clopts.nworkers);
     291                                        for(i; options.clopts.nworkers) {
     292                                                {
     293                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     294                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     295                                                        cworkers[i].queue = &queues[i % options.clopts.nprocs].q;
     296                                                        conns[i] = &cworkers[i].conn;
     297                                                }
     298                                                unpark( cworkers[i] );
     299                                        }
     300                                }
     301                                else {
     302                                        server_fd = listener(address, addrlen);
     303                                        aworkers = anew(options.clopts.nworkers);
     304                                        for(i; options.clopts.nworkers) {
     305                                                // if( options.file_cache.fixed_fds ) {
     306                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     307                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     308                                                // }
     309                                                // else
     310                                                {
     311                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     312                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     313                                                        aworkers[i].sockfd = server_fd;
     314                                                        aworkers[i].addr    = (struct sockaddr *)&address;
     315                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
     316                                                        aworkers[i].flags   = 0;
     317                                                        conns[i] = &aworkers[i].conn;
     318                                                }
     319                                                unpark( aworkers[i] );
     320                                        }
     321                                }
     322                                cl[0].prnt->conns = conns;
     323                                cl[0].prnt->conn_cnt = options.clopts.nworkers;
    283324                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
    284325                                for(i; options.clopts.nclusters) {
     
    307348                                }
    308349
    309                                 sout | "Notifying connections..." | nonl; flush( sout );
    310                                 for(i; options.clopts.nworkers) {
    311                                         workers[i].done = true;
    312                                 }
    313                                 sout | "done";
    314 
    315                                 sout | "Shutting down socket..." | nonl; flush( sout );
    316                                 if(options.socket.manyreuse) {
    317                                         for(i; options.clopts.nworkers) {
    318                                                 ret = shutdown( workers[i].sockfd, SHUT_RD );
    319                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
     350                                //===================
     351                                // Close Socket and join
     352                                if(options.socket.reuseport) {
     353                                        sout | "Notifying connections..." | nonl; flush( sout );
     354                                        for(i; options.clopts.nprocs) {
     355                                                acceptors[i].done = true;
     356                                        }
     357                                        for(i; options.clopts.nworkers) {
     358                                                cworkers[i].done = true;
     359                                        }
     360                                        sout | "done";
     361
     362                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     363                                        for(i; options.clopts.nprocs) {
     364                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
     365                                                if( ret < 0 ) {
     366                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
     367                                                }
     368                                        }
     369                                        sout | "done";
     370
     371                                        sout | "Closing Socket..." | nonl; flush( sout );
     372                                        for(i; options.clopts.nprocs) {
     373                                                ret = close( acceptors[i].sockfd );
     374                                                if( ret < 0) {
     375                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     376                                                }
     377                                        }
     378                                        sout | "done";
     379
     380                                        sout | "Stopping accept threads..." | nonl; flush( sout );
     381                                        for(i; options.clopts.nprocs) {
     382                                                join(acceptors[i]);
     383                                        }
     384                                        sout | "done";
     385
     386                                        sout | "Draining worker queues..." | nonl; flush( sout );
     387                                        for(i; options.clopts.nprocs) {
     388                                                PendingRead * p = 0p;
     389                                                while(p = pop(queues[i].q)) {
     390                                                        fulfil(p->f, -ECONNRESET);
     391                                                }
     392                                        }
     393                                        sout | "done";
     394
     395                                        sout | "Stopping worker threads..." | nonl; flush( sout );
     396                                        for(i; options.clopts.nworkers) {
     397                                                for(j; 2) {
     398                                                        ret = close(cworkers[i].conn.pipe[j]);
     399                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     400                                                }
     401                                                join(cworkers[i]);
    320402                                        }
    321403                                }
    322404                                else {
     405                                        sout | "Notifying connections..." | nonl; flush( sout );
     406                                        for(i; options.clopts.nworkers) {
     407                                                aworkers[i].done = true;
     408                                        }
     409                                        sout | "done";
     410
     411                                        sout | "Shutting down Socket..." | nonl; flush( sout );
    323412                                        ret = shutdown( server_fd, SHUT_RD );
    324413                                        if( ret < 0 ) {
    325                                                 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
    326                                         }
    327                                 }
    328                                 sout | "done";
    329 
    330                                 //===================
    331                                 // Close Socket
    332                                 sout | "Closing Socket..." | nonl; flush( sout );
    333                                 if(options.socket.manyreuse) {
    334                                         for(i; options.clopts.nworkers) {
    335                                                 ret = close(workers[i].sockfd);
    336                                                 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) );
    337                                         }
    338                                 }
    339                                 else {
     414                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
     415                                        }
     416                                        sout | "done";
     417
     418                                        sout | "Closing Socket..." | nonl; flush( sout );
    340419                                        ret = close( server_fd );
    341420                                        if(ret < 0) {
    342421                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    343422                                        }
    344                                 }
    345                                 sout | "done";
    346 
    347                                 sout | "Stopping connection threads..." | nonl; flush( sout );
    348                                 for(i; options.clopts.nworkers) {
    349                                         for(j; 2) {
    350                                                 ret = close(workers[i].pipe[j]);
    351                                                 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
    352                                         }
    353                                         join(workers[i]);
     423                                        sout | "done";
     424
     425                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     426                                        for(i; options.clopts.nworkers) {
     427                                                for(j; 2) {
     428                                                        ret = close(aworkers[i].conn.pipe[j]);
     429                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     430                                                }
     431                                                join(aworkers[i]);
     432                                        }
    354433                                }
    355434                        }
     
    371450
    372451                        // Now that the stats printer is stopped, we can reclaim this
    373                         adelete(workers);
     452                        adelete(aworkers);
     453                        adelete(cworkers);
     454                        adelete(acceptors);
     455                        adelete(queues);
     456                        free(conns);
    374457
    375458                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
     
    377460                sout | "done";
    378461
    379                 // sout | "Closing splice fds..." | nonl; flush( sout );
    380                 // for(i; pipe_cnt) {
    381                 //      ret = close( fds[pipe_off + i] );
    382                 //      if(ret < 0) {
    383                 //              abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    384                 //      }
    385                 // }
    386462                free(fds);
    387                 sout | "done";
    388463
    389464                sout | "Stopping processors..." | nonl; flush( sout );
Note: See TracChangeset for help on using the changeset viewer.