Changeset 402658b1


Ignore:
Timestamp:
Jan 13, 2021, 10:23:07 PM (4 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
9153e53
Parents:
bace538 (diff), a00bc5b (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Files:
14 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/Makefile.am

    rbace538 r402658b1  
    2929EXTRA_PROGRAMS = httpforall .dummy_hack
    3030
     31CLEANFILES = httpforall
     32
    3133nodist_httpforall_SOURCES = \
    3234        filecache.cfa \
  • benchmark/io/http/main.cfa

    rbace538 r402658b1  
    4646}
    4747
     48extern void init_protocol(void);
     49extern void deinit_protocol(void);
     50
    4851//=============================================================================================
    4952// Main
     
    6164        //===================
    6265        // Open Socket
    63         printf("Listening on port %d\n", options.socket.port);
     66        printf("%ld : Listening on port %d\n", getpid(), options.socket.port);
    6467        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    6568        if(server_fd < 0) {
     
    7982                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
    8083                if(ret < 0) {
    81                         if(errno == 98) {
     84                        if(errno == EADDRINUSE) {
    8285                                if(waited == 0) {
    8386                                        printf("Waiting for port\n");
     
    109112                options.clopts.instance = &cl;
    110113
     114
    111115                int pipe_cnt = options.clopts.nworkers * 2;
    112116                int pipe_off;
     
    124128                {
    125129                        ServerProc procs[options.clopts.nprocs];
     130
     131                        init_protocol();
    126132                        {
    127133                                Worker workers[options.clopts.nworkers];
     
    151157                                        printf("Shutting Down\n");
    152158                                }
     159
     160                                for(i; options.clopts.nworkers) {
     161                                        printf("Cancelling %p\n", (void*)workers[i].cancel.target);
     162                                        workers[i].done = true;
     163                                        cancel(workers[i].cancel);
     164                                }
     165
     166                                printf("Shutting down socket\n");
     167                                int ret = shutdown( server_fd, SHUT_RD );
     168                                if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
     169
     170                                //===================
     171                                // Close Socket
     172                                printf("Closing Socket\n");
     173                                ret = close( server_fd );
     174                                if(ret < 0) {
     175                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     176                                }
    153177                        }
    154178                        printf("Workers Closed\n");
     179
     180                        deinit_protocol();
    155181                }
    156182
     
    162188                }
    163189                free(fds);
    164         }
    165190
    166         //===================
    167         // Close Socket
    168         printf("Closing Socket\n");
    169         ret = close( server_fd );
    170         if(ret < 0) {
    171                 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    172191        }
    173192
  • benchmark/io/http/options.cfa

    rbace538 r402658b1  
    1212#include <parseargs.hfa>
    1313
     14#include <string.h>
     15
    1416Options options @= {
     17        false, // log
     18
    1519        { // file_cache
    1620                0,     // open_flags;
     
    4852                {'p', "port",           "Port the server will listen on", options.socket.port},
    4953                {'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
     54                {'L', "log",            "Enable logs", options.log, parse_settrue},
    5055                {'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    5156                {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
  • benchmark/io/http/options.hfa

    rbace538 r402658b1  
    88
    99struct Options {
     10        bool log;
     11
    1012        struct {
    1113                int open_flags;
  • benchmark/io/http/protocol.cfa

    rbace538 r402658b1  
    1818#include "options.hfa"
    1919
     20const char * volatile date = 0p;
     21
    2022const char * http_msgs[] = {
    21         "HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: %zu\n\n",
    22         "HTTP/1.1 400 Bad Request\nContent-Type: text/plain\nContent-Length: 0\n\n",
    23         "HTTP/1.1 404 Not Found\nContent-Type: text/plain\nContent-Length: 0\n\n",
    24         "HTTP/1.1 413 Payload Too Large\nContent-Type: text/plain\nContent-Length: 0\n\n",
    25         "HTTP/1.1 414 URI Too Long\nContent-Type: text/plain\nContent-Length: 0\n\n",
     23        "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n",
     24        "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     25        "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     26        "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     27        "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    2628};
    2729
     
    4547        while(len > 0) {
    4648                // Call write
    47                 int ret = write(fd, it, len);
     49                int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p);
     50                // int ret = write(fd, it, len);
    4851                if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); }
    4952
     
    6366int answer_header( int fd, size_t size ) {
    6467        const char * fmt = http_msgs[OK200];
    65         int len = 100;
     68        int len = 200;
    6669        char buffer[len];
    67         len = snprintf(buffer, len, fmt, size);
     70        len = snprintf(buffer, len, fmt, date, size);
    6871        return answer( fd, buffer, len );
    6972}
    7073
    71 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     74int answer_plain( int fd, char buffer[], size_t size ) {
     75        int ret = answer_header(fd, size);
     76        if( ret < 0 ) return ret;
     77        return answer(fd, buffer, size);
     78}
     79
     80int answer_empty( int fd ) {
     81        return answer_header(fd, 0);
     82}
     83
     84
     85[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) {
    7286        char * it = buffer;
    7387        size_t count = len - 1;
     
    7589        READ:
    7690        for() {
    77                 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);
     91                int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p);
     92                // int ret = read(fd, (void*)it, count);
    7893                if(ret == 0 ) return [OK200, true, 0, 0];
    7994                if(ret < 0 ) {
    8095                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
     96                        // if( errno == EINVAL ) return [E400, true, 0, 0];
    8197                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    8298                }
     
    92108        }
    93109
    94         printf("%.*s\n", rlen, buffer);
     110        if( options.log ) printf("%.*s\n", rlen, buffer);
    95111
    96112        it = buffer;
     
    104120
    105121void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
     122        unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    106123        off_t offset = 0;
    107124        ssize_t ret;
    108125        SPLICE1: while(count > 0) {
    109                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
     126                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p);
     127                // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
    110128                if( ret < 0 ) {
    111129                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     
    117135                size_t in_pipe = ret;
    118136                SPLICE2: while(in_pipe > 0) {
    119                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
     137                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p);
     138                        // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
    120139                        if( ret < 0 ) {
    121140                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
     
    127146        }
    128147}
     148
     149//=============================================================================================
     150
     151#include <clock.hfa>
     152#include <time.hfa>
     153#include <thread.hfa>
     154
     155struct date_buffer {
     156        char buff[100];
     157};
     158
     159thread DateFormater {
     160        int idx;
     161        date_buffer buffers[2];
     162};
     163
     164void ?{}( DateFormater & this ) {
     165        ((thread&)this){ "Server Date Thread", *options.clopts.instance };
     166        this.idx = 0;
     167        memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );
     168        memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );
     169}
     170
     171void main(DateFormater & this) {
     172        LOOP: for() {
     173                waitfor( ^?{} : this) {
     174                        break LOOP;
     175                }
     176                or else {}
     177
     178                Time now = getTimeNsec();
     179
     180                strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
     181
     182                char * next = this.buffers[this.idx].buff;
     183                __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST);
     184                this.idx = (this.idx + 1) % 2;
     185
     186                sleep(1`s);
     187        }
     188}
     189
     190//=============================================================================================
     191DateFormater * the_date_formatter;
     192
     193void init_protocol(void) {
     194        the_date_formatter = alloc();
     195        (*the_date_formatter){};
     196}
     197
     198void deinit_protocol(void) {
     199        ^(*the_date_formatter){};
     200        free( the_date_formatter );
     201}
  • benchmark/io/http/protocol.hfa

    rbace538 r402658b1  
    11#pragma once
     2
     3struct io_cancellation;
    24
    35enum HttpCode {
     
    1416int answer_error( int fd, HttpCode code );
    1517int answer_header( int fd, size_t size );
     18int answer_plain( int fd, char buffer [], size_t size );
     19int answer_empty( int fd );
    1620
    17 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
     21[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
    1822
    1923void sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    rbace538 r402658b1  
    1919        this.pipe[0] = -1;
    2020        this.pipe[1] = -1;
     21        this.done = false;
     22}
     23
     24extern "C" {
     25extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    2126}
    2227
     
    2833        CONNECTION:
    2934        for() {
    30                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p );
     35                if( options.log ) printf("=== Accepting connection ===\n");
     36                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
     37                // int fd = accept4( this.[sockfd, addr, addrlen, flags] );
    3138                if(fd < 0) {
    3239                        if( errno == ECONNABORTED ) break;
     40                        if( errno == EINVAL && this.done ) break;
    3341                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    3442                }
    3543
    36                 printf("New connection %d, waiting for requests\n", fd);
     44                if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);
    3745                REQUEST:
    3846                for() {
     
    4553                        size_t len = options.socket.buflen;
    4654                        char buffer[len];
    47                         printf("Reading request\n");
    48                         [code, closed, file, name_size] = http_read(fd, buffer, len);
     55                        if( options.log ) printf("=== Reading request ===\n");
     56                        [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
    4957
    5058                        // if we are done, break out of the loop
    5159                        if( closed ) {
    52                                 printf("Connection closed\n");
     60                                if( options.log ) printf("=== Connection closed ===\n");
     61                                close(fd);
    5362                                continue CONNECTION;
    5463                        }
     
    5665                        // If this wasn't a request retrun 400
    5766                        if( code != OK200 ) {
    58                                 printf("Invalid Request : %d\n", code_val(code));
     67                                printf("=== Invalid Request : %d ===\n", code_val(code));
    5968                                answer_error(fd, code);
    6069                                continue REQUEST;
    6170                        }
    6271
    63                         printf("Request for file %.*s\n", (int)name_size, file);
     72                        if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
     73                                if( options.log ) printf("=== Request for /plaintext ===\n");
     74
     75                                char text[] = "Hello, World!\n";
     76
     77                                // Send the header
     78                                answer_plain(fd, text, sizeof(text));
     79
     80                                if( options.log ) printf("=== Answer sent ===\n");
     81                                continue REQUEST;
     82                        }
     83
     84                        if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
     85                                if( options.log ) printf("=== Request for /ping ===\n");
     86
     87                                // Send the header
     88                                answer_empty(fd);
     89
     90                                if( options.log ) printf("=== Answer sent ===\n");
     91                                continue REQUEST;
     92                        }
     93
     94                        if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file);
    6495
    6596                        // Get the fd from the file cache
     
    70101                        // If we can't find the file, return 404
    71102                        if( ans_fd < 0 ) {
    72                                 printf("File Not Found\n");
     103                                printf("=== File Not Found ===\n");
    73104                                answer_error(fd, E404);
    74105                                continue REQUEST;
     
    81112                        sendfile( this.pipe, fd, ans_fd, count);
    82113
    83                         printf("File sent\n");
     114                        if( options.log ) printf("=== Answer sent ===\n");
    84115                }
    85116        }
  • benchmark/io/http/worker.hfa

    rbace538 r402658b1  
    1717        socklen_t * addrlen;
    1818        int flags;
     19        io_cancellation cancel;
     20        volatile bool done;
    1921};
    2022void ?{}( Worker & this);
  • benchmark/io/readv.cfa

    rbace538 r402658b1  
    9696
    9797        char **left;
    98         parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", left );
     98        parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", left );
    9999
    100100        if(kpollcp || odirect) {
    101101                if( (buflen % 512) != 0 ) {
    102102                        fprintf(stderr, "Buffer length must be a multiple of 512 when using O_DIRECT, was %lu\n\n", buflen);
    103                         print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", true);
     103                        print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", true);
    104104                }
    105105        }
  • example/io/simple/server_epoll.c

    rbace538 r402658b1  
    8888      }
    8989
    90       ev.events = EPOLLIN | EPOLLONESHOT;
     90      ev.events = EPOLLOUT | EPOLLIN | EPOLLONESHOT;
    9191      ev.data.u64 = (uint64_t)&ring;
    9292      if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ring.ring_fd, &ev) == -1) {
     
    9999
    100100        while(1) {
    101             BLOCK:
     101            BLOCK:;
    102102            int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    103103            if (nfds == -1) {
  • libcfa/src/concurrency/io.cfa

    rbace538 r402658b1  
    3131
    3232        extern "C" {
    33                 #include <sys/epoll.h>
    3433                #include <sys/syscall.h>
    3534
     
    4140        #include "kernel/fwd.hfa"
    4241        #include "io/types.hfa"
     42
     43        static const char * opcodes[] = {
     44                "OP_NOP",
     45                "OP_READV",
     46                "OP_WRITEV",
     47                "OP_FSYNC",
     48                "OP_READ_FIXED",
     49                "OP_WRITE_FIXED",
     50                "OP_POLL_ADD",
     51                "OP_POLL_REMOVE",
     52                "OP_SYNC_FILE_RANGE",
     53                "OP_SENDMSG",
     54                "OP_RECVMSG",
     55                "OP_TIMEOUT",
     56                "OP_TIMEOUT_REMOVE",
     57                "OP_ACCEPT",
     58                "OP_ASYNC_CANCEL",
     59                "OP_LINK_TIMEOUT",
     60                "OP_CONNECT",
     61                "OP_FALLOCATE",
     62                "OP_OPENAT",
     63                "OP_CLOSE",
     64                "OP_FILES_UPDATE",
     65                "OP_STATX",
     66                "OP_READ",
     67                "OP_WRITE",
     68                "OP_FADVISE",
     69                "OP_MADVISE",
     70                "OP_SEND",
     71                "OP_RECV",
     72                "OP_OPENAT2",
     73                "OP_EPOLL_CTL",
     74                "OP_SPLICE",
     75                "OP_PROVIDE_BUFFERS",
     76                "OP_REMOVE_BUFFERS",
     77                "OP_TEE",
     78                "INVALID_OP"
     79        };
    4380
    4481        // returns true of acquired as leader or second leader
     
    134171                int ret = 0;
    135172                if( need_sys_to_submit || need_sys_to_complete ) {
     173                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    136174                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    137175                        if( ret < 0 ) {
     
    157195        static unsigned __collect_submitions( struct __io_data & ring );
    158196        static __u32 __release_consumed_submission( struct __io_data & ring );
    159 
    160         static inline void process(struct io_uring_cqe & cqe ) {
     197        static inline void __clean( volatile struct io_uring_sqe * sqe );
     198
     199        // Process a single completion message from the io_uring
     200        // This is NOT thread-safe
     201        static inline void process( volatile struct io_uring_cqe & cqe ) {
    161202                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    162203                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     
    165206        }
    166207
    167         // Process a single completion message from the io_uring
    168         // This is NOT thread-safe
    169208        static [int, bool] __drain_io( & struct __io_data ring ) {
    170209                /* paranoid */ verify( ! __preemption_enabled() );
     
    192231                }
    193232
     233                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     234
    194235                // Release the consumed SQEs
    195236                __release_consumed_submission( ring );
     
    209250                for(i; count) {
    210251                        unsigned idx = (head + i) & mask;
    211                         struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     252                        volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    212253
    213254                        /* paranoid */ verify(&cqe);
     
    218259                // Mark to the kernel that the cqe has been seen
    219260                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    220                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    221                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
     261                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    222262
    223263                return [count, count > 0 || to_submit > 0];
     
    225265
    226266        void main( $io_ctx_thread & this ) {
    227                 epoll_event ev;
    228                 __ioctx_register( this, ev );
    229 
    230                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
    231 
    232                 int reset = 0;
     267                __ioctx_register( this );
     268
     269                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
     270
     271                const int reset_cnt = 5;
     272                int reset = reset_cnt;
    233273                // Then loop until we need to start
     274                LOOP:
    234275                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    235276                        // Drain the io
     
    239280                                [count, again] = __drain_io( *this.ring );
    240281
    241                                 if(!again) reset++;
     282                                if(!again) reset--;
    242283
    243284                                // Update statistics
     
    249290
    250291                        // If we got something, just yield and check again
    251                         if(reset < 5) {
     292                        if(reset > 1) {
    252293                                yield();
    253                         }
    254                         // We didn't get anything baton pass to the slow poller
    255                         else {
     294                                continue LOOP;
     295                        }
     296
     297                        // We alread failed to find completed entries a few time.
     298                        if(reset == 1) {
     299                                // Rearm the context so it can block
     300                                // but don't block right away
     301                                // we need to retry one last time in case
     302                                // something completed *just now*
     303                                __ioctx_prepare_block( this );
     304                                continue LOOP;
     305                        }
     306
    256307                                __STATS__( false,
    257308                                        io.complete_q.blocks += 1;
    258309                                )
    259                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
    260                                 reset = 0;
     310                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    261311
    262312                                // block this thread
    263                                 __ioctx_prepare_block( this, ev );
    264313                                wait( this.sem );
    265                         }
    266                 }
    267 
    268                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
     314
     315                        // restore counter
     316                        reset = reset_cnt;
     317                }
     318
     319                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
    269320        }
    270321
     
    289340//
    290341
    291         [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
     342        // Allocate an submit queue entry.
     343        // The kernel cannot see these entries until they are submitted, but other threads must be
     344        // able to see which entries can be used and which are already un used by an other thread
     345        // for convenience, return both the index and the pointer to the sqe
     346        // sqe == &sqes[idx]
     347        [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    292348                /* paranoid */ verify( data != 0 );
    293349
     
    304360                        // Look through the list starting at some offset
    305361                        for(i; cnt) {
    306                                 __u64 expected = 0;
    307                                 __u32 idx = (i + off) & mask;
    308                                 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     362                                __u64 expected = 3;
     363                                __u32 idx = (i + off) & mask; // Get an index from a random
     364                                volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    309365                                volatile __u64 * udata = &sqe->user_data;
    310366
     367                                // Allocate the entry by CASing the user_data field from 0 to the future address
    311368                                if( *udata == expected &&
    312369                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     
    319376                                        )
    320377
     378                                        // debug log
     379                                        __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    321380
    322381                                        // Success return the data
     
    325384                                verify(expected != data);
    326385
     386                                // This one was used
    327387                                len ++;
    328388                        }
    329389
    330390                        block++;
     391
     392                        abort( "Kernel I/O : all submit queue entries used, yielding\n" );
     393
    331394                        yield();
    332395                }
     
    377440        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    378441                __io_data & ring = *ctx->thrd.ring;
     442
     443                {
     444                        __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     445                        __cfadbg_print_safe( io,
     446                                "Kernel I/O : submitting %u (%p) for %p\n"
     447                                "    data: %p\n"
     448                                "    opcode: %s\n"
     449                                "    fd: %d\n"
     450                                "    flags: %d\n"
     451                                "    prio: %d\n"
     452                                "    off: %p\n"
     453                                "    addr: %p\n"
     454                                "    len: %d\n"
     455                                "    other flags: %d\n"
     456                                "    splice fd: %d\n"
     457                                "    pad[0]: %llu\n"
     458                                "    pad[1]: %llu\n"
     459                                "    pad[2]: %llu\n",
     460                                idx, sqe,
     461                                active_thread(),
     462                                (void*)sqe->user_data,
     463                                opcodes[sqe->opcode],
     464                                sqe->fd,
     465                                sqe->flags,
     466                                sqe->ioprio,
     467                                sqe->off,
     468                                sqe->addr,
     469                                sqe->len,
     470                                sqe->accept_flags,
     471                                sqe->splice_fd_in,
     472                                sqe->__pad2[0],
     473                                sqe->__pad2[1],
     474                                sqe->__pad2[2]
     475                        );
     476                }
     477
     478
    379479                // Get now the data we definetely need
    380480                volatile __u32 * const tail = ring.submit_q.tail;
     
    443543                                unlock(ring.submit_q.submit_lock);
    444544                        #endif
    445                         if( ret < 0 ) return;
     545                        if( ret < 0 ) {
     546                                return;
     547                        }
    446548
    447549                        // Release the consumed SQEs
     
    454556                                io.submit_q.submit_avg.cnt += 1;
    455557                        )
    456                 }
    457                 else {
     558
     559                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
     560                }
     561                else
     562                {
    458563                        // get mutual exclusion
    459564                        #if defined(LEADER_LOCK)
     
    463568                        #endif
    464569
    465                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     570                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    466571                        /* paranoid */  "index %u already reclaimed\n"
    467572                        /* paranoid */  "head %u, prev %u, tail %u\n"
     
    490595                        }
    491596
     597                        /* paranoid */ verify(ret == 1);
     598
    492599                        // update statistics
    493600                        __STATS__( false,
     
    496603                        )
    497604
     605                        {
     606                                __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
     607                                __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
     608                                __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
     609
     610                                __cfadbg_print_safe( io,
     611                                        "Kernel I/O : last submitted is %u (%p)\n"
     612                                        "    data: %p\n"
     613                                        "    opcode: %s\n"
     614                                        "    fd: %d\n"
     615                                        "    flags: %d\n"
     616                                        "    prio: %d\n"
     617                                        "    off: %p\n"
     618                                        "    addr: %p\n"
     619                                        "    len: %d\n"
     620                                        "    other flags: %d\n"
     621                                        "    splice fd: %d\n"
     622                                        "    pad[0]: %llu\n"
     623                                        "    pad[1]: %llu\n"
     624                                        "    pad[2]: %llu\n",
     625                                        last_idx, sqe,
     626                                        (void*)sqe->user_data,
     627                                        opcodes[sqe->opcode],
     628                                        sqe->fd,
     629                                        sqe->flags,
     630                                        sqe->ioprio,
     631                                        sqe->off,
     632                                        sqe->addr,
     633                                        sqe->len,
     634                                        sqe->accept_flags,
     635                                        sqe->splice_fd_in,
     636                                        sqe->__pad2[0],
     637                                        sqe->__pad2[1],
     638                                        sqe->__pad2[2]
     639                                );
     640                        }
     641
     642                        __atomic_thread_fence( __ATOMIC_SEQ_CST );
    498643                        // Release the consumed SQEs
    499644                        __release_consumed_submission( ring );
     645                        // ring.submit_q.sqes[idx].user_data = 3ul64;
    500646
    501647                        #if defined(LEADER_LOCK)
     
    505651                        #endif
    506652
    507                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
     653                        __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    508654                }
    509655        }
    510656
    511657        // #define PARTIAL_SUBMIT 32
     658
     659        // go through the list of submissions in the ready array and moved them into
     660        // the ring's submit queue
    512661        static unsigned __collect_submitions( struct __io_data & ring ) {
    513662                /* paranoid */ verify( ring.submit_q.ready != 0p );
     
    550699        }
    551700
     701        // Go through the ring's submit queue and release everything that has already been consumed
     702        // by io_uring
    552703        static __u32 __release_consumed_submission( struct __io_data & ring ) {
    553704                const __u32 smask = *ring.submit_q.mask;
    554705
     706                // We need to get the lock to copy the old head and new head
    555707                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
    556                 __u32 chead = *ring.submit_q.head;
    557                 __u32 phead = ring.submit_q.prev_head;
    558                 ring.submit_q.prev_head = chead;
     708                __attribute__((unused))
     709                __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
     710                __u32 chead = *ring.submit_q.head;              // get the current head of the queue
     711                __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
     712                ring.submit_q.prev_head = chead;                // note up to were we processed
    559713                unlock(ring.submit_q.release_lock);
    560714
     715                // the 3 fields are organized like this diagram
     716                // except it's are ring
     717                // ---+--------+--------+----
     718                // ---+--------+--------+----
     719                //    ^        ^        ^
     720                // phead    chead    ctail
     721
     722                // make sure ctail doesn't wrap around and reach phead
     723                /* paranoid */ verify(
     724                           (ctail >= chead && chead >= phead)
     725                        || (chead >= phead && phead >= ctail)
     726                        || (phead >= ctail && ctail >= chead)
     727                );
     728
     729                // find the range we need to clear
    561730                __u32 count = chead - phead;
     731
     732                // We acquired an previous-head/current-head range
     733                // go through the range and release the sqes
    562734                for( i; count ) {
    563735                        __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    564                         ring.submit_q.sqes[ idx ].user_data = 0;
     736
     737                        /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
     738                        __clean( &ring.submit_q.sqes[ idx ] );
    565739                }
    566740                return count;
    567741        }
     742
     743        void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
     744                __clean( sqe );
     745        }
     746
     747        static inline void __clean( volatile struct io_uring_sqe * sqe ) {
     748                // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
     749                __cfaabi_dbg_debug_do(
     750                        memset(sqe, 0xde, sizeof(*sqe));
     751                        sqe->opcode = IORING_OP_LAST;
     752                );
     753
     754                // Mark the entry as unused
     755                __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     756        }
    568757#endif
  • libcfa/src/concurrency/io/call.cfa.in

    rbace538 r402658b1  
    7474        ;
    7575
    76         extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
     76        extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
    7777        extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
    7878
     
    222222                __u32 idx;
    223223                struct io_uring_sqe * sqe;
    224                 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    225 
    226                 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     224                [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     225
    227226                sqe->opcode = IORING_OP_{op};
    228                 sqe->flags = sflags;{body}
     227                sqe->flags = sflags;
     228                sqe->ioprio = 0;
     229                sqe->fd = 0;
     230                sqe->off = 0;
     231                sqe->addr = 0;
     232                sqe->len = 0;
     233                sqe->accept_flags = 0;
     234                sqe->__pad2[0] = 0;
     235                sqe->__pad2[1] = 0;
     236                sqe->__pad2[2] = 0;{body}
     237
     238                asm volatile("": : :"memory");
    229239
    230240                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     
    312322        }),
    313323        # CFA_HAVE_IORING_OP_ACCEPT
    314         Call('ACCEPT4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
     324        Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    315325                'fd': 'sockfd',
    316                 'addr': 'addr',
    317                 'addr2': 'addrlen',
     326                'addr': '(__u64)addr',
     327                'addr2': '(__u64)addrlen',
    318328                'accept_flags': 'flags'
    319329        }),
     
    464474
    465475print("""
     476//-----------------------------------------------------------------------------
     477bool cancel(io_cancellation & this) {
     478        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
     479                return false;
     480        #else
     481                io_future_t future;
     482
     483                io_context * context = __get_io_context();
     484
     485                __u8 sflags = 0;
     486                struct __io_data & ring = *context->thrd.ring;
     487
     488                __u32 idx;
     489                volatile struct io_uring_sqe * sqe;
     490                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     491
     492                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     493                sqe->opcode = IORING_OP_ASYNC_CANCEL;
     494                sqe->flags = sflags;
     495                sqe->addr = this.target;
     496
     497                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     498                __submit( context, idx );
     499
     500                wait(future);
     501
     502                if( future.result == 0 ) return true; // Entry found
     503                if( future.result == -EALREADY) return true; // Entry found but in progress
     504                if( future.result == -ENOENT ) return false; // Entry not found
     505                return false;
     506        #endif
     507}
     508
    466509//-----------------------------------------------------------------------------
    467510// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    rbace538 r402658b1  
    5252                #include <pthread.h>
    5353                #include <sys/epoll.h>
     54                #include <sys/eventfd.h>
    5455                #include <sys/mman.h>
    5556                #include <sys/syscall.h>
     
    169170                // Main loop
    170171                while( iopoll.run ) {
     172                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
     173
    171174                        // Wait for events
    172175                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
     176
     177                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    173178
    174179                        // Check if an error occured
     
    181186                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    182187                                /* paranoid */ verify( io_ctx );
    183                                 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
     188                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
    184189                                #if !defined( __CFA_NO_STATISTICS__ )
    185190                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    186191                                #endif
     192
     193                                eventfd_t v;
     194                                eventfd_read(io_ctx->ring->efd, &v);
     195
    187196                                post( io_ctx->sem );
    188197                        }
     
    233242                $thread & thrd = this.thrd.self;
    234243                if( cluster_context ) {
     244                        // We are about to do weird things with the threads
     245                        // we don't need interrupts to complicate everything
     246                        disable_interrupts();
     247
     248                        // Get cluster info
    235249                        cluster & cltr = *thrd.curr_cluster;
    236250                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     
    239253                        // We need to adjust the clean-up based on where the thread is
    240254                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
     255                                // This is the tricky case
     256                                // The thread was preempted or ready to run and now it is on the ready queue
     257                                // but the cluster is shutting down, so there aren't any processors to run the ready queue
     258                                // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    241259
    242260                                ready_schedule_lock();
    243 
    244                                         // This is the tricky case
    245                                         // The thread was preempted and now it is on the ready queue
     261                                        // The thread should on the list
     262                                        /* paranoid */ verify( thrd.link.next != 0p );
     263
     264                                        // Remove the thread from the ready queue of this cluster
    246265                                        // The thread should be the last on the list
    247                                         /* paranoid */ verify( thrd.link.next != 0p );
    248 
    249                                         // Remove the thread from the ready queue of this cluster
    250266                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    251267                                        /* paranoid */ verify( removed );
     
    263279                        }
    264280                        // !!! This is not an else if !!!
     281                        // Ok, now the thread is blocked (whether we cheated to get here or not)
    265282                        if( thrd.state == Blocked ) {
    266 
    267283                                // This is the "easy case"
    268284                                // The thread is parked and can easily be moved to active cluster
     
    274290                        }
    275291                        else {
    276 
    277292                                // The thread is in a weird state
    278293                                // I don't know what to do here
    279294                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    280295                        }
     296
     297                        // The weird thread kidnapping stuff is over, restore interrupts.
     298                        enable_interrupts( __cfaabi_dbg_ctx );
    281299                } else {
    282300                        post( this.thrd.sem );
     
    365383                }
    366384
     385                // Step 3 : Initialize the data structure
    367386                // Get the pointers from the kernel to fill the structure
    368387                // submit queue
     
    379398                        const __u32 num = *sq.num;
    380399                        for( i; num ) {
    381                                 sq.sqes[i].user_data = 0ul64;
     400                                sq.sqes[i].opcode = IORING_OP_LAST;
     401                                sq.sqes[i].user_data = 3ul64;
    382402                        }
    383403                }
     
    409429                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    410430
     431                // Step 4 : eventfd
     432                int efd;
     433                for() {
     434                        efd = eventfd(0, 0);
     435                        if (efd < 0) {
     436                                if (errno == EINTR) continue;
     437                                abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
     438                        }
     439                        break;
     440                }
     441
     442                int ret;
     443                for() {
     444                        ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     445                        if (ret < 0) {
     446                                if (errno == EINTR) continue;
     447                                abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     448                        }
     449                        break;
     450                }
     451
    411452                // some paranoid checks
    412453                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
     
    423464                this.ring_flags = params.flags;
    424465                this.fd         = fd;
     466                this.efd        = efd;
    425467                this.eager_submits  = params_in.eager_submits;
    426468                this.poller_submits = params_in.poller_submits;
     
    445487                // close the file descriptor
    446488                close(this.fd);
     489                close(this.efd);
    447490
    448491                free( this.submit_q.ready ); // Maybe null, doesn't matter
     
    452495// I/O Context Sleep
    453496//=============================================================================================
    454 
    455         void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
    456                 ev.events = EPOLLIN | EPOLLONESHOT;
     497        #define IOEVENTS EPOLLIN | EPOLLONESHOT
     498
     499        static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
     500                struct epoll_event ev;
     501                ev.events = IOEVENTS;
    457502                ev.data.u64 = (__u64)&ctx;
    458                 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
     503                int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
    459504                if (ret < 0) {
    460                         abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
    461                 }
    462         }
    463 
    464         void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
    465                 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
    466                 if (ret < 0) {
    467                         abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
    468                 }
     505                        abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     506                }
     507        }
     508
     509        void __ioctx_register($io_ctx_thread & ctx) {
     510                __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     511        }
     512
     513        void __ioctx_prepare_block($io_ctx_thread & ctx) {
     514                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
     515                __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    469516        }
    470517
  • libcfa/src/concurrency/io/types.hfa

    rbace538 r402658b1  
    6565
    6666                // A buffer of sqes (not the actual ring)
    67                 struct io_uring_sqe * sqes;
     67                volatile struct io_uring_sqe * sqes;
    6868
    6969                // The location and size of the mmaped area
     
    8585
    8686                // the kernel ring
    87                 struct io_uring_cqe * cqes;
     87                volatile struct io_uring_cqe * cqes;
    8888
    8989                // The location and size of the mmaped area
     
    9797                __u32 ring_flags;
    9898                int fd;
     99                int efd;
    99100                bool eager_submits:1;
    100101                bool poller_submits:1;
     
    130131        #endif
    131132
    132         struct epoll_event;
    133133        struct $io_ctx_thread;
    134         void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev);
    135         void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev);
     134        void __ioctx_register($io_ctx_thread & ctx);
     135        void __ioctx_prepare_block($io_ctx_thread & ctx);
     136        void __sqe_clean( volatile struct io_uring_sqe * sqe );
    136137#endif
    137138
Note: See TracChangeset for help on using the changeset viewer.