Changes in / [266ecf1:182256b]
- Files:
-
- 4 deleted
- 25 edited
-
Jenkins/FullBuild (modified) (2 diffs)
-
benchmark/io/http/http_ring.cpp (modified) (15 diffs)
-
benchmark/io/http/main.cfa (modified) (8 diffs)
-
benchmark/io/http/options.cfa (modified) (7 diffs)
-
benchmark/io/http/options.hfa (modified) (3 diffs)
-
benchmark/io/http/protocol.cfa (modified) (10 diffs)
-
benchmark/io/http/protocol.hfa (modified) (2 diffs)
-
benchmark/io/http/worker.cfa (modified) (5 diffs)
-
benchmark/io/http/worker.hfa (modified) (1 diff)
-
example/io/tty-echo.cfa (deleted)
-
libcfa/configure.ac (modified) (2 diffs)
-
libcfa/prelude/defines.hfa.in (modified) (1 diff)
-
libcfa/src/bits/defs.hfa (modified) (1 diff)
-
libcfa/src/concurrency/io.cfa (modified) (5 diffs)
-
libcfa/src/concurrency/io/call.cfa.in (modified) (10 diffs)
-
libcfa/src/concurrency/io/setup.cfa (modified) (12 diffs)
-
libcfa/src/concurrency/io/types.hfa (modified) (6 diffs)
-
libcfa/src/concurrency/iofwd.hfa (modified) (4 diffs)
-
libcfa/src/concurrency/kernel.cfa (modified) (15 diffs)
-
libcfa/src/concurrency/kernel.hfa (modified) (5 diffs)
-
libcfa/src/concurrency/kernel/startup.cfa (modified) (14 diffs)
-
libcfa/src/concurrency/kernel_private.hfa (modified) (1 diff)
-
libcfa/src/concurrency/preemption.cfa (modified) (1 diff)
-
libcfa/src/concurrency/stats.cfa (modified) (5 diffs)
-
libcfa/src/concurrency/stats.hfa (modified) (1 diff)
-
tests/Makefile.am (modified) (3 diffs)
-
tests/io/.expect/many_read.txt (deleted)
-
tests/io/.in/io.data (deleted)
-
tests/io/many_read.cfa (deleted)
Legend:
- Unmodified
- Added
- Removed
-
Jenkins/FullBuild
r266ecf1 r182256b 21 21 gcc_7_x86_old: { trigger_build( 'gcc-7', 'x86', false ) }, 22 22 gcc_6_x86_old: { trigger_build( 'gcc-6', 'x86', false ) }, 23 gcc_9_x64_ new: { trigger_build( 'gcc-9', 'x64', true ) },24 gcc_8_x64_ new: { trigger_build( 'gcc-8', 'x64', true ) },25 gcc_7_x64_ new: { trigger_build( 'gcc-7', 'x64', true ) },26 gcc_6_x64_ new: { trigger_build( 'gcc-6', 'x64', true ) },27 gcc_5_x64_ new: { trigger_build( 'gcc-5', 'x64', true ) },28 clang_x64_ new: { trigger_build( 'clang', 'x64', true ) },29 clang_x64_ old: { trigger_build( 'clang', 'x64', false ) },23 gcc_9_x64_old: { trigger_build( 'gcc-9', 'x64', true ) }, 24 gcc_8_x64_old: { trigger_build( 'gcc-8', 'x64', true ) }, 25 gcc_7_x64_old: { trigger_build( 'gcc-7', 'x64', true ) }, 26 gcc_6_x64_old: { trigger_build( 'gcc-6', 'x64', true ) }, 27 gcc_5_x64_old: { trigger_build( 'gcc-5', 'x64', true ) }, 28 clang_x64_old: { trigger_build( 'clang', 'x64', true ) }, 29 clang_x64_new: { trigger_build( 'clang', 'x64', false ) }, 30 30 ) 31 31 } … … 66 66 67 67 def trigger_build(String cc, String arch, boolean new_ast) { 68 // Randomly delay the builds by a random amount to avoid hitting the SC server to hard69 sleep(time: 5 * Math.random(), unit:"MINUTES")70 71 // Run the build72 // Don't propagate, it doesn't play nice with our email setup73 68 def result = build job: 'Cforall/master', \ 74 69 parameters: [ \ -
benchmark/io/http/http_ring.cpp
r266ecf1 r182256b 20 20 socklen_t *addrlen; 21 21 int flags; 22 unsigned cnt;23 22 } acpt; 24 23 … … 68 67 thread_local stats_block_t stats; 69 68 stats_block_t global_stats; 70 71 thread_local struct __attribute__((aligned(128))) {72 size_t to_submit = 0;73 } local;74 69 75 70 // Get an array of current connections … … 197 192 static void submit(struct io_uring * ring, struct io_uring_sqe * sqe, connection * conn) { 198 193 (void)ring; 199 local.to_submit++;200 194 #ifdef USE_ASYNC 201 195 io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); … … 412 406 switch(state) { 413 407 case ACCEPTING: 414 //connection::accept(ring, opt);408 connection::accept(ring, opt); 415 409 newconn(ring, res); 416 410 break; … … 426 420 427 421 //========================================================= 428 extern "C" {429 #include <sys/eventfd.h> // use for termination430 }431 432 422 // Main loop of the WebServer 433 423 // Effectively uses one thread_local copy of everything per kernel thread … … 437 427 struct io_uring * ring = opt.ring; 438 428 439 int blockfd = eventfd(0, 0);440 if (blockfd < 0) {441 fprintf( stderr, "eventfd create error: (%d) %s\n", (int)errno, strerror(errno) );442 exit(EXIT_FAILURE);443 }444 445 int ret = io_uring_register_eventfd(ring, blockfd);446 if (ret < 0) {447 fprintf( stderr, "io_uring S&W error: (%d) %s\n", (int)-ret, strerror(-ret) );448 exit(EXIT_FAILURE);449 }450 451 429 // Track the shutdown using a event_fd 452 430 char endfd_buf[8]; … … 455 433 // Accept our first connection 456 434 // May not take effect until io_uring_submit_and_wait 457 for(unsigned i = 0; i < opt.acpt.cnt; i++) { 458 connection::accept(ring, opt); 459 } 435 connection::accept(ring, opt); 460 436 461 437 int reset = 1; // Counter to print stats once in a while … … 465 441 while(!done) { 466 442 // Submit all the answers we have and wait for responses 467 int ret = io_uring_submit(ring); 468 local.to_submit = 0; 443 int ret = io_uring_submit_and_wait(ring, 1); 469 444 470 445 // check errors … … 477 452 sqes += ret; 478 453 call++; 479 480 481 eventfd_t val;482 ret = eventfd_read(blockfd, &val);483 484 // check errors485 if (ret < 0) {486 fprintf( stderr, "eventfd read error: (%d) %s\n", (int)errno, strerror(errno) );487 exit(EXIT_FAILURE);488 }489 454 490 455 struct io_uring_cqe *cqe; … … 498 463 break; 499 464 } 500 501 if(local.to_submit > 30) break;502 465 503 466 auto req = (class connection *)cqe->user_data; … … 546 509 #include <pthread.h> // for pthreads 547 510 #include <signal.h> // for signal(SIGPIPE, SIG_IGN); 511 #include <sys/eventfd.h> // use for termination 548 512 #include <sys/socket.h> // for sockets in general 549 513 #include <netinet/in.h> // for sockaddr_in, AF_INET … … 564 528 unsigned entries = 256; // number of entries per ring/kernel thread 565 529 unsigned backlog = 262144; // backlog argument to listen 566 unsigned preaccept = 1; // start by accepting X per threads567 530 bool attach = false; // Whether or not to attach all the rings 568 531 bool sqpoll = false; // Whether or not to use SQ Polling … … 571 534 // Arguments Parsing 572 535 int c; 573 while ((c = getopt (argc, argv, "t:p:e:b: c:aS")) != -1) {536 while ((c = getopt (argc, argv, "t:p:e:b:aS")) != -1) { 574 537 switch (c) 575 538 { … … 585 548 case 'b': 586 549 backlog = atoi(optarg); 587 break;588 case 'c':589 preaccept = atoi(optarg);590 550 break; 591 551 case 'a': … … 721 681 thrd_opts[i].acpt.addrlen = (socklen_t*)&addrlen; 722 682 thrd_opts[i].acpt.flags = 0; 723 thrd_opts[i].acpt.cnt = preaccept;724 683 thrd_opts[i].endfd = efd; 725 684 thrd_opts[i].ring = &thrd_rings[i].storage; -
benchmark/io/http/main.cfa
r266ecf1 r182256b 29 29 30 30 //============================================================================================= 31 // Globals 32 //============================================================================================= 33 struct ServerProc { 34 processor self; 35 }; 36 37 void ?{}( ServerProc & this ) { 38 /* paranoid */ assert( options.clopts.instance != 0p ); 39 (this.self){ "Benchmark Processor", *options.clopts.instance }; 40 41 #if !defined(__CFA_NO_STATISTICS__) 42 if( options.clopts.procstats ) { 43 print_stats_at_exit( this.self, options.clopts.instance->print_stats ); 44 } 45 if( options.clopts.viewhalts ) { 46 print_halts( this.self ); 47 } 48 #endif 49 } 50 51 extern void init_protocol(void); 52 extern void deinit_protocol(void); 53 54 //============================================================================================= 31 55 // Stats Printer 32 56 //=============================================================================================' … … 34 58 thread StatsPrinter {}; 35 59 36 void ?{}( StatsPrinter & this, cluster & cl ) { 37 ((thread&)this){ "Stats Printer Thread", cl }; 38 } 39 40 void ^?{}( StatsPrinter & mutex this ) {} 60 void ?{}( StatsPrinter & this ) { 61 ((thread&)this){ "Stats Printer Thread" }; 62 } 41 63 42 64 void main(StatsPrinter & this) { … … 49 71 sleep(10`s); 50 72 51 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 52 } 53 } 54 55 //============================================================================================= 56 // Globals 57 //============================================================================================= 58 struct ServerCluster { 59 cluster self; 60 processor * procs; 61 // io_context * ctxs; 62 StatsPrinter * prnt; 63 64 }; 65 66 void ?{}( ServerCluster & this ) { 67 (this.self){ "Server Cluster", options.clopts.params }; 68 69 this.procs = alloc(options.clopts.nprocs); 70 for(i; options.clopts.nprocs) { 71 (this.procs[i]){ "Benchmark Processor", this.self }; 72 73 #if !defined(__CFA_NO_STATISTICS__) 74 if( options.clopts.procstats ) { 75 print_stats_at_exit( *this.procs, this.self.print_stats ); 76 } 77 if( options.clopts.viewhalts ) { 78 print_halts( *this.procs ); 79 } 80 #endif 81 } 82 83 if(options.stats) { 84 this.prnt = alloc(); 85 (*this.prnt){ this.self }; 86 } else { 87 this.prnt = 0p; 88 } 89 90 #if !defined(__CFA_NO_STATISTICS__) 91 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); 92 #endif 93 94 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 95 options.clopts.cltr_cnt++; 96 } 97 98 void ^?{}( ServerCluster & this ) { 99 delete(this.prnt); 100 101 for(i; options.clopts.nprocs) { 102 ^(this.procs[i]){}; 103 } 104 free(this.procs); 105 106 ^(this.self){}; 107 } 108 109 extern void init_protocol(void); 110 extern void deinit_protocol(void); 73 print_stats_now( *options.clopts.instance, CFA_STATS_READY_Q | CFA_STATS_IO ); 74 } 75 } 111 76 112 77 //============================================================================================= … … 172 137 // Run Server Cluster 173 138 { 139 cluster cl = { "Server Cluster", options.clopts.params }; 140 #if !defined(__CFA_NO_STATISTICS__) 141 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO ); 142 #endif 143 options.clopts.instance = &cl; 144 145 174 146 int pipe_cnt = options.clopts.nworkers * 2; 175 147 int pipe_off; … … 181 153 } 182 154 183 //if(options.file_cache.path && options.file_cache.fixed_fds) {184 //register_fixed_files(cl, fds, pipe_off);185 //}155 if(options.file_cache.path && options.file_cache.fixed_fds) { 156 register_fixed_files(cl, fds, pipe_off); 157 } 186 158 187 159 { 188 ServerCluster cl[options.clopts.nclusters]; 160 ServerProc procs[options.clopts.nprocs]; 161 StatsPrinter printer; 189 162 190 163 init_protocol(); … … 207 180 unpark( workers[i] ); 208 181 } 209 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 210 for(i; options.clopts.nclusters) { 211 sout | options.clopts.thrd_cnt[i] | nonl; 212 } 213 sout | nl; 182 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors"; 214 183 { 215 184 char buffer[128]; 216 for() { 217 int ret = cfa_read(0, buffer, 128, 0); 218 if(ret == 0) break; 185 while(int ret = cfa_read(0, buffer, 128, 0, -1`s, 0p, 0p); ret != 0) { 219 186 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 220 sout | "User wrote '" | "" | nonl;221 write(sout, buffer, ret - 1);222 sout | "'";223 187 } 224 188 … … 229 193 for(i; options.clopts.nworkers) { 230 194 workers[i].done = true; 195 cancel(workers[i].cancel); 231 196 } 232 197 sout | "done"; … … 256 221 sout | "done"; 257 222 258 sout | "Stopping processors /clusters..." | nonl; flush( sout );223 sout | "Stopping processors..." | nonl; flush( sout ); 259 224 } 260 225 sout | "done"; -
benchmark/io/http/options.cfa
r266ecf1 r182256b 13 13 #include <kernel.hfa> 14 14 #include <parseargs.hfa> 15 #include <stdlib.hfa>16 15 17 16 #include <stdlib.h> … … 20 19 Options options @= { 21 20 false, // log 22 false, // stats23 21 24 22 { // file_cache … … 38 36 39 37 { // cluster 40 1, // nclusters;41 38 1, // nprocs; 42 39 1, // nworkers; … … 49 46 50 47 void parse_options( int argc, char * argv[] ) { 51 // bool fixedfd = false; 52 // bool sqkpoll = false; 53 // bool iokpoll = false; 48 bool subthrd = false; 49 bool eagrsub = false; 50 bool fixedfd = false; 51 bool sqkpoll = false; 52 bool iokpoll = false; 53 unsigned sublen = 16; 54 54 unsigned nentries = 16; 55 bool isolate = false;56 55 57 56 … … 60 59 { 'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 61 60 { 't', "threads", "Number of worker threads to use", options.clopts.nworkers}, 62 {'\0', "isolate", "Create one cluster per processor", isolate, parse_settrue},63 61 {'\0', "log", "Enable logs", options.log, parse_settrue}, 64 {'\0', "stats", "Enable statistics", options.stats, parse_settrue},65 62 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 66 63 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, … … 68 65 {'\0', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size }, 69 66 {'\0', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue }, 70 // { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue}, 71 // { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue }, 72 // { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue }, 73 {'e', "numentries", "Number of I/O entries", nentries }, 67 { 's', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue }, 68 { 'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue}, 69 { 'f', "fixed-fds", "If set, files are open eagerly and pre-registered with the cluster", fixedfd, parse_settrue}, 70 { 'k', "kpollsubmit", "If set, cluster uses IORING_SETUP_SQPOLL, implies -f", sqkpoll, parse_settrue }, 71 { 'i', "kpollcomplete", "If set, cluster uses IORING_SETUP_IOPOLL", iokpoll, parse_settrue }, 72 {'\0', "submitlength", "Max number of submitions that can be submitted together", sublen }, 73 {'\0', "numentries", "Number of I/O entries", nentries }, 74 74 75 75 }; … … 91 91 nentries = v; 92 92 } 93 if(isolate) {94 options.clopts.nclusters = options.clopts.nprocs;95 options.clopts.nprocs = 1;96 }97 93 options.clopts.params.num_entries = nentries; 98 options.clopts.instance = alloc(options.clopts.nclusters); 99 options.clopts.thrd_cnt = alloc(options.clopts.nclusters); 100 options.clopts.cltr_cnt = 0; 101 for(i; options.clopts.nclusters) { 102 options.clopts.thrd_cnt[i] = 0; 94 95 options.clopts.params.poller_submits = subthrd; 96 options.clopts.params.eager_submits = eagrsub; 97 98 if( fixedfd ) { 99 options.file_cache.fixed_fds = true; 103 100 } 104 101 102 if( sqkpoll ) { 103 options.clopts.params.poll_submit = true; 104 options.file_cache.fixed_fds = true; 105 } 105 106 106 // if( fixedfd ) { 107 // options.file_cache.fixed_fds = true; 108 // } 107 if( iokpoll ) { 108 options.clopts.params.poll_complete = true; 109 options.file_cache.open_flags |= O_DIRECT; 110 } 109 111 110 // if( sqkpoll ) { 111 // options.file_cache.fixed_fds = true; 112 // } 113 114 // if( iokpoll ) { 115 // options.file_cache.open_flags |= O_DIRECT; 116 // } 112 options.clopts.params.num_ready = sublen; 117 113 118 114 if( left[0] == 0p ) { return; } -
benchmark/io/http/options.hfa
r266ecf1 r182256b 9 9 struct Options { 10 10 bool log; 11 bool stats;12 11 13 12 struct { … … 27 26 28 27 struct { 29 int nclusters;30 28 int nprocs; 31 29 int nworkers; … … 33 31 bool procstats; 34 32 bool viewhalts; 35 cluster ** instance; 36 size_t * thrd_cnt; 37 size_t cltr_cnt; 33 cluster * instance; 38 34 } clopts; 39 35 }; -
benchmark/io/http/protocol.cfa
r266ecf1 r182256b 20 20 #include "options.hfa" 21 21 22 #define PLAINTEXT_1WRITE 23 #define PLAINTEXT_NOCOPY 24 25 struct https_msg_str { 26 char msg[512]; 27 size_t len; 28 }; 29 30 const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 }; 22 const char * volatile date = 0p; 23 24 const char * http_msgs[] = { 25 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n", 26 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 27 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 28 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 29 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 30 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 31 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 32 }; 31 33 32 34 _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); 33 35 34 const int http_codes[KNOWN_CODES] = { 35 200, 36 const int http_codes[] = { 36 37 200, 37 38 400, … … 52 53 while(len > 0) { 53 54 // Call write 54 int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); 55 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p); 56 // int ret = write(fd, it, len); 55 57 if( ret < 0 ) { 56 58 if( errno == ECONNRESET || errno == EPIPE ) return -ECONNRESET; … … 70 72 /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); 71 73 int idx = (int)code; 72 return answer( fd, http_msgs[idx] ->msg, http_msgs[idx]->len);74 return answer( fd, http_msgs[idx], strlen( http_msgs[idx] ) ); 73 75 } 74 76 75 77 int answer_header( int fd, size_t size ) { 76 char buffer[512]; 77 char * it = buffer; 78 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 79 it += http_msgs[OK200]->len; 80 int len = http_msgs[OK200]->len; 81 len += snprintf(it, 512 - len, "%d \n\n", size); 78 const char * fmt = http_msgs[OK200]; 79 int len = 200; 80 char buffer[len]; 81 len = snprintf(buffer, len, fmt, date, size); 82 82 return answer( fd, buffer, len ); 83 83 } 84 84 85 #if defined(PLAINTEXT_NOCOPY) 86 int answer_plaintext( int fd ) { 87 return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len + 1); // +1 cause snprintf doesn't count nullterminator 88 } 89 #elif defined(PLAINTEXT_1WRITE) 90 int answer_plaintext( int fd ) { 91 char text[] = "Hello, World!\n"; 92 char buffer[512 + sizeof(text)]; 93 char * it = buffer; 94 memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); 95 it += http_msgs[OK200]->len; 96 int len = http_msgs[OK200]->len; 97 int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); 98 it += r; 99 len += r; 100 memcpy(it, text, sizeof(text)); 101 return answer(fd, buffer, len + sizeof(text)); 102 } 103 #else 104 int answer_plaintext( int fd ) { 105 char text[] = "Hello, World!\n"; 106 int ret = answer_header(fd, sizeof(text)); 85 int answer_plain( int fd, char buffer[], size_t size ) { 86 int ret = answer_header(fd, size); 107 87 if( ret < 0 ) return ret; 108 return answer(fd, text, sizeof(text)); 109 } 110 #endif 88 return answer(fd, buffer, size); 89 } 111 90 112 91 int answer_empty( int fd ) { … … 115 94 116 95 117 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len ) {96 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) { 118 97 char * it = buffer; 119 98 size_t count = len - 1; … … 121 100 READ: 122 101 for() { 123 int ret = cfa_re cv(fd, (void*)it, count, 0, CFA_IO_LAZY);102 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p); 124 103 // int ret = read(fd, (void*)it, count); 125 104 if(ret == 0 ) return [OK200, true, 0, 0]; … … 160 139 ssize_t ret; 161 140 SPLICE1: while(count > 0) { 162 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); 141 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p); 142 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 163 143 if( ret < 0 ) { 164 144 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; … … 172 152 size_t in_pipe = ret; 173 153 SPLICE2: while(in_pipe > 0) { 174 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); 154 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p); 155 // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 175 156 if( ret < 0 ) { 176 157 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; … … 192 173 #include <thread.hfa> 193 174 194 const char * original_http_msgs[] = {195 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ",196 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n",197 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",198 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",199 "HTTP/1.1 405 Method Not Allowed\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",200 "HTTP/1.1 408 Request Timeout\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",201 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",202 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",203 };204 205 175 struct date_buffer { 206 https_msg_str strs[KNOWN_CODES];176 char buff[100]; 207 177 }; 208 178 … … 213 183 214 184 void ?{}( DateFormater & this ) { 215 ((thread&)this){ "Server Date Thread", *options.clopts.instance [0]};185 ((thread&)this){ "Server Date Thread", *options.clopts.instance }; 216 186 this.idx = 0; 217 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );218 memset( &this.buffers[1], 0, sizeof(this.buffers[1]) );187 memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) ); 188 memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) ); 219 189 } 220 190 … … 226 196 or else {} 227 197 228 229 char buff[100];230 198 Time now = getTimeNsec(); 231 strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 232 sout | "Updated date to '" | buff | "'"; 233 234 for(i; KNOWN_CODES) { 235 size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff ); 236 this.buffers[this.idx].strs[i].len = len; 237 } 238 239 for(i; KNOWN_CODES) { 240 https_msg_str * next = &this.buffers[this.idx].strs[i]; 241 __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST); 242 } 199 200 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 201 202 char * next = this.buffers[this.idx].buff; 203 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST); 243 204 this.idx = (this.idx + 1) % 2; 244 245 sout | "Date thread sleeping";246 205 247 206 sleep(1`s); -
benchmark/io/http/protocol.hfa
r266ecf1 r182256b 1 1 #pragma once 2 3 struct io_cancellation; 2 4 3 5 enum HttpCode { 4 6 OK200 = 0, 5 OK200_PlainText,6 7 E400, 7 8 E404, … … 17 18 int answer_error( int fd, HttpCode code ); 18 19 int answer_header( int fd, size_t size ); 19 int answer_plain text( int fd);20 int answer_plain( int fd, char buffer [], size_t size ); 20 21 int answer_empty( int fd ); 21 22 22 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len );23 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *); 23 24 24 25 int sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r266ecf1 r182256b 17 17 //============================================================================================= 18 18 void ?{}( Worker & this ) { 19 size_t cli = rand() % options.clopts.cltr_cnt; 20 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli] }; 21 options.clopts.thrd_cnt[cli]++; 19 ((thread&)this){ "Server Worker Thread", *options.clopts.instance }; 22 20 this.pipe[0] = -1; 23 21 this.pipe[1] = -1; … … 37 35 for() { 38 36 if( options.log ) sout | "=== Accepting connection ==="; 39 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY ); 37 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 38 // int fd = accept4( this.[sockfd, addr, addrlen, flags] ); 40 39 if(fd < 0) { 41 40 if( errno == ECONNABORTED ) break; … … 43 42 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 44 43 } 45 if(this.done) break;46 44 47 45 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; … … 57 55 char buffer[len]; 58 56 if( options.log ) sout | "=== Reading request ==="; 59 [code, closed, file, name_size] = http_read(fd, buffer, len );57 [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel); 60 58 61 59 // if we are done, break out of the loop … … 72 70 if( options.log ) sout | "=== Request for /plaintext ==="; 73 71 74 int ret = answer_plaintext(fd); 72 char text[] = "Hello, World!\n"; 73 74 // Send the header 75 int ret = answer_plain(fd, text, sizeof(text)); 75 76 if( ret == -ECONNRESET ) break REQUEST; 76 77 -
benchmark/io/http/worker.hfa
r266ecf1 r182256b 17 17 socklen_t * addrlen; 18 18 int flags; 19 io_cancellation cancel; 19 20 volatile bool done; 20 21 }; -
libcfa/configure.ac
r266ecf1 r182256b 169 169 AH_TEMPLATE([CFA_HAVE_IOSQE_FIXED_FILE],[Defined if io_uring support is present when compiling libcfathread and supports the flag FIXED_FILE.]) 170 170 AH_TEMPLATE([CFA_HAVE_IOSQE_IO_DRAIN],[Defined if io_uring support is present when compiling libcfathread and supports the flag IO_DRAIN.]) 171 AH_TEMPLATE([CFA_HAVE_IOSQE_ASYNC],[Defined if io_uring support is present when compiling libcfathread and supports the flag ASYNC.]) 171 172 AH_TEMPLATE([CFA_HAVE_IOSQE_IO_LINK],[Defined if io_uring support is present when compiling libcfathread and supports the flag IO_LINK.]) 172 173 AH_TEMPLATE([CFA_HAVE_IOSQE_IO_HARDLINK],[Defined if io_uring support is present when compiling libcfathread and supports the flag IO_HARDLINK.]) 173 AH_TEMPLATE([CFA_HAVE_IOSQE_ASYNC],[Defined if io_uring support is present when compiling libcfathread and supports the flag ASYNC.])174 AH_TEMPLATE([CFA_HAVE_IOSQE_BUFFER_SELECT],[Defined if io_uring support is present when compiling libcfathread and supports the flag BUFFER_SELEC.])175 174 AH_TEMPLATE([CFA_HAVE_SPLICE_F_FD_IN_FIXED],[Defined if io_uring support is present when compiling libcfathread and supports the flag SPLICE_F_FD_IN_FIXED.]) 176 175 AH_TEMPLATE([CFA_HAVE_IORING_SETUP_ATTACH_WQ],[Defined if io_uring support is present when compiling libcfathread and supports the flag IORING_SETUP_ATTACH_WQ.]) … … 183 182 184 183 define(ioring_ops, [IORING_OP_NOP,IORING_OP_READV,IORING_OP_WRITEV,IORING_OP_FSYNC,IORING_OP_READ_FIXED,IORING_OP_WRITE_FIXED,IORING_OP_POLL_ADD,IORING_OP_POLL_REMOVE,IORING_OP_SYNC_FILE_RANGE,IORING_OP_SENDMSG,IORING_OP_RECVMSG,IORING_OP_TIMEOUT,IORING_OP_TIMEOUT_REMOVE,IORING_OP_ACCEPT,IORING_OP_ASYNC_CANCEL,IORING_OP_LINK_TIMEOUT,IORING_OP_CONNECT,IORING_OP_FALLOCATE,IORING_OP_OPENAT,IORING_OP_CLOSE,IORING_OP_FILES_UPDATE,IORING_OP_STATX,IORING_OP_READ,IORING_OP_WRITE,IORING_OP_FADVISE,IORING_OP_MADVISE,IORING_OP_SEND,IORING_OP_RECV,IORING_OP_OPENAT2,IORING_OP_EPOLL_CTL,IORING_OP_SPLICE,IORING_OP_PROVIDE_BUFFERS,IORING_OP_REMOVE_BUFFER,IORING_OP_TEE]) 185 define(ioring_flags, [IOSQE_FIXED_FILE,IOSQE_IO_DRAIN,IOSQE_ IO_LINK,IOSQE_IO_HARDLINK,IOSQE_ASYNC,IOSQE_BUFFER_SELECT,SPLICE_F_FD_IN_FIXED,IORING_SETUP_ATTACH_WQ])184 define(ioring_flags, [IOSQE_FIXED_FILE,IOSQE_IO_DRAIN,IOSQE_ASYNC,IOSQE_IO_LINK,IOSQE_IO_HARDLINK,SPLICE_F_FD_IN_FIXED,IORING_SETUP_ATTACH_WQ]) 186 185 187 186 define(ioring_from_decls, [ -
libcfa/prelude/defines.hfa.in
r266ecf1 r182256b 149 149 150 150 /* Defined if io_uring support is present when compiling libcfathread and 151 supports the flag BUFFER_SELEC. */152 #undef CFA_HAVE_IOSQE_BUFFER_SELECT153 154 /* Defined if io_uring support is present when compiling libcfathread and155 151 supports the flag FIXED_FILE. */ 156 152 #undef CFA_HAVE_IOSQE_FIXED_FILE -
libcfa/src/bits/defs.hfa
r266ecf1 r182256b 74 74 #error unsupported architecture 75 75 #endif 76 77 #define CFA_IO_LAZY (1_l64u << 32_l64u) -
libcfa/src/concurrency/io.cfa
r266ecf1 r182256b 32 32 extern "C" { 33 33 #include <sys/syscall.h> 34 #include <sys/eventfd.h>35 34 36 35 #include <linux/io_uring.h> … … 80 79 }; 81 80 82 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want ); 83 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy ); 84 static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * ); 85 static inline void __ioarbiter_notify( $io_context & ctx ); 81 // returns true of acquired as leader or second leader 82 static inline bool try_lock( __leaderlock_t & this ) { 83 const uintptr_t thrd = 1z | (uintptr_t)active_thread(); 84 bool block; 85 disable_interrupts(); 86 for() { 87 struct $thread * expected = this.value; 88 if( 1p != expected && 0p != expected ) { 89 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader 90 enable_interrupts( __cfaabi_dbg_ctx ); 91 return false; 92 } 93 struct $thread * desired; 94 if( 0p == expected ) { 95 // If the lock isn't locked acquire it, no need to block 96 desired = 1p; 97 block = false; 98 } 99 else { 100 // If the lock is already locked try becomming the next leader 101 desired = (struct $thread *)thrd; 102 block = true; 103 } 104 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 105 } 106 if( block ) { 107 enable_interrupts( __cfaabi_dbg_ctx ); 108 park(); 109 disable_interrupts(); 110 } 111 return true; 112 } 113 114 static inline bool next( __leaderlock_t & this ) { 115 /* paranoid */ verify( ! __preemption_enabled() ); 116 struct $thread * nextt; 117 for() { 118 struct $thread * expected = this.value; 119 /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked 120 121 struct $thread * desired; 122 if( 1p == expected ) { 123 // No next leader, just unlock 124 desired = 0p; 125 nextt = 0p; 126 } 127 else { 128 // There is a next leader, remove but keep locked 129 desired = 1p; 130 nextt = (struct $thread *)(~1z & (uintptr_t)expected); 131 } 132 if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break; 133 } 134 135 if(nextt) { 136 unpark( nextt ); 137 enable_interrupts( __cfaabi_dbg_ctx ); 138 return true; 139 } 140 enable_interrupts( __cfaabi_dbg_ctx ); 141 return false; 142 } 143 144 //============================================================================================= 145 // I/O Syscall 146 //============================================================================================= 147 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) { 148 bool need_sys_to_submit = false; 149 bool need_sys_to_complete = false; 150 unsigned flags = 0; 151 152 TO_SUBMIT: 153 if( to_submit > 0 ) { 154 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 155 need_sys_to_submit = true; 156 break TO_SUBMIT; 157 } 158 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) { 159 need_sys_to_submit = true; 160 flags |= IORING_ENTER_SQ_WAKEUP; 161 } 162 } 163 164 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) { 165 flags |= IORING_ENTER_GETEVENTS; 166 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) { 167 need_sys_to_complete = true; 168 } 169 } 170 171 int ret = 0; 172 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret); 176 177 if( ret < 0 ) { 178 switch((int)errno) { 179 case EAGAIN: 180 case EINTR: 181 case EBUSY: 182 ret = -1; 183 break; 184 default: 185 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 186 } 187 } 188 } 189 190 // Memory barrier 191 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 192 return ret; 193 } 194 86 195 //============================================================================================= 87 196 // I/O Polling 88 197 //============================================================================================= 89 static inline unsigned __flush( struct $io_context & ); 90 static inline __u32 __release_sqes( struct $io_context & ); 91 92 void __cfa_io_drain( processor * proc ) { 198 static unsigned __collect_submitions( struct __io_data & ring ); 199 static __u32 __release_consumed_submission( struct __io_data & ring ); 200 static inline void __clean( volatile struct io_uring_sqe * sqe ); 201 202 // Process a single completion message from the io_uring 203 // This is NOT thread-safe 204 static inline void process( volatile struct io_uring_cqe & cqe ) { 205 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 206 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 207 208 fulfil( *future, cqe.res ); 209 } 210 211 static [int, bool] __drain_io( & struct __io_data ring ) { 93 212 /* paranoid */ verify( ! __preemption_enabled() ); 94 /* paranoid */ verify( proc ); 95 /* paranoid */ verify( proc->io.ctx ); 213 214 unsigned to_submit = 0; 215 if( ring.poller_submits ) { 216 // If the poller thread also submits, then we need to aggregate the submissions which are ready 217 to_submit = __collect_submitions( ring ); 218 } 219 220 int ret = __io_uring_enter(ring, to_submit, true); 221 if( ret < 0 ) { 222 return [0, true]; 223 } 224 225 // update statistics 226 if (to_submit > 0) { 227 __STATS__( true, 228 if( to_submit > 0 ) { 229 io.submit_q.submit_avg.rdy += to_submit; 230 io.submit_q.submit_avg.csm += ret; 231 io.submit_q.submit_avg.cnt += 1; 232 } 233 ) 234 } 235 236 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 237 238 // Release the consumed SQEs 239 __release_consumed_submission( ring ); 96 240 97 241 // Drain the queue 98 $io_context * ctx = proc->io.ctx; 99 unsigned head = *ctx->cq.head; 100 unsigned tail = *ctx->cq.tail; 101 const __u32 mask = *ctx->cq.mask; 242 unsigned head = *ring.completion_q.head; 243 unsigned tail = *ring.completion_q.tail; 244 const __u32 mask = *ring.completion_q.mask; 245 246 // Nothing was new return 0 247 if (head == tail) { 248 return [0, to_submit > 0]; 249 } 102 250 103 251 __u32 count = tail - head; 104 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 105 252 /* paranoid */ verify( count != 0 ); 106 253 for(i; count) { 107 254 unsigned idx = (head + i) & mask; 108 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];255 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 109 256 110 257 /* paranoid */ verify(&cqe); 111 258 112 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 113 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 114 115 fulfil( *future, cqe.res ); 116 } 117 118 __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count); 259 process( cqe ); 260 } 119 261 120 262 // Mark to the kernel that the cqe has been seen 121 263 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 122 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 123 124 /* paranoid */ verify( ! __preemption_enabled() ); 125 126 return; 127 } 128 129 void __cfa_io_flush( processor * proc ) { 130 /* paranoid */ verify( ! __preemption_enabled() ); 131 /* paranoid */ verify( proc ); 132 /* paranoid */ verify( proc->io.ctx ); 133 134 $io_context & ctx = *proc->io.ctx; 135 136 if(!ctx.ext_sq.empty) { 137 __ioarbiter_flush( *ctx.arbiter, &ctx ); 138 } 139 140 __STATS__( true, io.calls.flush++; ) 141 int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8); 142 if( ret < 0 ) { 143 switch((int)errno) { 144 case EAGAIN: 145 case EINTR: 146 case EBUSY: 264 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 265 266 return [count, count > 0 || to_submit > 0]; 267 } 268 269 void main( $io_ctx_thread & this ) { 270 __ioctx_register( this ); 271 272 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 273 274 const int reset_cnt = 5; 275 int reset = reset_cnt; 276 // Then loop until we need to start 277 LOOP: 278 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 279 // Drain the io 280 int count; 281 bool again; 282 disable_interrupts(); 283 [count, again] = __drain_io( *this.ring ); 284 285 if(!again) reset--; 286 147 287 // Update statistics 148 __STATS__( false, io.calls.errors.busy ++; ) 149 return; 150 default: 151 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) ); 152 } 153 } 154 155 __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd); 156 __STATS__( true, io.calls.submitted += ret; ) 157 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 158 /* paranoid */ verify( ctx.sq.to_submit >= ret ); 159 160 ctx.sq.to_submit -= ret; 161 162 /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num ); 163 164 // Release the consumed SQEs 165 __release_sqes( ctx ); 166 167 /* paranoid */ verify( ! __preemption_enabled() ); 168 169 ctx.proc->io.pending = false; 288 __STATS__( true, 289 io.complete_q.completed_avg.val += count; 290 io.complete_q.completed_avg.cnt += 1; 291 ) 292 enable_interrupts( __cfaabi_dbg_ctx ); 293 294 // If we got something, just yield and check again 295 if(reset > 1) { 296 yield(); 297 continue LOOP; 298 } 299 300 // We alread failed to find completed entries a few time. 301 if(reset == 1) { 302 // Rearm the context so it can block 303 // but don't block right away 304 // we need to retry one last time in case 305 // something completed *just now* 306 __ioctx_prepare_block( this ); 307 continue LOOP; 308 } 309 310 __STATS__( false, 311 io.complete_q.blocks += 1; 312 ) 313 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 314 315 // block this thread 316 wait( this.sem ); 317 318 // restore counter 319 reset = reset_cnt; 320 } 321 322 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 323 324 __ioctx_unregister( this ); 170 325 } 171 326 … … 189 344 // head and tail must be fully filled and shouldn't ever be touched again. 190 345 // 191 //=============================================================================================192 // Allocation193 // for user's convenience fill the sqes from the indexes194 static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx) {195 struct io_uring_sqe * sqes = ctx->sq.sqes;196 for(i; want) {197 __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");198 out_sqes[i] = &sqes[idxs[i]];199 }200 }201 202 // Try to directly allocate from the a given context203 // Not thread-safe204 static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {205 __sub_ring_t & sq = ctx->sq;206 const __u32 mask = *sq.mask;207 __u32 fhead = sq.free_ring.head; // get the current head of the queue208 __u32 ftail = sq.free_ring.tail; // get the current tail of the queue209 210 // If we don't have enough sqes, fail211 if((ftail - fhead) < want) { return false; }212 213 // copy all the indexes we want from the available list214 for(i; want) {215 __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");216 idxs[i] = sq.free_ring.array[(fhead + i) & mask];217 }218 219 // Advance the head to mark the indexes as consumed220 __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);221 222 // return success223 return true;224 }225 346 226 347 // Allocate an submit queue entry. … … 229 350 // for convenience, return both the index and the pointer to the sqe 230 351 // sqe == &sqes[idx] 231 struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) { 232 __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want); 233 234 disable_interrupts(); 235 processor * proc = __cfaabi_tls.this_processor; 236 $io_context * ctx = proc->io.ctx; 237 /* paranoid */ verify( __cfaabi_tls.this_processor ); 238 /* paranoid */ verify( ctx ); 239 240 __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n"); 241 242 // We can proceed to the fast path 243 if( __alloc(ctx, idxs, want) ) { 244 // Allocation was successful 245 __STATS__( true, io.alloc.fast += 1; ) 246 enable_interrupts( __cfaabi_dbg_ctx ); 247 248 __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd); 249 250 __fill( sqes, want, idxs, ctx ); 251 return ctx; 252 } 253 // The fast path failed, fallback 254 __STATS__( true, io.alloc.fail += 1; ) 255 256 // Fast path failed, fallback on arbitration 257 __STATS__( true, io.alloc.slow += 1; ) 258 enable_interrupts( __cfaabi_dbg_ctx ); 259 260 $io_arbiter * ioarb = proc->cltr->io.arbiter; 261 /* paranoid */ verify( ioarb ); 262 263 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n"); 264 265 struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want); 266 267 __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd); 268 269 __fill( sqes, want, idxs,ret ); 270 return ret; 271 } 272 273 274 //============================================================================================= 275 // submission 276 static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) { 277 // We can proceed to the fast path 278 // Get the right objects 279 __sub_ring_t & sq = ctx->sq; 280 const __u32 mask = *sq.mask; 281 __u32 tail = *sq.kring.tail; 282 283 // Add the sqes to the array 284 for( i; have ) { 285 __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n"); 286 sq.kring.array[ (tail + i) & mask ] = idxs[i]; 287 } 288 289 // Make the sqes visible to the submitter 290 __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE); 291 sq.to_submit++; 292 293 ctx->proc->io.pending = true; 294 ctx->proc->io.dirty = true; 295 if(sq.to_submit > 30 || !lazy) { 296 __cfa_io_flush( ctx->proc ); 297 } 298 } 299 300 void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) { 301 __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager"); 302 303 disable_interrupts(); 304 processor * proc = __cfaabi_tls.this_processor; 305 $io_context * ctx = proc->io.ctx; 306 /* paranoid */ verify( __cfaabi_tls.this_processor ); 307 /* paranoid */ verify( ctx ); 308 309 // Can we proceed to the fast path 310 if( ctx == inctx ) // We have the right instance? 352 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 353 /* paranoid */ verify( data != 0 ); 354 355 // Prepare the data we need 356 __attribute((unused)) int len = 0; 357 __attribute((unused)) int block = 0; 358 __u32 cnt = *ring.submit_q.num; 359 __u32 mask = *ring.submit_q.mask; 360 361 __u32 off = thread_rand(); 362 363 // Loop around looking for an available spot 364 for() { 365 // Look through the list starting at some offset 366 for(i; cnt) { 367 __u64 expected = 3; 368 __u32 idx = (i + off) & mask; // Get an index from a random 369 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 370 volatile __u64 * udata = &sqe->user_data; 371 372 // Allocate the entry by CASing the user_data field from 0 to the future address 373 if( *udata == expected && 374 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) 375 { 376 // update statistics 377 __STATS__( false, 378 io.submit_q.alloc_avg.val += len; 379 io.submit_q.alloc_avg.block += block; 380 io.submit_q.alloc_avg.cnt += 1; 381 ) 382 383 // debug log 384 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 385 386 // Success return the data 387 return [sqe, idx]; 388 } 389 verify(expected != data); 390 391 // This one was used 392 len ++; 393 } 394 395 block++; 396 397 yield(); 398 } 399 } 400 401 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 402 /* paranoid */ verify( idx <= mask ); 403 /* paranoid */ verify( idx != -1ul32 ); 404 405 // We need to find a spot in the ready array 406 __attribute((unused)) int len = 0; 407 __attribute((unused)) int block = 0; 408 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 409 410 __u32 off = thread_rand(); 411 412 __u32 picked; 413 LOOKING: for() { 414 for(i; ring.submit_q.ready_cnt) { 415 picked = (i + off) & ready_mask; 416 __u32 expected = -1ul32; 417 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 418 break LOOKING; 419 } 420 verify(expected != idx); 421 422 len ++; 423 } 424 425 block++; 426 427 __u32 released = __release_consumed_submission( ring ); 428 if( released == 0 ) { 429 yield(); 430 } 431 } 432 433 // update statistics 434 __STATS__( false, 435 io.submit_q.look_avg.val += len; 436 io.submit_q.look_avg.block += block; 437 io.submit_q.look_avg.cnt += 1; 438 ) 439 440 return picked; 441 } 442 443 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 444 __io_data & ring = *ctx->thrd.ring; 445 311 446 { 312 __submit(ctx, idxs, have, lazy); 313 314 // Mark the instance as no longer in-use, re-enable interrupts and return 315 __STATS__( true, io.submit.fast += 1; ) 316 enable_interrupts( __cfaabi_dbg_ctx ); 317 318 __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n"); 319 return; 320 } 321 322 // Fast path failed, fallback on arbitration 323 __STATS__( true, io.submit.slow += 1; ) 324 enable_interrupts( __cfaabi_dbg_ctx ); 325 326 __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n"); 327 328 __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy); 329 } 330 331 //============================================================================================= 332 // Flushing 447 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 448 __cfadbg_print_safe( io, 449 "Kernel I/O : submitting %u (%p) for %p\n" 450 " data: %p\n" 451 " opcode: %s\n" 452 " fd: %d\n" 453 " flags: %d\n" 454 " prio: %d\n" 455 " off: %p\n" 456 " addr: %p\n" 457 " len: %d\n" 458 " other flags: %d\n" 459 " splice fd: %d\n" 460 " pad[0]: %llu\n" 461 " pad[1]: %llu\n" 462 " pad[2]: %llu\n", 463 idx, sqe, 464 active_thread(), 465 (void*)sqe->user_data, 466 opcodes[sqe->opcode], 467 sqe->fd, 468 sqe->flags, 469 sqe->ioprio, 470 (void*)sqe->off, 471 (void*)sqe->addr, 472 sqe->len, 473 sqe->accept_flags, 474 sqe->splice_fd_in, 475 sqe->__pad2[0], 476 sqe->__pad2[1], 477 sqe->__pad2[2] 478 ); 479 } 480 481 482 // Get now the data we definetely need 483 volatile __u32 * const tail = ring.submit_q.tail; 484 const __u32 mask = *ring.submit_q.mask; 485 486 // There are 2 submission schemes, check which one we are using 487 if( ring.poller_submits ) { 488 // If the poller thread submits, then we just need to add this to the ready array 489 __submit_to_ready_array( ring, idx, mask ); 490 491 post( ctx->thrd.sem ); 492 493 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 494 } 495 else if( ring.eager_submits ) { 496 __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask ); 497 498 #if defined(LEADER_LOCK) 499 if( !try_lock(ring.submit_q.submit_lock) ) { 500 __STATS__( false, 501 io.submit_q.helped += 1; 502 ) 503 return; 504 } 505 /* paranoid */ verify( ! __preemption_enabled() ); 506 __STATS__( true, 507 io.submit_q.leader += 1; 508 ) 509 #else 510 for() { 511 yield(); 512 513 if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) { 514 __STATS__( false, 515 io.submit_q.leader += 1; 516 ) 517 break; 518 } 519 520 // If some one else collected our index, we are done 521 #warning ABA problem 522 if( ring.submit_q.ready[picked] != idx ) { 523 __STATS__( false, 524 io.submit_q.helped += 1; 525 ) 526 return; 527 } 528 529 __STATS__( false, 530 io.submit_q.busy += 1; 531 ) 532 } 533 #endif 534 535 // We got the lock 536 // Collect the submissions 537 unsigned to_submit = __collect_submitions( ring ); 538 539 // Actually submit 540 int ret = __io_uring_enter( ring, to_submit, false ); 541 542 #if defined(LEADER_LOCK) 543 /* paranoid */ verify( ! __preemption_enabled() ); 544 next(ring.submit_q.submit_lock); 545 #else 546 unlock(ring.submit_q.submit_lock); 547 #endif 548 if( ret < 0 ) { 549 return; 550 } 551 552 // Release the consumed SQEs 553 __release_consumed_submission( ring ); 554 555 // update statistics 556 __STATS__( false, 557 io.submit_q.submit_avg.rdy += to_submit; 558 io.submit_q.submit_avg.csm += ret; 559 io.submit_q.submit_avg.cnt += 1; 560 ) 561 562 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 563 } 564 else 565 { 566 // get mutual exclusion 567 #if defined(LEADER_LOCK) 568 while(!try_lock(ring.submit_q.submit_lock)); 569 #else 570 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2); 571 #endif 572 573 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 574 /* paranoid */ "index %u already reclaimed\n" 575 /* paranoid */ "head %u, prev %u, tail %u\n" 576 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n", 577 /* paranoid */ idx, 578 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail 579 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ] 580 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ] 581 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ] 582 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ] 583 /* paranoid */ ); 584 585 // Append to the list of ready entries 586 587 /* paranoid */ verify( idx <= mask ); 588 ring.submit_q.array[ (*tail) & mask ] = idx; 589 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 590 591 // Submit however, many entries need to be submitted 592 int ret = __io_uring_enter( ring, 1, false ); 593 if( ret < 0 ) { 594 switch((int)errno) { 595 default: 596 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 597 } 598 } 599 600 /* paranoid */ verify(ret == 1); 601 602 // update statistics 603 __STATS__( false, 604 io.submit_q.submit_avg.csm += 1; 605 io.submit_q.submit_avg.cnt += 1; 606 ) 607 608 { 609 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 610 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 611 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 612 613 __cfadbg_print_safe( io, 614 "Kernel I/O : last submitted is %u (%p)\n" 615 " data: %p\n" 616 " opcode: %s\n" 617 " fd: %d\n" 618 " flags: %d\n" 619 " prio: %d\n" 620 " off: %p\n" 621 " addr: %p\n" 622 " len: %d\n" 623 " other flags: %d\n" 624 " splice fd: %d\n" 625 " pad[0]: %llu\n" 626 " pad[1]: %llu\n" 627 " pad[2]: %llu\n", 628 last_idx, sqe, 629 (void*)sqe->user_data, 630 opcodes[sqe->opcode], 631 sqe->fd, 632 sqe->flags, 633 sqe->ioprio, 634 (void*)sqe->off, 635 (void*)sqe->addr, 636 sqe->len, 637 sqe->accept_flags, 638 sqe->splice_fd_in, 639 sqe->__pad2[0], 640 sqe->__pad2[1], 641 sqe->__pad2[2] 642 ); 643 } 644 645 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 646 // Release the consumed SQEs 647 648 __release_consumed_submission( ring ); 649 // ring.submit_q.sqes[idx].user_data = 3ul64; 650 651 #if defined(LEADER_LOCK) 652 next(ring.submit_q.submit_lock); 653 #else 654 unlock(ring.submit_q.submit_lock); 655 #endif 656 657 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 658 } 659 } 660 661 // #define PARTIAL_SUBMIT 32 662 663 // go through the list of submissions in the ready array and moved them into 664 // the ring's submit queue 665 static unsigned __collect_submitions( struct __io_data & ring ) { 666 /* paranoid */ verify( ring.submit_q.ready != 0p ); 667 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 ); 668 669 unsigned to_submit = 0; 670 __u32 tail = *ring.submit_q.tail; 671 const __u32 mask = *ring.submit_q.mask; 672 #if defined(PARTIAL_SUBMIT) 673 #if defined(LEADER_LOCK) 674 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist 675 #endif 676 const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt; 677 const __u32 offset = ring.submit_q.prev_ready; 678 ring.submit_q.prev_ready += cnt; 679 #else 680 const __u32 cnt = ring.submit_q.ready_cnt; 681 const __u32 offset = 0; 682 #endif 683 684 // Go through the list of ready submissions 685 for( c; cnt ) { 686 __u32 i = (offset + c) % ring.submit_q.ready_cnt; 687 688 // replace any submission with the sentinel, to consume it. 689 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 690 691 // If it was already the sentinel, then we are done 692 if( idx == -1ul32 ) continue; 693 694 // If we got a real submission, append it to the list 695 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask; 696 to_submit++; 697 } 698 699 // Increment the tail based on how many we are ready to submit 700 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST); 701 702 return to_submit; 703 } 704 333 705 // Go through the ring's submit queue and release everything that has already been consumed 334 706 // by io_uring 335 // This cannot be done by multiple threads 336 static __u32 __release_sqes( struct $io_context & ctx ) { 337 const __u32 mask = *ctx.sq.mask; 338 707 static __u32 __release_consumed_submission( struct __io_data & ring ) { 708 const __u32 smask = *ring.submit_q.mask; 709 710 // We need to get the lock to copy the old head and new head 711 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 339 712 __attribute__((unused)) 340 __u32 ctail = * ctx.sq.kring.tail;// get the current tail of the queue341 __u32 chead = * ctx.sq.kring.head;// get the current head of the queue342 __u32 phead = ctx.sq.kring.released;// get the head the last time we were here343 344 __u32 ftail = ctx.sq.free_ring.tail; // get the current tail of the queue713 __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue 714 __u32 chead = *ring.submit_q.head; // get the current head of the queue 715 __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here 716 ring.submit_q.prev_head = chead; // note up to were we processed 717 unlock(ring.submit_q.release_lock); 345 718 346 719 // the 3 fields are organized like this diagram … … 361 734 __u32 count = chead - phead; 362 735 363 if(count == 0) {364 return 0;365 }366 367 736 // We acquired an previous-head/current-head range 368 737 // go through the range and release the sqes 369 738 for( i; count ) { 370 __cfadbg_print_safe(io, "Kernel I/O : release loop\n"); 371 __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ]; 372 ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx; 373 } 374 375 ctx.sq.kring.released = chead; // note up to were we processed 376 __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST); 377 378 __ioarbiter_notify(ctx); 379 739 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 740 741 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 742 __clean( &ring.submit_q.sqes[ idx ] ); 743 } 380 744 return count; 381 745 } 382 746 383 //============================================================================================= 384 // I/O Arbiter 385 //============================================================================================= 386 static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) { 387 __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n"); 388 389 __STATS__( false, io.alloc.block += 1; ) 390 391 // No one has any resources left, wait for something to finish 392 // Mark as pending 393 __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST ); 394 395 // Wait for our turn to submit 396 wait( this.pending.blocked, want ); 397 398 __attribute((unused)) bool ret = 399 __alloc( this.pending.ctx, idxs, want); 400 /* paranoid */ verify( ret ); 401 402 return this.pending.ctx; 403 404 } 405 406 static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) { 407 /* paranoid */ verify( !is_empty(this.pending.blocked) ); 408 this.pending.ctx = ctx; 409 410 while( !is_empty(this.pending.blocked) ) { 411 __cfadbg_print_safe(io, "Kernel I/O : notifying\n"); 412 __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head; 413 __u32 want = front( this.pending.blocked ); 414 415 if( have > want ) return; 416 417 signal_block( this.pending.blocked ); 418 } 419 420 this.pending.flag = false; 421 } 422 423 static void __ioarbiter_notify( $io_context & ctx ) { 424 if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) { 425 __ioarbiter_notify( *ctx.arbiter, &ctx ); 426 } 427 } 428 429 // Simply append to the pending 430 static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) { 431 __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd); 432 433 /* paranoid */ verify( &this == ctx->arbiter ); 434 435 // Mark as pending 436 __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST ); 437 438 __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have); 439 440 // Wait for our turn to submit 441 wait( ctx->ext_sq.blocked ); 442 443 // Submit our indexes 444 __submit(ctx, idxs, have, lazy); 445 446 __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have); 447 } 448 449 static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) { 450 /* paranoid */ verify( &this == ctx->arbiter ); 451 452 __STATS__( false, io.flush.external += 1; ) 453 454 __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n"); 455 456 condition & blcked = ctx->ext_sq.blocked; 457 /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) ); 458 while(!is_empty( blcked )) { 459 signal_block( blcked ); 460 } 461 462 ctx->ext_sq.empty = true; 747 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 748 __clean( sqe ); 749 } 750 751 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 752 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 753 __cfaabi_dbg_debug_do( 754 memset(sqe, 0xde, sizeof(*sqe)); 755 sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1; 756 ); 757 758 // Mark the entry as unused 759 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 463 760 } 464 761 #endif -
libcfa/src/concurrency/io/call.cfa.in
r266ecf1 r182256b 54 54 | IOSQE_IO_DRAIN 55 55 #endif 56 #if defined(CFA_HAVE_IOSQE_ASYNC) 57 | IOSQE_ASYNC 58 #endif 59 ; 60 61 static const __u32 LINK_FLAGS = 0 56 62 #if defined(CFA_HAVE_IOSQE_IO_LINK) 57 63 | IOSQE_IO_LINK … … 60 66 | IOSQE_IO_HARDLINK 61 67 #endif 62 #if defined(CFA_HAVE_IOSQE_ASYNC)63 | IOSQE_ASYNC64 #endif65 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED)66 | IOSQE_BUFFER_SELECTED67 #endif68 68 ; 69 69 … … 74 74 ; 75 75 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 79 static inline io_context * __get_io_context( void ) { 80 cluster * cltr = active_cluster(); 81 82 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n"); 83 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr ); 84 85 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr); 86 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ]; 87 } 78 88 #endif 79 89 … … 88 98 89 99 extern "C" { 90 #include < asm/types.h>100 #include <sys/types.h> 91 101 #include <sys/socket.h> 92 102 #include <sys/syscall.h> … … 185 195 return ', '.join(args_a) 186 196 187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{ 188 198 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 189 199 ssize_t res = {name}({args}); … … 195 205 }} 196 206 #else 207 // we don't support LINK yet 208 if( 0 != (submit_flags & LINK_FLAGS) ) {{ 209 errno = ENOTSUP; return -1; 210 }} 211 212 if( !context ) {{ 213 context = __get_io_context(); 214 }} 215 if(cancellation) {{ 216 cancellation->target = (__u64)(uintptr_t)&future; 217 }} 218 197 219 __u8 sflags = REGULAR_FLAGS & submit_flags; 220 struct __io_data & ring = *context->thrd.ring; 221 198 222 __u32 idx; 199 223 struct io_uring_sqe * sqe; 200 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1);224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 201 225 202 226 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = (__u64)(uintptr_t)&future;204 227 sqe->flags = sflags; 205 228 sqe->ioprio = 0; … … 216 239 217 240 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY));241 __submit( context, idx ); 219 242 #endif 220 243 }}""" 221 244 222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{ 245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{ 246 if( timeout >= 0 ) {{ 247 errno = ENOTSUP; 248 return -1; 249 }} 223 250 io_future_t future; 224 251 225 async_{name}( future, {args}, submit_flags );252 async_{name}( future, {args}, submit_flags, cancellation, context ); 226 253 227 254 wait( future ); … … 388 415 if c.define: 389 416 print("""#if defined({define}) 390 {ret} cfa_{name}({params}, __u64 submit_flags);417 {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 391 418 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 392 419 else: 393 print("{ret} cfa_{name}({params}, __u64 submit_flags);"420 print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);" 394 421 .format(ret=c.ret, name=c.name, params=c.params)) 395 422 … … 399 426 if c.define: 400 427 print("""#if defined({define}) 401 void async_{name}(io_future_t & future, {params}, __u64 submit_flags);428 void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context); 402 429 #endif""".format(define=c.define,name=c.name, params=c.params)) 403 430 else: 404 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"431 print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);" 405 432 .format(name=c.name, params=c.params)) 406 433 print("\n") … … 447 474 448 475 print(""" 476 //----------------------------------------------------------------------------- 477 bool cancel(io_cancellation & this) { 478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 479 return false; 480 #else 481 io_future_t future; 482 483 io_context * context = __get_io_context(); 484 485 __u8 sflags = 0; 486 struct __io_data & ring = *context->thrd.ring; 487 488 __u32 idx; 489 volatile struct io_uring_sqe * sqe; 490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 493 sqe->opcode = IORING_OP_ASYNC_CANCEL; 494 sqe->flags = sflags; 495 sqe->addr = this.target; 496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 498 __submit( context, idx ); 499 500 wait(future); 501 502 if( future.result == 0 ) return true; // Entry found 503 if( future.result == -EALREADY) return true; // Entry found but in progress 504 if( future.result == -ENOENT ) return false; // Entry not found 505 return false; 506 #endif 507 } 508 449 509 //----------------------------------------------------------------------------- 450 510 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
r266ecf1 r182256b 26 26 27 27 #if !defined(CFA_HAVE_LINUX_IO_URING_H) 28 void __kernel_io_startup() { 29 // Nothing to do without io_uring 30 } 31 32 void __kernel_io_shutdown() { 33 // Nothing to do without io_uring 34 } 35 28 36 void ?{}(io_context_params & this) {} 29 37 30 void ?{}($io_context & this, struct cluster & cl) {} 31 void ^?{}($io_context & this) {} 32 33 void __cfa_io_start( processor * proc ) {} 34 void __cfa_io_flush( processor * proc ) {} 35 void __cfa_io_stop ( processor * proc ) {} 36 37 $io_arbiter * create(void) { return 0p; } 38 void destroy($io_arbiter *) {} 38 void ?{}(io_context & this, struct cluster & cl) {} 39 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {} 40 41 void ^?{}(io_context & this) {} 42 void ^?{}(io_context & this, bool cluster_context) {} 43 44 void register_fixed_files( io_context &, int *, unsigned ) {} 45 void register_fixed_files( cluster &, int *, unsigned ) {} 39 46 40 47 #else … … 61 68 void ?{}(io_context_params & this) { 62 69 this.num_entries = 256; 70 this.num_ready = 256; 71 this.submit_aff = -1; 72 this.eager_submits = false; 73 this.poller_submits = false; 74 this.poll_submit = false; 75 this.poll_complete = false; 63 76 } 64 77 … … 93 106 94 107 //============================================================================================= 108 // I/O Startup / Shutdown logic + Master Poller 109 //============================================================================================= 110 111 // IO Master poller loop forward 112 static void * iopoll_loop( __attribute__((unused)) void * args ); 113 114 static struct { 115 pthread_t thrd; // pthread handle to io poller thread 116 void * stack; // pthread stack for io poller thread 117 int epollfd; // file descriptor to the epoll instance 118 volatile bool run; // Whether or not to continue 119 volatile bool stopped; // Whether the poller has finished running 120 volatile uint64_t epoch; // Epoch used for memory reclamation 121 } iopoll; 122 123 void __kernel_io_startup(void) { 124 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" ); 125 126 iopoll.epollfd = epoll_create1(0); 127 if (iopoll.epollfd == -1) { 128 abort( "internal error, epoll_create1\n"); 129 } 130 131 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" ); 132 133 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p ); 134 iopoll.run = true; 135 iopoll.stopped = false; 136 iopoll.epoch = 0; 137 } 138 139 void __kernel_io_shutdown(void) { 140 // Notify the io poller thread of the shutdown 141 iopoll.run = false; 142 sigval val = { 1 }; 143 pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 144 145 // Wait for the io poller thread to finish 146 147 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p ); 148 149 int ret = close(iopoll.epollfd); 150 if (ret == -1) { 151 abort( "internal error, close epoll\n"); 152 } 153 154 // Io polling is now fully stopped 155 156 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" ); 157 } 158 159 static void * iopoll_loop( __attribute__((unused)) void * args ) { 160 __processor_id_t id; 161 id.full_proc = false; 162 id.id = doregister(&id); 163 __cfaabi_tls.this_proc_id = &id; 164 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" ); 165 166 // Block signals to control when they arrive 167 sigset_t mask; 168 sigfillset(&mask); 169 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 170 abort( "internal error, pthread_sigmask" ); 171 } 172 173 sigdelset( &mask, SIGUSR1 ); 174 175 // Create sufficient events 176 struct epoll_event events[10]; 177 // Main loop 178 while( iopoll.run ) { 179 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 180 181 // increment the epoch to notify any deleters we are starting a new cycle 182 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST); 183 184 // Wait for events 185 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 186 187 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds); 188 189 // Check if an error occured 190 if (nfds == -1) { 191 if( errno == EINTR ) continue; 192 abort( "internal error, pthread_sigmask" ); 193 } 194 195 for(i; nfds) { 196 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 197 /* paranoid */ verify( io_ctx ); 198 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx); 199 #if !defined( __CFA_NO_STATISTICS__ ) 200 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; 201 #endif 202 203 eventfd_t v; 204 eventfd_read(io_ctx->ring->efd, &v); 205 206 post( io_ctx->sem ); 207 } 208 } 209 210 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST); 211 212 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" ); 213 unregister(&id); 214 return 0p; 215 } 216 217 //============================================================================================= 95 218 // I/O Context Constrution/Destruction 96 219 //============================================================================================= 97 220 98 99 100 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd ); 101 static void __io_uring_teardown( $io_context & this ); 102 static void __epoll_register($io_context & ctx); 103 static void __epoll_unregister($io_context & ctx); 104 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx ); 105 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx ); 106 107 void ?{}($io_context & this, processor * proc, struct cluster & cl) { 108 /* paranoid */ verify( cl.io.arbiter ); 109 this.proc = proc; 110 this.arbiter = cl.io.arbiter; 111 this.ext_sq.empty = true; 112 (this.ext_sq.blocked){}; 113 __io_uring_setup( this, cl.io.params, proc->idle ); 114 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this); 115 } 116 117 void ^?{}($io_context & this) { 118 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd); 119 120 __io_uring_teardown( this ); 121 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd); 221 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; } 222 void main( $io_ctx_thread & this ); 223 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; } 224 void ^?{}( $io_ctx_thread & mutex this ) {} 225 226 static void __io_create ( __io_data & this, const io_context_params & params_in ); 227 static void __io_destroy( __io_data & this ); 228 229 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) { 230 (this.thrd){ cl }; 231 this.thrd.ring = malloc(); 232 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this); 233 __io_create( *this.thrd.ring, params ); 234 235 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this); 236 this.thrd.done = false; 237 __thrd_start( this.thrd, main ); 238 239 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this); 240 } 241 242 void ?{}(io_context & this, struct cluster & cl) { 243 io_context_params params; 244 (this){ cl, params }; 245 } 246 247 void ^?{}(io_context & this, bool cluster_context) { 248 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this); 249 250 // Notify the thread of the shutdown 251 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST); 252 253 // If this is an io_context within a cluster, things get trickier 254 $thread & thrd = this.thrd.self; 255 if( cluster_context ) { 256 // We are about to do weird things with the threads 257 // we don't need interrupts to complicate everything 258 disable_interrupts(); 259 260 // Get cluster info 261 cluster & cltr = *thrd.curr_cluster; 262 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); 263 /* paranoid */ verify( !ready_mutate_islocked() ); 264 265 // We need to adjust the clean-up based on where the thread is 266 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 267 // This is the tricky case 268 // The thread was preempted or ready to run and now it is on the ready queue 269 // but the cluster is shutting down, so there aren't any processors to run the ready queue 270 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 271 272 ready_schedule_lock(); 273 // The thread should on the list 274 /* paranoid */ verify( thrd.link.next != 0p ); 275 276 // Remove the thread from the ready queue of this cluster 277 // The thread should be the last on the list 278 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 279 /* paranoid */ verify( removed ); 280 thrd.link.next = 0p; 281 thrd.link.prev = 0p; 282 283 // Fixup the thread state 284 thrd.state = Blocked; 285 thrd.ticket = TICKET_BLOCKED; 286 thrd.preempted = __NO_PREEMPTION; 287 288 ready_schedule_unlock(); 289 290 // Pretend like the thread was blocked all along 291 } 292 // !!! This is not an else if !!! 293 // Ok, now the thread is blocked (whether we cheated to get here or not) 294 if( thrd.state == Blocked ) { 295 // This is the "easy case" 296 // The thread is parked and can easily be moved to active cluster 297 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster ); 298 thrd.curr_cluster = active_cluster(); 299 300 // unpark the fast io_poller 301 unpark( &thrd ); 302 } 303 else { 304 // The thread is in a weird state 305 // I don't know what to do here 306 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 307 } 308 309 // The weird thread kidnapping stuff is over, restore interrupts. 310 enable_interrupts( __cfaabi_dbg_ctx ); 311 } else { 312 post( this.thrd.sem ); 313 } 314 315 ^(this.thrd){}; 316 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this); 317 318 __io_destroy( *this.thrd.ring ); 319 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this); 320 321 free(this.thrd.ring); 322 } 323 324 void ^?{}(io_context & this) { 325 ^(this){ false }; 122 326 } 123 327 … … 125 329 extern void __enable_interrupts_hard(); 126 330 127 static void __io_ uring_setup( $io_context & this, const io_context_params & params_in, int procfd) {331 static void __io_create( __io_data & this, const io_context_params & params_in ) { 128 332 // Step 1 : call to setup 129 333 struct io_uring_params params; 130 334 memset(¶ms, 0, sizeof(params)); 131 //if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;132 //if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;335 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL; 336 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 133 337 134 338 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; … … 136 340 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 137 341 } 342 if( params_in.poller_submits && params_in.eager_submits ) { 343 abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n"); 344 } 138 345 139 346 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms ); … … 143 350 144 351 // Step 2 : mmap result 145 struct __sub_ring_t & sq = this.sq; 146 struct __cmp_ring_t & cq = this.cq; 352 memset( &this, 0, sizeof(struct __io_data) ); 353 struct __submition_data & sq = this.submit_q; 354 struct __completion_data & cq = this.completion_q; 147 355 148 356 // calculate the right ring size … … 193 401 // Get the pointers from the kernel to fill the structure 194 402 // submit queue 195 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 196 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 197 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 198 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 199 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 200 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 201 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 202 203 sq.kring.released = 0; 204 205 sq.free_ring.head = 0; 206 sq.free_ring.tail = *sq.num; 207 sq.free_ring.array = alloc( *sq.num, 128`align ); 208 for(i; (__u32)*sq.num) { 209 sq.free_ring.array[i] = i; 210 } 211 212 sq.to_submit = 0; 403 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 404 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 405 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 406 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 407 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 408 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 409 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 410 sq.prev_head = *sq.head; 411 412 { 413 const __u32 num = *sq.num; 414 for( i; num ) { 415 __sqe_clean( &sq.sqes[i] ); 416 } 417 } 418 419 (sq.submit_lock){}; 420 (sq.release_lock){}; 421 422 if( params_in.poller_submits || params_in.eager_submits ) { 423 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) ); 424 sq.ready_cnt = max( params_in.num_ready, 8 ); 425 sq.ready = alloc( sq.ready_cnt, 64`align ); 426 for(i; sq.ready_cnt) { 427 sq.ready[i] = -1ul32; 428 } 429 sq.prev_ready = 0; 430 } 431 else { 432 sq.ready_cnt = 0; 433 sq.ready = 0p; 434 sq.prev_ready = 0; 435 } 213 436 214 437 // completion queue … … 223 446 // io_uring_register is so f*cking slow on some machine that it 224 447 // will never succeed if preemption isn't hard blocked 225 __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);226 227 448 __disable_interrupts_hard(); 228 449 229 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1); 450 int efd = eventfd(0, 0); 451 if (efd < 0) { 452 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 453 } 454 455 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 230 456 if (ret < 0) { 231 457 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); … … 233 459 234 460 __enable_interrupts_hard(); 235 236 __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);237 461 238 462 // some paranoid checks … … 244 468 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask ); 245 469 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num ); 246 /* paranoid */ verifyf( (*sq. kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );247 /* paranoid */ verifyf( (*sq. kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );470 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head ); 471 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail ); 248 472 249 473 // Update the global ring info 250 this.ring_flags = 0;474 this.ring_flags = params.flags; 251 475 this.fd = fd; 252 } 253 254 static void __io_uring_teardown( $io_context & this ) { 476 this.efd = efd; 477 this.eager_submits = params_in.eager_submits; 478 this.poller_submits = params_in.poller_submits; 479 } 480 481 static void __io_destroy( __io_data & this ) { 255 482 // Shutdown the io rings 256 struct __sub _ring_t & sq = this.sq;257 struct __c mp_ring_t & cq = this.cq;483 struct __submition_data & sq = this.submit_q; 484 struct __completion_data & cq = this.completion_q; 258 485 259 486 // unmap the submit queue entries … … 270 497 // close the file descriptor 271 498 close(this.fd); 272 273 free( this.sq.free_ring.array ); // Maybe null, doesn't matter 274 } 275 276 void __cfa_io_start( processor * proc ) { 277 proc->io.ctx = alloc(); 278 (*proc->io.ctx){proc, *proc->cltr}; 279 } 280 void __cfa_io_stop ( processor * proc ) { 281 ^(*proc->io.ctx){}; 282 free(proc->io.ctx); 499 close(this.efd); 500 501 free( this.submit_q.ready ); // Maybe null, doesn't matter 283 502 } 284 503 … … 286 505 // I/O Context Sleep 287 506 //============================================================================================= 288 // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) { 289 // struct epoll_event ev; 290 // ev.events = EPOLLIN | EPOLLONESHOT; 291 // ev.data.u64 = (__u64)&ctx; 292 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev); 293 // if (ret < 0) { 294 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 295 // } 296 // } 297 298 // static void __epoll_register($io_context & ctx) { 299 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 300 // } 301 302 // static void __epoll_unregister($io_context & ctx) { 303 // // Read the current epoch so we know when to stop 304 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 305 306 // // Remove the fd from the iopoller 307 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 308 309 // // Notify the io poller thread of the shutdown 310 // iopoll.run = false; 311 // sigval val = { 1 }; 312 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 313 314 // // Make sure all this is done 315 // __atomic_thread_fence(__ATOMIC_SEQ_CST); 316 317 // // Wait for the next epoch 318 // while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 319 // } 320 321 // void __ioctx_prepare_block($io_context & ctx) { 322 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx); 323 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 324 // } 325 507 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 508 struct epoll_event ev; 509 ev.events = EPOLLIN | EPOLLONESHOT; 510 ev.data.u64 = (__u64)&ctx; 511 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); 512 if (ret < 0) { 513 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 514 } 515 } 516 517 void __ioctx_register($io_ctx_thread & ctx) { 518 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 519 } 520 521 void __ioctx_prepare_block($io_ctx_thread & ctx) { 522 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 523 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 524 } 525 526 void __ioctx_unregister($io_ctx_thread & ctx) { 527 // Read the current epoch so we know when to stop 528 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST); 529 530 // Remove the fd from the iopoller 531 __ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE"); 532 533 // Notify the io poller thread of the shutdown 534 iopoll.run = false; 535 sigval val = { 1 }; 536 pthread_sigqueue( iopoll.thrd, SIGUSR1, val ); 537 538 // Make sure all this is done 539 __atomic_thread_fence(__ATOMIC_SEQ_CST); 540 541 // Wait for the next epoch 542 while(curr == iopoll.epoch && !iopoll.stopped) Pause(); 543 } 326 544 327 545 //============================================================================================= 328 546 // I/O Context Misc Setup 329 547 //============================================================================================= 330 void ?{}( $io_arbiter & this ) { 331 this.pending.flag = false; 332 } 333 334 void ^?{}( $io_arbiter & mutex this ) { 335 // /* paranoid */ verify( empty(this.assigned) ); 336 // /* paranoid */ verify( empty(this.available) ); 337 /* paranoid */ verify( is_empty(this.pending.blocked) ); 338 } 339 340 $io_arbiter * create(void) { 341 return new(); 342 } 343 void destroy($io_arbiter * arbiter) { 344 delete(arbiter); 345 } 346 347 //============================================================================================= 348 // I/O Context Misc Setup 349 //============================================================================================= 350 548 void register_fixed_files( io_context & ctx, int * files, unsigned count ) { 549 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count ); 550 if( ret < 0 ) { 551 abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) ); 552 } 553 554 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret ); 555 } 556 557 void register_fixed_files( cluster & cltr, int * files, unsigned count ) { 558 for(i; cltr.io.cnt) { 559 register_fixed_files( cltr.io.ctxs[i], files, count ); 560 } 561 } 351 562 #endif -
libcfa/src/concurrency/io/types.hfa
r266ecf1 r182256b 25 25 26 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 27 #include "bits/sequence.hfa" 28 #include "monitor.hfa" 27 #define LEADER_LOCK 28 struct __leaderlock_t { 29 struct $thread * volatile value; // ($thread) next_leader | (bool:1) is_locked 30 }; 29 31 30 struct processor; 31 monitor $io_arbiter; 32 static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; } 32 33 33 34 //----------------------------------------------------------------------- 34 35 // Ring Data structure 35 struct __sub_ring_t { 36 struct { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; // one passed last index consumed by the kernel 39 volatile __u32 * tail; // one passed last index visible to the kernel 40 volatile __u32 released; // one passed last index released back to the free list 36 struct __submition_data { 37 // Head and tail of the ring (associated with array) 38 volatile __u32 * head; 39 volatile __u32 * tail; 40 volatile __u32 prev_head; 41 41 42 // The actual kernel ring which uses head/tail 43 // indexes into the sqes arrays 44 __u32 * array; 45 } kring; 46 47 struct { 48 volatile __u32 head; 49 volatile __u32 tail; 50 // The ring which contains free allocations 51 // indexes into the sqes arrays 52 __u32 * array; 53 } free_ring; 54 55 // number of sqes to submit on next system call. 56 __u32 to_submit; 42 // The actual kernel ring which uses head/tail 43 // indexes into the sqes arrays 44 __u32 * array; 57 45 58 46 // number of entries and mask to go with it … … 60 48 const __u32 * mask; 61 49 62 // Submission flags , currently only IORING_SETUP_SQPOLL50 // Submission flags (Not sure what for) 63 51 __u32 * flags; 64 52 65 // number of sqes not submitted 66 // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer. 53 // number of sqes not submitted (whatever that means) 67 54 __u32 * dropped; 68 55 56 // Like head/tail but not seen by the kernel 57 volatile __u32 * ready; 58 __u32 ready_cnt; 59 __u32 prev_ready; 60 61 #if defined(LEADER_LOCK) 62 __leaderlock_t submit_lock; 63 #else 64 __spinlock_t submit_lock; 65 #endif 66 __spinlock_t release_lock; 67 69 68 // A buffer of sqes (not the actual ring) 70 struct io_uring_sqe * sqes;69 volatile struct io_uring_sqe * sqes; 71 70 72 71 // The location and size of the mmaped area … … 75 74 }; 76 75 77 struct __c mp_ring_t{76 struct __completion_data { 78 77 // Head and tail of the ring 79 78 volatile __u32 * head; … … 84 83 const __u32 * num; 85 84 86 // I don't know what this value is for85 // number of cqes not submitted (whatever that means) 87 86 __u32 * overflow; 88 87 … … 95 94 }; 96 95 97 struct __attribute__((aligned(128))) $io_context { 98 $io_arbiter * arbiter; 99 processor * proc; 100 101 struct { 102 volatile bool empty; 103 condition blocked; 104 } ext_sq; 105 106 struct __sub_ring_t sq; 107 struct __cmp_ring_t cq; 96 struct __io_data { 97 struct __submition_data submit_q; 98 struct __completion_data completion_q; 108 99 __u32 ring_flags; 109 100 int fd; 110 }; 111 112 monitor __attribute__((aligned(128))) $io_arbiter { 113 struct { 114 condition blocked; 115 $io_context * ctx; 116 volatile bool flag; 117 } pending; 101 int efd; 102 bool eager_submits:1; 103 bool poller_submits:1; 118 104 }; 119 105 … … 147 133 #endif 148 134 149 // void __ioctx_prepare_block($io_context & ctx); 135 struct $io_ctx_thread; 136 void __ioctx_register($io_ctx_thread & ctx); 137 void __ioctx_unregister($io_ctx_thread & ctx); 138 void __ioctx_prepare_block($io_ctx_thread & ctx); 139 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 150 140 #endif 151 141 -
libcfa/src/concurrency/iofwd.hfa
r266ecf1 r182256b 18 18 #include <unistd.h> 19 19 extern "C" { 20 #include < asm/types.h>20 #include <sys/types.h> 21 21 #if CFA_HAVE_LINUX_IO_URING_H 22 22 #include <linux/io_uring.h> … … 48 48 struct cluster; 49 49 struct io_future_t; 50 struct $io_context; 50 struct io_context; 51 struct io_cancellation; 51 52 52 53 struct iovec; … … 54 55 struct sockaddr; 55 56 struct statx; 56 struct epoll_event;57 58 //----------59 // underlying calls60 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2)));61 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));62 57 63 58 //---------- 64 59 // synchronous calls 65 60 #if defined(CFA_HAVE_PREADV2) 66 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);61 extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 67 62 #endif 68 63 #if defined(CFA_HAVE_PWRITEV2) 69 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);64 extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 70 65 #endif 71 extern int cfa_fsync(int fd, __u64 submit_flags);72 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);73 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);74 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);75 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);76 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);77 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);78 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);79 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);80 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags);81 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags);82 extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags);83 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);66 extern int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 67 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 68 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 69 extern ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 70 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 71 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 72 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 73 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 74 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 75 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 76 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 77 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 78 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 84 79 #if defined(CFA_HAVE_OPENAT2) 85 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);80 extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 86 81 #endif 87 extern int cfa_close(int fd, __u64 submit_flags);82 extern int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 88 83 #if defined(CFA_HAVE_STATX) 89 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);84 extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 90 85 #endif 91 extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags);92 extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags);93 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);94 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);86 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 87 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 88 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 89 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 95 90 96 91 //---------- 97 92 // asynchronous calls 98 93 #if defined(CFA_HAVE_PREADV2) 99 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);94 extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 100 95 #endif 101 96 #if defined(CFA_HAVE_PWRITEV2) 102 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);97 extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 103 98 #endif 104 extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags);105 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);106 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);107 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);108 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);109 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);110 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);111 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);112 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);113 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags);114 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags);115 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags);116 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);99 extern void async_fsync(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context); 100 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags, io_cancellation * cancellation, io_context * context); 101 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 102 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 103 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 104 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 105 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 106 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 107 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, io_cancellation * cancellation, io_context * context); 108 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags, io_cancellation * cancellation, io_context * context); 109 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags, io_cancellation * cancellation, io_context * context); 110 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags, io_cancellation * cancellation, io_context * context); 111 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, io_cancellation * cancellation, io_context * context); 117 112 #if defined(CFA_HAVE_OPENAT2) 118 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);113 extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, io_cancellation * cancellation, io_context * context); 119 114 #endif 120 extern void async_close(io_future_t & future, int fd, __u64 submit_flags);115 extern void async_close(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context); 121 116 #if defined(CFA_HAVE_STATX) 122 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);117 extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, io_cancellation * cancellation, io_context * context); 123 118 #endif 124 void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);125 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);126 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);127 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);119 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context); 120 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context); 121 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 122 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context); 128 123 129 124 … … 131 126 // Check if a function is blocks a only the user thread 132 127 bool has_user_level_blocking( fptr_t func ); 128 129 //----------------------------------------------------------------------------- 130 void register_fixed_files( io_context & ctx , int * files, unsigned count ); 131 void register_fixed_files( cluster & cltr, int * files, unsigned count ); -
libcfa/src/concurrency/kernel.cfa
r266ecf1 r182256b 22 22 #include <signal.h> 23 23 #include <unistd.h> 24 extern "C" {25 #include <sys/eventfd.h>26 }27 24 28 25 //CFA Includes … … 112 109 static void __run_thread(processor * this, $thread * dst); 113 110 static void __wake_one(cluster * cltr); 111 static void wait(__bin_sem_t & this); 114 112 115 113 static void push (__cluster_idles & idles, processor & proc); … … 117 115 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles ); 118 116 119 extern void __cfa_io_start( processor * );120 extern void __cfa_io_drain( processor * );121 extern void __cfa_io_flush( processor * );122 extern void __cfa_io_stop ( processor * );123 static inline void __maybe_io_drain( processor * );124 125 extern void __disable_interrupts_hard();126 extern void __enable_interrupts_hard();127 117 128 118 //============================================================================================= … … 140 130 verify(this); 141 131 142 __cfa_io_start( this );143 144 132 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 145 133 #if !defined(__CFA_NO_STATISTICS__) … … 163 151 MAIN_LOOP: 164 152 for() { 165 // Check if there is pending io166 __maybe_io_drain( this );167 168 153 // Try to get the next thread 169 154 readyThread = __next_thread( this->cltr ); 170 155 171 156 if( !readyThread ) { 172 __cfa_io_flush( this );173 157 readyThread = __next_thread_slow( this->cltr ); 174 158 } … … 206 190 #endif 207 191 208 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 209 210 __disable_interrupts_hard(); 211 eventfd_t val; 212 eventfd_read( this->idle, &val ); 213 __enable_interrupts_hard(); 192 wait( this->idle ); 214 193 215 194 #if !defined(__CFA_NO_STATISTICS__) … … 227 206 228 207 /* paranoid */ verify( readyThread ); 229 230 // Reset io dirty bit231 this->io.dirty = false;232 208 233 209 // We found a thread run it … … 244 220 } 245 221 #endif 246 247 if(this->io.pending && !this->io.dirty) {248 __cfa_io_flush( this );249 }250 222 } 251 223 … … 253 225 } 254 226 255 __cfa_io_stop( this );256 257 227 post( this->terminated ); 258 259 228 260 229 if(this == mainProcessor) { … … 279 248 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 280 249 __builtin_prefetch( thrd_dst->context.SP ); 281 282 __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name);283 250 284 251 $coroutine * proc_cor = get_coroutine(this->runner); … … 363 330 // Just before returning to the processor, set the processor coroutine to active 364 331 proc_cor->state = Active; 365 366 __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);367 332 368 333 /* paranoid */ verify( ! __preemption_enabled() ); … … 584 549 // Kernel Idle Sleep 585 550 //============================================================================================= 551 extern "C" { 552 char * strerror(int); 553 } 554 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); } 555 556 static void wait(__bin_sem_t & this) with( this ) { 557 verify(__cfaabi_dbg_in_kernel()); 558 CHECKED( pthread_mutex_lock(&lock) ); 559 while(val < 1) { 560 pthread_cond_wait(&cond, &lock); 561 } 562 val -= 1; 563 CHECKED( pthread_mutex_unlock(&lock) ); 564 } 565 566 static bool post(__bin_sem_t & this) with( this ) { 567 bool needs_signal = false; 568 569 CHECKED( pthread_mutex_lock(&lock) ); 570 if(val < 1) { 571 val += 1; 572 pthread_cond_signal(&cond); 573 needs_signal = true; 574 } 575 CHECKED( pthread_mutex_unlock(&lock) ); 576 577 return needs_signal; 578 } 579 580 #undef CHECKED 581 586 582 // Wake a thread from the front if there are any 587 583 static void __wake_one(cluster * this) { … … 599 595 600 596 // We found a processor, wake it up 601 eventfd_t val; 602 val = 1; 603 eventfd_write( p->idle, val ); 597 post( p->idle ); 604 598 605 599 #if !defined(__CFA_NO_STATISTICS__) … … 619 613 disable_interrupts(); 620 614 /* paranoid */ verify( ! __preemption_enabled() ); 621 eventfd_t val; 622 val = 1; 623 eventfd_write( this->idle, val ); 615 post( this->idle ); 624 616 enable_interrupts( __cfaabi_dbg_ctx ); 625 617 } … … 704 696 // Kernel Utilities 705 697 //============================================================================================= 706 #if defined(CFA_HAVE_LINUX_IO_URING_H)707 #include "io/types.hfa"708 #endif709 710 static inline void __maybe_io_drain( processor * proc ) {711 #if defined(CFA_HAVE_LINUX_IO_URING_H)712 __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);713 714 // Check if we should drain the queue715 $io_context * ctx = proc->io.ctx;716 unsigned head = *ctx->cq.head;717 unsigned tail = *ctx->cq.tail;718 if(head != tail) __cfa_io_drain( proc );719 #endif720 }721 722 698 //----------------------------------------------------------------------------- 723 699 // Debug -
libcfa/src/concurrency/kernel.hfa
r266ecf1 r182256b 28 28 } 29 29 30 //----------------------------------------------------------------------------- 31 // Underlying Locks 30 32 #ifdef __CFA_WITH_VERIFY__ 31 33 extern bool __cfaabi_dbg_in_kernel(); 32 34 #endif 33 35 34 //----------------------------------------------------------------------------- 35 // I/O 36 struct cluster; 37 struct $io_context; 38 struct $io_arbiter; 39 40 struct io_context_params { 41 int num_entries; 42 }; 43 44 void ?{}(io_context_params & this); 36 struct __bin_sem_t { 37 pthread_mutex_t lock; 38 pthread_cond_t cond; 39 int val; 40 }; 45 41 46 42 //----------------------------------------------------------------------------- … … 82 78 pthread_t kernel_thread; 83 79 84 struct {85 $io_context * ctx;86 bool pending;87 bool dirty;88 } io;89 90 80 // Preemption data 91 81 // Node which is added in the discrete event simulaiton … … 96 86 97 87 // Idle lock (kernel semaphore) 98 int idle;88 __bin_sem_t idle; 99 89 100 90 // Termination synchronisation (user semaphore) … … 126 116 127 117 DLISTED_MGD_IMPL_OUT(processor) 118 119 //----------------------------------------------------------------------------- 120 // I/O 121 struct __io_data; 122 123 // IO poller user-thread 124 // Not using the "thread" keyword because we want to control 125 // more carefully when to start/stop it 126 struct $io_ctx_thread { 127 struct __io_data * ring; 128 single_sem sem; 129 volatile bool done; 130 $thread self; 131 }; 132 133 134 struct io_context { 135 $io_ctx_thread thrd; 136 }; 137 138 struct io_context_params { 139 int num_entries; 140 int num_ready; 141 int submit_aff; 142 bool eager_submits:1; 143 bool poller_submits:1; 144 bool poll_submit:1; 145 bool poll_complete:1; 146 }; 147 148 void ?{}(io_context_params & this); 149 150 void ?{}(io_context & this, struct cluster & cl); 151 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params); 152 void ^?{}(io_context & this); 153 154 struct io_cancellation { 155 __u64 target; 156 }; 157 158 static inline void ?{}(io_cancellation & this) { this.target = -1u; } 159 static inline void ^?{}(io_cancellation &) {} 160 bool cancel(io_cancellation & this); 128 161 129 162 //----------------------------------------------------------------------------- … … 211 244 212 245 struct { 213 $io_arbiter * arbiter;214 io_context_params params;246 io_context * ctxs; 247 unsigned cnt; 215 248 } io; 216 249 -
libcfa/src/concurrency/kernel/startup.cfa
r266ecf1 r182256b 22 22 extern "C" { 23 23 #include <limits.h> // PTHREAD_STACK_MIN 24 #include <sys/eventfd.h> // eventfd25 24 #include <sys/mman.h> // mprotect 26 25 #include <sys/resource.h> // getrlimit … … 81 80 static void ?{}(processorCtx_t & this) {} 82 81 static void ?{}(processorCtx_t & this, processor * proc, current_stack_info_t * info); 82 static void ?{}(__bin_sem_t & this); 83 static void ^?{}(__bin_sem_t & this); 83 84 84 85 #if defined(__CFA_WITH_VERIFY__) … … 90 91 extern void __kernel_alarm_startup(void); 91 92 extern void __kernel_alarm_shutdown(void); 93 extern void __kernel_io_startup (void); 94 extern void __kernel_io_shutdown(void); 92 95 93 96 //----------------------------------------------------------------------------- … … 101 104 KERNEL_STORAGE($thread, mainThread); 102 105 KERNEL_STORAGE(__stack_t, mainThreadCtx); 106 KERNEL_STORAGE(io_context, mainPollerThread); 103 107 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 104 108 #if !defined(__CFA_NO_STATISTICS__) … … 196 200 197 201 void ?{}(processor & this) with( this ) { 202 ( this.idle ){}; 198 203 ( this.terminated ){}; 199 204 ( this.runner ){}; … … 223 228 __kernel_alarm_startup(); 224 229 230 // Start IO 231 __kernel_io_startup(); 232 225 233 // Add the main thread to the ready queue 226 234 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread … … 235 243 // THE SYSTEM IS NOW COMPLETELY RUNNING 236 244 245 246 // SKULLDUGGERY: The constructor for the mainCluster will call alloc with a dimension of 0 247 // malloc *can* return a non-null value, we should free it if that is the case 248 free( mainCluster->io.ctxs ); 249 250 // Now that the system is up, finish creating systems that need threading 251 mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread; 252 mainCluster->io.cnt = 1; 253 (*mainCluster->io.ctxs){ *mainCluster }; 254 237 255 __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n"); 238 256 … … 244 262 245 263 static void __kernel_shutdown(void) { 264 //Before we start shutting things down, wait for systems that need threading to shutdown 265 ^(*mainCluster->io.ctxs){}; 266 mainCluster->io.cnt = 0; 267 mainCluster->io.ctxs = 0p; 268 246 269 /* paranoid */ verify( __preemption_enabled() ); 247 270 disable_interrupts(); … … 261 284 // Disable preemption 262 285 __kernel_alarm_shutdown(); 286 287 // Stop IO 288 __kernel_io_shutdown(); 263 289 264 290 // Destroy the main processor and its context in reverse order of construction … … 460 486 pending_preemption = false; 461 487 462 this.io.ctx = 0p;463 this.io.pending = false;464 this.io.dirty = false;465 466 this.idle = eventfd(0, 0);467 if (idle < 0) {468 abort("KERNEL ERROR: PROCESSOR EVENTFD - %s\n", strerror(errno));469 }470 471 488 #if !defined(__CFA_NO_STATISTICS__) 472 489 print_stats = 0; … … 509 526 // Finally we don't need the read_lock any more 510 527 unregister((__processor_id_t*)&this); 511 512 close(this.idle);513 528 } 514 529 515 530 void ?{}(processor & this, const char name[], cluster & _cltr) { 531 ( this.idle ){}; 516 532 ( this.terminated ){}; 517 533 ( this.runner ){}; … … 568 584 threads{ __get }; 569 585 570 io.arbiter = create();571 io.params = io_params;572 573 586 doregister(this); 574 587 … … 583 596 ready_mutate_unlock( last_size ); 584 597 enable_interrupts_noPoll(); // Don't poll, could be in main cluster 598 599 600 this.io.cnt = num_io; 601 this.io.ctxs = aalloc(num_io); 602 for(i; this.io.cnt) { 603 (this.io.ctxs[i]){ this, io_params }; 604 } 585 605 } 586 606 587 607 void ^?{}(cluster & this) { 588 destroy(this.io.arbiter); 608 for(i; this.io.cnt) { 609 ^(this.io.ctxs[i]){ true }; 610 } 611 free(this.io.ctxs); 589 612 590 613 // Lock the RWlock so no-one pushes/pops while we are changing the queue … … 715 738 } 716 739 740 extern "C" { 741 char * strerror(int); 742 } 743 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); } 744 745 static void ?{}(__bin_sem_t & this) with( this ) { 746 // Create the mutex with error checking 747 pthread_mutexattr_t mattr; 748 pthread_mutexattr_init( &mattr ); 749 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP); 750 pthread_mutex_init(&lock, &mattr); 751 752 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required 753 val = 0; 754 } 755 756 static void ^?{}(__bin_sem_t & this) with( this ) { 757 CHECKED( pthread_mutex_destroy(&lock) ); 758 CHECKED( pthread_cond_destroy (&cond) ); 759 } 760 761 #undef CHECKED 762 717 763 #if defined(__CFA_WITH_VERIFY__) 718 764 static bool verify_fwd_bck_rng(void) { -
libcfa/src/concurrency/kernel_private.hfa
r266ecf1 r182256b 77 77 //----------------------------------------------------------------------------- 78 78 // I/O 79 $io_arbiter * create(void); 80 void destroy($io_arbiter *); 79 void ^?{}(io_context & this, bool ); 81 80 82 81 //======================================================================= -
libcfa/src/concurrency/preemption.cfa
r266ecf1 r182256b 585 585 586 586 // Setup proper signal handlers 587 __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO ); // __cfactx_switch handler588 __cfaabi_sigaction( SIGALRM, sigHandler_alarm , SA_SIGINFO ); // debug handler587 __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO | SA_RESTART ); // __cfactx_switch handler 588 __cfaabi_sigaction( SIGALRM, sigHandler_alarm , SA_SIGINFO | SA_RESTART ); // debug handler 589 589 590 590 signal_block( SIGALRM ); -
libcfa/src/concurrency/stats.cfa
r266ecf1 r182256b 25 25 26 26 #if defined(CFA_HAVE_LINUX_IO_URING_H) 27 stats->io.alloc.fast = 0; 28 stats->io.alloc.slow = 0; 29 stats->io.alloc.fail = 0; 30 stats->io.alloc.revoke = 0; 31 stats->io.alloc.block = 0; 32 stats->io.submit.fast = 0; 33 stats->io.submit.slow = 0; 34 stats->io.flush.external = 0; 35 stats->io.calls.flush = 0; 36 stats->io.calls.submitted = 0; 37 stats->io.calls.drain = 0; 38 stats->io.calls.completed = 0; 39 stats->io.calls.errors.busy = 0; 40 stats->io.poller.sleeps = 0; 27 stats->io.submit_q.submit_avg.rdy = 0; 28 stats->io.submit_q.submit_avg.csm = 0; 29 stats->io.submit_q.submit_avg.cnt = 0; 30 stats->io.submit_q.look_avg.val = 0; 31 stats->io.submit_q.look_avg.cnt = 0; 32 stats->io.submit_q.look_avg.block = 0; 33 stats->io.submit_q.alloc_avg.val = 0; 34 stats->io.submit_q.alloc_avg.cnt = 0; 35 stats->io.submit_q.alloc_avg.block = 0; 36 stats->io.submit_q.helped = 0; 37 stats->io.submit_q.leader = 0; 38 stats->io.submit_q.busy = 0; 39 stats->io.complete_q.completed_avg.val = 0; 40 stats->io.complete_q.completed_avg.cnt = 0; 41 stats->io.complete_q.blocks = 0; 41 42 #endif 42 43 } … … 59 60 60 61 #if defined(CFA_HAVE_LINUX_IO_URING_H) 61 __atomic_fetch_add( &cltr->io.alloc.fast , proc->io.alloc.fast , __ATOMIC_SEQ_CST ); proc->io.alloc.fast = 0; 62 __atomic_fetch_add( &cltr->io.alloc.slow , proc->io.alloc.slow , __ATOMIC_SEQ_CST ); proc->io.alloc.slow = 0; 63 __atomic_fetch_add( &cltr->io.alloc.fail , proc->io.alloc.fail , __ATOMIC_SEQ_CST ); proc->io.alloc.fail = 0; 64 __atomic_fetch_add( &cltr->io.alloc.revoke , proc->io.alloc.revoke , __ATOMIC_SEQ_CST ); proc->io.alloc.revoke = 0; 65 __atomic_fetch_add( &cltr->io.alloc.block , proc->io.alloc.block , __ATOMIC_SEQ_CST ); proc->io.alloc.block = 0; 66 __atomic_fetch_add( &cltr->io.submit.fast , proc->io.submit.fast , __ATOMIC_SEQ_CST ); proc->io.submit.fast = 0; 67 __atomic_fetch_add( &cltr->io.submit.slow , proc->io.submit.slow , __ATOMIC_SEQ_CST ); proc->io.submit.slow = 0; 68 __atomic_fetch_add( &cltr->io.flush.external , proc->io.flush.external , __ATOMIC_SEQ_CST ); proc->io.flush.external = 0; 69 __atomic_fetch_add( &cltr->io.calls.flush , proc->io.calls.flush , __ATOMIC_SEQ_CST ); proc->io.calls.flush = 0; 70 __atomic_fetch_add( &cltr->io.calls.submitted , proc->io.calls.submitted , __ATOMIC_SEQ_CST ); proc->io.calls.submitted = 0; 71 __atomic_fetch_add( &cltr->io.calls.drain , proc->io.calls.drain , __ATOMIC_SEQ_CST ); proc->io.calls.drain = 0; 72 __atomic_fetch_add( &cltr->io.calls.completed , proc->io.calls.completed , __ATOMIC_SEQ_CST ); proc->io.calls.completed = 0; 73 __atomic_fetch_add( &cltr->io.calls.errors.busy, proc->io.calls.errors.busy, __ATOMIC_SEQ_CST ); proc->io.calls.errors.busy = 0; 74 __atomic_fetch_add( &cltr->io.poller.sleeps , proc->io.poller.sleeps , __ATOMIC_SEQ_CST ); proc->io.poller.sleeps = 0; 62 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.rdy , proc->io.submit_q.submit_avg.rdy , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.rdy = 0; 63 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.csm , proc->io.submit_q.submit_avg.csm , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.csm = 0; 64 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.avl , proc->io.submit_q.submit_avg.avl , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.avl = 0; 65 __atomic_fetch_add( &cltr->io.submit_q.submit_avg.cnt , proc->io.submit_q.submit_avg.cnt , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.cnt = 0; 66 __atomic_fetch_add( &cltr->io.submit_q.look_avg.val , proc->io.submit_q.look_avg.val , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.val = 0; 67 __atomic_fetch_add( &cltr->io.submit_q.look_avg.cnt , proc->io.submit_q.look_avg.cnt , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.cnt = 0; 68 __atomic_fetch_add( &cltr->io.submit_q.look_avg.block , proc->io.submit_q.look_avg.block , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.block = 0; 69 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.val , proc->io.submit_q.alloc_avg.val , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.val = 0; 70 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt , proc->io.submit_q.alloc_avg.cnt , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.cnt = 0; 71 __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block , proc->io.submit_q.alloc_avg.block , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.block = 0; 72 __atomic_fetch_add( &cltr->io.submit_q.helped , proc->io.submit_q.helped , __ATOMIC_SEQ_CST ); proc->io.submit_q.helped = 0; 73 __atomic_fetch_add( &cltr->io.submit_q.leader , proc->io.submit_q.leader , __ATOMIC_SEQ_CST ); proc->io.submit_q.leader = 0; 74 __atomic_fetch_add( &cltr->io.submit_q.busy , proc->io.submit_q.busy , __ATOMIC_SEQ_CST ); proc->io.submit_q.busy = 0; 75 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val, proc->io.complete_q.completed_avg.val, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.val = 0; 76 __atomic_fetch_add( &cltr->io.complete_q.completed_avg.cnt, proc->io.complete_q.completed_avg.cnt, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.cnt = 0; 77 __atomic_fetch_add( &cltr->io.complete_q.blocks , proc->io.complete_q.blocks , __ATOMIC_SEQ_CST ); proc->io.complete_q.blocks = 0; 75 78 #endif 76 79 } … … 79 82 80 83 if( flags & CFA_STATS_READY_Q ) { 84 double push_sur = (100.0 * ((double)ready.pick.push.success) / ready.pick.push.attempt); 85 double pop_sur = (100.0 * ((double)ready.pick.pop .success) / ready.pick.pop .attempt); 86 81 87 double push_len = ((double)ready.pick.push.attempt) / ready.pick.push.success; 82 88 double pop_len = ((double)ready.pick.pop .attempt) / ready.pick.pop .success; 89 90 double lpush_sur = (100.0 * ((double)ready.pick.push.lsuccess) / ready.pick.push.local); 91 double lpop_sur = (100.0 * ((double)ready.pick.pop .lsuccess) / ready.pick.pop .local); 83 92 84 93 double lpush_len = ((double)ready.pick.push.local) / ready.pick.push.lsuccess; … … 87 96 __cfaabi_bits_print_safe( STDOUT_FILENO, 88 97 "----- %s \"%s\" (%p) - Ready Q Stats -----\n" 89 "- total threads : %'15" PRIu64 "run, %'15" PRIu64 "schd (%'" PRIu64 "mig )\n" 90 "- push avg probe : %'3.2lf, %'3.2lfl (%'15" PRIu64 " attempts, %'15" PRIu64 " locals)\n" 91 "- pop avg probe : %'3.2lf, %'3.2lfl (%'15" PRIu64 " attempts, %'15" PRIu64 " locals)\n" 92 "- Idle Sleep : %'15" PRIu64 "h, %'15" PRIu64 "c, %'15" PRIu64 "w, %'15" PRIu64 "e\n" 98 "- total threads run : %'15" PRIu64 "\n" 99 "- total threads scheduled: %'15" PRIu64 "\n" 100 "- push average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 101 "- pop average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 102 "- local push avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 103 "- local pop avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n" 104 "- thread migrations : %'15" PRIu64 "\n" 105 "- Idle Sleep -\n" 106 "-- halts : %'15" PRIu64 "\n" 107 "-- cancelled halts : %'15" PRIu64 "\n" 108 "-- schedule wake : %'15" PRIu64 "\n" 109 "-- wake on exit : %'15" PRIu64 "\n" 93 110 "\n" 94 111 , type, name, id 95 112 , ready.pick.pop.success 96 113 , ready.pick.push.success 114 , push_len, push_sur, ready.pick.push.attempt 115 , pop_len , pop_sur , ready.pick.pop .attempt 116 , lpush_len, lpush_sur, ready.pick.push.local 117 , lpop_len , lpop_sur , ready.pick.pop .local 97 118 , ready.threads.migration 98 , push_len, lpush_len, ready.pick.push.attempt, ready.pick.push.local99 , pop_len , lpop_len , ready.pick.pop .attempt, ready.pick.pop .local100 119 , ready.sleep.halts, ready.sleep.cancels, ready.sleep.wakes, ready.sleep.exits 101 120 ); … … 104 123 #if defined(CFA_HAVE_LINUX_IO_URING_H) 105 124 if( flags & CFA_STATS_IO ) { 106 uint64_t total_allocs = io.alloc.fast + io.alloc.slow;107 double avg fasta = ((double)io.alloc.fast) / total_allocs;125 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt; 126 double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt; 108 127 109 uint64_t total_submits = io.submit.fast + io.submit.slow; 110 double avgfasts = ((double)io.submit.fast) / total_submits; 128 double lavgv = 0; 129 double lavgb = 0; 130 if(io.submit_q.look_avg.cnt != 0) { 131 lavgv = ((double)io.submit_q.look_avg.val ) / io.submit_q.look_avg.cnt; 132 lavgb = ((double)io.submit_q.look_avg.block) / io.submit_q.look_avg.cnt; 133 } 111 134 112 double avgsubs = ((double)io.calls.submitted) / io.calls.flush; 113 double avgcomp = ((double)io.calls.completed) / io.calls.drain; 135 double aavgv = 0; 136 double aavgb = 0; 137 if(io.submit_q.alloc_avg.cnt != 0) { 138 aavgv = ((double)io.submit_q.alloc_avg.val ) / io.submit_q.alloc_avg.cnt; 139 aavgb = ((double)io.submit_q.alloc_avg.block) / io.submit_q.alloc_avg.cnt; 140 } 114 141 115 142 __cfaabi_bits_print_safe( STDOUT_FILENO, 116 143 "----- %s \"%s\" (%p) - I/O Stats -----\n" 117 "- total allocations : %'" PRIu64 "f, %'" PRIu64 "s (%'2.2lff) \n" 118 "- failures : %'" PRIu64 "oom, %'" PRIu64 "rvk, %'" PRIu64 "blk\n" 119 "- total submits : %'" PRIu64 "f, %'" PRIu64 "s (%'2.2lf) \n" 120 "- flush external : %'" PRIu64 "\n" 121 "- io_uring_enter : %'" PRIu64 " (%'" PRIu64 ", %'" PRIu64 " EBUSY)\n" 122 "- submits : %'" PRIu64 " (%'.2lf) \n" 123 "- completes : %'" PRIu64 " (%'.2lf) \n" 124 "- poller sleeping : %'" PRIu64 "\n" 144 "- total submit calls : %'15" PRIu64 "\n" 145 "- avg ready entries : %'18.2lf\n" 146 "- avg submitted entries : %'18.2lf\n" 147 "- total helped entries : %'15" PRIu64 "\n" 148 "- total leader entries : %'15" PRIu64 "\n" 149 "- total busy submit : %'15" PRIu64 "\n" 150 "- total ready search : %'15" PRIu64 "\n" 151 "- avg ready search len : %'18.2lf\n" 152 "- avg ready search block : %'18.2lf\n" 153 "- total alloc search : %'15" PRIu64 "\n" 154 "- avg alloc search len : %'18.2lf\n" 155 "- avg alloc search block : %'18.2lf\n" 156 "- total wait calls : %'15" PRIu64 "\n" 157 "- avg completion/wait : %'18.2lf\n" 158 "- total completion blocks: %'15" PRIu64 "\n" 125 159 "\n" 126 160 , type, name, id 127 , io.alloc.fast, io.alloc.slow, avgfasta 128 , io.alloc.fail, io.alloc.revoke, io.alloc.block 129 , io.submit.fast, io.submit.slow, avgfasts 130 , io.flush.external 131 , io.calls.flush, io.calls.drain, io.calls.errors.busy 132 , io.calls.submitted, avgsubs 133 , io.calls.completed, avgcomp 134 , io.poller.sleeps 161 , io.submit_q.submit_avg.cnt 162 , avgrdy, avgcsm 163 , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy 164 , io.submit_q.look_avg.cnt 165 , lavgv, lavgb 166 , io.submit_q.alloc_avg.cnt 167 , aavgv, aavgb 168 , io.complete_q.completed_avg.cnt 169 , ((double)io.complete_q.completed_avg.val) / io.complete_q.completed_avg.cnt 170 , io.complete_q.blocks 135 171 ); 136 172 } -
libcfa/src/concurrency/stats.hfa
r266ecf1 r182256b 66 66 struct __attribute__((aligned(64))) __stats_io_t{ 67 67 struct { 68 volatile uint64_t fast; 69 volatile uint64_t slow; 70 volatile uint64_t fail; 71 volatile uint64_t revoke; 72 volatile uint64_t block; 73 } alloc; 68 struct { 69 volatile uint64_t rdy; 70 volatile uint64_t csm; 71 volatile uint64_t avl; 72 volatile uint64_t cnt; 73 } submit_avg; 74 struct { 75 volatile uint64_t val; 76 volatile uint64_t cnt; 77 volatile uint64_t block; 78 } look_avg; 79 struct { 80 volatile uint64_t val; 81 volatile uint64_t cnt; 82 volatile uint64_t block; 83 } alloc_avg; 84 volatile uint64_t helped; 85 volatile uint64_t leader; 86 volatile uint64_t busy; 87 } submit_q; 74 88 struct { 75 volatile uint64_t fast;76 volatile uint64_t slow;77 } submit;78 struct {79 volatile uint64_t external;80 } flush;81 struct {82 volatile uint64_t drain;83 volatile uint64_t completed;84 volatile uint64_t flush;85 volatile uint64_t submitted;86 89 struct { 87 volatile uint64_t busy; 88 } errors; 89 } calls; 90 struct { 91 volatile uint64_t sleeps; 92 } poller; 90 volatile uint64_t val; 91 volatile uint64_t cnt; 92 } completed_avg; 93 volatile uint64_t blocks; 94 } complete_q; 93 95 }; 94 96 #endif -
tests/Makefile.am
r266ecf1 r182256b 44 44 -Wall \ 45 45 -Wno-unused-function \ 46 -quiet @CFA_FLAGS@ 46 -quiet @CFA_FLAGS@ \ 47 -DIN_DIR="${abs_srcdir}/.in/" 47 48 48 49 AM_CFAFLAGS = -XCFA --deterministic-out … … 75 76 long_tests.hfa \ 76 77 .in/io.data \ 77 io/.in/io.data \78 78 avltree/avl.h \ 79 79 avltree/avl-private.h \ … … 142 142 # don't use distcc to do the linking because distcc doesn't do linking 143 143 % : %.cfa $(CFACCBIN) 144 $(CFACOMPILETEST) -c -o $(abspath ${@}).o -DIN_DIR="$(abspath $(dir ${<}))/.in/"144 $(CFACOMPILETEST) -c -o $(abspath ${@}).o 145 145 $(CFACCLINK) ${@}.o -o $(abspath ${@}) 146 146 rm $(abspath ${@}).o
Note:
See TracChangeset
for help on using the changeset viewer.