Changes in / [8e48fca4:e58e423]
- Files:
-
- 11 edited
-
benchmark/io/http/filecache.cfa (modified) (1 diff)
-
benchmark/io/http/http_ring.cpp (modified) (1 diff)
-
benchmark/io/http/main.cfa (modified) (1 diff)
-
benchmark/io/http/options.cfa (modified) (5 diffs)
-
benchmark/io/http/options.hfa (modified) (1 diff)
-
benchmark/io/http/protocol.cfa (modified) (9 diffs)
-
benchmark/io/http/protocol.hfa (modified) (1 diff)
-
benchmark/io/http/worker.cfa (modified) (2 diffs)
-
libcfa/src/concurrency/io.cfa (modified) (2 diffs)
-
libcfa/src/concurrency/io/types.hfa (modified) (1 diff)
-
libcfa/src/concurrency/kernel.cfa (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
r8e48fca4 re58e423 185 185 sout | "Filled cache from path \"" | path | "\" with" | fcount | "files"; 186 186 if( conflicts > 0 ) { 187 sout | "Found" | conflicts | "conflicts (s eed: " | options.file_cache.hash_seed | ")";187 sout | "Found" | conflicts | "conflicts (size: " | file_cache.size | ", seed: " | options.file_cache.hash_seed | ")"; 188 188 #if defined(REJECT_CONFLICTS) 189 189 abort("Conflicts found in the cache"); -
benchmark/io/http/http_ring.cpp
r8e48fca4 re58e423 118 118 // Get a fix reply based on the return code 119 119 const char * http_msgs[] = { 120 "HTTP/1.1 200 OK\r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n",121 "HTTP/1.1 400 Bad Request\r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",122 "HTTP/1.1 404 Not Found\r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",123 "HTTP/1.1 405 Method Not \r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",124 "HTTP/1.1 408 Request Timeout\r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",125 "HTTP/1.1 413 Payload Too Large\r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",126 "HTTP/1.1 414 URI Too Long\r\nServer: Htt oForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n",120 "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 15\r\nConnection: keep-alive\r\n\r\nHello, World!\r\n", 121 "HTTP/1.1 400 Bad Request\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 122 "HTTP/1.1 404 Not Found\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 123 "HTTP/1.1 405 Method Not \r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 124 "HTTP/1.1 408 Request Timeout\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 125 "HTTP/1.1 413 Payload Too Large\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 126 "HTTP/1.1 414 URI Too Long\r\nServer: HttpForall\r\nContent-Type: text/plain\r\nContent-Length: 0 \r\n\r\n", 127 127 }; 128 128 static_assert( KNOWN_CODES == (sizeof(http_msgs) / sizeof(http_msgs[0])) ); -
benchmark/io/http/main.cfa
r8e48fca4 re58e423 212 212 } 213 213 sout | nl; 214 if(!options.interactive) park(); 214 215 { 215 216 char buffer[128]; -
benchmark/io/http/options.cfa
r8e48fca4 re58e423 21 21 false, // log 22 22 false, // stats 23 true, // interactive 24 0, // redirect 25 0, // redirect 23 26 24 27 { // file_cache … … 52 55 // bool sqkpoll = false; 53 56 // bool iokpoll = false; 54 unsigned nentries = 16;57 unsigned nentries = 0; 55 58 bool isolate = false; 56 59 … … 62 65 {'\0', "isolate", "Create one cluster per processor", isolate, parse_settrue}, 63 66 {'\0', "log", "Enable logs", options.log, parse_settrue}, 67 {'\0', "sout", "Redirect standard out to file", options.reopen_stdout}, 68 {'\0', "serr", "Redirect standard error to file", options.reopen_stderr}, 64 69 {'\0', "stats", "Enable statistics", options.stats, parse_settrue}, 70 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 65 71 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 66 72 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, … … 79 85 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left ); 80 86 81 if( !is_pow2(nentries) ) {87 if( nentries != 0 && !is_pow2(nentries) ) { 82 88 unsigned v = nentries; 83 89 v--; … … 131 137 132 138 options.file_cache.path = path; 139 140 if( options.reopen_stdout && options.reopen_stderr && 0 == strcmp(options.reopen_stdout, options.reopen_stderr) ) { 141 serr | "Redirect sout and serr to the same file is not supported"; 142 exit(EXIT_FAILURE); 143 } 144 145 if( options.reopen_stdout ) { 146 sout | "redirecting sout to '" | options.reopen_stdout | "'"; 147 FILE * ret = freopen( options.reopen_stdout, "w", stdout); 148 if( ret == 0p ) { 149 serr | "Failed to redirect sout to '" | options.reopen_stdout | "'"; 150 exit(EXIT_FAILURE); 151 } 152 } 153 154 if( options.reopen_stderr ) { 155 sout | "redirecting serr to '" | options.reopen_stderr | "'"; 156 FILE * ret = freopen( options.reopen_stderr, "w", stderr); 157 if( ret == 0p ) { 158 serr | "Failed to redirect serr to '" | options.reopen_stderr | "'"; 159 exit(EXIT_FAILURE); 160 } 161 } 133 162 } -
benchmark/io/http/options.hfa
r8e48fca4 re58e423 10 10 bool log; 11 11 bool stats; 12 bool interactive; 13 const char * reopen_stdout; 14 const char * reopen_stderr; 12 15 13 16 struct { -
benchmark/io/http/protocol.cfa
r8e48fca4 re58e423 11 11 #include <fstream.hfa> 12 12 #include <iofwd.hfa> 13 #include <io/types.hfa> 14 #include <mutex_stmt.hfa> 13 15 14 16 #include <assert.h> … … 26 28 #define PLAINTEXT_MEMCPY 27 29 #define PLAINTEXT_NOCOPY 30 #define LINKED_IO 28 31 29 32 struct https_msg_str { … … 53 56 } 54 57 55 static inline int answer( int fd, const char * it, int len ) {58 static inline int answer( int fd, const char * it, int len ) { 56 59 while(len > 0) { 57 60 // Call write 58 61 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 59 62 if( ret < 0 ) { 60 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET;63 if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } 61 64 if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; 62 65 … … 77 80 } 78 81 79 int answer_header( int fd, size_t size ) { 80 char buffer[512]; 81 char * it = buffer; 82 static int fill_header(char * it, size_t size) { 82 83 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 83 84 it += http_msgs[OK200]->len; 84 85 int len = http_msgs[OK200]->len; 85 86 len += snprintf(it, 512 - len, "%d \n\n", size); 87 return len; 88 } 89 90 static int answer_header( int fd, size_t size ) { 91 char buffer[512]; 92 int len = fill_header(buffer, size); 86 93 return answer( fd, buffer, len ); 87 94 } … … 135 142 } 136 143 137 138 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 139 char * it = buffer; 140 size_t count = len - 1; 141 int rlen = 0; 142 READ: 143 for() { 144 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 145 // int ret = read(fd, (void*)it, count); 146 if(ret == 0 ) return [OK200, true, 0, 0]; 147 if(ret < 0 ) { 148 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 149 if( errno == ECONNRESET ) return [E408, true, 0, 0]; 150 if( errno == EPIPE ) return [E408, true, 0, 0]; 151 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 152 } 153 it[ret + 1] = '\0'; 154 rlen += ret; 155 156 if( strstr( it, "\r\n\r\n" ) ) break; 157 158 it += ret; 159 count -= ret; 160 161 if( count < 1 ) return [E414, false, 0, 0]; 162 } 163 164 if( options.log ) { 165 write(sout, buffer, rlen); 166 sout | nl; 167 } 168 169 it = buffer; 170 int ret = memcmp(it, "GET /", 5); 171 if( ret != 0 ) return [E400, false, 0, 0]; 172 it += 5; 173 174 char * end = strstr( it, " " ); 175 return [OK200, false, it, end - it]; 176 } 177 178 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 144 static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 179 145 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 180 146 off_t offset = 0; … … 207 173 } 208 174 175 static void zero_sqe(struct io_uring_sqe * sqe) { 176 sqe->flags = 0; 177 sqe->ioprio = 0; 178 sqe->fd = 0; 179 sqe->off = 0; 180 sqe->addr = 0; 181 sqe->len = 0; 182 sqe->fsync_flags = 0; 183 sqe->__pad2[0] = 0; 184 sqe->__pad2[1] = 0; 185 sqe->__pad2[2] = 0; 186 sqe->fd = 0; 187 sqe->off = 0; 188 sqe->addr = 0; 189 sqe->len = 0; 190 } 191 192 enum FSM_STATE { 193 Initial, 194 Retry, 195 Error, 196 Done, 197 }; 198 199 struct FSM_Result { 200 FSM_STATE state; 201 int error; 202 }; 203 204 static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } 205 static inline bool is_error(FSM_Result & this) { return Error == this.state; } 206 static inline bool is_done(FSM_Result & this) { return Done == this.state; } 207 208 static inline int error(FSM_Result & this, int error) { 209 this.error = error; 210 this.state = Error; 211 return error; 212 } 213 214 static inline int done(FSM_Result & this) { 215 this.state = Done; 216 return 0; 217 } 218 219 static inline int retry(FSM_Result & this) { 220 this.state = Retry; 221 return 0; 222 } 223 224 static inline int need(FSM_Result & this) { 225 switch(this.state) { 226 case Initial: 227 case Retry: 228 return 1; 229 case Error: 230 if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; 231 case Done: 232 return 0; 233 } 234 } 235 236 // Generator that handles sending the header 237 generator header_g { 238 io_future_t f; 239 const char * next; 240 int fd; size_t len; 241 FSM_Result res; 242 }; 243 244 static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { 245 this.next = it; 246 this.fd = fd; 247 this.len = len; 248 } 249 250 static inline void fill(header_g & this, struct io_uring_sqe * sqe) { 251 zero_sqe(sqe); 252 sqe->opcode = IORING_OP_SEND; 253 sqe->user_data = (uintptr_t)&this.f; 254 sqe->flags = IOSQE_IO_LINK; 255 sqe->fd = this.fd; 256 sqe->addr = (uintptr_t)this.next; 257 sqe->len = this.len; 258 } 259 260 static inline int error(header_g & this, int error) { 261 int ret = close(this.fd); 262 if( ret != 0 ) { 263 mutex(serr) serr | "Failed to close fd" | errno; 264 } 265 return error(this.res, error); 266 } 267 268 static inline int wait_and_process(header_g & this) { 269 wait(this.f); 270 271 // Did something crazy happen? 272 if(this.f.result > this.len) { 273 mutex(serr) serr | "HEADER sent too much!"; 274 return error(this, -ERANGE); 275 } 276 277 // Something failed? 278 if(this.f.result < 0) { 279 int error = -this.f.result; 280 if( error == ECONNRESET ) return error(this, -ECONNRESET); 281 if( error == EPIPE ) return error(this, -EPIPE); 282 if( error == ECANCELED ) { 283 mutex(serr) serr | "HEADER was cancelled, WTF!"; 284 return error(this, -ECONNRESET); 285 } 286 if( error == EAGAIN || error == EWOULDBLOCK) { 287 mutex(serr) serr | "HEADER got eagain, WTF!"; 288 return error(this, -ECONNRESET); 289 } 290 } 291 292 // Done? 293 if(this.f.result == this.len) { 294 return done(this.res); 295 } 296 297 // It must be a Short read 298 this.len -= this.f.result; 299 this.next += this.f.result; 300 reset(this.f); 301 return retry(this.res); 302 } 303 304 // Generator that handles splicing in a file 305 struct splice_in_t { 306 io_future_t f; 307 int fd; int pipe; size_t len; off_t off; 308 FSM_Result res; 309 }; 310 311 static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { 312 this.fd = fd; 313 this.pipe = pipe; 314 this.len = len; 315 this.off = 0; 316 } 317 318 static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { 319 zero_sqe(sqe); 320 sqe->opcode = IORING_OP_SPLICE; 321 sqe->user_data = (uintptr_t)&this.f; 322 sqe->flags = 0; 323 sqe->splice_fd_in = this.fd; 324 sqe->splice_off_in = this.off; 325 sqe->fd = this.pipe; 326 sqe->off = (__u64)-1; 327 sqe->len = this.len; 328 sqe->splice_flags = SPLICE_F_MOVE; 329 } 330 331 static inline int wait_and_process(splice_in_t & this) { 332 wait(this.f); 333 334 // Did something crazy happen? 335 if(this.f.result > this.len) { 336 mutex(serr) serr | "SPLICE IN spliced too much!"; 337 return error(this.res, -ERANGE); 338 } 339 340 // Something failed? 341 if(this.f.result < 0) { 342 int error = -this.f.result; 343 if( error == ECONNRESET ) return error(this.res, -ECONNRESET); 344 if( error == EPIPE ) return error(this.res, -EPIPE); 345 if( error == ECANCELED ) { 346 mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; 347 return error(this.res, -ECONNRESET); 348 } 349 if( error == EAGAIN || error == EWOULDBLOCK) { 350 mutex(serr) serr | "SPLICE IN got eagain, WTF!"; 351 return error(this.res, -ECONNRESET); 352 } 353 } 354 355 // Done? 356 if(this.f.result == this.len) { 357 return done(this.res); 358 } 359 360 // It must be a Short read 361 this.len -= this.f.result; 362 this.off += this.f.result; 363 reset(this.f); 364 return retry(this.res); 365 } 366 367 generator splice_out_g { 368 io_future_t f; 369 int pipe; int fd; size_t len; 370 FSM_Result res; 371 }; 372 373 static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { 374 this.pipe = pipe; 375 this.fd = fd; 376 this.len = len; 377 } 378 379 static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { 380 zero_sqe(sqe); 381 sqe->opcode = IORING_OP_SPLICE; 382 sqe->user_data = (uintptr_t)&this.f; 383 sqe->flags = 0; 384 sqe->splice_fd_in = this.pipe; 385 sqe->splice_off_in = (__u64)-1; 386 sqe->fd = this.fd; 387 sqe->off = (__u64)-1; 388 sqe->len = this.len; 389 sqe->splice_flags = SPLICE_F_MOVE; 390 } 391 392 static inline int error(splice_out_g & this, int error) { 393 int ret = close(this.fd); 394 if( ret != 0 ) { 395 mutex(serr) serr | "Failed to close fd" | errno; 396 } 397 return error(this.res, error); 398 } 399 400 static inline void wait_and_process(splice_out_g & this) { 401 wait(this.f); 402 403 // Did something crazy happen? 404 if(this.f.result > this.len) { 405 mutex(serr) serr | "SPLICE OUT spliced too much!"; 406 return error(this.res, -ERANGE); 407 } 408 409 // Something failed? 410 if(this.f.result < 0) { 411 int error = -this.f.result; 412 if( error == ECONNRESET ) return error(this, -ECONNRESET); 413 if( error == EPIPE ) return error(this, -EPIPE); 414 if( error == ECANCELED ) { 415 this.f.result = 0; 416 goto SHORT_WRITE; 417 } 418 if( error == EAGAIN || error == EWOULDBLOCK) { 419 mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; 420 return error(this, -ECONNRESET); 421 } 422 } 423 424 // Done? 425 if(this.f.result == this.len) { 426 return done(this.res); 427 } 428 429 SHORT_WRITE: 430 // It must be a Short Write 431 this.len -= this.f.result; 432 reset(this.f); 433 return retry(this.res); 434 } 435 436 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { 437 #if defined(LINKED_IO) 438 char buffer[512]; 439 int len = fill_header(buffer, fsize); 440 header_g header = { fd, buffer, len }; 441 splice_in_t splice_in = { ans_fd, pipe[1], fsize }; 442 splice_out_g splice_out = { pipe[0], fd, fsize }; 443 444 RETRY_LOOP: for() { 445 int have = need(header.res) + need(splice_in.res) + 1; 446 int idx = 0; 447 struct io_uring_sqe * sqes[3]; 448 __u32 idxs[3]; 449 struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have); 450 451 if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } 452 if(need( header.res)) { fill(header , sqes[idx++]); } 453 fill(splice_out, sqes[idx]); 454 455 // Submit everything 456 asm volatile("": : :"memory"); 457 cfa_io_submit( ctx, idxs, have, false ); 458 459 // wait for the results 460 // Always wait for splice-in to complete as 461 // we may need to kill the connection if it fails 462 // If it already completed, this is a no-op 463 wait_and_process(splice_in); 464 465 if(is_error(splice_in.res)) { 466 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 467 close(fd); 468 } 469 470 // Process the other 2 471 wait_and_process(header); 472 wait_and_process(splice_out); 473 474 if(is_done(splice_out.res)) { 475 break RETRY_LOOP; 476 } 477 478 // We need to wait for the completion if 479 // - both completed 480 // - the header failed 481 // - 482 483 if( is_error(header.res) 484 || is_error(splice_in.res) 485 || is_error(splice_out.res)) { 486 return -ECONNRESET; 487 } 488 } 489 490 return len + fsize; 491 #else 492 int ret = answer_header(fd, fsize); 493 if( ret < 0 ) { close(fd); return ret; } 494 return sendfile(pipe, fd, ans_fd, fsize); 495 #endif 496 } 497 498 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 499 char * it = buffer; 500 size_t count = len - 1; 501 int rlen = 0; 502 READ: 503 for() { 504 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 505 // int ret = read(fd, (void*)it, count); 506 if(ret == 0 ) return [OK200, true, 0, 0]; 507 if(ret < 0 ) { 508 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 509 if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } 510 if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } 511 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 512 } 513 it[ret + 1] = '\0'; 514 rlen += ret; 515 516 if( strstr( it, "\r\n\r\n" ) ) break; 517 518 it += ret; 519 count -= ret; 520 521 if( count < 1 ) return [E414, false, 0, 0]; 522 } 523 524 if( options.log ) { 525 write(sout, buffer, rlen); 526 sout | nl; 527 } 528 529 it = buffer; 530 int ret = memcmp(it, "GET /", 5); 531 if( ret != 0 ) return [E400, false, 0, 0]; 532 it += 5; 533 534 char * end = strstr( it, " " ); 535 return [OK200, false, it, end - it]; 536 } 537 209 538 //============================================================================================= 210 539 … … 214 543 215 544 const char * original_http_msgs[] = { 216 "HTTP/1.1 200 OK\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",217 "HTTP/1.1 200 OK\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n\n",218 "HTTP/1.1 400 Bad Request\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",219 "HTTP/1.1 404 Not Found\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",220 "HTTP/1.1 405 Method Not Allowed\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",221 "HTTP/1.1 408 Request Timeout\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",222 "HTTP/1.1 413 Payload Too Large\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",223 "HTTP/1.1 414 URI Too Long\nServer: Htt oForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",545 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ", 546 "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n\n", 547 "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 548 "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 549 "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 550 "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 551 "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 552 "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 224 553 }; 225 554 … … 251 580 Time now = timeHiRes(); 252 581 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 253 sout | "Updated date to '" | buff | "'";582 // if( options.log ) sout | "Updated date to '" | buff | "'"; 254 583 255 584 for(i; KNOWN_CODES) { … … 264 593 this.idx = (this.idx + 1) % 2; 265 594 266 sout | "Date thread sleeping";595 // if( options.log ) sout | "Date thread sleeping"; 267 596 268 597 sleep(1`s); -
benchmark/io/http/protocol.hfa
r8e48fca4 re58e423 16 16 17 17 int answer_error( int fd, HttpCode code ); 18 int answer_header( int fd, size_t size );19 18 int answer_plaintext( int fd ); 20 19 int answer_empty( int fd ); 20 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count ); 21 21 22 22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); 23 24 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r8e48fca4 re58e423 122 122 } 123 123 124 // Send the header125 int ret = answer_header(fd, count);126 if( ret == -ECONNRESET ) break REQUEST;127 128 124 // Send the desired file 129 ret =sendfile( this.pipe, fd, ans_fd, count);125 int ret = answer_sendfile( this.pipe, fd, ans_fd, count); 130 126 if( ret == -ECONNRESET ) break REQUEST; 131 127 … … 134 130 135 131 if( options.log ) sout | "=== Connection closed ==="; 136 close(fd);137 132 continue CONNECTION; 138 133 } -
libcfa/src/concurrency/io.cfa
r8e48fca4 re58e423 183 183 ctx.proc->io.pending = false; 184 184 185 ready_schedule_lock();186 185 __cfa_io_drain( proc ); 187 ready_schedule_unlock();188 186 // for(i; 2) { 189 187 // unsigned idx = proc->rdq.id + i; … … 311 309 // Make the sqes visible to the submitter 312 310 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 313 sq.to_submit ++;311 sq.to_submit += have; 314 312 315 313 ctx->proc->io.pending = true; 316 314 ctx->proc->io.dirty = true; 317 315 if(sq.to_submit > 30 || !lazy) { 316 ready_schedule_lock(); 318 317 __cfa_io_flush( ctx->proc ); 318 ready_schedule_unlock(); 319 319 } 320 320 } -
libcfa/src/concurrency/io/types.hfa
r8e48fca4 re58e423 188 188 return wait(this.self); 189 189 } 190 191 void reset( io_future_t & this ) { 192 return reset(this.self); 193 } 190 194 } -
libcfa/src/concurrency/kernel.cfa
r8e48fca4 re58e423 196 196 197 197 if( !readyThread ) { 198 ready_schedule_lock(); 198 199 __cfa_io_flush( this ); 200 ready_schedule_unlock(); 201 199 202 readyThread = __next_thread_slow( this->cltr ); 200 203 } … … 277 280 278 281 if(this->io.pending && !this->io.dirty) { 282 ready_schedule_lock(); 279 283 __cfa_io_flush( this ); 284 ready_schedule_unlock(); 280 285 } 281 286 … … 317 322 318 323 // Don't block if we are done 319 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP; 324 if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) { 325 ready_schedule_unlock(); 326 break MAIN_LOOP; 327 } 320 328 321 329 __STATS( __tls_stats()->ready.sleep.halts++; ) … … 939 947 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count); 940 948 /* paranoid */ verify( it->local_data->this_stats ); 949 // __print_stats( it->local_data->this_stats, cltr->print_stats, "Processor", it->name, (void*)it ); 941 950 __tally_stats( cltr->stats, it->local_data->this_stats ); 942 951 it = &(*it)`next; … … 948 957 // this doesn't solve all problems but does solve many 949 958 // so it's probably good enough 959 disable_interrupts(); 950 960 uint_fast32_t last_size = ready_mutate_lock(); 951 961 … … 955 965 // Unlock the RWlock 956 966 ready_mutate_unlock( last_size ); 967 enable_interrupts(); 957 968 } 958 969
Note:
See TracChangeset
for help on using the changeset viewer.