Changeset 3f95dab
- Timestamp:
- Jul 28, 2022, 12:04:19 PM (2 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation
- Children:
- c4c8571
- Parents:
- 7ce8873
- Location:
- benchmark/io/http
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
r7ce8873 r3f95dab 302 302 sout | "done"; 303 303 304 //=================== 305 // Close Files 306 if( options.file_cache.path ) { 307 sout | "Closing open files..." | nonl; flush( sout ); 308 close_cache(); 309 sout | "done"; 310 } 311 304 312 sout | "Stopping accept threads..." | nonl; flush( sout ); 305 313 for(i; nacceptors) { … … 346 354 } 347 355 sout | "done"; 356 357 //=================== 358 // Close Files 359 if( options.file_cache.path ) { 360 sout | "Closing open files..." | nonl; flush( sout ); 361 close_cache(); 362 sout | "done"; 363 } 348 364 349 365 sout | "Stopping connection threads..." | nonl; flush( sout ); … … 386 402 } 387 403 sout | "done"; 388 389 //===================390 // Close Files391 if( options.file_cache.path ) {392 sout | "Closing open files..." | nonl; flush( sout );393 close_cache();394 sout | "done";395 }396 404 } 397 405 -
benchmark/io/http/protocol.cfa
r7ce8873 r3f95dab 29 29 #define PLAINTEXT_MEMCPY 30 30 #define PLAINTEXT_NOCOPY 31 #define LINKED_IO31 // #define LINKED_IO 32 32 33 33 static inline __s32 wait_res( io_future_t & this ) { … … 72 72 if( ret < 0 ) { 73 73 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 74 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN;75 74 76 75 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); … … 152 151 } 153 152 154 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 153 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) { 154 int zipf_idx = -1; 155 STATS: for(i; zipf_cnts) { 156 if(count <= zipf_sizes[i]) { 157 zipf_idx = i; 158 break STATS; 159 } 160 } 161 if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file"; 162 163 155 164 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 156 165 off_t offset = 0; 157 166 ssize_t ret; 158 167 SPLICE1: while(count > 0) { 159 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 160 if( ret < 0 ) { 161 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; 168 stats.tries++; 169 // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 170 ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 171 if( ret <= 0 ) { 162 172 if( errno == ECONNRESET ) return -ECONNRESET; 163 173 if( errno == EPIPE ) return -EPIPE; 164 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) ); 165 } 166 174 abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 175 } 167 176 count -= ret; 177 stats.splcin++; 178 if(count > 0) stats.avgrd[zipf_idx].calls++; 179 stats.avgrd[zipf_idx].bytes += ret; 180 168 181 size_t in_pipe = ret; 169 182 SPLICE2: while(in_pipe > 0) { 170 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);171 if( ret < 0 ) {172 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;183 // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 184 ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 185 if( ret <= 0 ) { 173 186 if( errno == ECONNRESET ) return -ECONNRESET; 174 187 if( errno == EPIPE ) return -EPIPE; 175 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );188 abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 176 189 } 190 stats.splcot++; 177 191 in_pipe -= ret; 178 192 } … … 506 520 return len + fsize; 507 521 #else 508 stats.tries++;509 522 int ret = answer_header(fd, fsize); 510 523 if( ret < 0 ) { close(fd); return ret; } 511 return sendfile(pipe, fd, ans_fd, fsize );524 return sendfile(pipe, fd, ans_fd, fsize, stats); 512 525 #endif 513 526 } … … 528 541 } 529 542 // int ret = read(fd, (void*)it, count); 530 if(ret == 0 ) return [OK200, true, 0, 0];543 if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; } 531 544 if(ret < 0 ) { 532 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;533 545 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 534 546 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } -
benchmark/io/http/worker.cfa
r7ce8873 r3f95dab 13 13 #include "protocol.hfa" 14 14 #include "filecache.hfa" 15 16 static const unsigned long long period = 50_000_000; 15 17 16 18 //============================================================================================= … … 95 97 // Send the desired file 96 98 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 97 if( ret == -ECONNRESET ) break REQUEST; 99 if(ret < 0) { 100 if( ret == -ECONNABORTED ) break REQUEST; 101 if( ret == -ECONNRESET ) break REQUEST; 102 if( ret == -EPIPE ) break REQUEST; 103 abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 104 } 98 105 99 106 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; … … 102 109 if (stats_thrd) { 103 110 unsigned long long next = rdtscl(); 104 if(next > (last + 500000000)) {111 if(next > (last + period)) { 105 112 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 106 113 push(this.stats.sendfile, stats_thrd->stats.send); … … 141 148 char buffer[len]; 142 149 handle_connection( this.conn, fd, buffer, len, 0p, last ); 150 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd); 151 this.conn.stats.sendfile.close++; 143 152 144 153 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; … … 162 171 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 163 172 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 173 this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]); 164 174 for() { 165 175 size_t len = options.socket.buflen; … … 173 183 if( options.log ) mutex(sout) sout | "=== Waiting new connection ==="; 174 184 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); 185 if(this.done) break; 186 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd); 187 this.conn.stats.sendfile.close++; 175 188 176 189 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 177 if(this.done) break; 178 } 190 } 191 192 lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2); 193 push(this.conn.stats.sendfile, stats_thrd->stats.send); 194 unlock(stats_thrd->stats.lock); 179 195 } 180 196 … … 198 214 199 215 static inline void push_connection( Acceptor & this, int fd ) { 216 this.stats.accepts++; 200 217 PendingRead * p = 0p; 201 218 for() { … … 212 229 213 230 // #define ACCEPT_SPIN 214 #define ACCEPT_MANY 231 #define ACCEPT_ONE 232 // #define ACCEPT_MANY 215 233 216 234 void main( Acceptor & this ) { … … 232 250 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 233 251 } 234 this.stats.accepts++;235 252 236 253 if(this.done) return; … … 242 259 if (stats_thrd) { 243 260 unsigned long long next = rdtscl(); 244 if(next > (last + 500000000)) { 261 if(next > (last + period)) { 262 if(try_lock(stats_thrd->stats.lock)) { 263 push(this.stats, stats_thrd->stats.accpt); 264 unlock(stats_thrd->stats.lock); 265 last = next; 266 } 267 } 268 } 269 270 if( options.log ) sout | "=== Accepting connection ==="; 271 } 272 273 #elif defined(ACCEPT_ONE) 274 if( options.log ) sout | "=== Accepting connection ==="; 275 for() { 276 int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0); 277 if(fd < 0) { 278 if( errno == ECONNABORTED ) break; 279 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 280 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 281 } 282 283 if(this.done) return; 284 285 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 286 287 if(fd) push_connection(this, fd); 288 289 if (stats_thrd) { 290 unsigned long long next = rdtscl(); 291 if(next > (last + period)) { 245 292 if(try_lock(stats_thrd->stats.lock)) { 246 293 push(this.stats, stats_thrd->stats.accpt); … … 269 316 if (stats_thrd) { 270 317 unsigned long long next = rdtscl(); 271 if(next > (last + 500000000)) {318 if(next > (last + period)) { 272 319 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 273 320 push(this.stats, stats_thrd->stats.accpt); … … 284 331 int fd = get_res(res); 285 332 reset(res); 286 this.stats.accepts++;287 333 if(fd < 0) { 288 334 if( errno == ECONNABORTED ) continue; … … 319 365 #error no accept algorithm specified 320 366 #endif 321 } 367 lock(stats_thrd->stats.lock); 368 push(this.stats, stats_thrd->stats.accpt); 369 unlock(stats_thrd->stats.lock); 370 }
Note: See TracChangeset
for help on using the changeset viewer.