Changes in / [a00bc5b:101cc3a]
- Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
ra00bc5b r101cc3a 29 29 EXTRA_PROGRAMS = httpforall .dummy_hack 30 30 31 CLEANFILES = httpforall32 33 31 nodist_httpforall_SOURCES = \ 34 32 filecache.cfa \ -
benchmark/io/http/main.cfa
ra00bc5b r101cc3a 46 46 } 47 47 48 extern void init_protocol(void);49 extern void deinit_protocol(void);50 51 48 //============================================================================================= 52 49 // Main … … 64 61 //=================== 65 62 // Open Socket 66 printf(" %ld : Listening on port %d\n", getpid(), options.socket.port);63 printf("Listening on port %d\n", options.socket.port); 67 64 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 68 65 if(server_fd < 0) { … … 82 79 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); 83 80 if(ret < 0) { 84 if(errno == EADDRINUSE) {81 if(errno == 98) { 85 82 if(waited == 0) { 86 83 printf("Waiting for port\n"); … … 112 109 options.clopts.instance = &cl; 113 110 114 115 111 int pipe_cnt = options.clopts.nworkers * 2; 116 112 int pipe_off; … … 128 124 { 129 125 ServerProc procs[options.clopts.nprocs]; 130 131 init_protocol();132 126 { 133 127 Worker workers[options.clopts.nworkers]; … … 157 151 printf("Shutting Down\n"); 158 152 } 159 160 for(i; options.clopts.nworkers) {161 printf("Cancelling %p\n", (void*)workers[i].cancel.target);162 workers[i].done = true;163 cancel(workers[i].cancel);164 }165 166 printf("Shutting down socket\n");167 int ret = shutdown( server_fd, SHUT_RD );168 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }169 170 //===================171 // Close Socket172 printf("Closing Socket\n");173 ret = close( server_fd );174 if(ret < 0) {175 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );176 }177 153 } 178 154 printf("Workers Closed\n"); 179 180 deinit_protocol();181 155 } 182 156 … … 188 162 } 189 163 free(fds); 164 } 190 165 166 //=================== 167 // Close Socket 168 printf("Closing Socket\n"); 169 ret = close( server_fd ); 170 if(ret < 0) { 171 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 191 172 } 192 173 -
benchmark/io/http/options.cfa
ra00bc5b r101cc3a 12 12 #include <parseargs.hfa> 13 13 14 #include <string.h>15 16 14 Options options @= { 17 false, // log18 19 15 { // file_cache 20 16 0, // open_flags; … … 52 48 {'p', "port", "Port the server will listen on", options.socket.port}, 53 49 {'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 54 {'L', "log", "Enable logs", options.log, parse_settrue},55 50 {'t', "threads", "Number of worker threads to use", options.clopts.nworkers}, 56 51 {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, -
benchmark/io/http/options.hfa
ra00bc5b r101cc3a 8 8 9 9 struct Options { 10 bool log;11 12 10 struct { 13 11 int open_flags; -
benchmark/io/http/protocol.cfa
ra00bc5b r101cc3a 18 18 #include "options.hfa" 19 19 20 const char * volatile date = 0p;21 22 20 const char * http_msgs[] = { 23 "HTTP/1.1 200 OK\n Server: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu\n\n",24 "HTTP/1.1 400 Bad Request\n Server: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0\n\n",25 "HTTP/1.1 404 Not Found\n Server: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0\n\n",26 "HTTP/1.1 413 Payload Too Large\n Server: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0\n\n",27 "HTTP/1.1 414 URI Too Long\n Server: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0\n\n",21 "HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: %zu\n\n", 22 "HTTP/1.1 400 Bad Request\nContent-Type: text/plain\nContent-Length: 0\n\n", 23 "HTTP/1.1 404 Not Found\nContent-Type: text/plain\nContent-Length: 0\n\n", 24 "HTTP/1.1 413 Payload Too Large\nContent-Type: text/plain\nContent-Length: 0\n\n", 25 "HTTP/1.1 414 URI Too Long\nContent-Type: text/plain\nContent-Length: 0\n\n", 28 26 }; 29 27 … … 47 45 while(len > 0) { 48 46 // Call write 49 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p); 50 // int ret = write(fd, it, len); 47 int ret = write(fd, it, len); 51 48 if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); } 52 49 … … 66 63 int answer_header( int fd, size_t size ) { 67 64 const char * fmt = http_msgs[OK200]; 68 int len = 200;65 int len = 100; 69 66 char buffer[len]; 70 len = snprintf(buffer, len, fmt, date,size);67 len = snprintf(buffer, len, fmt, size); 71 68 return answer( fd, buffer, len ); 72 69 } 73 70 74 int answer_plain( int fd, char buffer[], size_t size ) { 75 int ret = answer_header(fd, size); 76 if( ret < 0 ) return ret; 77 return answer(fd, buffer, size); 78 } 79 80 int answer_empty( int fd ) { 81 return answer_header(fd, 0); 82 } 83 84 85 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) { 71 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 86 72 char * it = buffer; 87 73 size_t count = len - 1; … … 89 75 READ: 90 76 for() { 91 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p); 92 // int ret = read(fd, (void*)it, count); 77 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p); 93 78 if(ret == 0 ) return [OK200, true, 0, 0]; 94 79 if(ret < 0 ) { 95 80 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 96 // if( errno == EINVAL ) return [E400, true, 0, 0];97 81 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 98 82 } … … 108 92 } 109 93 110 if( options.log )printf("%.*s\n", rlen, buffer);94 printf("%.*s\n", rlen, buffer); 111 95 112 96 it = buffer; … … 120 104 121 105 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 122 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;123 106 off_t offset = 0; 124 107 ssize_t ret; 125 108 SPLICE1: while(count > 0) { 126 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p); 127 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 109 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p); 128 110 if( ret < 0 ) { 129 111 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; … … 135 117 size_t in_pipe = ret; 136 118 SPLICE2: while(in_pipe > 0) { 137 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p); 138 // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 119 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p); 139 120 if( ret < 0 ) { 140 121 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; … … 146 127 } 147 128 } 148 149 //=============================================================================================150 151 #include <clock.hfa>152 #include <time.hfa>153 #include <thread.hfa>154 155 struct date_buffer {156 char buff[100];157 };158 159 thread DateFormater {160 int idx;161 date_buffer buffers[2];162 };163 164 void ?{}( DateFormater & this ) {165 ((thread&)this){ "Server Date Thread", *options.clopts.instance };166 this.idx = 0;167 memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );168 memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );169 }170 171 void main(DateFormater & this) {172 LOOP: for() {173 waitfor( ^?{} : this) {174 break LOOP;175 }176 or else {}177 178 Time now = getTimeNsec();179 180 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );181 182 char * next = this.buffers[this.idx].buff;183 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST);184 this.idx = (this.idx + 1) % 2;185 186 sleep(1`s);187 }188 }189 190 //=============================================================================================191 DateFormater * the_date_formatter;192 193 void init_protocol(void) {194 the_date_formatter = alloc();195 (*the_date_formatter){};196 }197 198 void deinit_protocol(void) {199 ^(*the_date_formatter){};200 free( the_date_formatter );201 } -
benchmark/io/http/protocol.hfa
ra00bc5b r101cc3a 1 1 #pragma once 2 3 struct io_cancellation;4 2 5 3 enum HttpCode { … … 16 14 int answer_error( int fd, HttpCode code ); 17 15 int answer_header( int fd, size_t size ); 18 int answer_plain( int fd, char buffer [], size_t size );19 int answer_empty( int fd );20 16 21 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len , io_cancellation *);17 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); 22 18 23 19 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
ra00bc5b r101cc3a 19 19 this.pipe[0] = -1; 20 20 this.pipe[1] = -1; 21 this.done = false;22 }23 24 extern "C" {25 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);26 21 } 27 22 … … 33 28 CONNECTION: 34 29 for() { 35 if( options.log ) printf("=== Accepting connection ===\n"); 36 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 37 // int fd = accept4( this.[sockfd, addr, addrlen, flags] ); 30 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p ); 38 31 if(fd < 0) { 39 32 if( errno == ECONNABORTED ) break; 40 if( errno == EINVAL && this.done ) break;41 33 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 42 34 } 43 35 44 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);36 printf("New connection %d, waiting for requests\n", fd); 45 37 REQUEST: 46 38 for() { … … 53 45 size_t len = options.socket.buflen; 54 46 char buffer[len]; 55 if( options.log ) printf("=== Reading request ===\n");56 [code, closed, file, name_size] = http_read(fd, buffer, len , &this.cancel);47 printf("Reading request\n"); 48 [code, closed, file, name_size] = http_read(fd, buffer, len); 57 49 58 50 // if we are done, break out of the loop 59 51 if( closed ) { 60 if( options.log ) printf("=== Connection closed ===\n"); 61 close(fd); 52 printf("Connection closed\n"); 62 53 continue CONNECTION; 63 54 } … … 65 56 // If this wasn't a request retrun 400 66 57 if( code != OK200 ) { 67 printf(" === Invalid Request : %d ===\n", code_val(code));58 printf("Invalid Request : %d\n", code_val(code)); 68 59 answer_error(fd, code); 69 60 continue REQUEST; 70 61 } 71 62 72 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 73 if( options.log ) printf("=== Request for /plaintext ===\n"); 74 75 char text[] = "Hello, World!\n"; 76 77 // Send the header 78 answer_plain(fd, text, sizeof(text)); 79 80 if( options.log ) printf("=== Answer sent ===\n"); 81 continue REQUEST; 82 } 83 84 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 85 if( options.log ) printf("=== Request for /ping ===\n"); 86 87 // Send the header 88 answer_empty(fd); 89 90 if( options.log ) printf("=== Answer sent ===\n"); 91 continue REQUEST; 92 } 93 94 if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file); 63 printf("Request for file %.*s\n", (int)name_size, file); 95 64 96 65 // Get the fd from the file cache … … 101 70 // If we can't find the file, return 404 102 71 if( ans_fd < 0 ) { 103 printf(" === File Not Found ===\n");72 printf("File Not Found\n"); 104 73 answer_error(fd, E404); 105 74 continue REQUEST; … … 112 81 sendfile( this.pipe, fd, ans_fd, count); 113 82 114 if( options.log ) printf("=== Answer sent ===\n");83 printf("File sent\n"); 115 84 } 116 85 } -
benchmark/io/http/worker.hfa
ra00bc5b r101cc3a 17 17 socklen_t * addrlen; 18 18 int flags; 19 io_cancellation cancel;20 volatile bool done;21 19 }; 22 20 void ?{}( Worker & this); -
benchmark/io/readv.cfa
ra00bc5b r101cc3a 96 96 97 97 char **left; 98 parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall readvbenchmark", left );98 parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", left ); 99 99 100 100 if(kpollcp || odirect) { 101 101 if( (buflen % 512) != 0 ) { 102 102 fprintf(stderr, "Buffer length must be a multiple of 512 when using O_DIRECT, was %lu\n\n", buflen); 103 print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall readvbenchmark", true);103 print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", true); 104 104 } 105 105 } -
example/io/simple/server_epoll.c
ra00bc5b r101cc3a 88 88 } 89 89 90 ev.events = EPOLL OUT | EPOLLIN | EPOLLONESHOT;90 ev.events = EPOLLIN | EPOLLONESHOT; 91 91 ev.data.u64 = (uint64_t)˚ 92 92 if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ring.ring_fd, &ev) == -1) { … … 99 99 100 100 while(1) { 101 BLOCK: ;101 BLOCK: 102 102 int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); 103 103 if (nfds == -1) { -
libcfa/src/concurrency/io.cfa
ra00bc5b r101cc3a 31 31 32 32 extern "C" { 33 #include <sys/epoll.h> 33 34 #include <sys/syscall.h> 34 35 … … 40 41 #include "kernel/fwd.hfa" 41 42 #include "io/types.hfa" 42 43 static const char * opcodes[] = {44 "OP_NOP",45 "OP_READV",46 "OP_WRITEV",47 "OP_FSYNC",48 "OP_READ_FIXED",49 "OP_WRITE_FIXED",50 "OP_POLL_ADD",51 "OP_POLL_REMOVE",52 "OP_SYNC_FILE_RANGE",53 "OP_SENDMSG",54 "OP_RECVMSG",55 "OP_TIMEOUT",56 "OP_TIMEOUT_REMOVE",57 "OP_ACCEPT",58 "OP_ASYNC_CANCEL",59 "OP_LINK_TIMEOUT",60 "OP_CONNECT",61 "OP_FALLOCATE",62 "OP_OPENAT",63 "OP_CLOSE",64 "OP_FILES_UPDATE",65 "OP_STATX",66 "OP_READ",67 "OP_WRITE",68 "OP_FADVISE",69 "OP_MADVISE",70 "OP_SEND",71 "OP_RECV",72 "OP_OPENAT2",73 "OP_EPOLL_CTL",74 "OP_SPLICE",75 "OP_PROVIDE_BUFFERS",76 "OP_REMOVE_BUFFERS",77 "OP_TEE",78 "INVALID_OP"79 };80 43 81 44 // returns true of acquired as leader or second leader … … 171 134 int ret = 0; 172 135 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);174 136 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 175 137 if( ret < 0 ) { … … 195 157 static unsigned __collect_submitions( struct __io_data & ring ); 196 158 static __u32 __release_consumed_submission( struct __io_data & ring ); 197 static inline void __clean( volatile struct io_uring_sqe * sqe ); 159 160 static inline void process(struct io_uring_cqe & cqe ) { 161 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 162 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 163 164 fulfil( *future, cqe.res ); 165 } 198 166 199 167 // Process a single completion message from the io_uring 200 168 // This is NOT thread-safe 201 static inline void process( volatile struct io_uring_cqe & cqe ) {202 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;203 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );204 205 fulfil( *future, cqe.res );206 }207 208 169 static [int, bool] __drain_io( & struct __io_data ring ) { 209 170 /* paranoid */ verify( ! __preemption_enabled() ); … … 231 192 } 232 193 233 __atomic_thread_fence( __ATOMIC_SEQ_CST );234 235 194 // Release the consumed SQEs 236 195 __release_consumed_submission( ring ); … … 250 209 for(i; count) { 251 210 unsigned idx = (head + i) & mask; 252 volatilestruct io_uring_cqe & cqe = ring.completion_q.cqes[idx];211 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 253 212 254 213 /* paranoid */ verify(&cqe); … … 259 218 // Mark to the kernel that the cqe has been seen 260 219 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 261 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 220 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 221 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 262 222 263 223 return [count, count > 0 || to_submit > 0]; … … 265 225 266 226 void main( $io_ctx_thread & this ) { 267 __ioctx_register( this );268 269 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 270 271 const int reset_cnt = 5; 272 int reset = reset_cnt;227 epoll_event ev; 228 __ioctx_register( this, ev ); 229 230 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 231 232 int reset = 0; 273 233 // Then loop until we need to start 274 LOOP:275 234 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 276 235 // Drain the io … … 280 239 [count, again] = __drain_io( *this.ring ); 281 240 282 if(!again) reset --;241 if(!again) reset++; 283 242 284 243 // Update statistics … … 290 249 291 250 // If we got something, just yield and check again 292 if(reset > 1) {251 if(reset < 5) { 293 252 yield(); 294 continue LOOP; 295 } 296 297 // We alread failed to find completed entries a few time. 298 if(reset == 1) { 299 // Rearm the context so it can block 300 // but don't block right away 301 // we need to retry one last time in case 302 // something completed *just now* 303 __ioctx_prepare_block( this ); 304 continue LOOP; 305 } 306 253 } 254 // We didn't get anything baton pass to the slow poller 255 else { 307 256 __STATS__( false, 308 257 io.complete_q.blocks += 1; 309 258 ) 310 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 259 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 260 reset = 0; 311 261 312 262 // block this thread 263 __ioctx_prepare_block( this, ev ); 313 264 wait( this.sem ); 314 315 // restore counter 316 reset = reset_cnt; 317 } 318 319 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 265 } 266 } 267 268 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 320 269 } 321 270 … … 340 289 // 341 290 342 // Allocate an submit queue entry. 343 // The kernel cannot see these entries until they are submitted, but other threads must be 344 // able to see which entries can be used and which are already un used by an other thread 345 // for convenience, return both the index and the pointer to the sqe 346 // sqe == &sqes[idx] 347 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 291 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 348 292 /* paranoid */ verify( data != 0 ); 349 293 … … 360 304 // Look through the list starting at some offset 361 305 for(i; cnt) { 362 __u64 expected = 3;363 __u32 idx = (i + off) & mask; // Get an index from a random364 volatilestruct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];306 __u64 expected = 0; 307 __u32 idx = (i + off) & mask; 308 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 365 309 volatile __u64 * udata = &sqe->user_data; 366 310 367 // Allocate the entry by CASing the user_data field from 0 to the future address368 311 if( *udata == expected && 369 312 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) … … 376 319 ) 377 320 378 // debug log379 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );380 321 381 322 // Success return the data … … 384 325 verify(expected != data); 385 326 386 // This one was used387 327 len ++; 388 328 } 389 329 390 330 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" );393 394 331 yield(); 395 332 } … … 440 377 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 441 378 __io_data & ring = *ctx->thrd.ring; 442 443 {444 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];445 __cfadbg_print_safe( io,446 "Kernel I/O : submitting %u (%p) for %p\n"447 " data: %p\n"448 " opcode: %s\n"449 " fd: %d\n"450 " flags: %d\n"451 " prio: %d\n"452 " off: %p\n"453 " addr: %p\n"454 " len: %d\n"455 " other flags: %d\n"456 " splice fd: %d\n"457 " pad[0]: %llu\n"458 " pad[1]: %llu\n"459 " pad[2]: %llu\n",460 idx, sqe,461 active_thread(),462 (void*)sqe->user_data,463 opcodes[sqe->opcode],464 sqe->fd,465 sqe->flags,466 sqe->ioprio,467 sqe->off,468 sqe->addr,469 sqe->len,470 sqe->accept_flags,471 sqe->splice_fd_in,472 sqe->__pad2[0],473 sqe->__pad2[1],474 sqe->__pad2[2]475 );476 }477 478 479 379 // Get now the data we definetely need 480 380 volatile __u32 * const tail = ring.submit_q.tail; … … 543 443 unlock(ring.submit_q.submit_lock); 544 444 #endif 545 if( ret < 0 ) { 546 return; 547 } 445 if( ret < 0 ) return; 548 446 549 447 // Release the consumed SQEs … … 556 454 io.submit_q.submit_avg.cnt += 1; 557 455 ) 558 559 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 560 } 561 else 562 { 456 } 457 else { 563 458 // get mutual exclusion 564 459 #if defined(LEADER_LOCK) … … 568 463 #endif 569 464 570 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,465 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0, 571 466 /* paranoid */ "index %u already reclaimed\n" 572 467 /* paranoid */ "head %u, prev %u, tail %u\n" … … 595 490 } 596 491 597 /* paranoid */ verify(ret == 1);598 599 492 // update statistics 600 493 __STATS__( false, … … 603 496 ) 604 497 605 {606 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;607 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];608 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];609 610 __cfadbg_print_safe( io,611 "Kernel I/O : last submitted is %u (%p)\n"612 " data: %p\n"613 " opcode: %s\n"614 " fd: %d\n"615 " flags: %d\n"616 " prio: %d\n"617 " off: %p\n"618 " addr: %p\n"619 " len: %d\n"620 " other flags: %d\n"621 " splice fd: %d\n"622 " pad[0]: %llu\n"623 " pad[1]: %llu\n"624 " pad[2]: %llu\n",625 last_idx, sqe,626 (void*)sqe->user_data,627 opcodes[sqe->opcode],628 sqe->fd,629 sqe->flags,630 sqe->ioprio,631 sqe->off,632 sqe->addr,633 sqe->len,634 sqe->accept_flags,635 sqe->splice_fd_in,636 sqe->__pad2[0],637 sqe->__pad2[1],638 sqe->__pad2[2]639 );640 }641 642 __atomic_thread_fence( __ATOMIC_SEQ_CST );643 498 // Release the consumed SQEs 644 499 __release_consumed_submission( ring ); 645 // ring.submit_q.sqes[idx].user_data = 3ul64;646 500 647 501 #if defined(LEADER_LOCK) … … 651 505 #endif 652 506 653 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread());507 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 654 508 } 655 509 } 656 510 657 511 // #define PARTIAL_SUBMIT 32 658 659 // go through the list of submissions in the ready array and moved them into660 // the ring's submit queue661 512 static unsigned __collect_submitions( struct __io_data & ring ) { 662 513 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 699 550 } 700 551 701 // Go through the ring's submit queue and release everything that has already been consumed702 // by io_uring703 552 static __u32 __release_consumed_submission( struct __io_data & ring ) { 704 553 const __u32 smask = *ring.submit_q.mask; 705 554 706 // We need to get the lock to copy the old head and new head707 555 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 708 __attribute__((unused)) 709 __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue 710 __u32 chead = *ring.submit_q.head; // get the current head of the queue 711 __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here 712 ring.submit_q.prev_head = chead; // note up to were we processed 556 __u32 chead = *ring.submit_q.head; 557 __u32 phead = ring.submit_q.prev_head; 558 ring.submit_q.prev_head = chead; 713 559 unlock(ring.submit_q.release_lock); 714 560 715 // the 3 fields are organized like this diagram716 // except it's are ring717 // ---+--------+--------+----718 // ---+--------+--------+----719 // ^ ^ ^720 // phead chead ctail721 722 // make sure ctail doesn't wrap around and reach phead723 /* paranoid */ verify(724 (ctail >= chead && chead >= phead)725 || (chead >= phead && phead >= ctail)726 || (phead >= ctail && ctail >= chead)727 );728 729 // find the range we need to clear730 561 __u32 count = chead - phead; 731 732 // We acquired an previous-head/current-head range733 // go through the range and release the sqes734 562 for( i; count ) { 735 563 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 736 737 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 738 __clean( &ring.submit_q.sqes[ idx ] ); 564 ring.submit_q.sqes[ idx ].user_data = 0; 739 565 } 740 566 return count; 741 567 } 742 743 void __sqe_clean( volatile struct io_uring_sqe * sqe ) {744 __clean( sqe );745 }746 747 static inline void __clean( volatile struct io_uring_sqe * sqe ) {748 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors749 __cfaabi_dbg_debug_do(750 memset(sqe, 0xde, sizeof(*sqe));751 sqe->opcode = IORING_OP_LAST;752 );753 754 // Mark the entry as unused755 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);756 }757 568 #endif -
libcfa/src/concurrency/io/call.cfa.in
ra00bc5b r101cc3a 74 74 ; 75 75 76 extern [* volatilestruct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );76 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 78 … … 222 222 __u32 idx; 223 223 struct io_uring_sqe * sqe; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 224 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 226 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 226 227 sqe->opcode = IORING_OP_{op}; 227 sqe->flags = sflags; 228 sqe->ioprio = 0; 229 sqe->fd = 0; 230 sqe->off = 0; 231 sqe->addr = 0; 232 sqe->len = 0; 233 sqe->accept_flags = 0; 234 sqe->__pad2[0] = 0; 235 sqe->__pad2[1] = 0; 236 sqe->__pad2[2] = 0;{body} 237 238 asm volatile("": : :"memory"); 228 sqe->flags = sflags;{body} 239 229 240 230 verify( sqe->user_data == (__u64)(uintptr_t)&future ); … … 322 312 }), 323 313 # CFA_HAVE_IORING_OP_ACCEPT 324 Call('ACCEPT ', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {325 'fd': 'sockfd', 326 'addr': ' (__u64)addr',327 'addr2': ' (__u64)addrlen',314 Call('ACCEPT4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', { 315 'fd': 'sockfd', 316 'addr': 'addr', 317 'addr2': 'addrlen', 328 318 'accept_flags': 'flags' 329 319 }), … … 474 464 475 465 print(""" 476 //-----------------------------------------------------------------------------477 bool cancel(io_cancellation & this) {478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)479 return false;480 #else481 io_future_t future;482 483 io_context * context = __get_io_context();484 485 __u8 sflags = 0;486 struct __io_data & ring = *context->thrd.ring;487 488 __u32 idx;489 volatile struct io_uring_sqe * sqe;490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;493 sqe->opcode = IORING_OP_ASYNC_CANCEL;494 sqe->flags = sflags;495 sqe->addr = this.target;496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future );498 __submit( context, idx );499 500 wait(future);501 502 if( future.result == 0 ) return true; // Entry found503 if( future.result == -EALREADY) return true; // Entry found but in progress504 if( future.result == -ENOENT ) return false; // Entry not found505 return false;506 #endif507 }508 509 466 //----------------------------------------------------------------------------- 510 467 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
ra00bc5b r101cc3a 52 52 #include <pthread.h> 53 53 #include <sys/epoll.h> 54 #include <sys/eventfd.h>55 54 #include <sys/mman.h> 56 55 #include <sys/syscall.h> … … 170 169 // Main loop 171 170 while( iopoll.run ) { 172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");173 174 171 // Wait for events 175 172 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);178 173 179 174 // Check if an error occured … … 186 181 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 187 182 /* paranoid */ verify( io_ctx ); 188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx); 189 184 #if !defined( __CFA_NO_STATISTICS__ ) 190 185 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; 191 186 #endif 192 193 eventfd_t v;194 eventfd_read(io_ctx->ring->efd, &v);195 196 187 post( io_ctx->sem ); 197 188 } … … 242 233 $thread & thrd = this.thrd.self; 243 234 if( cluster_context ) { 244 // We are about to do weird things with the threads245 // we don't need interrupts to complicate everything246 disable_interrupts();247 248 // Get cluster info249 235 cluster & cltr = *thrd.curr_cluster; 250 236 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 253 239 // We need to adjust the clean-up based on where the thread is 254 240 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case256 // The thread was preempted or ready to run and now it is on the ready queue257 // but the cluster is shutting down, so there aren't any processors to run the ready queue258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along259 241 260 242 ready_schedule_lock(); 261 // The thread should on the list 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 246 // The thread should be the last on the list 262 247 /* paranoid */ verify( thrd.link.next != 0p ); 263 248 264 249 // Remove the thread from the ready queue of this cluster 265 // The thread should be the last on the list266 250 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 267 251 /* paranoid */ verify( removed ); … … 279 263 } 280 264 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not)282 265 if( thrd.state == Blocked ) { 266 283 267 // This is the "easy case" 284 268 // The thread is parked and can easily be moved to active cluster … … 290 274 } 291 275 else { 276 292 277 // The thread is in a weird state 293 278 // I don't know what to do here 294 279 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 295 280 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts.298 enable_interrupts( __cfaabi_dbg_ctx );299 281 } else { 300 282 post( this.thrd.sem ); … … 383 365 } 384 366 385 // Step 3 : Initialize the data structure386 367 // Get the pointers from the kernel to fill the structure 387 368 // submit queue … … 398 379 const __u32 num = *sq.num; 399 380 for( i; num ) { 400 sq.sqes[i].opcode = IORING_OP_LAST; 401 sq.sqes[i].user_data = 3ul64; 381 sq.sqes[i].user_data = 0ul64; 402 382 } 403 383 } … … 429 409 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 430 410 431 // Step 4 : eventfd432 int efd;433 for() {434 efd = eventfd(0, 0);435 if (efd < 0) {436 if (errno == EINTR) continue;437 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));438 }439 break;440 }441 442 int ret;443 for() {444 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);445 if (ret < 0) {446 if (errno == EINTR) continue;447 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));448 }449 break;450 }451 452 411 // some paranoid checks 453 412 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); … … 464 423 this.ring_flags = params.flags; 465 424 this.fd = fd; 466 this.efd = efd;467 425 this.eager_submits = params_in.eager_submits; 468 426 this.poller_submits = params_in.poller_submits; … … 487 445 // close the file descriptor 488 446 close(this.fd); 489 close(this.efd);490 447 491 448 free( this.submit_q.ready ); // Maybe null, doesn't matter … … 495 452 // I/O Context Sleep 496 453 //============================================================================================= 497 #define IOEVENTS EPOLLIN | EPOLLONESHOT 498 499 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 500 struct epoll_event ev; 501 ev.events = IOEVENTS; 454 455 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 456 ev.events = EPOLLIN | EPOLLONESHOT; 502 457 ev.data.u64 = (__u64)&ctx; 503 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);458 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev); 504 459 if (ret < 0) { 505 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 506 } 507 } 508 509 void __ioctx_register($io_ctx_thread & ctx) { 510 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 511 } 512 513 void __ioctx_prepare_block($io_ctx_thread & ctx) { 514 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 515 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 460 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 461 } 462 } 463 464 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 465 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 if (ret < 0) { 467 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 468 } 516 469 } 517 470 -
libcfa/src/concurrency/io/types.hfa
ra00bc5b r101cc3a 65 65 66 66 // A buffer of sqes (not the actual ring) 67 volatilestruct io_uring_sqe * sqes;67 struct io_uring_sqe * sqes; 68 68 69 69 // The location and size of the mmaped area … … 85 85 86 86 // the kernel ring 87 volatilestruct io_uring_cqe * cqes;87 struct io_uring_cqe * cqes; 88 88 89 89 // The location and size of the mmaped area … … 97 97 __u32 ring_flags; 98 98 int fd; 99 int efd;100 99 bool eager_submits:1; 101 100 bool poller_submits:1; … … 131 130 #endif 132 131 132 struct epoll_event; 133 133 struct $io_ctx_thread; 134 void __ioctx_register($io_ctx_thread & ctx); 135 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 134 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev); 135 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev); 137 136 #endif 138 137
Note:
See TracChangeset
for help on using the changeset viewer.