Changeset 329e26a for benchmark/io/http/main.cfa
- Timestamp:
- Jun 10, 2022, 5:02:03 PM (2 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- 8419b76
- Parents:
- bf7c7ea
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
rbf7c7ea r329e26a 37 37 // Globals 38 38 //============================================================================================= 39 struct ServerCluster {40 cluster self;41 processor * procs;42 43 };44 45 39 void ?{}( ServerCluster & this ) { 46 40 (this.self){ "Server Cluster", options.clopts.params }; … … 56 50 (this.procs[i]){ "Benchmark Processor", this.self }; 57 51 58 int c = 0;59 int n = 1 + (i % cnt);60 for(int j = 0; j < CPU_SETSIZE; j++) {61 if(CPU_ISSET(j, &fullset)) n--;62 if(n == 0) {63 c = j;64 break;65 }66 }67 cpu_set_t localset;68 CPU_ZERO(&localset);69 CPU_SET(c, &localset);70 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);71 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );52 // int c = 0; 53 // int n = 1 + (i % cnt); 54 // for(int j = 0; j < CPU_SETSIZE; j++) { 55 // if(CPU_ISSET(j, &fullset)) n--; 56 // if(n == 0) { 57 // c = j; 58 // break; 59 // } 60 // } 61 // cpu_set_t localset; 62 // CPU_ZERO(&localset); 63 // CPU_SET(c, &localset); 64 // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset); 65 // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret ); 72 66 73 67 #if !defined(__CFA_NO_STATISTICS__) … … 85 79 #endif 86 80 87 options.clopts.instance = &this.self; 81 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 82 options.clopts.cltr_cnt++; 88 83 } 89 84 … … 187 182 Acceptor * acceptors = 0p; 188 183 Q * queues = 0p; 189 ServerCluster cl ;184 ServerCluster cl[options.clopts.nclusters]; 190 185 191 186 if(options.stats) { 192 187 stats_thrd = alloc(); 193 (*stats_thrd){ cl .self};188 (*stats_thrd){ cl }; 194 189 } else { 195 190 stats_thrd = 0p; … … 198 193 init_protocol(); 199 194 { 195 int nacceptors = options.clopts.nprocs * options.clopts.nclusters; 200 196 conns = alloc(options.clopts.nworkers); 201 197 if(options.socket.reuseport) { 202 queues = alloc(options.clopts.nprocs); 203 acceptors = anew(options.clopts.nprocs); 204 for(i; options.clopts.nprocs) { 198 queues = alloc(nacceptors); 199 acceptors = alloc(nacceptors); 200 sout | "Creating" | nacceptors | "Acceptors"; 201 for(i; nacceptors) { 202 (acceptors[i]){ i % options.clopts.nclusters }; 203 } 204 for(i; nacceptors) { 205 205 (queues[i]){}; 206 206 { … … 219 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % options.clopts.nprocs].q;221 cworkers[i].queue = &queues[i % nacceptors].q; 222 222 conns[i] = &cworkers[i].conn; 223 223 } … … 246 246 } 247 247 } 248 249 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 248 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 249 for(i; options.clopts.nclusters) { 250 sout | options.clopts.thrd_cnt[i] | nonl; 251 } 250 252 sout | nl; 251 253 { … … 274 276 if(options.socket.reuseport) { 275 277 sout | "Notifying connections..." | nonl; flush( sout ); 276 for(i; options.clopts.nprocs) {278 for(i; nacceptors) { 277 279 acceptors[i].done = true; 278 280 } … … 283 285 284 286 sout | "Shutting down Socket..." | nonl; flush( sout ); 285 for(i; options.clopts.nprocs) {287 for(i; nacceptors) { 286 288 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 287 289 if( ret < 0 ) { … … 292 294 293 295 sout | "Closing Socket..." | nonl; flush( sout ); 294 for(i; options.clopts.nprocs) {296 for(i; nacceptors) { 295 297 ret = close( acceptors[i].sockfd ); 296 298 if( ret < 0) { … … 301 303 302 304 sout | "Stopping accept threads..." | nonl; flush( sout ); 303 for(i; options.clopts.nprocs) {305 for(i; nacceptors) { 304 306 join(acceptors[i]); 305 307 } … … 307 309 308 310 sout | "Draining worker queues..." | nonl; flush( sout ); 309 for(i; options.clopts.nprocs) {311 for(i; nacceptors) { 310 312 PendingRead * p = 0p; 311 313 while(p = pop(queues[i].q)) {
Note: See TracChangeset
for help on using the changeset viewer.