Changeset aeb20a4 for benchmark/io/http/main.cfa
- Timestamp:
- Jun 9, 2022, 2:26:43 PM (22 months ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- c06551b
- Parents:
- db7a3ad (diff), 430ce61 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
rdb7a3ad raeb20a4 25 25 #include "options.hfa" 26 26 #include "socket.hfa" 27 #include "printer.hfa" 27 28 #include "worker.hfa" 28 29 … … 31 32 Duration default_preemption() { 32 33 return 0; 33 }34 35 //=============================================================================================36 // Stats Printer37 //============================================================================================='38 39 thread StatsPrinter {40 Worker * workers;41 int worker_cnt;42 condition_variable(fast_block_lock) var;43 };44 45 void ?{}( StatsPrinter & this, cluster & cl ) {46 ((thread&)this){ "Stats Printer Thread", cl };47 this.worker_cnt = 0;48 }49 50 void ^?{}( StatsPrinter & mutex this ) {}51 52 #define eng3(X) (ws(3, 3, unit(eng( X ))))53 54 void main(StatsPrinter & this) {55 LOOP: for() {56 waitfor( ^?{} : this) {57 break LOOP;58 }59 or else {}60 61 wait(this.var, 10`s);62 63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );64 if(this.worker_cnt != 0) {65 uint64_t tries = 0;66 uint64_t calls = 0;67 uint64_t header = 0;68 uint64_t splcin = 0;69 uint64_t splcot = 0;70 struct {71 volatile uint64_t calls;72 volatile uint64_t bytes;73 } avgrd[zipf_cnts];74 memset(avgrd, 0, sizeof(avgrd));75 76 for(i; this.worker_cnt) {77 tries += this.workers[i].stats.sendfile.tries;78 calls += this.workers[i].stats.sendfile.calls;79 header += this.workers[i].stats.sendfile.header;80 splcin += this.workers[i].stats.sendfile.splcin;81 splcot += this.workers[i].stats.sendfile.splcot;82 for(j; zipf_cnts) {83 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;84 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;85 }86 }87 88 double ratio = ((double)tries) / calls;89 90 sout | "----- Worker Stats -----";91 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";92 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";93 sout | " - zipf sizes:";94 for(i; zipf_cnts) {95 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;96 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";97 }98 }99 else {100 sout | "No Workers!";101 }102 }103 34 } 104 35 … … 109 40 cluster self; 110 41 processor * procs; 111 // io_context * ctxs;112 StatsPrinter * prnt;113 42 114 43 }; … … 152 81 } 153 82 154 if(options.stats) {155 this.prnt = alloc();156 (*this.prnt){ this.self };157 } else {158 this.prnt = 0p;159 }160 161 83 #if !defined(__CFA_NO_STATISTICS__) 162 84 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); 163 85 #endif 164 86 165 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 166 options.clopts.cltr_cnt++; 87 options.clopts.instance = &this.self; 167 88 } 168 89 169 90 void ^?{}( ServerCluster & this ) { 170 delete(this.prnt);171 172 91 for(i; options.clopts.nprocs) { 173 92 ^(this.procs[i]){}; … … 180 99 extern void init_protocol(void); 181 100 extern void deinit_protocol(void); 101 102 //============================================================================================= 103 // REUSEPORT 104 //============================================================================================= 105 106 size_t sockarr_size; 107 struct __attribute__((aligned(128))) Q { 108 mpsc_queue(PendingRead) q; 109 }; 182 110 183 111 //============================================================================================= … … 235 163 236 164 int server_fd; 237 if(!options.socket.manyreuse) {238 server_fd = listener(address, addrlen);239 }240 165 241 166 //=================== … … 257 182 { 258 183 // Stats printer makes a copy so this needs to persist longer than normal 259 Worker * workers; 260 ServerCluster cl[options.clopts.nclusters]; 184 connection ** conns; 185 AcceptWorker * aworkers = 0p; 186 ChannelWorker * cworkers = 0p; 187 Acceptor * acceptors = 0p; 188 Q * queues = 0p; 189 ServerCluster cl; 190 191 if(options.stats) { 192 stats_thrd = alloc(); 193 (*stats_thrd){ cl.self }; 194 } else { 195 stats_thrd = 0p; 196 } 261 197 262 198 init_protocol(); 263 199 { 264 workers = anew(options.clopts.nworkers); 265 cl[0].prnt->workers = workers; 266 cl[0].prnt->worker_cnt = options.clopts.nworkers; 267 for(i; options.clopts.nworkers) { 268 // if( options.file_cache.fixed_fds ) { 269 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 270 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 271 // } 272 // else 273 { 274 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 275 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 276 workers[i].sockfd = options.socket.manyreuse ? listener(address, addrlen) : server_fd; 277 workers[i].addr = (struct sockaddr *)&address; 278 workers[i].addrlen = (socklen_t*)&addrlen; 279 workers[i].flags = 0; 280 } 281 unpark( workers[i] ); 282 } 283 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 284 for(i; options.clopts.nclusters) { 285 sout | options.clopts.thrd_cnt[i] | nonl; 286 } 200 conns = alloc(options.clopts.nworkers); 201 if(options.socket.reuseport) { 202 queues = alloc(options.clopts.nprocs); 203 acceptors = anew(options.clopts.nprocs); 204 for(i; options.clopts.nprocs) { 205 (queues[i]){}; 206 { 207 acceptors[i].sockfd = listener(address, addrlen); 208 acceptors[i].addr = (struct sockaddr *)&address; 209 acceptors[i].addrlen = (socklen_t*)&addrlen; 210 acceptors[i].flags = 0; 211 acceptors[i].queue = &queues[i].q; 212 } 213 unpark( acceptors[i] ); 214 } 215 216 cworkers = anew(options.clopts.nworkers); 217 for(i; options.clopts.nworkers) { 218 { 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % options.clopts.nprocs].q; 222 conns[i] = &cworkers[i].conn; 223 } 224 unpark( cworkers[i] ); 225 } 226 } 227 else { 228 server_fd = listener(address, addrlen); 229 aworkers = anew(options.clopts.nworkers); 230 for(i; options.clopts.nworkers) { 231 // if( options.file_cache.fixed_fds ) { 232 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 233 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 234 // } 235 // else 236 { 237 aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 238 aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 239 aworkers[i].sockfd = server_fd; 240 aworkers[i].addr = (struct sockaddr *)&address; 241 aworkers[i].addrlen = (socklen_t*)&addrlen; 242 aworkers[i].flags = 0; 243 conns[i] = &aworkers[i].conn; 244 } 245 unpark( aworkers[i] ); 246 } 247 } 248 249 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 287 250 sout | nl; 288 251 { … … 307 270 } 308 271 309 sout | "Notifying connections..." | nonl; flush( sout ); 310 for(i; options.clopts.nworkers) { 311 workers[i].done = true; 312 } 313 sout | "done"; 314 315 sout | "Shutting down socket..." | nonl; flush( sout ); 316 if(options.socket.manyreuse) { 317 for(i; options.clopts.nworkers) { 318 ret = shutdown( workers[i].sockfd, SHUT_RD ); 319 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 272 //=================== 273 // Close Socket and join 274 if(options.socket.reuseport) { 275 sout | "Notifying connections..." | nonl; flush( sout ); 276 for(i; options.clopts.nprocs) { 277 acceptors[i].done = true; 278 } 279 for(i; options.clopts.nworkers) { 280 cworkers[i].done = true; 281 } 282 sout | "done"; 283 284 sout | "Shutting down Socket..." | nonl; flush( sout ); 285 for(i; options.clopts.nprocs) { 286 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 287 if( ret < 0 ) { 288 abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) ); 289 } 290 } 291 sout | "done"; 292 293 sout | "Closing Socket..." | nonl; flush( sout ); 294 for(i; options.clopts.nprocs) { 295 ret = close( acceptors[i].sockfd ); 296 if( ret < 0) { 297 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 298 } 299 } 300 sout | "done"; 301 302 sout | "Stopping accept threads..." | nonl; flush( sout ); 303 for(i; options.clopts.nprocs) { 304 join(acceptors[i]); 305 } 306 sout | "done"; 307 308 sout | "Draining worker queues..." | nonl; flush( sout ); 309 for(i; options.clopts.nprocs) { 310 PendingRead * p = 0p; 311 while(p = pop(queues[i].q)) { 312 fulfil(p->f, -ECONNRESET); 313 } 314 } 315 sout | "done"; 316 317 sout | "Stopping worker threads..." | nonl; flush( sout ); 318 for(i; options.clopts.nworkers) { 319 for(j; 2) { 320 ret = close(cworkers[i].conn.pipe[j]); 321 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 322 } 323 join(cworkers[i]); 320 324 } 321 325 } 322 326 else { 327 sout | "Notifying connections..." | nonl; flush( sout ); 328 for(i; options.clopts.nworkers) { 329 aworkers[i].done = true; 330 } 331 sout | "done"; 332 333 sout | "Shutting down Socket..." | nonl; flush( sout ); 323 334 ret = shutdown( server_fd, SHUT_RD ); 324 335 if( ret < 0 ) { 325 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 326 } 327 } 328 sout | "done"; 329 330 //=================== 331 // Close Socket 332 sout | "Closing Socket..." | nonl; flush( sout ); 333 if(options.socket.manyreuse) { 334 for(i; options.clopts.nworkers) { 335 ret = close(workers[i].sockfd); 336 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 337 } 338 } 339 else { 336 abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) ); 337 } 338 sout | "done"; 339 340 sout | "Closing Socket..." | nonl; flush( sout ); 340 341 ret = close( server_fd ); 341 342 if(ret < 0) { 342 343 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 343 344 } 344 }345 sout | "done"; 346 347 sout | "Stopping connection threads..." | nonl; flush( sout );348 for(i; options.clopts.nworkers) {349 for(j; 2) {350 ret = close(workers[i].pipe[j]);351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );352 }353 join(workers[i]);345 sout | "done"; 346 347 sout | "Stopping connection threads..." | nonl; flush( sout ); 348 for(i; options.clopts.nworkers) { 349 for(j; 2) { 350 ret = close(aworkers[i].conn.pipe[j]); 351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 352 } 353 join(aworkers[i]); 354 } 354 355 } 355 356 } … … 361 362 362 363 sout | "Stopping printer threads..." | nonl; flush( sout ); 363 for(i; options.clopts.nclusters) { 364 StatsPrinter * p = cl[i].prnt; 365 if(p) { 366 notify_one(p->var); 367 join(*p); 368 } 369 } 364 if(stats_thrd) { 365 notify_one(stats_thrd->var); 366 } 367 delete(stats_thrd); 370 368 sout | "done"; 371 369 372 370 // Now that the stats printer is stopped, we can reclaim this 373 adelete(workers); 371 adelete(aworkers); 372 adelete(cworkers); 373 adelete(acceptors); 374 adelete(queues); 375 free(conns); 374 376 375 377 sout | "Stopping processors/clusters..." | nonl; flush( sout ); … … 377 379 sout | "done"; 378 380 379 // sout | "Closing splice fds..." | nonl; flush( sout );380 // for(i; pipe_cnt) {381 // ret = close( fds[pipe_off + i] );382 // if(ret < 0) {383 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );384 // }385 // }386 381 free(fds); 387 sout | "done";388 382 389 383 sout | "Stopping processors..." | nonl; flush( sout );
Note: See TracChangeset
for help on using the changeset viewer.