Changes in / [c4c8571:2af1943]
- Files:
-
- 8 edited
-
benchmark/io/http/main.cfa (modified) (3 diffs)
-
benchmark/io/http/protocol.cfa (modified) (5 diffs)
-
benchmark/io/http/worker.cfa (modified) (13 diffs)
-
libcfa/src/concurrency/io.cfa (modified) (1 diff)
-
libcfa/src/concurrency/io/call.cfa.in (modified) (2 diffs)
-
libcfa/src/concurrency/io/setup.cfa (modified) (2 diffs)
-
libcfa/src/concurrency/stats.cfa (modified) (3 diffs)
-
libcfa/src/concurrency/stats.hfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
rc4c8571 r2af1943 302 302 sout | "done"; 303 303 304 //===================305 // Close Files306 if( options.file_cache.path ) {307 sout | "Closing open files..." | nonl; flush( sout );308 close_cache();309 sout | "done";310 }311 312 304 sout | "Stopping accept threads..." | nonl; flush( sout ); 313 305 for(i; nacceptors) { … … 354 346 } 355 347 sout | "done"; 356 357 //===================358 // Close Files359 if( options.file_cache.path ) {360 sout | "Closing open files..." | nonl; flush( sout );361 close_cache();362 sout | "done";363 }364 348 365 349 sout | "Stopping connection threads..." | nonl; flush( sout ); … … 402 386 } 403 387 sout | "done"; 388 389 //=================== 390 // Close Files 391 if( options.file_cache.path ) { 392 sout | "Closing open files..." | nonl; flush( sout ); 393 close_cache(); 394 sout | "done"; 395 } 404 396 } 405 397 -
benchmark/io/http/protocol.cfa
rc4c8571 r2af1943 29 29 #define PLAINTEXT_MEMCPY 30 30 #define PLAINTEXT_NOCOPY 31 //#define LINKED_IO31 #define LINKED_IO 32 32 33 33 static inline __s32 wait_res( io_future_t & this ) { … … 72 72 if( ret < 0 ) { 73 73 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 74 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; 74 75 75 76 abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); … … 151 152 } 152 153 153 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) { 154 int zipf_idx = -1; 155 STATS: for(i; zipf_cnts) { 156 if(count <= zipf_sizes[i]) { 157 zipf_idx = i; 158 break STATS; 159 } 160 } 161 if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file"; 162 163 154 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 164 155 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 165 156 off_t offset = 0; 166 157 ssize_t ret; 167 158 SPLICE1: while(count > 0) { 168 stats.tries++; 169 // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 170 ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 171 if( ret <= 0 ) { 159 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 160 if( ret < 0 ) { 161 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; 172 162 if( errno == ECONNRESET ) return -ECONNRESET; 173 163 if( errno == EPIPE ) return -EPIPE; 174 abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 175 } 164 abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) ); 165 } 166 176 167 count -= ret; 177 stats.splcin++;178 if(count > 0) stats.avgrd[zipf_idx].calls++;179 stats.avgrd[zipf_idx].bytes += ret;180 181 168 size_t in_pipe = ret; 182 169 SPLICE2: while(in_pipe > 0) { 183 //ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY);184 ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);185 if( ret <= 0 ) {170 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 171 if( ret < 0 ) { 172 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; 186 173 if( errno == ECONNRESET ) return -ECONNRESET; 187 174 if( errno == EPIPE ) return -EPIPE; 188 abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) );175 abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) ); 189 176 } 190 stats.splcot++;191 177 in_pipe -= ret; 192 178 } … … 520 506 return len + fsize; 521 507 #else 508 stats.tries++; 522 509 int ret = answer_header(fd, fsize); 523 510 if( ret < 0 ) { close(fd); return ret; } 524 return sendfile(pipe, fd, ans_fd, fsize , stats);511 return sendfile(pipe, fd, ans_fd, fsize); 525 512 #endif 526 513 } … … 541 528 } 542 529 // int ret = read(fd, (void*)it, count); 543 if(ret == 0 ) { close(fd); return [OK200, true, 0, 0]; }530 if(ret == 0 ) return [OK200, true, 0, 0]; 544 531 if(ret < 0 ) { 532 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 545 533 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 546 534 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } -
benchmark/io/http/worker.cfa
rc4c8571 r2af1943 13 13 #include "protocol.hfa" 14 14 #include "filecache.hfa" 15 16 static const unsigned long long period = 50_000_000;17 15 18 16 //============================================================================================= … … 97 95 // Send the desired file 98 96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 99 if(ret < 0) { 100 if( ret == -ECONNABORTED ) break REQUEST; 101 if( ret == -ECONNRESET ) break REQUEST; 102 if( ret == -EPIPE ) break REQUEST; 103 abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); 104 } 97 if( ret == -ECONNRESET ) break REQUEST; 105 98 106 99 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; … … 109 102 if (stats_thrd) { 110 103 unsigned long long next = rdtscl(); 111 if(next > (last + period)) {104 if(next > (last + 500000000)) { 112 105 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 113 106 push(this.stats.sendfile, stats_thrd->stats.send); … … 148 141 char buffer[len]; 149 142 handle_connection( this.conn, fd, buffer, len, 0p, last ); 150 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd);151 this.conn.stats.sendfile.close++;152 143 153 144 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; … … 171 162 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 172 163 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 173 this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]);174 164 for() { 175 165 size_t len = options.socket.buflen; … … 183 173 if( options.log ) mutex(sout) sout | "=== Waiting new connection ==="; 184 174 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); 175 176 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 185 177 if(this.done) break; 186 this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd); 187 this.conn.stats.sendfile.close++; 188 189 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 190 } 191 192 lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2); 193 push(this.conn.stats.sendfile, stats_thrd->stats.send); 194 unlock(stats_thrd->stats.lock); 178 } 195 179 } 196 180 … … 214 198 215 199 static inline void push_connection( Acceptor & this, int fd ) { 216 this.stats.accepts++;217 200 PendingRead * p = 0p; 218 201 for() { … … 229 212 230 213 // #define ACCEPT_SPIN 231 #define ACCEPT_ONE 232 // #define ACCEPT_MANY 214 #define ACCEPT_MANY 233 215 234 216 void main( Acceptor & this ) { … … 250 232 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 251 233 } 234 this.stats.accepts++; 252 235 253 236 if(this.done) return; … … 259 242 if (stats_thrd) { 260 243 unsigned long long next = rdtscl(); 261 if(next > (last + period)) { 262 if(try_lock(stats_thrd->stats.lock)) { 263 push(this.stats, stats_thrd->stats.accpt); 264 unlock(stats_thrd->stats.lock); 265 last = next; 266 } 267 } 268 } 269 270 if( options.log ) sout | "=== Accepting connection ==="; 271 } 272 273 #elif defined(ACCEPT_ONE) 274 if( options.log ) sout | "=== Accepting connection ==="; 275 for() { 276 int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0); 277 if(fd < 0) { 278 if( errno == ECONNABORTED ) break; 279 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 280 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 281 } 282 283 if(this.done) return; 284 285 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 286 287 if(fd) push_connection(this, fd); 288 289 if (stats_thrd) { 290 unsigned long long next = rdtscl(); 291 if(next > (last + period)) { 244 if(next > (last + 500000000)) { 292 245 if(try_lock(stats_thrd->stats.lock)) { 293 246 push(this.stats, stats_thrd->stats.accpt); … … 316 269 if (stats_thrd) { 317 270 unsigned long long next = rdtscl(); 318 if(next > (last + period)) {271 if(next > (last + 500000000)) { 319 272 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 320 273 push(this.stats, stats_thrd->stats.accpt); … … 331 284 int fd = get_res(res); 332 285 reset(res); 286 this.stats.accepts++; 333 287 if(fd < 0) { 334 288 if( errno == ECONNABORTED ) continue; … … 365 319 #error no accept algorithm specified 366 320 #endif 367 lock(stats_thrd->stats.lock); 368 push(this.stats, stats_thrd->stats.accpt); 369 unlock(stats_thrd->stats.lock); 370 } 321 } -
libcfa/src/concurrency/io.cfa
rc4c8571 r2af1943 432 432 433 433 disable_interrupts(); 434 __STATS__( true, if(!lazy) io.submit.eagr += 1; )435 434 processor * proc = __cfaabi_tls.this_processor; 436 435 $io_context * ctx = proc->io.ctx; -
libcfa/src/concurrency/io/call.cfa.in
rc4c8571 r2af1943 34 34 #include "kernel.hfa" 35 35 #include "io/types.hfa" 36 #include "stats.hfa"37 36 38 37 //============================================================================================= … … 227 226 async_{name}( future, {args}, submit_flags ); 228 227 229 __attribute__((unused)) bool parked; 230 parked = wait( future ); 231 __STATS__(false, if(!parked) io.submit.nblk += 1; ) 228 wait( future ); 232 229 if( future.result < 0 ) {{ 233 230 errno = -future.result; -
libcfa/src/concurrency/io/setup.cfa
rc4c8571 r2af1943 229 229 #if !defined(CFA_WITH_IO_URING_IDLE) 230 230 // Step 4 : eventfd 231 // io_uring_register is so f*cking slow on some machine that it 232 // will never succeed if preemption isn't hard blocked 231 233 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd); 232 234 … … 238 240 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd); 239 241 #endif 240 241 // #if defined(CFA_HAVE_IORING_REGISTER_IOWQ_MAX_WORKERS)242 // // Step 5 : max worker count243 // __cfadbg_print_safe(io_core, "Kernel I/O : lmiting max workers for ring %d\n", fd);244 245 // unsigned int maxes[2];246 // maxes[0] = 64; // max number of bounded workers (Regular files / block)247 // maxes[1] = 64; // max number of unbounded workers (IOSQE_ASYNC)248 // int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_IOWQ_MAX_WORKERS, maxes, 2);249 // if (ret < 0) {250 // abort("KERNEL ERROR: IO_URING MAX WORKER REGISTER - %s\n", strerror(errno));251 // }252 253 // __cfadbg_print_safe(io_core, "Kernel I/O : lmited max workers for ring %d\n", fd);254 // #endif255 242 256 243 // some paranoid checks -
libcfa/src/concurrency/stats.cfa
rc4c8571 r2af1943 46 46 stats->io.submit.fast = 0; 47 47 stats->io.submit.slow = 0; 48 stats->io.submit.eagr = 0;49 stats->io.submit.nblk = 0;50 48 stats->io.flush.external = 0; 51 49 stats->io.flush.dirty = 0; … … 118 116 tally_one( &cltr->io.submit.fast , &proc->io.submit.fast ); 119 117 tally_one( &cltr->io.submit.slow , &proc->io.submit.slow ); 120 tally_one( &cltr->io.submit.eagr , &proc->io.submit.eagr );121 tally_one( &cltr->io.submit.nblk , &proc->io.submit.nblk );122 118 tally_one( &cltr->io.flush.external , &proc->io.flush.external ); 123 119 tally_one( &cltr->io.flush.dirty , &proc->io.flush.dirty ); … … 201 197 sstr | "fast," | eng3(io.submit.slow) | "slow (" | ws(3, 3, avgfasts) | "%)" | nonl; 202 198 } 203 sstr | " - eager" | eng3(io.submit.eagr) | nonl;204 sstr | " - no-wait" | eng3(io.submit.nblk) | nonl;205 199 sstr | nl; 206 200 -
libcfa/src/concurrency/stats.hfa
rc4c8571 r2af1943 92 92 volatile uint64_t fast; 93 93 volatile uint64_t slow; 94 volatile uint64_t eagr;95 volatile uint64_t nblk;96 94 } submit; 97 95 struct {
Note:
See TracChangeset
for help on using the changeset viewer.