Changeset ece0e80


Ignore:
Timestamp:
Jan 9, 2021, 4:27:57 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
561dd26
Parents:
35fd2c4
Message:

Added prints.
Naive implementation of cancel.
Server now shutdown cleanly.

Files:
8 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/main.cfa

    r35fd2c4 rece0e80  
    114114                options.clopts.instance = &cl;
    115115
    116                 init_protocol();
    117116
    118117                int pipe_cnt = options.clopts.nworkers * 2;
     
    131130                {
    132131                        ServerProc procs[options.clopts.nprocs];
     132
     133                        init_protocol();
    133134                        {
    134135                                Worker workers[options.clopts.nworkers];
     
    158159                                        printf("Shutting Down\n");
    159160                                }
     161
     162                                for(i; options.clopts.nworkers) {
     163                                        printf("Cancelling %p\n", (void*)workers[i].cancel.target);
     164                                        cancel(workers[i].cancel);
     165                                }
     166
     167                                printf("Shutting down socket\n");
     168                                int ret = shutdown( server_fd, SHUT_RD );
     169                                if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
     170
     171                                //===================
     172                                // Close Socket
     173                                printf("Closing Socket\n");
     174                                ret = close( server_fd );
     175                                if(ret < 0) {
     176                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     177                                }
    160178                        }
    161179                        printf("Workers Closed\n");
     180
     181                        deinit_protocol();
    162182                }
    163183
     
    170190                free(fds);
    171191
    172                 deinit_protocol();
    173         }
    174 
    175         //===================
    176         // Close Socket
    177         printf("Closing Socket\n");
    178         ret = close( server_fd );
    179         if(ret < 0) {
    180                 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    181192        }
    182193
  • benchmark/io/http/protocol.cfa

    r35fd2c4 rece0e80  
    7171}
    7272
    73 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     73[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) {
    7474        char * it = buffer;
    7575        size_t count = len - 1;
     
    7777        READ:
    7878        for() {
    79                 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);
     79                int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p);
    8080                // int ret = read(fd, (void*)it, count);
    8181                if(ret == 0 ) return [OK200, true, 0, 0];
     
    148148
    149149void ?{}( DateFormater & this ) {
    150         ((thread&)this){ *options.clopts.instance };
     150        ((thread&)this){ "Server Date Thread", *options.clopts.instance };
    151151        this.idx = 0;
    152152        memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );
     
    162162
    163163                Time now = getTimeNsec();
    164                 // Date: Wed, 17 Apr 2013 12:00:00 GMT
     164
    165165                strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
    166                 printf("Changing date to %s\n", this.buffers[this.idx].buff);
    167166
    168167                char * next = this.buffers[this.idx].buff;
  • benchmark/io/http/protocol.hfa

    r35fd2c4 rece0e80  
    11#pragma once
     2
     3struct io_cancellation;
    24
    35enum HttpCode {
     
    1517int answer_header( int fd, size_t size );
    1618
    17 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
     19[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
    1820
    1921void sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    r35fd2c4 rece0e80  
    2828        CONNECTION:
    2929        for() {
    30                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p );
     30                printf("=== Accepting connection ===\n");
     31                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
    3132                if(fd < 0) {
    3233                        if( errno == ECONNABORTED ) break;
     34                        if( errno == EINVAL ) break;
    3335                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    3436                }
    3537
    36                 printf("New connection %d, waiting for requests\n", fd);
     38                printf("=== New connection %d, waiting for requests ===\n", fd);
    3739                REQUEST:
    3840                for() {
     
    4547                        size_t len = options.socket.buflen;
    4648                        char buffer[len];
    47                         printf("Reading request\n");
    48                         [code, closed, file, name_size] = http_read(fd, buffer, len);
     49                        printf("=== Reading request ===\n");
     50                        [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
    4951
    5052                        // if we are done, break out of the loop
    5153                        if( closed ) {
    52                                 printf("Connection closed\n");
     54                                printf("=== Connection closed ===\n");
    5355                                continue CONNECTION;
    5456                        }
     
    5658                        // If this wasn't a request retrun 400
    5759                        if( code != OK200 ) {
    58                                 printf("Invalid Request : %d\n", code_val(code));
     60                                printf("=== Invalid Request : %d ===\n", code_val(code));
    5961                                answer_error(fd, code);
    6062                                continue REQUEST;
    6163                        }
    6264
    63                         printf("Request for file %.*s\n", (int)name_size, file);
     65                        printf("=== Request for file %.*s ===\n", (int)name_size, file);
    6466
    6567                        // Get the fd from the file cache
     
    7072                        // If we can't find the file, return 404
    7173                        if( ans_fd < 0 ) {
    72                                 printf("File Not Found\n");
     74                                printf("=== File Not Found ===\n");
    7375                                answer_error(fd, E404);
    7476                                continue REQUEST;
     
    8183                        sendfile( this.pipe, fd, ans_fd, count);
    8284
    83                         printf("File sent\n");
     85                        printf("=== File sent ===\n");
    8486                }
    8587        }
  • benchmark/io/http/worker.hfa

    r35fd2c4 rece0e80  
    1717        socklen_t * addrlen;
    1818        int flags;
     19        io_cancellation cancel;
    1920};
    2021void ?{}( Worker & this);
  • libcfa/src/concurrency/io.cfa

    r35fd2c4 rece0e80  
    134134                int ret = 0;
    135135                if( need_sys_to_submit || need_sys_to_complete ) {
     136                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    136137                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    137138                        if( ret < 0 ) {
     
    230231                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    231232
    232                 int reset = 0;
     233                const int reset_cnt = 5;
     234                int reset = reset_cnt;
    233235                // Then loop until we need to start
     236                LOOP:
    234237                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    235238                        // Drain the io
     
    239242                                [count, again] = __drain_io( *this.ring );
    240243
    241                                 if(!again) reset++;
     244                                if(!again) reset--;
    242245
    243246                                // Update statistics
     
    249252
    250253                        // If we got something, just yield and check again
    251                         if(reset < 5) {
     254                        if(reset > 1) {
    252255                                yield();
    253                         }
    254                         // We didn't get anything baton pass to the slow poller
    255                         else {
     256                                continue LOOP;
     257                        }
     258
     259                        // We alread failed to find events a few time.
     260                        if(reset == 1) {
     261                                // Rearm the context so it can block
     262                                // but don't block right away
     263                                // we need to retry one last time in case
     264                                // something completed *just now*
     265                                __ioctx_prepare_block( this, ev );
     266                                continue LOOP;
     267                        }
     268
    256269                                __STATS__( false,
    257270                                        io.complete_q.blocks += 1;
    258271                                )
    259272                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    260                                 reset = 0;
    261273
    262274                                // block this thread
    263                                 __ioctx_prepare_block( this, ev );
    264275                                wait( this.sem );
    265                         }
     276
     277                        // restore counter
     278                        reset = reset_cnt;
    266279                }
    267280
     
    319332                                        )
    320333
     334                                        __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    321335
    322336                                        // Success return the data
     
    376390
    377391        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
     392                __cfadbg_print_safe( io, "Kernel I/O : submitting %u for %p\n", idx, active_thread() );
     393
    378394                __io_data & ring = *ctx->thrd.ring;
    379395                // Get now the data we definetely need
     
    443459                                unlock(ring.submit_q.submit_lock);
    444460                        #endif
    445                         if( ret < 0 ) return;
     461                        if( ret < 0 ) {
     462                                return;
     463                        }
    446464
    447465                        // Release the consumed SQEs
     
    454472                                io.submit_q.submit_avg.cnt += 1;
    455473                        )
     474
     475                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    456476                }
    457477                else {
  • libcfa/src/concurrency/io/call.cfa.in

    r35fd2c4 rece0e80  
    464464
    465465print("""
     466//-----------------------------------------------------------------------------
     467bool cancel(io_cancellation & this) {
     468        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
     469                return false;
     470        #else
     471                io_future_t future;
     472
     473                io_context * context = __get_io_context();
     474
     475                __u8 sflags = 0;
     476                struct __io_data & ring = *context->thrd.ring;
     477
     478                __u32 idx;
     479                struct io_uring_sqe * sqe;
     480                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     481
     482                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     483                sqe->opcode = IORING_OP_ASYNC_CANCEL;
     484                sqe->flags = sflags;
     485                sqe->addr = this.target;
     486
     487                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     488                __submit( context, idx );
     489
     490                wait(future);
     491
     492                if( future.result == 0 ) return true; // Entry found
     493                if( future.result == -EALREADY) return true; // Entry found but in progress
     494                if( future.result == -ENOENT ) return false; // Entry not found
     495                return false;
     496        #endif
     497}
     498
    466499//-----------------------------------------------------------------------------
    467500// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    r35fd2c4 rece0e80  
    169169                // Main loop
    170170                while( iopoll.run ) {
     171                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
     172
    171173                        // Wait for events
    172174                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
     175
     176                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    173177
    174178                        // Check if an error occured
     
    181185                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    182186                                /* paranoid */ verify( io_ctx );
    183                                 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
     187                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %p\n", io_ctx);
    184188                                #if !defined( __CFA_NO_STATISTICS__ )
    185189                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
     
    233237                $thread & thrd = this.thrd.self;
    234238                if( cluster_context ) {
     239                        // We are about to do weird things with the threads
     240                        // we don't need interrupts to complicate everything
     241                        disable_interrupts();
     242
     243                        // Get cluster info
    235244                        cluster & cltr = *thrd.curr_cluster;
    236245                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     
    239248                        // We need to adjust the clean-up based on where the thread is
    240249                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     250                                // This is the tricky case
     251                                // The thread was preempted or ready to run and now it is on the ready queue
     252                                // but the cluster is shutting down, so there aren't any processors to run the ready queue
     253                                // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    241254
    242255                                ready_schedule_lock();
    243 
    244                                         // This is the tricky case
    245                                         // The thread was preempted and now it is on the ready queue
     256                                        // The thread should on the list
     257                                        /* paranoid */ verify( thrd.link.next != 0p );
     258
     259                                        // Remove the thread from the ready queue of this cluster
    246260                                        // The thread should be the last on the list
    247                                         /* paranoid */ verify( thrd.link.next != 0p );
    248 
    249                                         // Remove the thread from the ready queue of this cluster
    250261                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    251262                                        /* paranoid */ verify( removed );
     
    263274                        }
    264275                        // !!! This is not an else if !!!
     276                        // Ok, now the thread is blocked (whether we cheated to get here or not)
    265277                        if( thrd.state == Blocked ) {
    266 
    267278                                // This is the "easy case"
    268279                                // The thread is parked and can easily be moved to active cluster
     
    274285                        }
    275286                        else {
    276 
    277287                                // The thread is in a weird state
    278288                                // I don't know what to do here
    279289                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    280290                        }
     291
     292                        // The weird thread kidnapping stuff is over, restore interrupts.
     293                        enable_interrupts( __cfaabi_dbg_ctx );
    281294                } else {
    282295                        post( this.thrd.sem );
     
    463476
    464477        void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
     478                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %p\n", &ctx);
    465479                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
    466480                if (ret < 0) {
Note: See TracChangeset for help on using the changeset viewer.