- Timestamp:
- Mar 2, 2021, 5:28:32 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
- Children:
- 6083392
- Parents:
- 182256b (diff), 9eb7a532 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- benchmark/io/http
- Files:
-
- 8 edited
-
http_ring.cpp (modified) (15 diffs)
-
main.cfa (modified) (8 diffs)
-
options.cfa (modified) (7 diffs)
-
options.hfa (modified) (3 diffs)
-
protocol.cfa (modified) (10 diffs)
-
protocol.hfa (modified) (2 diffs)
-
worker.cfa (modified) (5 diffs)
-
worker.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/http_ring.cpp
r182256b r266ecf1 20 20 socklen_t *addrlen; 21 21 int flags; 22 unsigned cnt; 22 23 } acpt; 23 24 … … 67 68 thread_local stats_block_t stats; 68 69 stats_block_t global_stats; 70 71 thread_local struct __attribute__((aligned(128))) { 72 size_t to_submit = 0; 73 } local; 69 74 70 75 // Get an array of current connections … … 192 197 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) { 193 198 (void)ring; 199 local.to_submit++; 194 200 #ifdef USE_ASYNC 195 201 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); … … 406 412 switch(state) { 407 413 case ACCEPTING: 408 connection::accept(ring, opt);414 // connection::accept(ring, opt); 409 415 newconn(ring, res); 410 416 break; … … 420 426 421 427 //========================================================= 428 extern "C" { 429 #include <sys/eventfd.h> // use for termination 430 } 431 422 432 // Main loop of the WebServer 423 433 // Effectively uses one thread_local copy of everything per kernel thread … … 427 437 struct io_uring * ring = opt.ring; 428 438 439 int blockfd = eventfd(0, 0); 440 if (blockfd < 0) { 441 fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) ); 442 exit(EXIT_FAILURE); 443 } 444 445 int ret = io_uring_register_eventfd(ring, blockfd); 446 if (ret < 0) { 447 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) ); 448 exit(EXIT_FAILURE); 449 } 450 429 451 // Track the shutdown using a event_fd 430 452 char endfd_buf[8]; … … 433 455 // Accept our first connection 434 456 // May not take effect until io_uring_submit_and_wait 435 connection::accept(ring, opt); 457 for(unsigned i = 0; i < opt.acpt.cnt; i++) { 458 connection::accept(ring, opt); 459 } 436 460 437 461 int reset = 1; // Counter to print stats once in a while … … 441 465 while(!done) { 442 466 // Submit all the answers we have and wait for responses 443 int ret = io_uring_submit_and_wait(ring, 1); 467 int ret = io_uring_submit(ring); 468 local.to_submit = 0; 444 469 445 470 // check errors … … 452 477 sqes += ret; 453 478 call++; 479 480 481 eventfd_t val; 482 ret = eventfd_read(blockfd, &val); 483 484 // check errors 485 if (ret < 0) { 486 fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) ); 487 exit(EXIT_FAILURE); 488 } 454 489 455 490 struct io_uring_cqe *cqe; … … 463 498 break; 464 499 } 500 501 if(local.to_submit > 30) break; 465 502 466 503 auto req = (class connection *)cqe->user_data; … … 509 546 #include <pthread.h> // for pthreads 510 547 #include <signal.h> // for signal(SIGPIPE, SIG_IGN); 511 #include <sys/eventfd.h> // use for termination512 548 #include <sys/socket.h> // for sockets in general 513 549 #include <netinet/in.h> // for sockaddr_in, AF_INET … … 528 564 unsigned entries = 256; // number of entries per ring/kernel thread 529 565 unsigned backlog = 262144; // backlog argument to listen 566 unsigned preaccept = 1; // start by accepting X per threads 530 567 bool attach = false; // Whether or not to attach all the rings 531 568 bool sqpoll = false; // Whether or not to use SQ Polling … … 534 571 // Arguments Parsing 535 572 int c; 536 while ((c = getopt (argc, argv, "t:p:e:b: aS")) != -1) {573 while ((c = getopt (argc, argv, "t:p:e:b:c:aS")) != -1) { 537 574 switch (c) 538 575 { … … 548 585 case 'b': 549 586 backlog = atoi(optarg); 587 break; 588 case 'c': 589 preaccept = atoi(optarg); 550 590 break; 551 591 case 'a': … … 681 721 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen; 682 722 thrd_opts[i].acpt.flags = 0; 723 thrd_opts[i].acpt.cnt = preaccept; 683 724 thrd_opts[i].endfd = efd; 684 725 thrd_opts[i].ring = &thrd_rings[i].storage; -
benchmark/io/http/main.cfa
r182256b r266ecf1 29 29 30 30 //============================================================================================= 31 // Globals32 //=============================================================================================33 struct ServerProc {34 processor self;35 };36 37 void ?{}( ServerProc & this ) {38 /* paranoid */ assert( options.clopts.instance != 0p );39 (this.self){ "Benchmark Processor", *options.clopts.instance };40 41 #if !defined(__CFA_NO_STATISTICS__)42 if( options.clopts.procstats ) {43 print_stats_at_exit( this.self, options.clopts.instance->print_stats );44 }45 if( options.clopts.viewhalts ) {46 print_halts( this.self );47 }48 #endif49 }50 51 extern void init_protocol(void);52 extern void deinit_protocol(void);53 54 //=============================================================================================55 31 // Stats Printer 56 32 //=============================================================================================' … … 58 34 thread StatsPrinter {}; 59 35 60 void ?{}( StatsPrinter & this ) { 61 ((thread&)this){ "Stats Printer Thread" }; 62 } 36 void ?{}( StatsPrinter & this, cluster & cl ) { 37 ((thread&)this){ "Stats Printer Thread", cl }; 38 } 39 40 void ^?{}( StatsPrinter & mutex this ) {} 63 41 64 42 void main(StatsPrinter & this) { … … 71 49 sleep(10`s); 72 50 73 print_stats_now( *options.clopts.instance, CFA_STATS_READY_Q | CFA_STATS_IO ); 74 } 75 } 51 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 52 } 53 } 54 55 //============================================================================================= 56 // Globals 57 //============================================================================================= 58 struct ServerCluster { 59 cluster self; 60 processor * procs; 61 // io_context * ctxs; 62 StatsPrinter * prnt; 63 64 }; 65 66 void ?{}( ServerCluster & this ) { 67 (this.self){ "Server Cluster", options.clopts.params }; 68 69 this.procs = alloc(options.clopts.nprocs); 70 for(i; options.clopts.nprocs) { 71 (this.procs[i]){ "Benchmark Processor", this.self }; 72 73 #if !defined(__CFA_NO_STATISTICS__) 74 if( options.clopts.procstats ) { 75 print_stats_at_exit( *this.procs, this.self.print_stats ); 76 } 77 if( options.clopts.viewhalts ) { 78 print_halts( *this.procs ); 79 } 80 #endif 81 } 82 83 if(options.stats) { 84 this.prnt = alloc(); 85 (*this.prnt){ this.self }; 86 } else { 87 this.prnt = 0p; 88 } 89 90 #if !defined(__CFA_NO_STATISTICS__) 91 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); 92 #endif 93 94 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 95 options.clopts.cltr_cnt++; 96 } 97 98 void ^?{}( ServerCluster & this ) { 99 delete(this.prnt); 100 101 for(i; options.clopts.nprocs) { 102 ^(this.procs[i]){}; 103 } 104 free(this.procs); 105 106 ^(this.self){}; 107 } 108 109 extern void init_protocol(void); 110 extern void deinit_protocol(void); 76 111 77 112 //============================================================================================= … … 137 172 // Run Server Cluster 138 173 { 139 cluster cl = { "Server Cluster", options.clopts.params };140 #if !defined(__CFA_NO_STATISTICS__)141 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );142 #endif143 options.clopts.instance = &cl;144 145 146 174 int pipe_cnt = options.clopts.nworkers * 2; 147 175 int pipe_off; … … 153 181 } 154 182 155 if(options.file_cache.path && options.file_cache.fixed_fds) {156 register_fixed_files(cl, fds, pipe_off);157 }183 // if(options.file_cache.path && options.file_cache.fixed_fds) { 184 // register_fixed_files(cl, fds, pipe_off); 185 // } 158 186 159 187 { 160 ServerProc procs[options.clopts.nprocs]; 161 StatsPrinter printer; 188 ServerCluster cl[options.clopts.nclusters]; 162 189 163 190 init_protocol(); … … 180 207 unpark( workers[i] ); 181 208 } 182 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 209 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 210 for(i; options.clopts.nclusters) { 211 sout | options.clopts.thrd_cnt[i] | nonl; 212 } 213 sout | nl; 183 214 { 184 215 char buffer[128]; 185 while(int ret = cfa_read(0, buffer, 128, 0, -1`s, 0p, 0p); ret != 0) { 216 for() { 217 int ret = cfa_read(0, buffer, 128, 0); 218 if(ret == 0) break; 186 219 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 220 sout | "User wrote '" | "" | nonl; 221 write(sout, buffer, ret - 1); 222 sout | "'"; 187 223 } 188 224 … … 193 229 for(i; options.clopts.nworkers) { 194 230 workers[i].done = true; 195 cancel(workers[i].cancel);196 231 } 197 232 sout | "done"; … … 221 256 sout | "done"; 222 257 223 sout | "Stopping processors ..." | nonl; flush( sout );258 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 224 259 } 225 260 sout | "done"; -
benchmark/io/http/options.cfa
r182256b r266ecf1 13 13 #include <kernel.hfa> 14 14 #include <parseargs.hfa> 15 #include <stdlib.hfa> 15 16 16 17 #include <stdlib.h> … … 19 20 Options options @= { 20 21 false, // log 22 false, // stats 21 23 22 24 { // file_cache … … 36 38 37 39 { // cluster 40 1, // nclusters; 38 41 1, // nprocs; 39 42 1, // nworkers; … … 46 49 47 50 void parse_options( int argc, char * argv[] ) { 48 bool subthrd = false; 49 bool eagrsub = false; 50 bool fixedfd = false; 51 bool sqkpoll = false; 52 bool iokpoll = false; 53 unsigned sublen = 16; 51 // bool fixedfd = false; 52 // bool sqkpoll = false; 53 // bool iokpoll = false; 54 54 unsigned nentries = 16; 55 bool isolate = false; 55 56 56 57 … … 59 60 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 60 61 { 't', "threads", "Number of worker threads to use", options.clopts.nworkers}, 62 {'\0', "isolate", "Create one cluster per processor", isolate, parse_settrue}, 61 63 {'\0', "log", "Enable logs", options.log, parse_settrue}, 64 {'\0', "stats", "Enable statistics", options.stats, parse_settrue}, 62 65 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 63 66 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, … … 65 68 {'\0', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size }, 66 69 {'\0', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue }, 67 { 's', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue }, 68 { 'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue}, 69 { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue}, 70 { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue }, 71 { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue }, 72 {'\0', "submitlength", "Max number of submitions that can be submitted together", sublen }, 73 {'\0', "numentries", "Number of I/O entries", nentries }, 70 // { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue}, 71 // { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue }, 72 // { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue }, 73 {'e', "numentries", "Number of I/O entries", nentries }, 74 74 75 75 }; … … 91 91 nentries = v; 92 92 } 93 if(isolate) { 94 options.clopts.nclusters = options.clopts.nprocs; 95 options.clopts.nprocs = 1; 96 } 93 97 options.clopts.params.num_entries = nentries; 94 95 options.clopts.params.poller_submits = subthrd; 96 options.clopts.params.eager_submits = eagrsub; 97 98 if( fixedfd ) { 99 options.file_cache.fixed_fds = true; 98 options.clopts.instance = alloc(options.clopts.nclusters); 99 options.clopts.thrd_cnt = alloc(options.clopts.nclusters); 100 options.clopts.cltr_cnt = 0; 101 for(i; options.clopts.nclusters) { 102 options.clopts.thrd_cnt[i] = 0; 100 103 } 101 104 102 if( sqkpoll ) {103 options.clopts.params.poll_submit = true;104 options.file_cache.fixed_fds = true;105 }106 105 107 if( iokpoll ) { 108 options.clopts.params.poll_complete = true; 109 options.file_cache.open_flags |= O_DIRECT; 110 } 106 // if( fixedfd ) { 107 // options.file_cache.fixed_fds = true; 108 // } 111 109 112 options.clopts.params.num_ready = sublen; 110 // if( sqkpoll ) { 111 // options.file_cache.fixed_fds = true; 112 // } 113 114 // if( iokpoll ) { 115 // options.file_cache.open_flags |= O_DIRECT; 116 // } 113 117 114 118 if( left[0] == 0p ) { return; } -
benchmark/io/http/options.hfa
r182256b r266ecf1 9 9 struct Options { 10 10 bool log; 11 bool stats; 11 12 12 13 struct { … … 26 27 27 28 struct { 29 int nclusters; 28 30 int nprocs; 29 31 int nworkers; … … 31 33 bool procstats; 32 34 bool viewhalts; 33 cluster * instance; 35 cluster ** instance; 36 size_t * thrd_cnt; 37 size_t cltr_cnt; 34 38 } clopts; 35 39 }; -
benchmark/io/http/protocol.cfa
r182256b r266ecf1 20 20 #include "options.hfa" 21 21 22 const char * volatile date = 0p; 23 24 const char * http_msgs[] = { 25 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n", 26 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 27 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 28 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 29 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 30 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 31 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 32 }; 22 #define PLAINTEXT_1WRITE 23 #define PLAINTEXT_NOCOPY 24 25 struct https_msg_str { 26 char msg[512]; 27 size_t len; 28 }; 29 30 const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 }; 33 31 34 32 _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 35 33 36 const int http_codes[] = { 34 const int http_codes[KNOWN_CODES] = { 35 200, 37 36 200, 38 37 400, … … 53 52 while(len > 0) { 54 53 // Call write 55 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p); 56 // int ret = write(fd, it, len); 54 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 57 55 if( ret < 0 ) { 58 56 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET; … … 72 70 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); 73 71 int idx = (int)code; 74 return answer( fd, http_msgs[idx] , strlen( http_msgs[idx] ));72 return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len ); 75 73 } 76 74 77 75 int answer_header( int fd, size_t size ) { 78 const char * fmt = http_msgs[OK200]; 79 int len = 200; 80 char buffer[len]; 81 len = snprintf(buffer, len, fmt, date, size); 76 char buffer[512]; 77 char * it = buffer; 78 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 79 it += http_msgs[OK200]->len; 80 int len = http_msgs[OK200]->len; 81 len += snprintf(it, 512 - len, "%d \n\n", size); 82 82 return answer( fd, buffer, len ); 83 83 } 84 84 85 int answer_plain( int fd, char buffer[], size_t size ) { 86 int ret = answer_header(fd, size); 85 #if defined(PLAINTEXT_NOCOPY) 86 int answer_plaintext( int fd ) { 87 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len + 1); // +1 cause snprintf doesn't count nullterminator 88 } 89 #elif defined(PLAINTEXT_1WRITE) 90 int answer_plaintext( int fd ) { 91 char text[] = "Hello, World!\n"; 92 char buffer[512 + sizeof(text)]; 93 char * it = buffer; 94 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 95 it += http_msgs[OK200]->len; 96 int len = http_msgs[OK200]->len; 97 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); 98 it += r; 99 len += r; 100 memcpy(it, text, sizeof(text)); 101 return answer(fd, buffer, len + sizeof(text)); 102 } 103 #else 104 int answer_plaintext( int fd ) { 105 char text[] = "Hello, World!\n"; 106 int ret = answer_header(fd, sizeof(text)); 87 107 if( ret < 0 ) return ret; 88 return answer(fd, buffer, size); 89 } 108 return answer(fd, text, sizeof(text)); 109 } 110 #endif 90 111 91 112 int answer_empty( int fd ) { … … 94 115 95 116 96 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len , io_cancellation * cancel) {117 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 97 118 char * it = buffer; 98 119 size_t count = len - 1; … … 100 121 READ: 101 122 for() { 102 int ret = cfa_re ad(fd, (void*)it, count, 0, -1`s, cancel, 0p);123 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 103 124 // int ret = read(fd, (void*)it, count); 104 125 if(ret == 0 ) return [OK200, true, 0, 0]; … … 139 160 ssize_t ret; 140 161 SPLICE1: while(count > 0) { 141 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p); 142 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 162 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 143 163 if( ret < 0 ) { 144 164 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; … … 152 172 size_t in_pipe = ret; 153 173 SPLICE2: while(in_pipe > 0) { 154 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p); 155 // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 174 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 156 175 if( ret < 0 ) { 157 176 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; … … 173 192 #include <thread.hfa> 174 193 194 const char * original_http_msgs[] = { 195 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ", 196 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n", 197 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 198 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 199 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 200 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 201 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 202 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 203 }; 204 175 205 struct date_buffer { 176 char buff[100];206 https_msg_str strs[KNOWN_CODES]; 177 207 }; 178 208 … … 183 213 184 214 void ?{}( DateFormater & this ) { 185 ((thread&)this){ "Server Date Thread", *options.clopts.instance };215 ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] }; 186 216 this.idx = 0; 187 memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );188 memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );217 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); 218 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) ); 189 219 } 190 220 … … 196 226 or else {} 197 227 228 229 char buff[100]; 198 230 Time now = getTimeNsec(); 199 200 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 201 202 char * next = this.buffers[this.idx].buff; 203 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST); 231 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 232 sout | "Updated date to '" | buff | "'"; 233 234 for(i; KNOWN_CODES) { 235 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff ); 236 this.buffers[this.idx].strs[i].len = len; 237 } 238 239 for(i; KNOWN_CODES) { 240 https_msg_str * next = &this.buffers[this.idx].strs[i]; 241 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST); 242 } 204 243 this.idx = (this.idx + 1) % 2; 244 245 sout | "Date thread sleeping"; 205 246 206 247 sleep(1`s); -
benchmark/io/http/protocol.hfa
r182256b r266ecf1 1 1 #pragma once 2 3 struct io_cancellation;4 2 5 3 enum HttpCode { 6 4 OK200 = 0, 5 OK200_PlainText, 7 6 E400, 8 7 E404, … … 18 17 int answer_error( int fd, HttpCode code ); 19 18 int answer_header( int fd, size_t size ); 20 int answer_plain ( int fd, char buffer [], size_t size);19 int answer_plaintext( int fd ); 21 20 int answer_empty( int fd ); 22 21 23 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len , io_cancellation *);22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); 24 23 25 24 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r182256b r266ecf1 17 17 //============================================================================================= 18 18 void ?{}( Worker & this ) { 19 ((thread&)this){ "Server Worker Thread", *options.clopts.instance }; 19 size_t cli = rand() % options.clopts.cltr_cnt; 20 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli] }; 21 options.clopts.thrd_cnt[cli]++; 20 22 this.pipe[0] = -1; 21 23 this.pipe[1] = -1; … … 35 37 for() { 36 38 if( options.log ) sout | "=== Accepting connection ==="; 37 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 38 // int fd = accept4( this.[sockfd, addr, addrlen, flags] ); 39 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY ); 39 40 if(fd < 0) { 40 41 if( errno == ECONNABORTED ) break; … … 42 43 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 43 44 } 45 if(this.done) break; 44 46 45 47 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; … … 55 57 char buffer[len]; 56 58 if( options.log ) sout | "=== Reading request ==="; 57 [code, closed, file, name_size] = http_read(fd, buffer, len , &this.cancel);59 [code, closed, file, name_size] = http_read(fd, buffer, len); 58 60 59 61 // if we are done, break out of the loop … … 70 72 if( options.log ) sout | "=== Request for /plaintext ==="; 71 73 72 char text[] = "Hello, World!\n"; 73 74 // Send the header 75 int ret = answer_plain(fd, text, sizeof(text)); 74 int ret = answer_plaintext(fd); 76 75 if( ret == -ECONNRESET ) break REQUEST; 77 76 -
benchmark/io/http/worker.hfa
r182256b r266ecf1 17 17 socklen_t * addrlen; 18 18 int flags; 19 io_cancellation cancel;20 19 volatile bool done; 21 20 };
Note:
See TracChangeset
for help on using the changeset viewer.