- Timestamp:
- Jun 9, 2022, 2:26:43 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- c06551b
- Parents:
- db7a3ad (diff), 430ce61 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- benchmark/io
- Files:
-
- 3 added
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
rdb7a3ad raeb20a4 37 37 options.cfa \ 38 38 options.hfa \ 39 printer.cfa \ 40 printer.hfa \ 39 41 protocol.cfa \ 40 42 protocol.hfa \ -
benchmark/io/http/main.cfa
rdb7a3ad raeb20a4 25 25 #include "options.hfa" 26 26 #include "socket.hfa" 27 #include "printer.hfa" 27 28 #include "worker.hfa" 28 29 … … 31 32 Duration default_preemption() { 32 33 return 0; 33 }34 35 //=============================================================================================36 // Stats Printer37 //============================================================================================='38 39 thread StatsPrinter {40 Worker * workers;41 int worker_cnt;42 condition_variable(fast_block_lock) var;43 };44 45 void ?{}( StatsPrinter & this, cluster & cl ) {46 ((thread&)this){ "Stats Printer Thread", cl };47 this.worker_cnt = 0;48 }49 50 void ^?{}( StatsPrinter & mutex this ) {}51 52 #define eng3(X) (ws(3, 3, unit(eng( X ))))53 54 void main(StatsPrinter & this) {55 LOOP: for() {56 waitfor( ^?{} : this) {57 break LOOP;58 }59 or else {}60 61 wait(this.var, 10`s);62 63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );64 if(this.worker_cnt != 0) {65 uint64_t tries = 0;66 uint64_t calls = 0;67 uint64_t header = 0;68 uint64_t splcin = 0;69 uint64_t splcot = 0;70 struct {71 volatile uint64_t calls;72 volatile uint64_t bytes;73 } avgrd[zipf_cnts];74 memset(avgrd, 0, sizeof(avgrd));75 76 for(i; this.worker_cnt) {77 tries += this.workers[i].stats.sendfile.tries;78 calls += this.workers[i].stats.sendfile.calls;79 header += this.workers[i].stats.sendfile.header;80 splcin += this.workers[i].stats.sendfile.splcin;81 splcot += this.workers[i].stats.sendfile.splcot;82 for(j; zipf_cnts) {83 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;84 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;85 }86 }87 88 double ratio = ((double)tries) / calls;89 90 sout | "----- Worker Stats -----";91 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";92 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";93 sout | " - zipf sizes:";94 for(i; zipf_cnts) {95 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;96 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";97 }98 }99 else {100 sout | "No Workers!";101 }102 }103 34 } 104 35 … … 109 40 cluster self; 110 41 processor * procs; 111 // io_context * ctxs;112 StatsPrinter * prnt;113 42 114 43 }; … … 152 81 } 153 82 154 if(options.stats) {155 this.prnt = alloc();156 (*this.prnt){ this.self };157 } else {158 this.prnt = 0p;159 }160 161 83 #if !defined(__CFA_NO_STATISTICS__) 162 84 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); 163 85 #endif 164 86 165 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 166 options.clopts.cltr_cnt++; 87 options.clopts.instance = &this.self; 167 88 } 168 89 169 90 void ^?{}( ServerCluster & this ) { 170 delete(this.prnt);171 172 91 for(i; options.clopts.nprocs) { 173 92 ^(this.procs[i]){}; … … 180 99 extern void init_protocol(void); 181 100 extern void deinit_protocol(void); 101 102 //============================================================================================= 103 // REUSEPORT 104 //============================================================================================= 105 106 size_t sockarr_size; 107 struct __attribute__((aligned(128))) Q { 108 mpsc_queue(PendingRead) q; 109 }; 182 110 183 111 //============================================================================================= … … 235 163 236 164 int server_fd; 237 if(!options.socket.manyreuse) {238 server_fd = listener(address, addrlen);239 }240 165 241 166 //=================== … … 257 182 { 258 183 // Stats printer makes a copy so this needs to persist longer than normal 259 Worker * workers; 260 ServerCluster cl[options.clopts.nclusters]; 184 connection ** conns; 185 AcceptWorker * aworkers = 0p; 186 ChannelWorker * cworkers = 0p; 187 Acceptor * acceptors = 0p; 188 Q * queues = 0p; 189 ServerCluster cl; 190 191 if(options.stats) { 192 stats_thrd = alloc(); 193 (*stats_thrd){ cl.self }; 194 } else { 195 stats_thrd = 0p; 196 } 261 197 262 198 init_protocol(); 263 199 { 264 workers = anew(options.clopts.nworkers); 265 cl[0].prnt->workers = workers; 266 cl[0].prnt->worker_cnt = options.clopts.nworkers; 267 for(i; options.clopts.nworkers) { 268 // if( options.file_cache.fixed_fds ) { 269 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 270 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 271 // } 272 // else 273 { 274 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 275 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 276 workers[i].sockfd = options.socket.manyreuse ? listener(address, addrlen) : server_fd; 277 workers[i].addr = (struct sockaddr *)&address; 278 workers[i].addrlen = (socklen_t*)&addrlen; 279 workers[i].flags = 0; 280 } 281 unpark( workers[i] ); 282 } 283 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 284 for(i; options.clopts.nclusters) { 285 sout | options.clopts.thrd_cnt[i] | nonl; 286 } 200 conns = alloc(options.clopts.nworkers); 201 if(options.socket.reuseport) { 202 queues = alloc(options.clopts.nprocs); 203 acceptors = anew(options.clopts.nprocs); 204 for(i; options.clopts.nprocs) { 205 (queues[i]){}; 206 { 207 acceptors[i].sockfd = listener(address, addrlen); 208 acceptors[i].addr = (struct sockaddr *)&address; 209 acceptors[i].addrlen = (socklen_t*)&addrlen; 210 acceptors[i].flags = 0; 211 acceptors[i].queue = &queues[i].q; 212 } 213 unpark( acceptors[i] ); 214 } 215 216 cworkers = anew(options.clopts.nworkers); 217 for(i; options.clopts.nworkers) { 218 { 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % options.clopts.nprocs].q; 222 conns[i] = &cworkers[i].conn; 223 } 224 unpark( cworkers[i] ); 225 } 226 } 227 else { 228 server_fd = listener(address, addrlen); 229 aworkers = anew(options.clopts.nworkers); 230 for(i; options.clopts.nworkers) { 231 // if( options.file_cache.fixed_fds ) { 232 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 233 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 234 // } 235 // else 236 { 237 aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 238 aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 239 aworkers[i].sockfd = server_fd; 240 aworkers[i].addr = (struct sockaddr *)&address; 241 aworkers[i].addrlen = (socklen_t*)&addrlen; 242 aworkers[i].flags = 0; 243 conns[i] = &aworkers[i].conn; 244 } 245 unpark( aworkers[i] ); 246 } 247 } 248 249 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 287 250 sout | nl; 288 251 { … … 307 270 } 308 271 309 sout | "Notifying connections..." | nonl; flush( sout ); 310 for(i; options.clopts.nworkers) { 311 workers[i].done = true; 312 } 313 sout | "done"; 314 315 sout | "Shutting down socket..." | nonl; flush( sout ); 316 if(options.socket.manyreuse) { 317 for(i; options.clopts.nworkers) { 318 ret = shutdown( workers[i].sockfd, SHUT_RD ); 319 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 272 //=================== 273 // Close Socket and join 274 if(options.socket.reuseport) { 275 sout | "Notifying connections..." | nonl; flush( sout ); 276 for(i; options.clopts.nprocs) { 277 acceptors[i].done = true; 278 } 279 for(i; options.clopts.nworkers) { 280 cworkers[i].done = true; 281 } 282 sout | "done"; 283 284 sout | "Shutting down Socket..." | nonl; flush( sout ); 285 for(i; options.clopts.nprocs) { 286 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 287 if( ret < 0 ) { 288 abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) ); 289 } 290 } 291 sout | "done"; 292 293 sout | "Closing Socket..." | nonl; flush( sout ); 294 for(i; options.clopts.nprocs) { 295 ret = close( acceptors[i].sockfd ); 296 if( ret < 0) { 297 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 298 } 299 } 300 sout | "done"; 301 302 sout | "Stopping accept threads..." | nonl; flush( sout ); 303 for(i; options.clopts.nprocs) { 304 join(acceptors[i]); 305 } 306 sout | "done"; 307 308 sout | "Draining worker queues..." | nonl; flush( sout ); 309 for(i; options.clopts.nprocs) { 310 PendingRead * p = 0p; 311 while(p = pop(queues[i].q)) { 312 fulfil(p->f, -ECONNRESET); 313 } 314 } 315 sout | "done"; 316 317 sout | "Stopping worker threads..." | nonl; flush( sout ); 318 for(i; options.clopts.nworkers) { 319 for(j; 2) { 320 ret = close(cworkers[i].conn.pipe[j]); 321 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 322 } 323 join(cworkers[i]); 320 324 } 321 325 } 322 326 else { 327 sout | "Notifying connections..." | nonl; flush( sout ); 328 for(i; options.clopts.nworkers) { 329 aworkers[i].done = true; 330 } 331 sout | "done"; 332 333 sout | "Shutting down Socket..." | nonl; flush( sout ); 323 334 ret = shutdown( server_fd, SHUT_RD ); 324 335 if( ret < 0 ) { 325 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 326 } 327 } 328 sout | "done"; 329 330 //=================== 331 // Close Socket 332 sout | "Closing Socket..." | nonl; flush( sout ); 333 if(options.socket.manyreuse) { 334 for(i; options.clopts.nworkers) { 335 ret = close(workers[i].sockfd); 336 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 337 } 338 } 339 else { 336 abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) ); 337 } 338 sout | "done"; 339 340 sout | "Closing Socket..." | nonl; flush( sout ); 340 341 ret = close( server_fd ); 341 342 if(ret < 0) { 342 343 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 343 344 } 344 }345 sout | "done"; 346 347 sout | "Stopping connection threads..." | nonl; flush( sout );348 for(i; options.clopts.nworkers) {349 for(j; 2) {350 ret = close(workers[i].pipe[j]);351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );352 }353 join(workers[i]);345 sout | "done"; 346 347 sout | "Stopping connection threads..." | nonl; flush( sout ); 348 for(i; options.clopts.nworkers) { 349 for(j; 2) { 350 ret = close(aworkers[i].conn.pipe[j]); 351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 352 } 353 join(aworkers[i]); 354 } 354 355 } 355 356 } … … 361 362 362 363 sout | "Stopping printer threads..." | nonl; flush( sout ); 363 for(i; options.clopts.nclusters) { 364 StatsPrinter * p = cl[i].prnt; 365 if(p) { 366 notify_one(p->var); 367 join(*p); 368 } 369 } 364 if(stats_thrd) { 365 notify_one(stats_thrd->var); 366 } 367 delete(stats_thrd); 370 368 sout | "done"; 371 369 372 370 // Now that the stats printer is stopped, we can reclaim this 373 adelete(workers); 371 adelete(aworkers); 372 adelete(cworkers); 373 adelete(acceptors); 374 adelete(queues); 375 free(conns); 374 376 375 377 sout | "Stopping processors/clusters..." | nonl; flush( sout ); … … 377 379 sout | "done"; 378 380 379 // sout | "Closing splice fds..." | nonl; flush( sout );380 // for(i; pipe_cnt) {381 // ret = close( fds[pipe_off + i] );382 // if(ret < 0) {383 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );384 // }385 // }386 381 free(fds); 387 sout | "done";388 382 389 383 sout | "Stopping processors..." | nonl; flush( sout ); -
benchmark/io/http/options.cfa
rdb7a3ad raeb20a4 38 38 10, // backlog 39 39 1024, // buflen 40 false, // onereuse 41 false // manyreuse 40 false // reuseport 42 41 }, 43 42 44 43 { // cluster 45 1, // nclusters;46 44 1, // nprocs; 47 45 1, // nworkers; … … 54 52 55 53 void parse_options( int argc, char * argv[] ) { 56 // bool fixedfd = false;57 // bool sqkpoll = false;58 // bool iokpoll = false;59 54 unsigned nentries = 0; 60 bool isolate = false;61 62 63 55 static cfa_option opt[] = { 64 56 { 'p', "port", "Port the server will listen on", options.socket.port}, 65 57 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 66 58 { 't', "threads", "Number of worker threads to use", options.clopts.nworkers}, 67 {'\0', "isolate", "Create one cluster per processor", isolate, parse_settrue},68 59 {'\0', "log", "Enable logs", options.log, parse_settrue}, 69 60 {'\0', "sout", "Redirect standard out to file", options.reopen_stdout}, … … 72 63 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 73 64 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 74 {'\0', "reuseport-one", "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue}, 75 {'\0', "reuseport", "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue}, 65 {'\0', "reuseport", "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue}, 76 66 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 77 67 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, … … 101 91 nentries = v; 102 92 } 103 if(isolate) {104 options.clopts.nclusters = options.clopts.nprocs;105 options.clopts.nprocs = 1;106 }107 93 options.clopts.params.num_entries = nentries; 108 options.clopts.instance = alloc(options.clopts.nclusters); 109 options.clopts.thrd_cnt = alloc(options.clopts.nclusters); 110 options.clopts.cltr_cnt = 0; 111 for(i; options.clopts.nclusters) { 112 options.clopts.thrd_cnt[i] = 0; 113 } 94 options.clopts.instance = 0p; 95 options.clopts.thrd_cnt = 0; 114 96 115 97 -
benchmark/io/http/options.hfa
rdb7a3ad raeb20a4 27 27 int backlog; 28 28 int buflen; 29 bool onereuse; 30 bool manyreuse; 29 bool reuseport; 31 30 } socket; 32 31 33 32 struct { 34 int nclusters;35 33 int nprocs; 36 34 int nworkers; … … 38 36 bool procstats; 39 37 bool viewhalts; 40 cluster ** instance; 41 size_t * thrd_cnt; 42 size_t cltr_cnt; 38 cluster * instance; 39 size_t thrd_cnt; 43 40 } clopts; 44 41 }; -
benchmark/io/http/protocol.cfa
rdb7a3ad raeb20a4 30 30 #define PLAINTEXT_NOCOPY 31 31 #define LINKED_IO 32 33 static inline __s32 wait_res( io_future_t & this ) { 34 wait( this ); 35 if( this.result < 0 ) {{ 36 errno = -this.result; 37 return -1; 38 }} 39 return this.result; 40 } 32 41 33 42 struct https_msg_str { … … 470 479 471 480 if(is_error(splice_in.res)) { 481 if(splice_in.res.error == -EPIPE) return -ECONNRESET; 472 482 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 473 483 close(fd); … … 503 513 } 504 514 505 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len) {515 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) { 506 516 char * it = buffer; 507 517 size_t count = len - 1; … … 509 519 READ: 510 520 for() { 511 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 521 int ret; 522 if( f ) { 523 ret = wait_res(*f); 524 reset(*f); 525 f = 0p; 526 } else { 527 ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 528 } 512 529 // int ret = read(fd, (void*)it, count); 513 530 if(ret == 0 ) return [OK200, true, 0, 0]; … … 570 587 571 588 void ?{}( DateFormater & this ) { 572 ((thread&)this){ "Server Date Thread", *options.clopts.instance [0]};589 ((thread&)this){ "Server Date Thread", *options.clopts.instance }; 573 590 this.idx = 0; 574 591 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); -
benchmark/io/http/protocol.hfa
rdb7a3ad raeb20a4 1 1 #pragma once 2 2 3 struct io_future_t; 3 4 struct sendfile_stats_t; 4 5 … … 22 23 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 23 24 24 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len);25 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f); -
benchmark/io/http/socket.cfa
rdb7a3ad raeb20a4 26 26 27 27 int listener(struct sockaddr_in & address, int addrlen) { 28 int sockfd = socket(AF_INET, SOCK_STREAM, 0); 28 int type = SOCK_STREAM; 29 if(options.socket.reuseport) type |= SOCK_NONBLOCK; 30 int sockfd = socket(AF_INET, type, 0); 29 31 if(sockfd < 0) { 30 32 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) ); 31 33 } 32 34 33 if(options.socket. onereuse || options.socket.manyreuse) {35 if(options.socket.reuseport) { 34 36 int value = 1; 35 37 // if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&on, sizeof(on))) -
benchmark/io/http/worker.cfa
rdb7a3ad raeb20a4 8 8 #include <fstream.hfa> 9 9 #include <iofwd.hfa> 10 #include <mutex_stmt.hfa> 10 11 11 12 #include "options.hfa" … … 14 15 15 16 //============================================================================================= 16 // Worker Thread 17 //============================================================================================= 18 void ?{}( Worker & this ) { 19 size_t cli = rand() % options.clopts.cltr_cnt; 20 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 21 options.clopts.thrd_cnt[cli]++; 22 this.pipe[0] = -1; 23 this.pipe[1] = -1; 17 // Generic connection handling 18 //============================================================================================= 19 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) { 20 REQUEST: 21 for() { 22 bool closed; 23 HttpCode code; 24 const char * file; 25 size_t name_size; 26 27 // Read the http request 28 if( options.log ) sout | "=== Reading request ==="; 29 [code, closed, file, name_size] = http_read(fd, buffer, len, f); 30 f = 0p; 31 32 // if we are done, break out of the loop 33 if( closed ) break REQUEST; 34 35 // If this wasn't a request retrun 400 36 if( code != OK200 ) { 37 sout | "=== Invalid Request :" | code_val(code) | "==="; 38 answer_error(fd, code); 39 continue REQUEST; 40 } 41 42 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 43 if( options.log ) sout | "=== Request for /plaintext ==="; 44 45 int ret = answer_plaintext(fd); 46 if( ret == -ECONNRESET ) break REQUEST; 47 48 if( options.log ) sout | "=== Answer sent ==="; 49 continue REQUEST; 50 } 51 52 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 53 if( options.log ) sout | "=== Request for /ping ==="; 54 55 // Send the header 56 int ret = answer_empty(fd); 57 if( ret == -ECONNRESET ) break REQUEST; 58 59 if( options.log ) sout | "=== Answer sent ==="; 60 continue REQUEST; 61 } 62 63 if( options.log ) { 64 sout | "=== Request for file " | nonl; 65 write(sout, file, name_size); 66 sout | " ==="; 67 } 68 69 if( !options.file_cache.path ) { 70 if( options.log ) { 71 sout | "=== File Not Found (" | nonl; 72 write(sout, file, name_size); 73 sout | ") ==="; 74 } 75 answer_error(fd, E405); 76 continue REQUEST; 77 } 78 79 // Get the fd from the file cache 80 int ans_fd; 81 size_t count; 82 [ans_fd, count] = get_file( file, name_size ); 83 84 // If we can't find the file, return 404 85 if( ans_fd < 0 ) { 86 if( options.log ) { 87 sout | "=== File Not Found (" | nonl; 88 write(sout, file, name_size); 89 sout | ") ==="; 90 } 91 answer_error(fd, E404); 92 continue REQUEST; 93 } 94 95 // Send the desired file 96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 97 if( ret == -ECONNRESET ) break REQUEST; 98 99 if( options.log ) sout | "=== Answer sent ==="; 100 } 101 102 if (stats_thrd) { 103 unsigned long long next = rdtscl(); 104 if(next > (last + 500000000)) { 105 if(try_lock(stats_thrd->stats.lock)) { 106 push(this.stats.sendfile, stats_thrd->stats.send); 107 unlock(stats_thrd->stats.lock); 108 last = next; 109 } 110 } 111 } 112 } 113 114 //============================================================================================= 115 // Self Accepting Worker Thread 116 //============================================================================================= 117 void ?{}( AcceptWorker & this ) { 118 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 }; 119 options.clopts.thrd_cnt++; 24 120 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 35 } 36 37 extern "C" { 38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 39 } 40 41 void main( Worker & this ) { 121 } 122 123 void main( AcceptWorker & this ) { 42 124 park(); 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 const bool reuse = options.socket.manyreuse; 47 48 CONNECTION: 125 unsigned long long last = rdtscl(); 126 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 127 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 49 128 for() { 50 129 if( options.log ) sout | "=== Accepting connection ==="; 51 int fd = cfa_accept4( this. [sockfd,addr, addrlen, flags], CFA_IO_LAZY );130 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY ); 52 131 if(fd < 0) { 53 132 if( errno == ECONNABORTED ) break; … … 58 137 59 138 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 60 REQUEST: 61 for() { 62 bool closed; 63 HttpCode code; 64 const char * file; 65 size_t name_size; 66 67 // Read the http request 68 size_t len = options.socket.buflen; 69 char buffer[len]; 70 if( options.log ) sout | "=== Reading request ==="; 71 [code, closed, file, name_size] = http_read(fd, buffer, len); 72 73 // if we are done, break out of the loop 74 if( closed ) break REQUEST; 75 76 // If this wasn't a request retrun 400 77 if( code != OK200 ) { 78 sout | "=== Invalid Request :" | code_val(code) | "==="; 79 answer_error(fd, code); 80 continue REQUEST; 81 } 82 83 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 84 if( options.log ) sout | "=== Request for /plaintext ==="; 85 86 int ret = answer_plaintext(fd); 87 if( ret == -ECONNRESET ) break REQUEST; 88 89 if( options.log ) sout | "=== Answer sent ==="; 90 continue REQUEST; 91 } 92 93 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 94 if( options.log ) sout | "=== Request for /ping ==="; 95 96 // Send the header 97 int ret = answer_empty(fd); 98 if( ret == -ECONNRESET ) break REQUEST; 99 100 if( options.log ) sout | "=== Answer sent ==="; 101 continue REQUEST; 102 } 103 104 if( options.log ) { 105 sout | "=== Request for file " | nonl; 106 write(sout, file, name_size); 107 sout | " ==="; 108 } 109 110 if( !options.file_cache.path ) { 111 if( options.log ) { 112 sout | "=== File Not Found (" | nonl; 113 write(sout, file, name_size); 114 sout | ") ==="; 139 size_t len = options.socket.buflen; 140 char buffer[len]; 141 handle_connection( this.conn, fd, buffer, len, 0p, last ); 142 143 if( options.log ) sout | "=== Connection closed ==="; 144 } 145 } 146 147 148 //============================================================================================= 149 // Channel Worker Thread 150 //============================================================================================= 151 void ?{}( ChannelWorker & this ) { 152 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 }; 153 options.clopts.thrd_cnt++; 154 this.done = false; 155 } 156 157 void main( ChannelWorker & this ) { 158 park(); 159 unsigned long long last = rdtscl(); 160 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 161 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 162 for() { 163 size_t len = options.socket.buflen; 164 char buffer[len]; 165 PendingRead p; 166 p.in.buf = (void*)buffer; 167 p.in.len = len; 168 push(*this.queue, &p); 169 170 if( options.log ) sout | "=== Waiting new connection ==="; 171 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); 172 173 if( options.log ) sout | "=== Connection closed ==="; 174 if(this.done) break; 175 } 176 } 177 178 extern "C" { 179 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 180 } 181 182 void ?{}( Acceptor & this ) { 183 ((thread&)this){ "Server Worker Thread", *options.clopts.instance, 64000 }; 184 options.clopts.thrd_cnt++; 185 this.done = false; 186 } 187 188 void main( Acceptor & this ) { 189 park(); 190 unsigned long long last = rdtscl(); 191 if( options.log ) sout | "=== Accepting connection ==="; 192 for() { 193 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]); 194 if(fd < 0) { 195 if( errno == EWOULDBLOCK) { 196 this.stats.eagains++; 197 yield(); 198 continue; 199 } 200 if( errno == ECONNABORTED ) break; 201 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 202 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 203 } 204 this.stats.accepts++; 205 206 if(this.done) return; 207 208 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 209 210 if(fd) { 211 PendingRead * p = 0p; 212 for() { 213 if(this.done) return; 214 p = pop(*this.queue); 215 if(p) break; 216 yield(); 217 this.stats.creates++; 218 }; 219 220 p->out.fd = fd; 221 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY); 222 } 223 224 if (stats_thrd) { 225 unsigned long long next = rdtscl(); 226 if(next > (last + 500000000)) { 227 if(try_lock(stats_thrd->stats.lock)) { 228 push(this.stats, stats_thrd->stats.accpt); 229 unlock(stats_thrd->stats.lock); 230 last = next; 115 231 } 116 answer_error(fd, E405); 117 continue REQUEST; 118 } 119 120 // Get the fd from the file cache 121 int ans_fd; 122 size_t count; 123 [ans_fd, count] = get_file( file, name_size ); 124 125 // If we can't find the file, return 404 126 if( ans_fd < 0 ) { 127 if( options.log ) { 128 sout | "=== File Not Found (" | nonl; 129 write(sout, file, name_size); 130 sout | ") ==="; 131 } 132 answer_error(fd, E404); 133 continue REQUEST; 134 } 135 136 // Send the desired file 137 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 138 if( ret == -ECONNRESET ) break REQUEST; 139 140 if( options.log ) sout | "=== Answer sent ==="; 141 } 142 143 if( options.log ) sout | "=== Connection closed ==="; 144 continue CONNECTION; 145 } 146 } 232 } 233 } 234 235 if( options.log ) sout | "=== Accepting connection ==="; 236 } 237 } -
benchmark/io/http/worker.hfa
rdb7a3ad raeb20a4 1 1 #pragma once 2 2 3 #include <iofwd.hfa> 4 #include <queueLockFree.hfa> 3 5 #include <thread.hfa> 4 6 … … 7 9 } 8 10 11 #include "printer.hfa" 12 9 13 //============================================================================================= 10 14 // Worker Thread 11 15 //============================================================================================= 12 16 13 extern const size_t zipf_sizes[]; 14 enum { zipf_cnts = 36, }; 15 16 struct sendfile_stats_t { 17 volatile uint64_t calls; 18 volatile uint64_t tries; 19 volatile uint64_t header; 20 volatile uint64_t splcin; 21 volatile uint64_t splcot; 17 struct connection { 18 int pipe[2]; 22 19 struct { 23 volatile uint64_t calls; 24 volatile uint64_t bytes; 25 } avgrd[zipf_cnts]; 20 sendfile_stats_t sendfile; 21 } stats; 26 22 }; 27 23 28 thread Worker { 29 int pipe[2]; 24 static inline void ?{}( connection & this ) { 25 this.pipe[0] = -1; 26 this.pipe[1] = -1; 27 } 28 29 thread AcceptWorker { 30 connection conn; 30 31 int sockfd; 31 32 struct sockaddr * addr; … … 33 34 int flags; 34 35 volatile bool done; 36 }; 37 void ?{}( AcceptWorker & this); 38 void main( AcceptWorker & ); 39 40 41 struct PendingRead { 42 PendingRead * volatile next; 43 io_future_t f; 35 44 struct { 36 sendfile_stats_t sendfile; 37 } stats; 45 void * buf; 46 size_t len; 47 } in; 48 struct { 49 volatile int fd; 50 } out; 38 51 }; 39 void ?{}( Worker & this); 40 void main( Worker & ); 52 53 static inline PendingRead * volatile & ?`next ( PendingRead * node ) { 54 return node->next; 55 } 56 57 thread ChannelWorker { 58 connection conn; 59 volatile bool done; 60 mpsc_queue(PendingRead) * queue; 61 }; 62 void ?{}( ChannelWorker & ); 63 void main( ChannelWorker & ); 64 65 thread Acceptor { 66 mpsc_queue(PendingRead) * queue; 67 int sockfd; 68 struct sockaddr * addr; 69 socklen_t * addrlen; 70 int flags; 71 volatile bool done; 72 acceptor_stats_t stats; 73 }; 74 void ?{}( Acceptor & ); 75 void main( Acceptor & );
Note: See TracChangeset
for help on using the changeset viewer.