- Timestamp:
- Jun 10, 2022, 5:02:03 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- 8419b76
- Parents:
- bf7c7ea
- Location:
- benchmark/io/http
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
rbf7c7ea r329e26a 37 37 // Globals 38 38 //============================================================================================= 39 struct ServerCluster {40 cluster self;41 processor * procs;42 43 };44 45 39 void ?{}( ServerCluster & this ) { 46 40 (this.self){ "Server Cluster", options.clopts.params }; … … 56 50 (this.procs[i]){ "Benchmark Processor", this.self }; 57 51 58 int c = 0;59 int n = 1 + (i % cnt);60 for(int j = 0; j < CPU_SETSIZE; j++) {61 if(CPU_ISSET(j, &fullset)) n--;62 if(n == 0) {63 c = j;64 break;65 }66 }67 cpu_set_t localset;68 CPU_ZERO(&localset);69 CPU_SET(c, &localset);70 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);71 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );52 // int c = 0; 53 // int n = 1 + (i % cnt); 54 // for(int j = 0; j < CPU_SETSIZE; j++) { 55 // if(CPU_ISSET(j, &fullset)) n--; 56 // if(n == 0) { 57 // c = j; 58 // break; 59 // } 60 // } 61 // cpu_set_t localset; 62 // CPU_ZERO(&localset); 63 // CPU_SET(c, &localset); 64 // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset); 65 // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret ); 72 66 73 67 #if !defined(__CFA_NO_STATISTICS__) … … 85 79 #endif 86 80 87 options.clopts.instance = &this.self; 81 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 82 options.clopts.cltr_cnt++; 88 83 } 89 84 … … 187 182 Acceptor * acceptors = 0p; 188 183 Q * queues = 0p; 189 ServerCluster cl ;184 ServerCluster cl[options.clopts.nclusters]; 190 185 191 186 if(options.stats) { 192 187 stats_thrd = alloc(); 193 (*stats_thrd){ cl .self};188 (*stats_thrd){ cl }; 194 189 } else { 195 190 stats_thrd = 0p; … … 198 193 init_protocol(); 199 194 { 195 int nacceptors = options.clopts.nprocs * options.clopts.nclusters; 200 196 conns = alloc(options.clopts.nworkers); 201 197 if(options.socket.reuseport) { 202 queues = alloc(options.clopts.nprocs); 203 acceptors = anew(options.clopts.nprocs); 204 for(i; options.clopts.nprocs) { 198 queues = alloc(nacceptors); 199 acceptors = alloc(nacceptors); 200 sout | "Creating" | nacceptors | "Acceptors"; 201 for(i; nacceptors) { 202 (acceptors[i]){ i % options.clopts.nclusters }; 203 } 204 for(i; nacceptors) { 205 205 (queues[i]){}; 206 206 { … … 219 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % options.clopts.nprocs].q;221 cworkers[i].queue = &queues[i % nacceptors].q; 222 222 conns[i] = &cworkers[i].conn; 223 223 } … … 246 246 } 247 247 } 248 249 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 248 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 249 for(i; options.clopts.nclusters) { 250 sout | options.clopts.thrd_cnt[i] | nonl; 251 } 250 252 sout | nl; 251 253 { … … 274 276 if(options.socket.reuseport) { 275 277 sout | "Notifying connections..." | nonl; flush( sout ); 276 for(i; options.clopts.nprocs) {278 for(i; nacceptors) { 277 279 acceptors[i].done = true; 278 280 } … … 283 285 284 286 sout | "Shutting down Socket..." | nonl; flush( sout ); 285 for(i; options.clopts.nprocs) {287 for(i; nacceptors) { 286 288 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 287 289 if( ret < 0 ) { … … 292 294 293 295 sout | "Closing Socket..." | nonl; flush( sout ); 294 for(i; options.clopts.nprocs) {296 for(i; nacceptors) { 295 297 ret = close( acceptors[i].sockfd ); 296 298 if( ret < 0) { … … 301 303 302 304 sout | "Stopping accept threads..." | nonl; flush( sout ); 303 for(i; options.clopts.nprocs) {305 for(i; nacceptors) { 304 306 join(acceptors[i]); 305 307 } … … 307 309 308 310 sout | "Draining worker queues..." | nonl; flush( sout ); 309 for(i; options.clopts.nprocs) {311 for(i; nacceptors) { 310 312 PendingRead * p = 0p; 311 313 while(p = pop(queues[i].q)) { -
benchmark/io/http/options.cfa
rbf7c7ea r329e26a 42 42 43 43 { // cluster 44 1, // nclusters; 44 45 1, // nprocs; 45 46 1, // nworkers; … … 53 54 void parse_options( int argc, char * argv[] ) { 54 55 unsigned nentries = 0; 56 bool isolate = false; 57 58 55 59 static cfa_option opt[] = { 56 60 { 'p', "port", "Port the server will listen on", options.socket.port}, 57 61 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 58 62 { 't', "threads", "Number of worker threads to use", options.clopts.nworkers}, 63 {'\0', "isolate", "Create one cluster per processor", isolate, parse_settrue}, 59 64 {'\0', "log", "Enable logs", options.log, parse_settrue}, 60 65 {'\0', "sout", "Redirect standard out to file", options.reopen_stdout}, … … 91 96 nentries = v; 92 97 } 98 if(isolate) { 99 options.clopts.nclusters = options.clopts.nprocs; 100 options.clopts.nprocs = 1; 101 } 93 102 options.clopts.params.num_entries = nentries; 94 options.clopts.instance = 0p; 95 options.clopts.thrd_cnt = 0; 103 options.clopts.instance = alloc(options.clopts.nclusters); 104 options.clopts.thrd_cnt = alloc(options.clopts.nclusters); 105 options.clopts.cltr_cnt = 0; 106 for(i; options.clopts.nclusters) { 107 options.clopts.thrd_cnt[i] = 0; 108 } 96 109 97 110 -
benchmark/io/http/options.hfa
rbf7c7ea r329e26a 31 31 32 32 struct { 33 int nclusters; 33 34 int nprocs; 34 35 int nworkers; … … 36 37 bool procstats; 37 38 bool viewhalts; 38 cluster * instance; 39 size_t thrd_cnt; 39 cluster ** instance; 40 size_t * thrd_cnt; 41 size_t cltr_cnt; 40 42 } clopts; 41 43 }; -
benchmark/io/http/printer.cfa
rbf7c7ea r329e26a 1 1 #include "printer.hfa" 2 #include "options.hfa" 2 3 3 4 #include <fstream.hfa> … … 40 41 } 41 42 42 void ?{}( StatsPrinter & this, cluster &cl ) {43 void ?{}( StatsPrinter & this, ServerCluster * cl ) { 43 44 ((thread&)this){ "Stats Printer Thread" }; 44 &this.cl = &cl;45 this.cl = cl; 45 46 memset(&this.stats, 0, sizeof(this.stats));; 46 47 } … … 59 60 wait(this.var, 10`s); 60 61 61 print_stats_now( this.cl, CFA_STATS_READY_Q | CFA_STATS_IO );62 for(i; options.clopts.nclusters) print_stats_now( this.cl[i].self, CFA_STATS_READY_Q | CFA_STATS_IO ); 62 63 { 63 64 struct { -
benchmark/io/http/printer.hfa
rbf7c7ea r329e26a 35 35 void push(acceptor_stats_t & from, acceptor_stats_t & to); 36 36 37 struct ServerCluster { 38 cluster self; 39 processor * procs; 40 }; 41 37 42 thread StatsPrinter { 38 43 struct { … … 42 47 } stats; 43 48 condition_variable(fast_block_lock) var; 44 cluster &cl;49 ServerCluster * cl; 45 50 }; 46 51 47 void ?{}( StatsPrinter & this, cluster &cl );52 void ?{}( StatsPrinter & this, ServerCluster * cl ); 48 53 void ^?{}( StatsPrinter & mutex this ); 49 54 -
benchmark/io/http/worker.cfa
rbf7c7ea r329e26a 116 116 //============================================================================================= 117 117 void ?{}( AcceptWorker & this ) { 118 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 }; 119 options.clopts.thrd_cnt++; 118 size_t cli = rand() % options.clopts.cltr_cnt; 119 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 120 options.clopts.thrd_cnt[cli]++; 120 121 this.done = false; 121 122 } … … 150 151 //============================================================================================= 151 152 void ?{}( ChannelWorker & this ) { 152 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 }; 153 options.clopts.thrd_cnt++; 153 size_t cli = rand() % options.clopts.cltr_cnt; 154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 155 options.clopts.thrd_cnt[cli]++; 154 156 this.done = false; 155 157 } … … 181 183 } 182 184 183 void ?{}( Acceptor & this ) {184 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance , 64000 };185 options.clopts.thrd_cnt ++;185 void ?{}( Acceptor & this, int cli ) { 186 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 }; 187 options.clopts.thrd_cnt[cli]++; 186 188 this.done = false; 187 189 } -
benchmark/io/http/worker.hfa
rbf7c7ea r329e26a 72 72 acceptor_stats_t stats; 73 73 }; 74 void ?{}( Acceptor & );74 void ?{}( Acceptor &, int cli ); 75 75 void main( Acceptor & );
Note: See TracChangeset
for help on using the changeset viewer.