Changeset 402658b1
- Timestamp:
- Jan 13, 2021, 10:23:07 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 9153e53
- Parents:
- bace538 (diff), a00bc5b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
rbace538 r402658b1 29 29 EXTRA_PROGRAMS = httpforall .dummy_hack 30 30 31 CLEANFILES = httpforall 32 31 33 nodist_httpforall_SOURCES = \ 32 34 filecache.cfa \ -
benchmark/io/http/main.cfa
rbace538 r402658b1 46 46 } 47 47 48 extern void init_protocol(void); 49 extern void deinit_protocol(void); 50 48 51 //============================================================================================= 49 52 // Main … … 61 64 //=================== 62 65 // Open Socket 63 printf(" Listening on port %d\n", options.socket.port);66 printf("%ld : Listening on port %d\n", getpid(), options.socket.port); 64 67 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 65 68 if(server_fd < 0) { … … 79 82 ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) ); 80 83 if(ret < 0) { 81 if(errno == 98) {84 if(errno == EADDRINUSE) { 82 85 if(waited == 0) { 83 86 printf("Waiting for port\n"); … … 109 112 options.clopts.instance = &cl; 110 113 114 111 115 int pipe_cnt = options.clopts.nworkers * 2; 112 116 int pipe_off; … … 124 128 { 125 129 ServerProc procs[options.clopts.nprocs]; 130 131 init_protocol(); 126 132 { 127 133 Worker workers[options.clopts.nworkers]; … … 151 157 printf("Shutting Down\n"); 152 158 } 159 160 for(i; options.clopts.nworkers) { 161 printf("Cancelling %p\n", (void*)workers[i].cancel.target); 162 workers[i].done = true; 163 cancel(workers[i].cancel); 164 } 165 166 printf("Shutting down socket\n"); 167 int ret = shutdown( server_fd, SHUT_RD ); 168 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); } 169 170 //=================== 171 // Close Socket 172 printf("Closing Socket\n"); 173 ret = close( server_fd ); 174 if(ret < 0) { 175 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 176 } 153 177 } 154 178 printf("Workers Closed\n"); 179 180 deinit_protocol(); 155 181 } 156 182 … … 162 188 } 163 189 free(fds); 164 }165 190 166 //===================167 // Close Socket168 printf("Closing Socket\n");169 ret = close( server_fd );170 if(ret < 0) {171 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );172 191 } 173 192 -
benchmark/io/http/options.cfa
rbace538 r402658b1 12 12 #include <parseargs.hfa> 13 13 14 #include <string.h> 15 14 16 Options options @= { 17 false, // log 18 15 19 { // file_cache 16 20 0, // open_flags; … … 48 52 {'p', "port", "Port the server will listen on", options.socket.port}, 49 53 {'c', "cpus", "Number of processors to use", options.clopts.nprocs}, 54 {'L', "log", "Enable logs", options.log, parse_settrue}, 50 55 {'t', "threads", "Number of worker threads to use", options.clopts.nworkers}, 51 56 {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, -
benchmark/io/http/options.hfa
rbace538 r402658b1 8 8 9 9 struct Options { 10 bool log; 11 10 12 struct { 11 13 int open_flags; -
benchmark/io/http/protocol.cfa
rbace538 r402658b1 18 18 #include "options.hfa" 19 19 20 const char * volatile date = 0p; 21 20 22 const char * http_msgs[] = { 21 "HTTP/1.1 200 OK\n Content-Type: text/plain\nContent-Length: %zu\n\n",22 "HTTP/1.1 400 Bad Request\n Content-Type: text/plain\nContent-Length: 0\n\n",23 "HTTP/1.1 404 Not Found\n Content-Type: text/plain\nContent-Length: 0\n\n",24 "HTTP/1.1 413 Payload Too Large\n Content-Type: text/plain\nContent-Length: 0\n\n",25 "HTTP/1.1 414 URI Too Long\n Content-Type: text/plain\nContent-Length: 0\n\n",23 "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n", 24 "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 25 "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 26 "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 27 "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", 26 28 }; 27 29 … … 45 47 while(len > 0) { 46 48 // Call write 47 int ret = write(fd, it, len); 49 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p); 50 // int ret = write(fd, it, len); 48 51 if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); } 49 52 … … 63 66 int answer_header( int fd, size_t size ) { 64 67 const char * fmt = http_msgs[OK200]; 65 int len = 100;68 int len = 200; 66 69 char buffer[len]; 67 len = snprintf(buffer, len, fmt, size);70 len = snprintf(buffer, len, fmt, date, size); 68 71 return answer( fd, buffer, len ); 69 72 } 70 73 71 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 74 int answer_plain( int fd, char buffer[], size_t size ) { 75 int ret = answer_header(fd, size); 76 if( ret < 0 ) return ret; 77 return answer(fd, buffer, size); 78 } 79 80 int answer_empty( int fd ) { 81 return answer_header(fd, 0); 82 } 83 84 85 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) { 72 86 char * it = buffer; 73 87 size_t count = len - 1; … … 75 89 READ: 76 90 for() { 77 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p); 91 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p); 92 // int ret = read(fd, (void*)it, count); 78 93 if(ret == 0 ) return [OK200, true, 0, 0]; 79 94 if(ret < 0 ) { 80 95 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; 96 // if( errno == EINVAL ) return [E400, true, 0, 0]; 81 97 abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); 82 98 } … … 92 108 } 93 109 94 printf("%.*s\n", rlen, buffer);110 if( options.log ) printf("%.*s\n", rlen, buffer); 95 111 96 112 it = buffer; … … 104 120 105 121 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { 122 unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; 106 123 off_t offset = 0; 107 124 ssize_t ret; 108 125 SPLICE1: while(count > 0) { 109 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p); 126 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p); 127 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); 110 128 if( ret < 0 ) { 111 129 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; … … 117 135 size_t in_pipe = ret; 118 136 SPLICE2: while(in_pipe > 0) { 119 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p); 137 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p); 138 // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); 120 139 if( ret < 0 ) { 121 140 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; … … 127 146 } 128 147 } 148 149 //============================================================================================= 150 151 #include <clock.hfa> 152 #include <time.hfa> 153 #include <thread.hfa> 154 155 struct date_buffer { 156 char buff[100]; 157 }; 158 159 thread DateFormater { 160 int idx; 161 date_buffer buffers[2]; 162 }; 163 164 void ?{}( DateFormater & this ) { 165 ((thread&)this){ "Server Date Thread", *options.clopts.instance }; 166 this.idx = 0; 167 memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) ); 168 memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) ); 169 } 170 171 void main(DateFormater & this) { 172 LOOP: for() { 173 waitfor( ^?{} : this) { 174 break LOOP; 175 } 176 or else {} 177 178 Time now = getTimeNsec(); 179 180 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 181 182 char * next = this.buffers[this.idx].buff; 183 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST); 184 this.idx = (this.idx + 1) % 2; 185 186 sleep(1`s); 187 } 188 } 189 190 //============================================================================================= 191 DateFormater * the_date_formatter; 192 193 void init_protocol(void) { 194 the_date_formatter = alloc(); 195 (*the_date_formatter){}; 196 } 197 198 void deinit_protocol(void) { 199 ^(*the_date_formatter){}; 200 free( the_date_formatter ); 201 } -
benchmark/io/http/protocol.hfa
rbace538 r402658b1 1 1 #pragma once 2 3 struct io_cancellation; 2 4 3 5 enum HttpCode { … … 14 16 int answer_error( int fd, HttpCode code ); 15 17 int answer_header( int fd, size_t size ); 18 int answer_plain( int fd, char buffer [], size_t size ); 19 int answer_empty( int fd ); 16 20 17 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len );21 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *); 18 22 19 23 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
rbace538 r402658b1 19 19 this.pipe[0] = -1; 20 20 this.pipe[1] = -1; 21 this.done = false; 22 } 23 24 extern "C" { 25 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 21 26 } 22 27 … … 28 33 CONNECTION: 29 34 for() { 30 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p ); 35 if( options.log ) printf("=== Accepting connection ===\n"); 36 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 37 // int fd = accept4( this.[sockfd, addr, addrlen, flags] ); 31 38 if(fd < 0) { 32 39 if( errno == ECONNABORTED ) break; 40 if( errno == EINVAL && this.done ) break; 33 41 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 34 42 } 35 43 36 printf("New connection %d, waiting for requests\n", fd);44 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd); 37 45 REQUEST: 38 46 for() { … … 45 53 size_t len = options.socket.buflen; 46 54 char buffer[len]; 47 printf("Reading request\n");48 [code, closed, file, name_size] = http_read(fd, buffer, len );55 if( options.log ) printf("=== Reading request ===\n"); 56 [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel); 49 57 50 58 // if we are done, break out of the loop 51 59 if( closed ) { 52 printf("Connection closed\n"); 60 if( options.log ) printf("=== Connection closed ===\n"); 61 close(fd); 53 62 continue CONNECTION; 54 63 } … … 56 65 // If this wasn't a request retrun 400 57 66 if( code != OK200 ) { 58 printf(" Invalid Request : %d\n", code_val(code));67 printf("=== Invalid Request : %d ===\n", code_val(code)); 59 68 answer_error(fd, code); 60 69 continue REQUEST; 61 70 } 62 71 63 printf("Request for file %.*s\n", (int)name_size, file); 72 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 73 if( options.log ) printf("=== Request for /plaintext ===\n"); 74 75 char text[] = "Hello, World!\n"; 76 77 // Send the header 78 answer_plain(fd, text, sizeof(text)); 79 80 if( options.log ) printf("=== Answer sent ===\n"); 81 continue REQUEST; 82 } 83 84 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 85 if( options.log ) printf("=== Request for /ping ===\n"); 86 87 // Send the header 88 answer_empty(fd); 89 90 if( options.log ) printf("=== Answer sent ===\n"); 91 continue REQUEST; 92 } 93 94 if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file); 64 95 65 96 // Get the fd from the file cache … … 70 101 // If we can't find the file, return 404 71 102 if( ans_fd < 0 ) { 72 printf(" File Not Found\n");103 printf("=== File Not Found ===\n"); 73 104 answer_error(fd, E404); 74 105 continue REQUEST; … … 81 112 sendfile( this.pipe, fd, ans_fd, count); 82 113 83 printf("File sent\n");114 if( options.log ) printf("=== Answer sent ===\n"); 84 115 } 85 116 } -
benchmark/io/http/worker.hfa
rbace538 r402658b1 17 17 socklen_t * addrlen; 18 18 int flags; 19 io_cancellation cancel; 20 volatile bool done; 19 21 }; 20 22 void ?{}( Worker & this); -
benchmark/io/readv.cfa
rbace538 r402658b1 96 96 97 97 char **left; 98 parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall yieldbenchmark", left );98 parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", left ); 99 99 100 100 if(kpollcp || odirect) { 101 101 if( (buflen % 512) != 0 ) { 102 102 fprintf(stderr, "Buffer length must be a multiple of 512 when using O_DIRECT, was %lu\n\n", buflen); 103 print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall yieldbenchmark", true);103 print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", true); 104 104 } 105 105 } -
example/io/simple/server_epoll.c
rbace538 r402658b1 88 88 } 89 89 90 ev.events = EPOLL IN | EPOLLONESHOT;90 ev.events = EPOLLOUT | EPOLLIN | EPOLLONESHOT; 91 91 ev.data.u64 = (uint64_t)˚ 92 92 if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ring.ring_fd, &ev) == -1) { … … 99 99 100 100 while(1) { 101 BLOCK: 101 BLOCK:; 102 102 int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1); 103 103 if (nfds == -1) { -
libcfa/src/concurrency/io.cfa
rbace538 r402658b1 31 31 32 32 extern "C" { 33 #include <sys/epoll.h>34 33 #include <sys/syscall.h> 35 34 … … 41 40 #include "kernel/fwd.hfa" 42 41 #include "io/types.hfa" 42 43 static const char * opcodes[] = { 44 "OP_NOP", 45 "OP_READV", 46 "OP_WRITEV", 47 "OP_FSYNC", 48 "OP_READ_FIXED", 49 "OP_WRITE_FIXED", 50 "OP_POLL_ADD", 51 "OP_POLL_REMOVE", 52 "OP_SYNC_FILE_RANGE", 53 "OP_SENDMSG", 54 "OP_RECVMSG", 55 "OP_TIMEOUT", 56 "OP_TIMEOUT_REMOVE", 57 "OP_ACCEPT", 58 "OP_ASYNC_CANCEL", 59 "OP_LINK_TIMEOUT", 60 "OP_CONNECT", 61 "OP_FALLOCATE", 62 "OP_OPENAT", 63 "OP_CLOSE", 64 "OP_FILES_UPDATE", 65 "OP_STATX", 66 "OP_READ", 67 "OP_WRITE", 68 "OP_FADVISE", 69 "OP_MADVISE", 70 "OP_SEND", 71 "OP_RECV", 72 "OP_OPENAT2", 73 "OP_EPOLL_CTL", 74 "OP_SPLICE", 75 "OP_PROVIDE_BUFFERS", 76 "OP_REMOVE_BUFFERS", 77 "OP_TEE", 78 "INVALID_OP" 79 }; 43 80 44 81 // returns true of acquired as leader or second leader … … 134 171 int ret = 0; 135 172 if( need_sys_to_submit || need_sys_to_complete ) { 173 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 136 174 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 137 175 if( ret < 0 ) { … … 157 195 static unsigned __collect_submitions( struct __io_data & ring ); 158 196 static __u32 __release_consumed_submission( struct __io_data & ring ); 159 160 static inline void process(struct io_uring_cqe & cqe ) { 197 static inline void __clean( volatile struct io_uring_sqe * sqe ); 198 199 // Process a single completion message from the io_uring 200 // This is NOT thread-safe 201 static inline void process( volatile struct io_uring_cqe & cqe ) { 161 202 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 162 203 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); … … 165 206 } 166 207 167 // Process a single completion message from the io_uring168 // This is NOT thread-safe169 208 static [int, bool] __drain_io( & struct __io_data ring ) { 170 209 /* paranoid */ verify( ! __preemption_enabled() ); … … 192 231 } 193 232 233 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 234 194 235 // Release the consumed SQEs 195 236 __release_consumed_submission( ring ); … … 209 250 for(i; count) { 210 251 unsigned idx = (head + i) & mask; 211 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];252 volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 212 253 213 254 /* paranoid */ verify(&cqe); … … 218 259 // Mark to the kernel that the cqe has been seen 219 260 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 220 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 221 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 261 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST ); 222 262 223 263 return [count, count > 0 || to_submit > 0]; … … 225 265 226 266 void main( $io_ctx_thread & this ) { 227 epoll_event ev;228 __ioctx_register( this, ev ); 229 230 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 231 232 int reset = 0;267 __ioctx_register( this ); 268 269 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this); 270 271 const int reset_cnt = 5; 272 int reset = reset_cnt; 233 273 // Then loop until we need to start 274 LOOP: 234 275 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 235 276 // Drain the io … … 239 280 [count, again] = __drain_io( *this.ring ); 240 281 241 if(!again) reset ++;282 if(!again) reset--; 242 283 243 284 // Update statistics … … 249 290 250 291 // If we got something, just yield and check again 251 if(reset < 5) {292 if(reset > 1) { 252 293 yield(); 253 } 254 // We didn't get anything baton pass to the slow poller 255 else { 294 continue LOOP; 295 } 296 297 // We alread failed to find completed entries a few time. 298 if(reset == 1) { 299 // Rearm the context so it can block 300 // but don't block right away 301 // we need to retry one last time in case 302 // something completed *just now* 303 __ioctx_prepare_block( this ); 304 continue LOOP; 305 } 306 256 307 __STATS__( false, 257 308 io.complete_q.blocks += 1; 258 309 ) 259 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 260 reset = 0; 310 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this); 261 311 262 312 // block this thread 263 __ioctx_prepare_block( this, ev );264 313 wait( this.sem ); 265 } 266 } 267 268 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 314 315 // restore counter 316 reset = reset_cnt; 317 } 318 319 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this); 269 320 } 270 321 … … 289 340 // 290 341 291 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 342 // Allocate an submit queue entry. 343 // The kernel cannot see these entries until they are submitted, but other threads must be 344 // able to see which entries can be used and which are already un used by an other thread 345 // for convenience, return both the index and the pointer to the sqe 346 // sqe == &sqes[idx] 347 [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 292 348 /* paranoid */ verify( data != 0 ); 293 349 … … 304 360 // Look through the list starting at some offset 305 361 for(i; cnt) { 306 __u64 expected = 0;307 __u32 idx = (i + off) & mask; 308 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];362 __u64 expected = 3; 363 __u32 idx = (i + off) & mask; // Get an index from a random 364 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 309 365 volatile __u64 * udata = &sqe->user_data; 310 366 367 // Allocate the entry by CASing the user_data field from 0 to the future address 311 368 if( *udata == expected && 312 369 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) … … 319 376 ) 320 377 378 // debug log 379 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 321 380 322 381 // Success return the data … … 325 384 verify(expected != data); 326 385 386 // This one was used 327 387 len ++; 328 388 } 329 389 330 390 block++; 391 392 abort( "Kernel I/O : all submit queue entries used, yielding\n" ); 393 331 394 yield(); 332 395 } … … 377 440 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 378 441 __io_data & ring = *ctx->thrd.ring; 442 443 { 444 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 445 __cfadbg_print_safe( io, 446 "Kernel I/O : submitting %u (%p) for %p\n" 447 " data: %p\n" 448 " opcode: %s\n" 449 " fd: %d\n" 450 " flags: %d\n" 451 " prio: %d\n" 452 " off: %p\n" 453 " addr: %p\n" 454 " len: %d\n" 455 " other flags: %d\n" 456 " splice fd: %d\n" 457 " pad[0]: %llu\n" 458 " pad[1]: %llu\n" 459 " pad[2]: %llu\n", 460 idx, sqe, 461 active_thread(), 462 (void*)sqe->user_data, 463 opcodes[sqe->opcode], 464 sqe->fd, 465 sqe->flags, 466 sqe->ioprio, 467 sqe->off, 468 sqe->addr, 469 sqe->len, 470 sqe->accept_flags, 471 sqe->splice_fd_in, 472 sqe->__pad2[0], 473 sqe->__pad2[1], 474 sqe->__pad2[2] 475 ); 476 } 477 478 379 479 // Get now the data we definetely need 380 480 volatile __u32 * const tail = ring.submit_q.tail; … … 443 543 unlock(ring.submit_q.submit_lock); 444 544 #endif 445 if( ret < 0 ) return; 545 if( ret < 0 ) { 546 return; 547 } 446 548 447 549 // Release the consumed SQEs … … 454 556 io.submit_q.submit_avg.cnt += 1; 455 557 ) 456 } 457 else { 558 559 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 560 } 561 else 562 { 458 563 // get mutual exclusion 459 564 #if defined(LEADER_LOCK) … … 463 568 #endif 464 569 465 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,570 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64, 466 571 /* paranoid */ "index %u already reclaimed\n" 467 572 /* paranoid */ "head %u, prev %u, tail %u\n" … … 490 595 } 491 596 597 /* paranoid */ verify(ret == 1); 598 492 599 // update statistics 493 600 __STATS__( false, … … 496 603 ) 497 604 605 { 606 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head; 607 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ]; 608 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx]; 609 610 __cfadbg_print_safe( io, 611 "Kernel I/O : last submitted is %u (%p)\n" 612 " data: %p\n" 613 " opcode: %s\n" 614 " fd: %d\n" 615 " flags: %d\n" 616 " prio: %d\n" 617 " off: %p\n" 618 " addr: %p\n" 619 " len: %d\n" 620 " other flags: %d\n" 621 " splice fd: %d\n" 622 " pad[0]: %llu\n" 623 " pad[1]: %llu\n" 624 " pad[2]: %llu\n", 625 last_idx, sqe, 626 (void*)sqe->user_data, 627 opcodes[sqe->opcode], 628 sqe->fd, 629 sqe->flags, 630 sqe->ioprio, 631 sqe->off, 632 sqe->addr, 633 sqe->len, 634 sqe->accept_flags, 635 sqe->splice_fd_in, 636 sqe->__pad2[0], 637 sqe->__pad2[1], 638 sqe->__pad2[2] 639 ); 640 } 641 642 __atomic_thread_fence( __ATOMIC_SEQ_CST ); 498 643 // Release the consumed SQEs 499 644 __release_consumed_submission( ring ); 645 // ring.submit_q.sqes[idx].user_data = 3ul64; 500 646 501 647 #if defined(LEADER_LOCK) … … 505 651 #endif 506 652 507 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret);653 __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() ); 508 654 } 509 655 } 510 656 511 657 // #define PARTIAL_SUBMIT 32 658 659 // go through the list of submissions in the ready array and moved them into 660 // the ring's submit queue 512 661 static unsigned __collect_submitions( struct __io_data & ring ) { 513 662 /* paranoid */ verify( ring.submit_q.ready != 0p ); … … 550 699 } 551 700 701 // Go through the ring's submit queue and release everything that has already been consumed 702 // by io_uring 552 703 static __u32 __release_consumed_submission( struct __io_data & ring ) { 553 704 const __u32 smask = *ring.submit_q.mask; 554 705 706 // We need to get the lock to copy the old head and new head 555 707 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 556 __u32 chead = *ring.submit_q.head; 557 __u32 phead = ring.submit_q.prev_head; 558 ring.submit_q.prev_head = chead; 708 __attribute__((unused)) 709 __u32 ctail = *ring.submit_q.tail; // get the current tail of the queue 710 __u32 chead = *ring.submit_q.head; // get the current head of the queue 711 __u32 phead = ring.submit_q.prev_head; // get the head the last time we were here 712 ring.submit_q.prev_head = chead; // note up to were we processed 559 713 unlock(ring.submit_q.release_lock); 560 714 715 // the 3 fields are organized like this diagram 716 // except it's are ring 717 // ---+--------+--------+---- 718 // ---+--------+--------+---- 719 // ^ ^ ^ 720 // phead chead ctail 721 722 // make sure ctail doesn't wrap around and reach phead 723 /* paranoid */ verify( 724 (ctail >= chead && chead >= phead) 725 || (chead >= phead && phead >= ctail) 726 || (phead >= ctail && ctail >= chead) 727 ); 728 729 // find the range we need to clear 561 730 __u32 count = chead - phead; 731 732 // We acquired an previous-head/current-head range 733 // go through the range and release the sqes 562 734 for( i; count ) { 563 735 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 564 ring.submit_q.sqes[ idx ].user_data = 0; 736 737 /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data ); 738 __clean( &ring.submit_q.sqes[ idx ] ); 565 739 } 566 740 return count; 567 741 } 742 743 void __sqe_clean( volatile struct io_uring_sqe * sqe ) { 744 __clean( sqe ); 745 } 746 747 static inline void __clean( volatile struct io_uring_sqe * sqe ) { 748 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors 749 __cfaabi_dbg_debug_do( 750 memset(sqe, 0xde, sizeof(*sqe)); 751 sqe->opcode = IORING_OP_LAST; 752 ); 753 754 // Mark the entry as unused 755 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST); 756 } 568 757 #endif -
libcfa/src/concurrency/io/call.cfa.in
rbace538 r402658b1 74 74 ; 75 75 76 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 78 … … 222 222 __u32 idx; 223 223 struct io_uring_sqe * sqe; 224 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 226 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 225 227 226 sqe->opcode = IORING_OP_{op}; 228 sqe->flags = sflags;{body} 227 sqe->flags = sflags; 228 sqe->ioprio = 0; 229 sqe->fd = 0; 230 sqe->off = 0; 231 sqe->addr = 0; 232 sqe->len = 0; 233 sqe->accept_flags = 0; 234 sqe->__pad2[0] = 0; 235 sqe->__pad2[1] = 0; 236 sqe->__pad2[2] = 0;{body} 237 238 asm volatile("": : :"memory"); 229 239 230 240 verify( sqe->user_data == (__u64)(uintptr_t)&future ); … … 312 322 }), 313 323 # CFA_HAVE_IORING_OP_ACCEPT 314 Call('ACCEPT 4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {324 Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', { 315 325 'fd': 'sockfd', 316 'addr': ' addr',317 'addr2': ' addrlen',326 'addr': '(__u64)addr', 327 'addr2': '(__u64)addrlen', 318 328 'accept_flags': 'flags' 319 329 }), … … 464 474 465 475 print(""" 476 //----------------------------------------------------------------------------- 477 bool cancel(io_cancellation & this) { 478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 479 return false; 480 #else 481 io_future_t future; 482 483 io_context * context = __get_io_context(); 484 485 __u8 sflags = 0; 486 struct __io_data & ring = *context->thrd.ring; 487 488 __u32 idx; 489 volatile struct io_uring_sqe * sqe; 490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 493 sqe->opcode = IORING_OP_ASYNC_CANCEL; 494 sqe->flags = sflags; 495 sqe->addr = this.target; 496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 498 __submit( context, idx ); 499 500 wait(future); 501 502 if( future.result == 0 ) return true; // Entry found 503 if( future.result == -EALREADY) return true; // Entry found but in progress 504 if( future.result == -ENOENT ) return false; // Entry not found 505 return false; 506 #endif 507 } 508 466 509 //----------------------------------------------------------------------------- 467 510 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
rbace538 r402658b1 52 52 #include <pthread.h> 53 53 #include <sys/epoll.h> 54 #include <sys/eventfd.h> 54 55 #include <sys/mman.h> 55 56 #include <sys/syscall.h> … … 169 170 // Main loop 170 171 while( iopoll.run ) { 172 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 173 171 174 // Wait for events 172 175 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 176 177 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds); 173 178 174 179 // Check if an error occured … … 181 186 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 182 187 /* paranoid */ verify( io_ctx ); 183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);188 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx); 184 189 #if !defined( __CFA_NO_STATISTICS__ ) 185 190 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; 186 191 #endif 192 193 eventfd_t v; 194 eventfd_read(io_ctx->ring->efd, &v); 195 187 196 post( io_ctx->sem ); 188 197 } … … 233 242 $thread & thrd = this.thrd.self; 234 243 if( cluster_context ) { 244 // We are about to do weird things with the threads 245 // we don't need interrupts to complicate everything 246 disable_interrupts(); 247 248 // Get cluster info 235 249 cluster & cltr = *thrd.curr_cluster; 236 250 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 239 253 // We need to adjust the clean-up based on where the thread is 240 254 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 255 // This is the tricky case 256 // The thread was preempted or ready to run and now it is on the ready queue 257 // but the cluster is shutting down, so there aren't any processors to run the ready queue 258 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 241 259 242 260 ready_schedule_lock(); 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 261 // The thread should on the list 262 /* paranoid */ verify( thrd.link.next != 0p ); 263 264 // Remove the thread from the ready queue of this cluster 246 265 // The thread should be the last on the list 247 /* paranoid */ verify( thrd.link.next != 0p );248 249 // Remove the thread from the ready queue of this cluster250 266 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 251 267 /* paranoid */ verify( removed ); … … 263 279 } 264 280 // !!! This is not an else if !!! 281 // Ok, now the thread is blocked (whether we cheated to get here or not) 265 282 if( thrd.state == Blocked ) { 266 267 283 // This is the "easy case" 268 284 // The thread is parked and can easily be moved to active cluster … … 274 290 } 275 291 else { 276 277 292 // The thread is in a weird state 278 293 // I don't know what to do here 279 294 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 280 295 } 296 297 // The weird thread kidnapping stuff is over, restore interrupts. 298 enable_interrupts( __cfaabi_dbg_ctx ); 281 299 } else { 282 300 post( this.thrd.sem ); … … 365 383 } 366 384 385 // Step 3 : Initialize the data structure 367 386 // Get the pointers from the kernel to fill the structure 368 387 // submit queue … … 379 398 const __u32 num = *sq.num; 380 399 for( i; num ) { 381 sq.sqes[i].user_data = 0ul64; 400 sq.sqes[i].opcode = IORING_OP_LAST; 401 sq.sqes[i].user_data = 3ul64; 382 402 } 383 403 } … … 409 429 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 410 430 431 // Step 4 : eventfd 432 int efd; 433 for() { 434 efd = eventfd(0, 0); 435 if (efd < 0) { 436 if (errno == EINTR) continue; 437 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno)); 438 } 439 break; 440 } 441 442 int ret; 443 for() { 444 ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1); 445 if (ret < 0) { 446 if (errno == EINTR) continue; 447 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno)); 448 } 449 break; 450 } 451 411 452 // some paranoid checks 412 453 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask ); … … 423 464 this.ring_flags = params.flags; 424 465 this.fd = fd; 466 this.efd = efd; 425 467 this.eager_submits = params_in.eager_submits; 426 468 this.poller_submits = params_in.poller_submits; … … 445 487 // close the file descriptor 446 488 close(this.fd); 489 close(this.efd); 447 490 448 491 free( this.submit_q.ready ); // Maybe null, doesn't matter … … 452 495 // I/O Context Sleep 453 496 //============================================================================================= 454 455 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 456 ev.events = EPOLLIN | EPOLLONESHOT; 497 #define IOEVENTS EPOLLIN | EPOLLONESHOT 498 499 static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) { 500 struct epoll_event ev; 501 ev.events = IOEVENTS; 457 502 ev.data.u64 = (__u64)&ctx; 458 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);503 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev); 459 504 if (ret < 0) { 460 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) ); 461 } 462 } 463 464 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 465 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 if (ret < 0) { 467 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) ); 468 } 505 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) ); 506 } 507 } 508 509 void __ioctx_register($io_ctx_thread & ctx) { 510 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD"); 511 } 512 513 void __ioctx_prepare_block($io_ctx_thread & ctx) { 514 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx); 515 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM"); 469 516 } 470 517 -
libcfa/src/concurrency/io/types.hfa
rbace538 r402658b1 65 65 66 66 // A buffer of sqes (not the actual ring) 67 struct io_uring_sqe * sqes;67 volatile struct io_uring_sqe * sqes; 68 68 69 69 // The location and size of the mmaped area … … 85 85 86 86 // the kernel ring 87 struct io_uring_cqe * cqes;87 volatile struct io_uring_cqe * cqes; 88 88 89 89 // The location and size of the mmaped area … … 97 97 __u32 ring_flags; 98 98 int fd; 99 int efd; 99 100 bool eager_submits:1; 100 101 bool poller_submits:1; … … 130 131 #endif 131 132 132 struct epoll_event;133 133 struct $io_ctx_thread; 134 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev); 135 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev); 134 void __ioctx_register($io_ctx_thread & ctx); 135 void __ioctx_prepare_block($io_ctx_thread & ctx); 136 void __sqe_clean( volatile struct io_uring_sqe * sqe ); 136 137 #endif 137 138
Note: See TracChangeset
for help on using the changeset viewer.