Changeset ece0e80
- Timestamp:
- Jan 9, 2021, 4:27:57 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 561dd26
- Parents:
- 35fd2c4
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
r35fd2c4 rece0e80 114 114 options.clopts.instance = &cl; 115 115 116 init_protocol();117 116 118 117 int pipe_cnt = options.clopts.nworkers * 2; … … 131 130 { 132 131 ServerProc procs[options.clopts.nprocs]; 132 133 init_protocol(); 133 134 { 134 135 Worker workers[options.clopts.nworkers]; … … 158 159 printf("Shutting Down\n"); 159 160 } 161 162 for(i; options.clopts.nworkers) { 163 printf("Cancelling %p\n", (void*)workers[i].cancel.target); 164 cancel(workers[i].cancel); 165 } 166 167 printf("Shutting down socket\n"); 168 int ret = shutdown( server_fd, SHUT_RD ); 169 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); } 170 171 //=================== 172 // Close Socket 173 printf("Closing Socket\n"); 174 ret = close( server_fd ); 175 if(ret < 0) { 176 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 177 } 160 178 } 161 179 printf("Workers Closed\n"); 180 181 deinit_protocol(); 162 182 } 163 183 … … 170 190 free(fds); 171 191 172 deinit_protocol();173 }174 175 //===================176 // Close Socket177 printf("Closing Socket\n");178 ret = close( server_fd );179 if(ret < 0) {180 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );181 192 } 182 193 -
benchmark/io/http/protocol.cfa
r35fd2c4 rece0e80 71 71 } 72 72 73 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len ) {73 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) { 74 74 char * it = buffer; 75 75 size_t count = len - 1; … … 77 77 READ: 78 78 for() { 79 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);79 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p); 80 80 // int ret = read(fd, (void*)it, count); 81 81 if(ret == 0 ) return [OK200, true, 0, 0]; … … 148 148 149 149 void ?{}( DateFormater & this ) { 150 ((thread&)this){ *options.clopts.instance };150 ((thread&)this){ "Server Date Thread", *options.clopts.instance }; 151 151 this.idx = 0; 152 152 memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) ); … … 162 162 163 163 Time now = getTimeNsec(); 164 // Date: Wed, 17 Apr 2013 12:00:00 GMT 164 165 165 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); 166 printf("Changing date to %s\n", this.buffers[this.idx].buff);167 166 168 167 char * next = this.buffers[this.idx].buff; -
benchmark/io/http/protocol.hfa
r35fd2c4 rece0e80 1 1 #pragma once 2 3 struct io_cancellation; 2 4 3 5 enum HttpCode { … … 15 17 int answer_header( int fd, size_t size ); 16 18 17 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len );19 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *); 18 20 19 21 void sendfile( int pipe[2], int fd, int ans_fd, size_t count ); -
benchmark/io/http/worker.cfa
r35fd2c4 rece0e80 28 28 CONNECTION: 29 29 for() { 30 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p ); 30 printf("=== Accepting connection ===\n"); 31 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p ); 31 32 if(fd < 0) { 32 33 if( errno == ECONNABORTED ) break; 34 if( errno == EINVAL ) break; 33 35 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 34 36 } 35 37 36 printf(" New connection %d, waiting for requests\n", fd);38 printf("=== New connection %d, waiting for requests ===\n", fd); 37 39 REQUEST: 38 40 for() { … … 45 47 size_t len = options.socket.buflen; 46 48 char buffer[len]; 47 printf(" Reading request\n");48 [code, closed, file, name_size] = http_read(fd, buffer, len );49 printf("=== Reading request ===\n"); 50 [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel); 49 51 50 52 // if we are done, break out of the loop 51 53 if( closed ) { 52 printf(" Connection closed\n");54 printf("=== Connection closed ===\n"); 53 55 continue CONNECTION; 54 56 } … … 56 58 // If this wasn't a request retrun 400 57 59 if( code != OK200 ) { 58 printf(" Invalid Request : %d\n", code_val(code));60 printf("=== Invalid Request : %d ===\n", code_val(code)); 59 61 answer_error(fd, code); 60 62 continue REQUEST; 61 63 } 62 64 63 printf(" Request for file %.*s\n", (int)name_size, file);65 printf("=== Request for file %.*s ===\n", (int)name_size, file); 64 66 65 67 // Get the fd from the file cache … … 70 72 // If we can't find the file, return 404 71 73 if( ans_fd < 0 ) { 72 printf(" File Not Found\n");74 printf("=== File Not Found ===\n"); 73 75 answer_error(fd, E404); 74 76 continue REQUEST; … … 81 83 sendfile( this.pipe, fd, ans_fd, count); 82 84 83 printf(" File sent\n");85 printf("=== File sent ===\n"); 84 86 } 85 87 } -
benchmark/io/http/worker.hfa
r35fd2c4 rece0e80 17 17 socklen_t * addrlen; 18 18 int flags; 19 io_cancellation cancel; 19 20 }; 20 21 void ?{}( Worker & this); -
libcfa/src/concurrency/io.cfa
r35fd2c4 rece0e80 134 134 int ret = 0; 135 135 if( need_sys_to_submit || need_sys_to_complete ) { 136 __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags); 136 137 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8); 137 138 if( ret < 0 ) { … … 230 231 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring); 231 232 232 int reset = 0; 233 const int reset_cnt = 5; 234 int reset = reset_cnt; 233 235 // Then loop until we need to start 236 LOOP: 234 237 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) { 235 238 // Drain the io … … 239 242 [count, again] = __drain_io( *this.ring ); 240 243 241 if(!again) reset ++;244 if(!again) reset--; 242 245 243 246 // Update statistics … … 249 252 250 253 // If we got something, just yield and check again 251 if(reset < 5) {254 if(reset > 1) { 252 255 yield(); 253 } 254 // We didn't get anything baton pass to the slow poller 255 else { 256 continue LOOP; 257 } 258 259 // We alread failed to find events a few time. 260 if(reset == 1) { 261 // Rearm the context so it can block 262 // but don't block right away 263 // we need to retry one last time in case 264 // something completed *just now* 265 __ioctx_prepare_block( this, ev ); 266 continue LOOP; 267 } 268 256 269 __STATS__( false, 257 270 io.complete_q.blocks += 1; 258 271 ) 259 272 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self); 260 reset = 0;261 273 262 274 // block this thread 263 __ioctx_prepare_block( this, ev );264 275 wait( this.sem ); 265 } 276 277 // restore counter 278 reset = reset_cnt; 266 279 } 267 280 … … 319 332 ) 320 333 334 __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data ); 321 335 322 336 // Success return the data … … 376 390 377 391 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 392 __cfadbg_print_safe( io, "Kernel I/O : submitting %u for %p\n", idx, active_thread() ); 393 378 394 __io_data & ring = *ctx->thrd.ring; 379 395 // Get now the data we definetely need … … 443 459 unlock(ring.submit_q.submit_lock); 444 460 #endif 445 if( ret < 0 ) return; 461 if( ret < 0 ) { 462 return; 463 } 446 464 447 465 // Release the consumed SQEs … … 454 472 io.submit_q.submit_avg.cnt += 1; 455 473 ) 474 475 __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() ); 456 476 } 457 477 else { -
libcfa/src/concurrency/io/call.cfa.in
r35fd2c4 rece0e80 464 464 465 465 print(""" 466 //----------------------------------------------------------------------------- 467 bool cancel(io_cancellation & this) { 468 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 469 return false; 470 #else 471 io_future_t future; 472 473 io_context * context = __get_io_context(); 474 475 __u8 sflags = 0; 476 struct __io_data & ring = *context->thrd.ring; 477 478 __u32 idx; 479 struct io_uring_sqe * sqe; 480 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 481 482 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 483 sqe->opcode = IORING_OP_ASYNC_CANCEL; 484 sqe->flags = sflags; 485 sqe->addr = this.target; 486 487 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 488 __submit( context, idx ); 489 490 wait(future); 491 492 if( future.result == 0 ) return true; // Entry found 493 if( future.result == -EALREADY) return true; // Entry found but in progress 494 if( future.result == -ENOENT ) return false; // Entry not found 495 return false; 496 #endif 497 } 498 466 499 //----------------------------------------------------------------------------- 467 500 // Check if a function is has asynchronous -
libcfa/src/concurrency/io/setup.cfa
r35fd2c4 rece0e80 169 169 // Main loop 170 170 while( iopoll.run ) { 171 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n"); 172 171 173 // Wait for events 172 174 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask ); 175 176 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds); 173 177 174 178 // Check if an error occured … … 181 185 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64; 182 186 /* paranoid */ verify( io_ctx ); 183 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);187 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %p\n", io_ctx); 184 188 #if !defined( __CFA_NO_STATISTICS__ ) 185 189 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats; … … 233 237 $thread & thrd = this.thrd.self; 234 238 if( cluster_context ) { 239 // We are about to do weird things with the threads 240 // we don't need interrupts to complicate everything 241 disable_interrupts(); 242 243 // Get cluster info 235 244 cluster & cltr = *thrd.curr_cluster; 236 245 /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster ); … … 239 248 // We need to adjust the clean-up based on where the thread is 240 249 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 250 // This is the tricky case 251 // The thread was preempted or ready to run and now it is on the ready queue 252 // but the cluster is shutting down, so there aren't any processors to run the ready queue 253 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along 241 254 242 255 ready_schedule_lock(); 243 244 // This is the tricky case 245 // The thread was preempted and now it is on the ready queue 256 // The thread should on the list 257 /* paranoid */ verify( thrd.link.next != 0p ); 258 259 // Remove the thread from the ready queue of this cluster 246 260 // The thread should be the last on the list 247 /* paranoid */ verify( thrd.link.next != 0p );248 249 // Remove the thread from the ready queue of this cluster250 261 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd ); 251 262 /* paranoid */ verify( removed ); … … 263 274 } 264 275 // !!! This is not an else if !!! 276 // Ok, now the thread is blocked (whether we cheated to get here or not) 265 277 if( thrd.state == Blocked ) { 266 267 278 // This is the "easy case" 268 279 // The thread is parked and can easily be moved to active cluster … … 274 285 } 275 286 else { 276 277 287 // The thread is in a weird state 278 288 // I don't know what to do here 279 289 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n"); 280 290 } 291 292 // The weird thread kidnapping stuff is over, restore interrupts. 293 enable_interrupts( __cfaabi_dbg_ctx ); 281 294 } else { 282 295 post( this.thrd.sem ); … … 463 476 464 477 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) { 478 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %p\n", &ctx); 465 479 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev); 466 480 if (ret < 0) {
Note: See TracChangeset
for help on using the changeset viewer.