- Timestamp:
- Jun 8, 2022, 7:07:51 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- bbf61838
- Parents:
- 6e2b04e
- Location:
- benchmark/io/http
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified benchmark/io/http/main.cfa ¶
r6e2b04e r7f0ac12 38 38 39 39 thread StatsPrinter { 40 Worker * workers;41 int worker_cnt;40 connection ** conns; 41 volatile int conn_cnt; 42 42 condition_variable(fast_block_lock) var; 43 43 }; … … 45 45 void ?{}( StatsPrinter & this, cluster & cl ) { 46 46 ((thread&)this){ "Stats Printer Thread", cl }; 47 this. worker_cnt = 0;47 this.conn_cnt = 0; 48 48 } 49 49 … … 62 62 63 63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 64 if(this. worker_cnt != 0) {64 if(this.conn_cnt != 0) { 65 65 uint64_t tries = 0; 66 66 uint64_t calls = 0; … … 74 74 memset(avgrd, 0, sizeof(avgrd)); 75 75 76 for(i; this. worker_cnt) {77 tries += this. workers[i].stats.sendfile.tries;78 calls += this. workers[i].stats.sendfile.calls;79 header += this. workers[i].stats.sendfile.header;80 splcin += this. workers[i].stats.sendfile.splcin;81 splcot += this. workers[i].stats.sendfile.splcot;76 for(i; this.conn_cnt) { 77 tries += this.conns[i]->stats.sendfile.tries; 78 calls += this.conns[i]->stats.sendfile.calls; 79 header += this.conns[i]->stats.sendfile.header; 80 splcin += this.conns[i]->stats.sendfile.splcin; 81 splcot += this.conns[i]->stats.sendfile.splcot; 82 82 for(j; zipf_cnts) { 83 avgrd[j].calls += this. workers[i].stats.sendfile.avgrd[j].calls;84 avgrd[j].bytes += this. workers[i].stats.sendfile.avgrd[j].bytes;83 avgrd[j].calls += this.conns[i]->stats.sendfile.avgrd[j].calls; 84 avgrd[j].bytes += this.conns[i]->stats.sendfile.avgrd[j].bytes; 85 85 } 86 86 } … … 88 88 double ratio = ((double)tries) / calls; 89 89 90 sout | "----- WorkerStats -----";90 sout | "----- Connection Stats -----"; 91 91 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)"; 92 92 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out"; … … 98 98 } 99 99 else { 100 sout | "No Workers!";100 sout | "No Connections!"; 101 101 } 102 102 } … … 182 182 183 183 //============================================================================================= 184 // REUSEPORT 185 //============================================================================================= 186 187 size_t sockarr_size; 188 struct __attribute__((aligned(128))) Q { 189 mpsc_queue(PendingRead) q; 190 }; 191 192 //============================================================================================= 184 193 // Termination 185 194 //============================================================================================= … … 235 244 236 245 int server_fd; 237 if(!options.socket.manyreuse) {238 server_fd = listener(address, addrlen);239 }240 246 241 247 //=================== … … 257 263 { 258 264 // Stats printer makes a copy so this needs to persist longer than normal 259 Worker * workers; 265 connection ** conns; 266 AcceptWorker * aworkers = 0p; 267 ChannelWorker * cworkers = 0p; 268 Acceptor * acceptors = 0p; 269 Q * queues = 0p; 260 270 ServerCluster cl[options.clopts.nclusters]; 261 271 262 272 init_protocol(); 263 273 { 264 workers = anew(options.clopts.nworkers); 265 cl[0].prnt->workers = workers; 266 cl[0].prnt->worker_cnt = options.clopts.nworkers; 267 for(i; options.clopts.nworkers) { 268 // if( options.file_cache.fixed_fds ) { 269 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 270 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 271 // } 272 // else 273 { 274 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 275 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 276 workers[i].sockfd = options.socket.manyreuse ? listener(address, addrlen) : server_fd; 277 workers[i].addr = (struct sockaddr *)&address; 278 workers[i].addrlen = (socklen_t*)&addrlen; 279 workers[i].flags = 0; 280 } 281 unpark( workers[i] ); 282 } 274 conns = alloc(options.clopts.nworkers); 275 if(options.socket.reuseport) { 276 queues = alloc(options.clopts.nprocs); 277 acceptors = anew(options.clopts.nprocs); 278 for(i; options.clopts.nprocs) { 279 (queues[i]){}; 280 { 281 acceptors[i].sockfd = listener(address, addrlen); 282 acceptors[i].addr = (struct sockaddr *)&address; 283 acceptors[i].addrlen = (socklen_t*)&addrlen; 284 acceptors[i].flags = 0; 285 acceptors[i].queue = &queues[i].q; 286 } 287 unpark( acceptors[i] ); 288 } 289 290 cworkers = anew(options.clopts.nworkers); 291 for(i; options.clopts.nworkers) { 292 { 293 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 294 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 295 cworkers[i].queue = &queues[i % options.clopts.nprocs].q; 296 conns[i] = &cworkers[i].conn; 297 } 298 unpark( cworkers[i] ); 299 } 300 } 301 else { 302 server_fd = listener(address, addrlen); 303 aworkers = anew(options.clopts.nworkers); 304 for(i; options.clopts.nworkers) { 305 // if( options.file_cache.fixed_fds ) { 306 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 307 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 308 // } 309 // else 310 { 311 aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 312 aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 313 aworkers[i].sockfd = server_fd; 314 aworkers[i].addr = (struct sockaddr *)&address; 315 aworkers[i].addrlen = (socklen_t*)&addrlen; 316 aworkers[i].flags = 0; 317 conns[i] = &aworkers[i].conn; 318 } 319 unpark( aworkers[i] ); 320 } 321 } 322 cl[0].prnt->conns = conns; 323 cl[0].prnt->conn_cnt = options.clopts.nworkers; 283 324 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 284 325 for(i; options.clopts.nclusters) { … … 307 348 } 308 349 309 sout | "Notifying connections..." | nonl; flush( sout ); 310 for(i; options.clopts.nworkers) { 311 workers[i].done = true; 312 } 313 sout | "done"; 314 315 sout | "Shutting down socket..." | nonl; flush( sout ); 316 if(options.socket.manyreuse) { 317 for(i; options.clopts.nworkers) { 318 ret = shutdown( workers[i].sockfd, SHUT_RD ); 319 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 350 //=================== 351 // Close Socket and join 352 if(options.socket.reuseport) { 353 sout | "Notifying connections..." | nonl; flush( sout ); 354 for(i; options.clopts.nprocs) { 355 acceptors[i].done = true; 356 } 357 for(i; options.clopts.nworkers) { 358 cworkers[i].done = true; 359 } 360 sout | "done"; 361 362 sout | "Shutting down Socket..." | nonl; flush( sout ); 363 for(i; options.clopts.nprocs) { 364 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 365 if( ret < 0 ) { 366 abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) ); 367 } 368 } 369 sout | "done"; 370 371 sout | "Closing Socket..." | nonl; flush( sout ); 372 for(i; options.clopts.nprocs) { 373 ret = close( acceptors[i].sockfd ); 374 if( ret < 0) { 375 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 376 } 377 } 378 sout | "done"; 379 380 sout | "Stopping accept threads..." | nonl; flush( sout ); 381 for(i; options.clopts.nprocs) { 382 join(acceptors[i]); 383 } 384 sout | "done"; 385 386 sout | "Draining worker queues..." | nonl; flush( sout ); 387 for(i; options.clopts.nprocs) { 388 PendingRead * p = 0p; 389 while(p = pop(queues[i].q)) { 390 fulfil(p->f, -ECONNRESET); 391 } 392 } 393 sout | "done"; 394 395 sout | "Stopping worker threads..." | nonl; flush( sout ); 396 for(i; options.clopts.nworkers) { 397 for(j; 2) { 398 ret = close(cworkers[i].conn.pipe[j]); 399 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 400 } 401 join(cworkers[i]); 320 402 } 321 403 } 322 404 else { 405 sout | "Notifying connections..." | nonl; flush( sout ); 406 for(i; options.clopts.nworkers) { 407 aworkers[i].done = true; 408 } 409 sout | "done"; 410 411 sout | "Shutting down Socket..." | nonl; flush( sout ); 323 412 ret = shutdown( server_fd, SHUT_RD ); 324 413 if( ret < 0 ) { 325 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 326 } 327 } 328 sout | "done"; 329 330 //=================== 331 // Close Socket 332 sout | "Closing Socket..." | nonl; flush( sout ); 333 if(options.socket.manyreuse) { 334 for(i; options.clopts.nworkers) { 335 ret = close(workers[i].sockfd); 336 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 337 } 338 } 339 else { 414 abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) ); 415 } 416 sout | "done"; 417 418 sout | "Closing Socket..." | nonl; flush( sout ); 340 419 ret = close( server_fd ); 341 420 if(ret < 0) { 342 421 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 343 422 } 344 }345 sout | "done"; 346 347 sout | "Stopping connection threads..." | nonl; flush( sout );348 for(i; options.clopts.nworkers) {349 for(j; 2) {350 ret = close(workers[i].pipe[j]);351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );352 }353 join(workers[i]);423 sout | "done"; 424 425 sout | "Stopping connection threads..." | nonl; flush( sout ); 426 for(i; options.clopts.nworkers) { 427 for(j; 2) { 428 ret = close(aworkers[i].conn.pipe[j]); 429 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 430 } 431 join(aworkers[i]); 432 } 354 433 } 355 434 } … … 371 450 372 451 // Now that the stats printer is stopped, we can reclaim this 373 adelete(workers); 452 adelete(aworkers); 453 adelete(cworkers); 454 adelete(acceptors); 455 adelete(queues); 456 free(conns); 374 457 375 458 sout | "Stopping processors/clusters..." | nonl; flush( sout ); … … 377 460 sout | "done"; 378 461 379 // sout | "Closing splice fds..." | nonl; flush( sout );380 // for(i; pipe_cnt) {381 // ret = close( fds[pipe_off + i] );382 // if(ret < 0) {383 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );384 // }385 // }386 462 free(fds); 387 sout | "done";388 463 389 464 sout | "Stopping processors..." | nonl; flush( sout ); -
TabularUnified benchmark/io/http/options.cfa ¶
r6e2b04e r7f0ac12 38 38 10, // backlog 39 39 1024, // buflen 40 false, // onereuse 41 false // manyreuse 40 false // reuseport 42 41 }, 43 42 … … 72 71 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 73 72 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 74 {'\0', "reuseport-one", "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue}, 75 {'\0', "reuseport", "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue}, 73 {'\0', "reuseport", "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue}, 76 74 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 77 75 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, -
TabularUnified benchmark/io/http/options.hfa ¶
r6e2b04e r7f0ac12 27 27 int backlog; 28 28 int buflen; 29 bool onereuse; 30 bool manyreuse; 29 bool reuseport; 31 30 } socket; 32 31 -
TabularUnified benchmark/io/http/protocol.cfa ¶
r6e2b04e r7f0ac12 30 30 #define PLAINTEXT_NOCOPY 31 31 #define LINKED_IO 32 33 static inline __s32 wait_res( io_future_t & this ) { 34 wait( this ); 35 if( this.result < 0 ) {{ 36 errno = -this.result; 37 return -1; 38 }} 39 return this.result; 40 } 32 41 33 42 struct https_msg_str { … … 470 479 471 480 if(is_error(splice_in.res)) { 481 if(splice_in.res.error == -EPIPE) return -ECONNRESET; 472 482 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 473 483 close(fd); … … 503 513 } 504 514 505 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len) {515 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) { 506 516 char * it = buffer; 507 517 size_t count = len - 1; … … 509 519 READ: 510 520 for() { 511 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 521 int ret; 522 if( f ) { 523 ret = wait_res(*f); 524 reset(*f); 525 f = 0p; 526 } else { 527 ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 528 } 512 529 // int ret = read(fd, (void*)it, count); 513 530 if(ret == 0 ) return [OK200, true, 0, 0]; -
TabularUnified benchmark/io/http/protocol.hfa ¶
r6e2b04e r7f0ac12 1 1 #pragma once 2 2 3 struct io_future_t; 3 4 struct sendfile_stats_t; 4 5 … … 22 23 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 23 24 24 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len);25 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f); -
TabularUnified benchmark/io/http/socket.cfa ¶
r6e2b04e r7f0ac12 26 26 27 27 int listener(struct sockaddr_in & address, int addrlen) { 28 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 28 int type = SOCK_STREAM; 29 if(options.socket.reuseport) type |= SOCK_NONBLOCK; 30 int sockfd = socket(AF_INET, type, 0); 29 31 if(sockfd < 0) { 30 32 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) ); 31 33 } 32 34 33 if(options.socket. onereuse || options.socket.manyreuse) {35 if(options.socket.reuseport) { 34 36 int value = 1; 35 37 // if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&on, sizeof(on))) -
TabularUnified benchmark/io/http/worker.cfa ¶
r6e2b04e r7f0ac12 8 8 #include <fstream.hfa> 9 9 #include <iofwd.hfa> 10 #include <mutex_stmt.hfa> 10 11 11 12 #include "options.hfa" … … 13 14 #include "filecache.hfa" 14 15 15 //============================================================================================= 16 // Worker Thread 17 //============================================================================================= 18 void ?{}( Worker & this ) { 16 void ?{}( sendfile_stats_t & this ) { 17 this.calls = 0; 18 this.tries = 0; 19 this.header = 0; 20 this.splcin = 0; 21 this.splcot = 0; 22 for(i; zipf_cnts) { 23 this.avgrd[i].calls = 0; 24 this.avgrd[i].bytes = 0; 25 } 26 } 27 28 //============================================================================================= 29 // Generic connection handling 30 //============================================================================================= 31 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) { 32 REQUEST: 33 for() { 34 bool closed; 35 HttpCode code; 36 const char * file; 37 size_t name_size; 38 39 // Read the http request 40 if( options.log ) sout | "=== Reading request ==="; 41 [code, closed, file, name_size] = http_read(fd, buffer, len, f); 42 f = 0p; 43 44 // if we are done, break out of the loop 45 if( closed ) break REQUEST; 46 47 // If this wasn't a request retrun 400 48 if( code != OK200 ) { 49 sout | "=== Invalid Request :" | code_val(code) | "==="; 50 answer_error(fd, code); 51 continue REQUEST; 52 } 53 54 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 55 if( options.log ) sout | "=== Request for /plaintext ==="; 56 57 int ret = answer_plaintext(fd); 58 if( ret == -ECONNRESET ) break REQUEST; 59 60 if( options.log ) sout | "=== Answer sent ==="; 61 continue REQUEST; 62 } 63 64 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 65 if( options.log ) sout | "=== Request for /ping ==="; 66 67 // Send the header 68 int ret = answer_empty(fd); 69 if( ret == -ECONNRESET ) break REQUEST; 70 71 if( options.log ) sout | "=== Answer sent ==="; 72 continue REQUEST; 73 } 74 75 if( options.log ) { 76 sout | "=== Request for file " | nonl; 77 write(sout, file, name_size); 78 sout | " ==="; 79 } 80 81 if( !options.file_cache.path ) { 82 if( options.log ) { 83 sout | "=== File Not Found (" | nonl; 84 write(sout, file, name_size); 85 sout | ") ==="; 86 } 87 answer_error(fd, E405); 88 continue REQUEST; 89 } 90 91 // Get the fd from the file cache 92 int ans_fd; 93 size_t count; 94 [ans_fd, count] = get_file( file, name_size ); 95 96 // If we can't find the file, return 404 97 if( ans_fd < 0 ) { 98 if( options.log ) { 99 sout | "=== File Not Found (" | nonl; 100 write(sout, file, name_size); 101 sout | ") ==="; 102 } 103 answer_error(fd, E404); 104 continue REQUEST; 105 } 106 107 // Send the desired file 108 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 109 if( ret == -ECONNRESET ) break REQUEST; 110 111 if( options.log ) sout | "=== Answer sent ==="; 112 } 113 } 114 115 //============================================================================================= 116 // Self Accepting Worker Thread 117 //============================================================================================= 118 void ?{}( AcceptWorker & this ) { 19 119 size_t cli = rand() % options.clopts.cltr_cnt; 20 120 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 21 121 options.clopts.thrd_cnt[cli]++; 22 this.pipe[0] = -1;23 this.pipe[1] = -1;24 122 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 35 } 36 37 extern "C" { 38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 39 } 40 41 void main( Worker & this ) { 123 } 124 125 void main( AcceptWorker & this ) { 42 126 park(); 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 const bool reuse = options.socket.manyreuse; 47 48 CONNECTION: 127 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 128 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 49 129 for() { 50 130 if( options.log ) sout | "=== Accepting connection ==="; 51 int fd = cfa_accept4( this. [sockfd,addr, addrlen, flags], CFA_IO_LAZY );131 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY ); 52 132 if(fd < 0) { 53 133 if( errno == ECONNABORTED ) break; … … 58 138 59 139 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 60 REQUEST: 61 for() { 62 bool closed; 63 HttpCode code; 64 const char * file; 65 size_t name_size; 66 67 // Read the http request 68 size_t len = options.socket.buflen; 69 char buffer[len]; 70 if( options.log ) sout | "=== Reading request ==="; 71 [code, closed, file, name_size] = http_read(fd, buffer, len); 72 73 // if we are done, break out of the loop 74 if( closed ) break REQUEST; 75 76 // If this wasn't a request retrun 400 77 if( code != OK200 ) { 78 sout | "=== Invalid Request :" | code_val(code) | "==="; 79 answer_error(fd, code); 80 continue REQUEST; 140 size_t len = options.socket.buflen; 141 char buffer[len]; 142 handle_connection( this.conn, fd, buffer, len, 0p ); 143 144 if( options.log ) sout | "=== Connection closed ==="; 145 } 146 } 147 148 149 //============================================================================================= 150 // Channel Worker Thread 151 //============================================================================================= 152 void ?{}( ChannelWorker & this ) { 153 size_t cli = rand() % options.clopts.cltr_cnt; 154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 155 options.clopts.thrd_cnt[cli]++; 156 this.done = false; 157 } 158 159 void main( ChannelWorker & this ) { 160 park(); 161 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 162 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 163 for() { 164 size_t len = options.socket.buflen; 165 char buffer[len]; 166 PendingRead p; 167 p.in.buf = (void*)buffer; 168 p.in.len = len; 169 push(*this.queue, &p); 170 171 if( options.log ) sout | "=== Waiting new connection ==="; 172 handle_connection( this.conn, p.out.fd, buffer, len, &p.f ); 173 174 if( options.log ) sout | "=== Connection closed ==="; 175 if(this.done) break; 176 } 177 } 178 179 extern "C" { 180 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 181 } 182 183 void ?{}( Acceptor & this ) { 184 size_t cli = rand() % options.clopts.cltr_cnt; 185 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 186 options.clopts.thrd_cnt[cli]++; 187 this.done = false; 188 } 189 190 void main( Acceptor & this ) { 191 park(); 192 if( options.log ) sout | "=== Accepting connection ==="; 193 for() { 194 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]); 195 if(fd < 0) { 196 if( errno == EWOULDBLOCK) { 197 yield(); 198 continue; 81 199 } 82 83 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 84 if( options.log ) sout | "=== Request for /plaintext ==="; 85 86 int ret = answer_plaintext(fd); 87 if( ret == -ECONNRESET ) break REQUEST; 88 89 if( options.log ) sout | "=== Answer sent ==="; 90 continue REQUEST; 91 } 92 93 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 94 if( options.log ) sout | "=== Request for /ping ==="; 95 96 // Send the header 97 int ret = answer_empty(fd); 98 if( ret == -ECONNRESET ) break REQUEST; 99 100 if( options.log ) sout | "=== Answer sent ==="; 101 continue REQUEST; 102 } 103 104 if( options.log ) { 105 sout | "=== Request for file " | nonl; 106 write(sout, file, name_size); 107 sout | " ==="; 108 } 109 110 if( !options.file_cache.path ) { 111 if( options.log ) { 112 sout | "=== File Not Found (" | nonl; 113 write(sout, file, name_size); 114 sout | ") ==="; 115 } 116 answer_error(fd, E405); 117 continue REQUEST; 118 } 119 120 // Get the fd from the file cache 121 int ans_fd; 122 size_t count; 123 [ans_fd, count] = get_file( file, name_size ); 124 125 // If we can't find the file, return 404 126 if( ans_fd < 0 ) { 127 if( options.log ) { 128 sout | "=== File Not Found (" | nonl; 129 write(sout, file, name_size); 130 sout | ") ==="; 131 } 132 answer_error(fd, E404); 133 continue REQUEST; 134 } 135 136 // Send the desired file 137 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 138 if( ret == -ECONNRESET ) break REQUEST; 139 140 if( options.log ) sout | "=== Answer sent ==="; 141 } 142 143 if( options.log ) sout | "=== Connection closed ==="; 144 continue CONNECTION; 145 } 146 } 200 if( errno == ECONNABORTED ) break; 201 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 202 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 203 } 204 if(this.done) return; 205 206 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 207 208 if(fd) { 209 PendingRead * p = 0p; 210 for() { 211 if(this.done) return; 212 p = pop(*this.queue); 213 if(p) break; 214 yield(); 215 }; 216 217 p->out.fd = fd; 218 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY); 219 } 220 221 if( options.log ) sout | "=== Accepting connection ==="; 222 } 223 } -
TabularUnified benchmark/io/http/worker.hfa ¶
r6e2b04e r7f0ac12 1 1 #pragma once 2 2 3 #include <iofwd.hfa> 4 #include <queueLockFree.hfa> 3 5 #include <thread.hfa> 4 6 … … 26 28 }; 27 29 28 thread Worker { 30 void ?{}( sendfile_stats_t & this ); 31 32 struct connection { 29 33 int pipe[2]; 34 struct { 35 sendfile_stats_t sendfile; 36 } stats; 37 }; 38 39 static inline void ?{}( connection & this ) { 40 this.pipe[0] = -1; 41 this.pipe[1] = -1; 42 } 43 44 thread AcceptWorker { 45 connection conn; 30 46 int sockfd; 31 47 struct sockaddr * addr; … … 33 49 int flags; 34 50 volatile bool done; 51 }; 52 void ?{}( AcceptWorker & this); 53 void main( AcceptWorker & ); 54 55 56 struct PendingRead { 57 PendingRead * volatile next; 58 io_future_t f; 35 59 struct { 36 sendfile_stats_t sendfile; 37 } stats; 60 void * buf; 61 size_t len; 62 } in; 63 struct { 64 volatile int fd; 65 } out; 38 66 }; 39 void ?{}( Worker & this); 40 void main( Worker & ); 67 68 static inline PendingRead * volatile & ?`next ( PendingRead * node ) { 69 return node->next; 70 } 71 72 thread ChannelWorker { 73 connection conn; 74 volatile bool done; 75 mpsc_queue(PendingRead) * queue; 76 }; 77 void ?{}( ChannelWorker & ); 78 void main( ChannelWorker & ); 79 80 thread Acceptor { 81 mpsc_queue(PendingRead) * queue; 82 int sockfd; 83 struct sockaddr * addr; 84 socklen_t * addrlen; 85 int flags; 86 volatile bool done; 87 }; 88 void ?{}( Acceptor & ); 89 void main( Acceptor & );
Note: See TracChangeset
for help on using the changeset viewer.