Changes in / [a00bc5b:101cc3a]


Ignore:
Files:
14 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/Makefile.am

    ra00bc5b r101cc3a  
    2929EXTRA_PROGRAMS = httpforall .dummy_hack
    3030
    31 CLEANFILES = httpforall
    32 
    3331nodist_httpforall_SOURCES = \
    3432        filecache.cfa \
  • benchmark/io/http/main.cfa

    ra00bc5b r101cc3a  
    4646}
    4747
    48 extern void init_protocol(void);
    49 extern void deinit_protocol(void);
    50 
    5148//=============================================================================================
    5249// Main
     
    6461        //===================
    6562        // Open Socket
    66         printf("%ld : Listening on port %d\n", getpid(), options.socket.port);
     63        printf("Listening on port %d\n", options.socket.port);
    6764        int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    6865        if(server_fd < 0) {
     
    8279                ret = bind( server_fd, (struct sockaddr *)&address, sizeof(address) );
    8380                if(ret < 0) {
    84                         if(errno == EADDRINUSE) {
     81                        if(errno == 98) {
    8582                                if(waited == 0) {
    8683                                        printf("Waiting for port\n");
     
    112109                options.clopts.instance = &cl;
    113110
    114 
    115111                int pipe_cnt = options.clopts.nworkers * 2;
    116112                int pipe_off;
     
    128124                {
    129125                        ServerProc procs[options.clopts.nprocs];
    130 
    131                         init_protocol();
    132126                        {
    133127                                Worker workers[options.clopts.nworkers];
     
    157151                                        printf("Shutting Down\n");
    158152                                }
    159 
    160                                 for(i; options.clopts.nworkers) {
    161                                         printf("Cancelling %p\n", (void*)workers[i].cancel.target);
    162                                         workers[i].done = true;
    163                                         cancel(workers[i].cancel);
    164                                 }
    165 
    166                                 printf("Shutting down socket\n");
    167                                 int ret = shutdown( server_fd, SHUT_RD );
    168                                 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); }
    169 
    170                                 //===================
    171                                 // Close Socket
    172                                 printf("Closing Socket\n");
    173                                 ret = close( server_fd );
    174                                 if(ret < 0) {
    175                                         abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    176                                 }
    177153                        }
    178154                        printf("Workers Closed\n");
    179 
    180                         deinit_protocol();
    181155                }
    182156
     
    188162                }
    189163                free(fds);
     164        }
    190165
     166        //===================
     167        // Close Socket
     168        printf("Closing Socket\n");
     169        ret = close( server_fd );
     170        if(ret < 0) {
     171                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    191172        }
    192173
  • benchmark/io/http/options.cfa

    ra00bc5b r101cc3a  
    1212#include <parseargs.hfa>
    1313
    14 #include <string.h>
    15 
    1614Options options @= {
    17         false, // log
    18 
    1915        { // file_cache
    2016                0,     // open_flags;
     
    5248                {'p', "port",           "Port the server will listen on", options.socket.port},
    5349                {'c', "cpus",           "Number of processors to use", options.clopts.nprocs},
    54                 {'L', "log",            "Enable logs", options.log, parse_settrue},
    5550                {'t', "threads",        "Number of worker threads to use", options.clopts.nworkers},
    5651                {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
  • benchmark/io/http/options.hfa

    ra00bc5b r101cc3a  
    88
    99struct Options {
    10         bool log;
    11 
    1210        struct {
    1311                int open_flags;
  • benchmark/io/http/protocol.cfa

    ra00bc5b r101cc3a  
    1818#include "options.hfa"
    1919
    20 const char * volatile date = 0p;
    21 
    2220const char * http_msgs[] = {
    23         "HTTP/1.1 200 OK\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: %zu \n\n",
    24         "HTTP/1.1 400 Bad Request\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    25         "HTTP/1.1 404 Not Found\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    26         "HTTP/1.1 413 Payload Too Large\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
    27         "HTTP/1.1 414 URI Too Long\nServer: HttoForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n",
     21        "HTTP/1.1 200 OK\nContent-Type: text/plain\nContent-Length: %zu\n\n",
     22        "HTTP/1.1 400 Bad Request\nContent-Type: text/plain\nContent-Length: 0\n\n",
     23        "HTTP/1.1 404 Not Found\nContent-Type: text/plain\nContent-Length: 0\n\n",
     24        "HTTP/1.1 413 Payload Too Large\nContent-Type: text/plain\nContent-Length: 0\n\n",
     25        "HTTP/1.1 414 URI Too Long\nContent-Type: text/plain\nContent-Length: 0\n\n",
    2826};
    2927
     
    4745        while(len > 0) {
    4846                // Call write
    49                 int ret = cfa_write(fd, it, len, 0, -1`s, 0p, 0p);
    50                 // int ret = write(fd, it, len);
     47                int ret = write(fd, it, len);
    5148                if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); }
    5249
     
    6663int answer_header( int fd, size_t size ) {
    6764        const char * fmt = http_msgs[OK200];
    68         int len = 200;
     65        int len = 100;
    6966        char buffer[len];
    70         len = snprintf(buffer, len, fmt, date, size);
     67        len = snprintf(buffer, len, fmt, size);
    7168        return answer( fd, buffer, len );
    7269}
    7370
    74 int answer_plain( int fd, char buffer[], size_t size ) {
    75         int ret = answer_header(fd, size);
    76         if( ret < 0 ) return ret;
    77         return answer(fd, buffer, size);
    78 }
    79 
    80 int answer_empty( int fd ) {
    81         return answer_header(fd, 0);
    82 }
    83 
    84 
    85 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation * cancel) {
     71[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
    8672        char * it = buffer;
    8773        size_t count = len - 1;
     
    8975        READ:
    9076        for() {
    91                 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, cancel, 0p);
    92                 // int ret = read(fd, (void*)it, count);
     77                int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);
    9378                if(ret == 0 ) return [OK200, true, 0, 0];
    9479                if(ret < 0 ) {
    9580                        if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ;
    96                         // if( errno == EINVAL ) return [E400, true, 0, 0];
    9781                        abort( "read error: (%d) %s\n", (int)errno, strerror(errno) );
    9882                }
     
    10892        }
    10993
    110         if( options.log ) printf("%.*s\n", rlen, buffer);
     94        printf("%.*s\n", rlen, buffer);
    11195
    11296        it = buffer;
     
    120104
    121105void sendfile( int pipe[2], int fd, int ans_fd, size_t count ) {
    122         unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE;
    123106        off_t offset = 0;
    124107        ssize_t ret;
    125108        SPLICE1: while(count > 0) {
    126                 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, 0, -1`s, 0p, 0p);
    127                 // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags);
     109                ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
    128110                if( ret < 0 ) {
    129111                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
     
    135117                size_t in_pipe = ret;
    136118                SPLICE2: while(in_pipe > 0) {
    137                         ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, 0, -1`s, 0p, 0p);
    138                         // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags);
     119                        ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p);
    139120                        if( ret < 0 ) {
    140121                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
     
    146127        }
    147128}
    148 
    149 //=============================================================================================
    150 
    151 #include <clock.hfa>
    152 #include <time.hfa>
    153 #include <thread.hfa>
    154 
    155 struct date_buffer {
    156         char buff[100];
    157 };
    158 
    159 thread DateFormater {
    160         int idx;
    161         date_buffer buffers[2];
    162 };
    163 
    164 void ?{}( DateFormater & this ) {
    165         ((thread&)this){ "Server Date Thread", *options.clopts.instance };
    166         this.idx = 0;
    167         memset( this.buffers[0].buff, 0, sizeof(this.buffers[0]) );
    168         memset( this.buffers[1].buff, 0, sizeof(this.buffers[1]) );
    169 }
    170 
    171 void main(DateFormater & this) {
    172         LOOP: for() {
    173                 waitfor( ^?{} : this) {
    174                         break LOOP;
    175                 }
    176                 or else {}
    177 
    178                 Time now = getTimeNsec();
    179 
    180                 strftime( this.buffers[this.idx].buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now );
    181 
    182                 char * next = this.buffers[this.idx].buff;
    183                 __atomic_exchange_n((char * volatile *)&date, next, __ATOMIC_SEQ_CST);
    184                 this.idx = (this.idx + 1) % 2;
    185 
    186                 sleep(1`s);
    187         }
    188 }
    189 
    190 //=============================================================================================
    191 DateFormater * the_date_formatter;
    192 
    193 void init_protocol(void) {
    194         the_date_formatter = alloc();
    195         (*the_date_formatter){};
    196 }
    197 
    198 void deinit_protocol(void) {
    199         ^(*the_date_formatter){};
    200         free( the_date_formatter );
    201 }
  • benchmark/io/http/protocol.hfa

    ra00bc5b r101cc3a  
    11#pragma once
    2 
    3 struct io_cancellation;
    42
    53enum HttpCode {
     
    1614int answer_error( int fd, HttpCode code );
    1715int answer_header( int fd, size_t size );
    18 int answer_plain( int fd, char buffer [], size_t size );
    19 int answer_empty( int fd );
    2016
    21 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len, io_cancellation *);
     17[HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
    2218
    2319void sendfile( int pipe[2], int fd, int ans_fd, size_t count );
  • benchmark/io/http/worker.cfa

    ra00bc5b r101cc3a  
    1919        this.pipe[0] = -1;
    2020        this.pipe[1] = -1;
    21         this.done = false;
    22 }
    23 
    24 extern "C" {
    25 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    2621}
    2722
     
    3328        CONNECTION:
    3429        for() {
    35                 if( options.log ) printf("=== Accepting connection ===\n");
    36                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, &this.cancel, 0p );
    37                 // int fd = accept4( this.[sockfd, addr, addrlen, flags] );
     30                int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p );
    3831                if(fd < 0) {
    3932                        if( errno == ECONNABORTED ) break;
    40                         if( errno == EINVAL && this.done ) break;
    4133                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
    4234                }
    4335
    44                 if( options.log ) printf("=== New connection %d, waiting for requests ===\n", fd);
     36                printf("New connection %d, waiting for requests\n", fd);
    4537                REQUEST:
    4638                for() {
     
    5345                        size_t len = options.socket.buflen;
    5446                        char buffer[len];
    55                         if( options.log ) printf("=== Reading request ===\n");
    56                         [code, closed, file, name_size] = http_read(fd, buffer, len, &this.cancel);
     47                        printf("Reading request\n");
     48                        [code, closed, file, name_size] = http_read(fd, buffer, len);
    5749
    5850                        // if we are done, break out of the loop
    5951                        if( closed ) {
    60                                 if( options.log ) printf("=== Connection closed ===\n");
    61                                 close(fd);
     52                                printf("Connection closed\n");
    6253                                continue CONNECTION;
    6354                        }
     
    6556                        // If this wasn't a request retrun 400
    6657                        if( code != OK200 ) {
    67                                 printf("=== Invalid Request : %d ===\n", code_val(code));
     58                                printf("Invalid Request : %d\n", code_val(code));
    6859                                answer_error(fd, code);
    6960                                continue REQUEST;
    7061                        }
    7162
    72                         if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    73                                 if( options.log ) printf("=== Request for /plaintext ===\n");
    74 
    75                                 char text[] = "Hello, World!\n";
    76 
    77                                 // Send the header
    78                                 answer_plain(fd, text, sizeof(text));
    79 
    80                                 if( options.log ) printf("=== Answer sent ===\n");
    81                                 continue REQUEST;
    82                         }
    83 
    84                         if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    85                                 if( options.log ) printf("=== Request for /ping ===\n");
    86 
    87                                 // Send the header
    88                                 answer_empty(fd);
    89 
    90                                 if( options.log ) printf("=== Answer sent ===\n");
    91                                 continue REQUEST;
    92                         }
    93 
    94                         if( options.log ) printf("=== Request for file %.*s ===\n", (int)name_size, file);
     63                        printf("Request for file %.*s\n", (int)name_size, file);
    9564
    9665                        // Get the fd from the file cache
     
    10170                        // If we can't find the file, return 404
    10271                        if( ans_fd < 0 ) {
    103                                 printf("=== File Not Found ===\n");
     72                                printf("File Not Found\n");
    10473                                answer_error(fd, E404);
    10574                                continue REQUEST;
     
    11281                        sendfile( this.pipe, fd, ans_fd, count);
    11382
    114                         if( options.log ) printf("=== Answer sent ===\n");
     83                        printf("File sent\n");
    11584                }
    11685        }
  • benchmark/io/http/worker.hfa

    ra00bc5b r101cc3a  
    1717        socklen_t * addrlen;
    1818        int flags;
    19         io_cancellation cancel;
    20         volatile bool done;
    2119};
    2220void ?{}( Worker & this);
  • benchmark/io/readv.cfa

    ra00bc5b r101cc3a  
    9696
    9797        char **left;
    98         parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", left );
     98        parse_args( opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", left );
    9999
    100100        if(kpollcp || odirect) {
    101101                if( (buflen % 512) != 0 ) {
    102102                        fprintf(stderr, "Buffer length must be a multiple of 512 when using O_DIRECT, was %lu\n\n", buflen);
    103                         print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall readv benchmark", true);
     103                        print_args_usage(opt, opt_cnt, "[OPTIONS]...\ncforall yield benchmark", true);
    104104                }
    105105        }
  • example/io/simple/server_epoll.c

    ra00bc5b r101cc3a  
    8888      }
    8989
    90       ev.events = EPOLLOUT | EPOLLIN | EPOLLONESHOT;
     90      ev.events = EPOLLIN | EPOLLONESHOT;
    9191      ev.data.u64 = (uint64_t)&ring;
    9292      if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ring.ring_fd, &ev) == -1) {
     
    9999
    100100        while(1) {
    101             BLOCK:;
     101            BLOCK:
    102102            int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    103103            if (nfds == -1) {
  • libcfa/src/concurrency/io.cfa

    ra00bc5b r101cc3a  
    3131
    3232        extern "C" {
     33                #include <sys/epoll.h>
    3334                #include <sys/syscall.h>
    3435
     
    4041        #include "kernel/fwd.hfa"
    4142        #include "io/types.hfa"
    42 
    43         static const char * opcodes[] = {
    44                 "OP_NOP",
    45                 "OP_READV",
    46                 "OP_WRITEV",
    47                 "OP_FSYNC",
    48                 "OP_READ_FIXED",
    49                 "OP_WRITE_FIXED",
    50                 "OP_POLL_ADD",
    51                 "OP_POLL_REMOVE",
    52                 "OP_SYNC_FILE_RANGE",
    53                 "OP_SENDMSG",
    54                 "OP_RECVMSG",
    55                 "OP_TIMEOUT",
    56                 "OP_TIMEOUT_REMOVE",
    57                 "OP_ACCEPT",
    58                 "OP_ASYNC_CANCEL",
    59                 "OP_LINK_TIMEOUT",
    60                 "OP_CONNECT",
    61                 "OP_FALLOCATE",
    62                 "OP_OPENAT",
    63                 "OP_CLOSE",
    64                 "OP_FILES_UPDATE",
    65                 "OP_STATX",
    66                 "OP_READ",
    67                 "OP_WRITE",
    68                 "OP_FADVISE",
    69                 "OP_MADVISE",
    70                 "OP_SEND",
    71                 "OP_RECV",
    72                 "OP_OPENAT2",
    73                 "OP_EPOLL_CTL",
    74                 "OP_SPLICE",
    75                 "OP_PROVIDE_BUFFERS",
    76                 "OP_REMOVE_BUFFERS",
    77                 "OP_TEE",
    78                 "INVALID_OP"
    79         };
    8043
    8144        // returns true of acquired as leader or second leader
     
    171134                int ret = 0;
    172135                if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174136                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175137                        if( ret < 0 ) {
     
    195157        static unsigned __collect_submitions( struct __io_data & ring );
    196158        static __u32 __release_consumed_submission( struct __io_data & ring );
    197         static inline void __clean( volatile struct io_uring_sqe * sqe );
     159
     160        static inline void process(struct io_uring_cqe & cqe ) {
     161                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     162                __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     163
     164                fulfil( *future, cqe.res );
     165        }
    198166
    199167        // Process a single completion message from the io_uring
    200168        // This is NOT thread-safe
    201         static inline void process( volatile struct io_uring_cqe & cqe ) {
    202                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    203                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    204 
    205                 fulfil( *future, cqe.res );
    206         }
    207 
    208169        static [int, bool] __drain_io( & struct __io_data ring ) {
    209170                /* paranoid */ verify( ! __preemption_enabled() );
     
    231192                }
    232193
    233                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    234 
    235194                // Release the consumed SQEs
    236195                __release_consumed_submission( ring );
     
    250209                for(i; count) {
    251210                        unsigned idx = (head + i) & mask;
    252                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     211                        struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
    253212
    254213                        /* paranoid */ verify(&cqe);
     
    259218                // Mark to the kernel that the cqe has been seen
    260219                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    261                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
     220                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     221                __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
    262222
    263223                return [count, count > 0 || to_submit > 0];
     
    265225
    266226        void main( $io_ctx_thread & this ) {
    267                 __ioctx_register( this );
    268 
    269                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    270 
    271                 const int reset_cnt = 5;
    272                 int reset = reset_cnt;
     227                epoll_event ev;
     228                __ioctx_register( this, ev );
     229
     230                __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
     231
     232                int reset = 0;
    273233                // Then loop until we need to start
    274                 LOOP:
    275234                while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    276235                        // Drain the io
     
    280239                                [count, again] = __drain_io( *this.ring );
    281240
    282                                 if(!again) reset--;
     241                                if(!again) reset++;
    283242
    284243                                // Update statistics
     
    290249
    291250                        // If we got something, just yield and check again
    292                         if(reset > 1) {
     251                        if(reset < 5) {
    293252                                yield();
    294                                 continue LOOP;
    295                         }
    296 
    297                         // We alread failed to find completed entries a few time.
    298                         if(reset == 1) {
    299                                 // Rearm the context so it can block
    300                                 // but don't block right away
    301                                 // we need to retry one last time in case
    302                                 // something completed *just now*
    303                                 __ioctx_prepare_block( this );
    304                                 continue LOOP;
    305                         }
    306 
     253                        }
     254                        // We didn't get anything baton pass to the slow poller
     255                        else {
    307256                                __STATS__( false,
    308257                                        io.complete_q.blocks += 1;
    309258                                )
    310                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
     259                                __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
     260                                reset = 0;
    311261
    312262                                // block this thread
     263                                __ioctx_prepare_block( this, ev );
    313264                                wait( this.sem );
    314 
    315                         // restore counter
    316                         reset = reset_cnt;
    317                 }
    318 
    319                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
     265                        }
     266                }
     267
     268                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
    320269        }
    321270
     
    340289//
    341290
    342         // Allocate an submit queue entry.
    343         // The kernel cannot see these entries until they are submitted, but other threads must be
    344         // able to see which entries can be used and which are already un used by an other thread
    345         // for convenience, return both the index and the pointer to the sqe
    346         // sqe == &sqes[idx]
    347         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
     291        [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    348292                /* paranoid */ verify( data != 0 );
    349293
     
    360304                        // Look through the list starting at some offset
    361305                        for(i; cnt) {
    362                                 __u64 expected = 3;
    363                                 __u32 idx = (i + off) & mask; // Get an index from a random
    364                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
     306                                __u64 expected = 0;
     307                                __u32 idx = (i + off) & mask;
     308                                struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    365309                                volatile __u64 * udata = &sqe->user_data;
    366310
    367                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    368311                                if( *udata == expected &&
    369312                                        __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
     
    376319                                        )
    377320
    378                                         // debug log
    379                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    380321
    381322                                        // Success return the data
     
    384325                                verify(expected != data);
    385326
    386                                 // This one was used
    387327                                len ++;
    388328                        }
    389329
    390330                        block++;
    391 
    392                         abort( "Kernel I/O : all submit queue entries used, yielding\n" );
    393 
    394331                        yield();
    395332                }
     
    440377        void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    441378                __io_data & ring = *ctx->thrd.ring;
    442 
    443                 {
    444                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    445                         __cfadbg_print_safe( io,
    446                                 "Kernel I/O : submitting %u (%p) for %p\n"
    447                                 "    data: %p\n"
    448                                 "    opcode: %s\n"
    449                                 "    fd: %d\n"
    450                                 "    flags: %d\n"
    451                                 "    prio: %d\n"
    452                                 "    off: %p\n"
    453                                 "    addr: %p\n"
    454                                 "    len: %d\n"
    455                                 "    other flags: %d\n"
    456                                 "    splice fd: %d\n"
    457                                 "    pad[0]: %llu\n"
    458                                 "    pad[1]: %llu\n"
    459                                 "    pad[2]: %llu\n",
    460                                 idx, sqe,
    461                                 active_thread(),
    462                                 (void*)sqe->user_data,
    463                                 opcodes[sqe->opcode],
    464                                 sqe->fd,
    465                                 sqe->flags,
    466                                 sqe->ioprio,
    467                                 sqe->off,
    468                                 sqe->addr,
    469                                 sqe->len,
    470                                 sqe->accept_flags,
    471                                 sqe->splice_fd_in,
    472                                 sqe->__pad2[0],
    473                                 sqe->__pad2[1],
    474                                 sqe->__pad2[2]
    475                         );
    476                 }
    477 
    478 
    479379                // Get now the data we definetely need
    480380                volatile __u32 * const tail = ring.submit_q.tail;
     
    543443                                unlock(ring.submit_q.submit_lock);
    544444                        #endif
    545                         if( ret < 0 ) {
    546                                 return;
    547                         }
     445                        if( ret < 0 ) return;
    548446
    549447                        // Release the consumed SQEs
     
    556454                                io.submit_q.submit_avg.cnt += 1;
    557455                        )
    558 
    559                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    560                 }
    561                 else
    562                 {
     456                }
     457                else {
    563458                        // get mutual exclusion
    564459                        #if defined(LEADER_LOCK)
     
    568463                        #endif
    569464
    570                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
     465                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
    571466                        /* paranoid */  "index %u already reclaimed\n"
    572467                        /* paranoid */  "head %u, prev %u, tail %u\n"
     
    595490                        }
    596491
    597                         /* paranoid */ verify(ret == 1);
    598 
    599492                        // update statistics
    600493                        __STATS__( false,
     
    603496                        )
    604497
    605                         {
    606                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    607                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    608                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    609 
    610                                 __cfadbg_print_safe( io,
    611                                         "Kernel I/O : last submitted is %u (%p)\n"
    612                                         "    data: %p\n"
    613                                         "    opcode: %s\n"
    614                                         "    fd: %d\n"
    615                                         "    flags: %d\n"
    616                                         "    prio: %d\n"
    617                                         "    off: %p\n"
    618                                         "    addr: %p\n"
    619                                         "    len: %d\n"
    620                                         "    other flags: %d\n"
    621                                         "    splice fd: %d\n"
    622                                         "    pad[0]: %llu\n"
    623                                         "    pad[1]: %llu\n"
    624                                         "    pad[2]: %llu\n",
    625                                         last_idx, sqe,
    626                                         (void*)sqe->user_data,
    627                                         opcodes[sqe->opcode],
    628                                         sqe->fd,
    629                                         sqe->flags,
    630                                         sqe->ioprio,
    631                                         sqe->off,
    632                                         sqe->addr,
    633                                         sqe->len,
    634                                         sqe->accept_flags,
    635                                         sqe->splice_fd_in,
    636                                         sqe->__pad2[0],
    637                                         sqe->__pad2[1],
    638                                         sqe->__pad2[2]
    639                                 );
    640                         }
    641 
    642                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    643498                        // Release the consumed SQEs
    644499                        __release_consumed_submission( ring );
    645                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    646500
    647501                        #if defined(LEADER_LOCK)
     
    651505                        #endif
    652506
    653                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
     507                        __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
    654508                }
    655509        }
    656510
    657511        // #define PARTIAL_SUBMIT 32
    658 
    659         // go through the list of submissions in the ready array and moved them into
    660         // the ring's submit queue
    661512        static unsigned __collect_submitions( struct __io_data & ring ) {
    662513                /* paranoid */ verify( ring.submit_q.ready != 0p );
     
    699550        }
    700551
    701         // Go through the ring's submit queue and release everything that has already been consumed
    702         // by io_uring
    703552        static __u32 __release_consumed_submission( struct __io_data & ring ) {
    704553                const __u32 smask = *ring.submit_q.mask;
    705554
    706                 // We need to get the lock to copy the old head and new head
    707555                if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
    708                 __attribute__((unused))
    709                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    710                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    711                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    712                 ring.submit_q.prev_head = chead;                // note up to were we processed
     556                __u32 chead = *ring.submit_q.head;
     557                __u32 phead = ring.submit_q.prev_head;
     558                ring.submit_q.prev_head = chead;
    713559                unlock(ring.submit_q.release_lock);
    714560
    715                 // the 3 fields are organized like this diagram
    716                 // except it's are ring
    717                 // ---+--------+--------+----
    718                 // ---+--------+--------+----
    719                 //    ^        ^        ^
    720                 // phead    chead    ctail
    721 
    722                 // make sure ctail doesn't wrap around and reach phead
    723                 /* paranoid */ verify(
    724                            (ctail >= chead && chead >= phead)
    725                         || (chead >= phead && phead >= ctail)
    726                         || (phead >= ctail && ctail >= chead)
    727                 );
    728 
    729                 // find the range we need to clear
    730561                __u32 count = chead - phead;
    731 
    732                 // We acquired an previous-head/current-head range
    733                 // go through the range and release the sqes
    734562                for( i; count ) {
    735563                        __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    736 
    737                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    738                         __clean( &ring.submit_q.sqes[ idx ] );
     564                        ring.submit_q.sqes[ idx ].user_data = 0;
    739565                }
    740566                return count;
    741567        }
    742 
    743         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    744                 __clean( sqe );
    745         }
    746 
    747         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    748                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    749                 __cfaabi_dbg_debug_do(
    750                         memset(sqe, 0xde, sizeof(*sqe));
    751                         sqe->opcode = IORING_OP_LAST;
    752                 );
    753 
    754                 // Mark the entry as unused
    755                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
    756         }
    757568#endif
  • libcfa/src/concurrency/io/call.cfa.in

    ra00bc5b r101cc3a  
    7474        ;
    7575
    76         extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
     76        extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
    7777        extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
    7878
     
    222222                __u32 idx;
    223223                struct io_uring_sqe * sqe;
    224                 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    225 
     224                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     225
     226                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
    226227                sqe->opcode = IORING_OP_{op};
    227                 sqe->flags = sflags;
    228                 sqe->ioprio = 0;
    229                 sqe->fd = 0;
    230                 sqe->off = 0;
    231                 sqe->addr = 0;
    232                 sqe->len = 0;
    233                 sqe->accept_flags = 0;
    234                 sqe->__pad2[0] = 0;
    235                 sqe->__pad2[1] = 0;
    236                 sqe->__pad2[2] = 0;{body}
    237 
    238                 asm volatile("": : :"memory");
     228                sqe->flags = sflags;{body}
    239229
    240230                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     
    322312        }),
    323313        # CFA_HAVE_IORING_OP_ACCEPT
    324         Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    325                 'fd': 'sockfd',
    326                 'addr': '(__u64)addr',
    327                 'addr2': '(__u64)addrlen',
     314        Call('ACCEPT4', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
     315                'fd': 'sockfd',
     316                'addr': 'addr',
     317                'addr2': 'addrlen',
    328318                'accept_flags': 'flags'
    329319        }),
     
    474464
    475465print("""
    476 //-----------------------------------------------------------------------------
    477 bool cancel(io_cancellation & this) {
    478         #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
    479                 return false;
    480         #else
    481                 io_future_t future;
    482 
    483                 io_context * context = __get_io_context();
    484 
    485                 __u8 sflags = 0;
    486                 struct __io_data & ring = *context->thrd.ring;
    487 
    488                 __u32 idx;
    489                 volatile struct io_uring_sqe * sqe;
    490                 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    491 
    492                 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
    493                 sqe->opcode = IORING_OP_ASYNC_CANCEL;
    494                 sqe->flags = sflags;
    495                 sqe->addr = this.target;
    496 
    497                 verify( sqe->user_data == (__u64)(uintptr_t)&future );
    498                 __submit( context, idx );
    499 
    500                 wait(future);
    501 
    502                 if( future.result == 0 ) return true; // Entry found
    503                 if( future.result == -EALREADY) return true; // Entry found but in progress
    504                 if( future.result == -ENOENT ) return false; // Entry not found
    505                 return false;
    506         #endif
    507 }
    508 
    509466//-----------------------------------------------------------------------------
    510467// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    ra00bc5b r101cc3a  
    5252                #include <pthread.h>
    5353                #include <sys/epoll.h>
    54                 #include <sys/eventfd.h>
    5554                #include <sys/mman.h>
    5655                #include <sys/syscall.h>
     
    170169                // Main loop
    171170                while( iopoll.run ) {
    172                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    173 
    174171                        // Wait for events
    175172                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    176 
    177                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    178173
    179174                        // Check if an error occured
     
    186181                                $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    187182                                /* paranoid */ verify( io_ctx );
    188                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
     183                                __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
    189184                                #if !defined( __CFA_NO_STATISTICS__ )
    190185                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    191186                                #endif
    192 
    193                                 eventfd_t v;
    194                                 eventfd_read(io_ctx->ring->efd, &v);
    195 
    196187                                post( io_ctx->sem );
    197188                        }
     
    242233                $thread & thrd = this.thrd.self;
    243234                if( cluster_context ) {
    244                         // We are about to do weird things with the threads
    245                         // we don't need interrupts to complicate everything
    246                         disable_interrupts();
    247 
    248                         // Get cluster info
    249235                        cluster & cltr = *thrd.curr_cluster;
    250236                        /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
     
    253239                        // We need to adjust the clean-up based on where the thread is
    254240                        if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    255                                 // This is the tricky case
    256                                 // The thread was preempted or ready to run and now it is on the ready queue
    257                                 // but the cluster is shutting down, so there aren't any processors to run the ready queue
    258                                 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    259241
    260242                                ready_schedule_lock();
    261                                         // The thread should on the list
     243
     244                                        // This is the tricky case
     245                                        // The thread was preempted and now it is on the ready queue
     246                                        // The thread should be the last on the list
    262247                                        /* paranoid */ verify( thrd.link.next != 0p );
    263248
    264249                                        // Remove the thread from the ready queue of this cluster
    265                                         // The thread should be the last on the list
    266250                                        __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    267251                                        /* paranoid */ verify( removed );
     
    279263                        }
    280264                        // !!! This is not an else if !!!
    281                         // Ok, now the thread is blocked (whether we cheated to get here or not)
    282265                        if( thrd.state == Blocked ) {
     266
    283267                                // This is the "easy case"
    284268                                // The thread is parked and can easily be moved to active cluster
     
    290274                        }
    291275                        else {
     276
    292277                                // The thread is in a weird state
    293278                                // I don't know what to do here
    294279                                abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    295280                        }
    296 
    297                         // The weird thread kidnapping stuff is over, restore interrupts.
    298                         enable_interrupts( __cfaabi_dbg_ctx );
    299281                } else {
    300282                        post( this.thrd.sem );
     
    383365                }
    384366
    385                 // Step 3 : Initialize the data structure
    386367                // Get the pointers from the kernel to fill the structure
    387368                // submit queue
     
    398379                        const __u32 num = *sq.num;
    399380                        for( i; num ) {
    400                                 sq.sqes[i].opcode = IORING_OP_LAST;
    401                                 sq.sqes[i].user_data = 3ul64;
     381                                sq.sqes[i].user_data = 0ul64;
    402382                        }
    403383                }
     
    429409                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
    430410
    431                 // Step 4 : eventfd
    432                 int efd;
    433                 for() {
    434                         efd = eventfd(0, 0);
    435                         if (efd < 0) {
    436                                 if (errno == EINTR) continue;
    437                                 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    438                         }
    439                         break;
    440                 }
    441 
    442                 int ret;
    443                 for() {
    444                         ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
    445                         if (ret < 0) {
    446                                 if (errno == EINTR) continue;
    447                                 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
    448                         }
    449                         break;
    450                 }
    451 
    452411                // some paranoid checks
    453412                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
     
    464423                this.ring_flags = params.flags;
    465424                this.fd         = fd;
    466                 this.efd        = efd;
    467425                this.eager_submits  = params_in.eager_submits;
    468426                this.poller_submits = params_in.poller_submits;
     
    487445                // close the file descriptor
    488446                close(this.fd);
    489                 close(this.efd);
    490447
    491448                free( this.submit_q.ready ); // Maybe null, doesn't matter
     
    495452// I/O Context Sleep
    496453//=============================================================================================
    497         #define IOEVENTS EPOLLIN | EPOLLONESHOT
    498 
    499         static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
    500                 struct epoll_event ev;
    501                 ev.events = IOEVENTS;
     454
     455        void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
     456                ev.events = EPOLLIN | EPOLLONESHOT;
    502457                ev.data.u64 = (__u64)&ctx;
    503                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
     458                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
    504459                if (ret < 0) {
    505                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    506                 }
    507         }
    508 
    509         void __ioctx_register($io_ctx_thread & ctx) {
    510                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    511         }
    512 
    513         void __ioctx_prepare_block($io_ctx_thread & ctx) {
    514                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
    515                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     460                        abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
     461                }
     462        }
     463
     464        void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
     465                int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
     466                if (ret < 0) {
     467                        abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
     468                }
    516469        }
    517470
  • libcfa/src/concurrency/io/types.hfa

    ra00bc5b r101cc3a  
    6565
    6666                // A buffer of sqes (not the actual ring)
    67                 volatile struct io_uring_sqe * sqes;
     67                struct io_uring_sqe * sqes;
    6868
    6969                // The location and size of the mmaped area
     
    8585
    8686                // the kernel ring
    87                 volatile struct io_uring_cqe * cqes;
     87                struct io_uring_cqe * cqes;
    8888
    8989                // The location and size of the mmaped area
     
    9797                __u32 ring_flags;
    9898                int fd;
    99                 int efd;
    10099                bool eager_submits:1;
    101100                bool poller_submits:1;
     
    131130        #endif
    132131
     132        struct epoll_event;
    133133        struct $io_ctx_thread;
    134         void __ioctx_register($io_ctx_thread & ctx);
    135         void __ioctx_prepare_block($io_ctx_thread & ctx);
    136         void __sqe_clean( volatile struct io_uring_sqe * sqe );
     134        void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev);
     135        void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev);
    137136#endif
    138137
Note: See TracChangeset for help on using the changeset viewer.