Changeset 7f0ac12 for benchmark/io/http/worker.cfa
- Timestamp:
- Jun 8, 2022, 7:07:51 PM (23 months ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- bbf61838
- Parents:
- 6e2b04e
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/worker.cfa
r6e2b04e r7f0ac12 8 8 #include <fstream.hfa> 9 9 #include <iofwd.hfa> 10 #include <mutex_stmt.hfa> 10 11 11 12 #include "options.hfa" … … 13 14 #include "filecache.hfa" 14 15 15 //============================================================================================= 16 // Worker Thread 17 //============================================================================================= 18 void ?{}( Worker & this ) { 16 void ?{}( sendfile_stats_t & this ) { 17 this.calls = 0; 18 this.tries = 0; 19 this.header = 0; 20 this.splcin = 0; 21 this.splcot = 0; 22 for(i; zipf_cnts) { 23 this.avgrd[i].calls = 0; 24 this.avgrd[i].bytes = 0; 25 } 26 } 27 28 //============================================================================================= 29 // Generic connection handling 30 //============================================================================================= 31 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f ) { 32 REQUEST: 33 for() { 34 bool closed; 35 HttpCode code; 36 const char * file; 37 size_t name_size; 38 39 // Read the http request 40 if( options.log ) sout | "=== Reading request ==="; 41 [code, closed, file, name_size] = http_read(fd, buffer, len, f); 42 f = 0p; 43 44 // if we are done, break out of the loop 45 if( closed ) break REQUEST; 46 47 // If this wasn't a request retrun 400 48 if( code != OK200 ) { 49 sout | "=== Invalid Request :" | code_val(code) | "==="; 50 answer_error(fd, code); 51 continue REQUEST; 52 } 53 54 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 55 if( options.log ) sout | "=== Request for /plaintext ==="; 56 57 int ret = answer_plaintext(fd); 58 if( ret == -ECONNRESET ) break REQUEST; 59 60 if( options.log ) sout | "=== Answer sent ==="; 61 continue REQUEST; 62 } 63 64 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 65 if( options.log ) sout | "=== Request for /ping ==="; 66 67 // Send the header 68 int ret = answer_empty(fd); 69 if( ret == -ECONNRESET ) break REQUEST; 70 71 if( options.log ) sout | "=== Answer sent ==="; 72 continue REQUEST; 73 } 74 75 if( options.log ) { 76 sout | "=== Request for file " | nonl; 77 write(sout, file, name_size); 78 sout | " ==="; 79 } 80 81 if( !options.file_cache.path ) { 82 if( options.log ) { 83 sout | "=== File Not Found (" | nonl; 84 write(sout, file, name_size); 85 sout | ") ==="; 86 } 87 answer_error(fd, E405); 88 continue REQUEST; 89 } 90 91 // Get the fd from the file cache 92 int ans_fd; 93 size_t count; 94 [ans_fd, count] = get_file( file, name_size ); 95 96 // If we can't find the file, return 404 97 if( ans_fd < 0 ) { 98 if( options.log ) { 99 sout | "=== File Not Found (" | nonl; 100 write(sout, file, name_size); 101 sout | ") ==="; 102 } 103 answer_error(fd, E404); 104 continue REQUEST; 105 } 106 107 // Send the desired file 108 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 109 if( ret == -ECONNRESET ) break REQUEST; 110 111 if( options.log ) sout | "=== Answer sent ==="; 112 } 113 } 114 115 //============================================================================================= 116 // Self Accepting Worker Thread 117 //============================================================================================= 118 void ?{}( AcceptWorker & this ) { 19 119 size_t cli = rand() % options.clopts.cltr_cnt; 20 120 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 21 121 options.clopts.thrd_cnt[cli]++; 22 this.pipe[0] = -1;23 this.pipe[1] = -1;24 122 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 35 } 36 37 extern "C" { 38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 39 } 40 41 void main( Worker & this ) { 123 } 124 125 void main( AcceptWorker & this ) { 42 126 park(); 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 const bool reuse = options.socket.manyreuse; 47 48 CONNECTION: 127 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 128 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 49 129 for() { 50 130 if( options.log ) sout | "=== Accepting connection ==="; 51 int fd = cfa_accept4( this. [sockfd,addr, addrlen, flags], CFA_IO_LAZY );131 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY ); 52 132 if(fd < 0) { 53 133 if( errno == ECONNABORTED ) break; … … 58 138 59 139 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 60 REQUEST: 61 for() { 62 bool closed; 63 HttpCode code; 64 const char * file; 65 size_t name_size; 66 67 // Read the http request 68 size_t len = options.socket.buflen; 69 char buffer[len]; 70 if( options.log ) sout | "=== Reading request ==="; 71 [code, closed, file, name_size] = http_read(fd, buffer, len); 72 73 // if we are done, break out of the loop 74 if( closed ) break REQUEST; 75 76 // If this wasn't a request retrun 400 77 if( code != OK200 ) { 78 sout | "=== Invalid Request :" | code_val(code) | "==="; 79 answer_error(fd, code); 80 continue REQUEST; 140 size_t len = options.socket.buflen; 141 char buffer[len]; 142 handle_connection( this.conn, fd, buffer, len, 0p ); 143 144 if( options.log ) sout | "=== Connection closed ==="; 145 } 146 } 147 148 149 //============================================================================================= 150 // Channel Worker Thread 151 //============================================================================================= 152 void ?{}( ChannelWorker & this ) { 153 size_t cli = rand() % options.clopts.cltr_cnt; 154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 155 options.clopts.thrd_cnt[cli]++; 156 this.done = false; 157 } 158 159 void main( ChannelWorker & this ) { 160 park(); 161 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 162 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 163 for() { 164 size_t len = options.socket.buflen; 165 char buffer[len]; 166 PendingRead p; 167 p.in.buf = (void*)buffer; 168 p.in.len = len; 169 push(*this.queue, &p); 170 171 if( options.log ) sout | "=== Waiting new connection ==="; 172 handle_connection( this.conn, p.out.fd, buffer, len, &p.f ); 173 174 if( options.log ) sout | "=== Connection closed ==="; 175 if(this.done) break; 176 } 177 } 178 179 extern "C" { 180 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 181 } 182 183 void ?{}( Acceptor & this ) { 184 size_t cli = rand() % options.clopts.cltr_cnt; 185 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 186 options.clopts.thrd_cnt[cli]++; 187 this.done = false; 188 } 189 190 void main( Acceptor & this ) { 191 park(); 192 if( options.log ) sout | "=== Accepting connection ==="; 193 for() { 194 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]); 195 if(fd < 0) { 196 if( errno == EWOULDBLOCK) { 197 yield(); 198 continue; 81 199 } 82 83 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 84 if( options.log ) sout | "=== Request for /plaintext ==="; 85 86 int ret = answer_plaintext(fd); 87 if( ret == -ECONNRESET ) break REQUEST; 88 89 if( options.log ) sout | "=== Answer sent ==="; 90 continue REQUEST; 91 } 92 93 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 94 if( options.log ) sout | "=== Request for /ping ==="; 95 96 // Send the header 97 int ret = answer_empty(fd); 98 if( ret == -ECONNRESET ) break REQUEST; 99 100 if( options.log ) sout | "=== Answer sent ==="; 101 continue REQUEST; 102 } 103 104 if( options.log ) { 105 sout | "=== Request for file " | nonl; 106 write(sout, file, name_size); 107 sout | " ==="; 108 } 109 110 if( !options.file_cache.path ) { 111 if( options.log ) { 112 sout | "=== File Not Found (" | nonl; 113 write(sout, file, name_size); 114 sout | ") ==="; 115 } 116 answer_error(fd, E405); 117 continue REQUEST; 118 } 119 120 // Get the fd from the file cache 121 int ans_fd; 122 size_t count; 123 [ans_fd, count] = get_file( file, name_size ); 124 125 // If we can't find the file, return 404 126 if( ans_fd < 0 ) { 127 if( options.log ) { 128 sout | "=== File Not Found (" | nonl; 129 write(sout, file, name_size); 130 sout | ") ==="; 131 } 132 answer_error(fd, E404); 133 continue REQUEST; 134 } 135 136 // Send the desired file 137 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 138 if( ret == -ECONNRESET ) break REQUEST; 139 140 if( options.log ) sout | "=== Answer sent ==="; 141 } 142 143 if( options.log ) sout | "=== Connection closed ==="; 144 continue CONNECTION; 145 } 146 } 200 if( errno == ECONNABORTED ) break; 201 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 202 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 203 } 204 if(this.done) return; 205 206 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 207 208 if(fd) { 209 PendingRead * p = 0p; 210 for() { 211 if(this.done) return; 212 p = pop(*this.queue); 213 if(p) break; 214 yield(); 215 }; 216 217 p->out.fd = fd; 218 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY); 219 } 220 221 if( options.log ) sout | "=== Accepting connection ==="; 222 } 223 }
Note: See TracChangeset
for help on using the changeset viewer.