Changeset 4500e52


Ignore:
Timestamp:
Sep 23, 2021, 12:44:19 PM (2 months ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
master
Children:
bc4a433
Parents:
db614d0
Message:

Added cfathread version that uses epoll

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/clib/cfathread.cfa

    rdb614d0 r4500e52  
    1414//
    1515
     16#define EPOLL_FOR_SOCKETS
     17
    1618#include "fstream.hfa"
    1719#include "locks.hfa"
     
    2325#include "cfathread.h"
    2426
     27extern "C" {
     28                #include <string.h>
     29                #include <errno.h>
     30}
     31
    2532extern void ?{}(processor &, const char[], cluster &, thread$ *);
    2633extern "C" {
    2734      extern void __cfactx_invoke_thread(void (*main)(void *), void * this);
     35        extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    2836}
    2937
    3038extern Time __kernel_get_time();
     39extern unsigned register_proc_id( void );
    3140
    3241//================================================================================
    33 // Thread run y the C Interface
     42// Epoll support for sockets
     43
     44#if defined(EPOLL_FOR_SOCKETS)
     45        extern "C" {
     46                #include <sys/epoll.h>
     47                #include <sys/resource.h>
     48        }
     49
     50        static pthread_t master_poller;
     51        static int master_epollfd = 0;
     52        static size_t poller_cnt = 0;
     53        static int * poller_fds = 0p;
     54        static struct leaf_poller * pollers = 0p;
     55
     56        struct __attribute__((aligned)) fd_info_t {
     57                int pollid;
     58                size_t rearms;
     59        };
     60        rlim_t fd_limit = 0;
     61        static fd_info_t * volatile * fd_map = 0p;
     62
     63        void * master_epoll( __attribute__((unused)) void * args ) {
     64                unsigned id = register_proc_id();
     65
     66                enum { MAX_EVENTS = 5 };
     67                struct epoll_event events[MAX_EVENTS];
     68                for() {
     69                        int ret = epoll_wait(master_epollfd, events, MAX_EVENTS, -1);
     70                        if ( ret < 0 ) {
     71                                abort | "Master epoll error: " | strerror(errno);
     72                        }
     73
     74                        for(i; ret) {
     75                                thread$ * thrd = (thread$ *)events[i].data.u64;
     76                                unpark( thrd );
     77                        }
     78                }
     79
     80                return 0p;
     81        }
     82
     83        static inline int epoll_rearm(int epollfd, int fd, uint32_t event) {
     84                struct epoll_event eevent;
     85                eevent.events = event | EPOLLET | EPOLLONESHOT;
     86                eevent.data.u64 = (uint64_t)active_thread();
     87
     88                if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &eevent))
     89                {
     90                        if(errno == ENOENT) return -1;
     91                        abort | acquire | "epoll" | epollfd | "ctl rearm" | fd | "error: " | errno | strerror(errno);
     92                }
     93
     94                park();
     95                return 0;
     96        }
     97
     98        thread leaf_poller {
     99                int epollfd;
     100        };
     101
     102        void ?{}(leaf_poller & this, int fd) { this.epollfd = fd; }
     103
     104        void main(leaf_poller & this) {
     105                enum { MAX_EVENTS = 1024 };
     106                struct epoll_event events[MAX_EVENTS];
     107                const int max_retries = 5;
     108                int retries = max_retries;
     109
     110                struct epoll_event event;
     111                event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
     112                event.data.u64 = (uint64_t)&(thread&)this;
     113
     114                if(0 != epoll_ctl(master_epollfd, EPOLL_CTL_ADD, this.epollfd, &event))
     115                {
     116                        abort | "master epoll ctl add leaf: " | errno | strerror(errno);
     117                }
     118
     119                park();
     120
     121                for() {
     122                        yield();
     123                        int ret = epoll_wait(this.epollfd, events, MAX_EVENTS, 0);
     124                        if ( ret < 0 ) {
     125                                abort | "Leaf epoll error: " | errno | strerror(errno);
     126                        }
     127
     128                        if(ret) {
     129                                for(i; ret) {
     130                                        thread$ * thrd = (thread$ *)events[i].data.u64;
     131                                        unpark( thrd );
     132                                }
     133                        }
     134                        else if(0 >= --retries) {
     135                                epoll_rearm(master_epollfd, this.epollfd, EPOLLIN);
     136                        }
     137                }
     138        }
     139
     140        void setup_epoll( void ) __attribute__(( constructor ));
     141        void setup_epoll( void ) {
     142                if(master_epollfd) abort | "Master epoll already setup";
     143
     144                master_epollfd = epoll_create1(0);
     145                if(master_epollfd == -1) {
     146                        abort | "failed to create master epoll: " | errno | strerror(errno);
     147                }
     148
     149                struct rlimit rlim;
     150                if(int ret = getrlimit(RLIMIT_NOFILE, &rlim); 0 != ret) {
     151                        abort | "failed to get nofile limit: " | errno | strerror(errno);
     152                }
     153
     154                fd_limit = rlim.rlim_cur;
     155                fd_map = alloc(fd_limit);
     156                for(i;fd_limit) {
     157                        fd_map[i] = 0p;
     158                }
     159
     160                poller_cnt = 24;
     161                poller_fds = alloc(poller_cnt);
     162                pollers    = alloc(poller_cnt);
     163                for(i; poller_cnt) {
     164                        poller_fds[i] = epoll_create1(0);
     165                        if(poller_fds[i] == -1) {
     166                                abort | "failed to create leaf epoll [" | i | "]: " | errno | strerror(errno);
     167                        }
     168
     169                        (pollers[i]){ poller_fds[i] };
     170                }
     171
     172                pthread_attr_t attr;
     173                if (int ret = pthread_attr_init(&attr); 0 != ret) {
     174                        abort | "failed to create master epoll thread attr: " | ret | strerror(ret);
     175                }
     176
     177                if (int ret = pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) {
     178                        abort | "failed to create master epoll thread: " | ret | strerror(ret);
     179                }
     180        }
     181
     182        static inline int epoll_wait(int fd, uint32_t event) {
     183                if(fd_map[fd] >= 1p) {
     184                        fd_map[fd]->rearms++;
     185                        epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event);
     186                        return 0;
     187                }
     188
     189                for() {
     190                        fd_info_t * expected = 0p;
     191                        fd_info_t * sentinel = 1p;
     192                        if(__atomic_compare_exchange_n( &(fd_map[fd]), &expected, sentinel, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
     193                                struct epoll_event eevent;
     194                                eevent.events = event | EPOLLET | EPOLLONESHOT;
     195                                eevent.data.u64 = (uint64_t)active_thread();
     196
     197                                int id = thread_rand() % poller_cnt;
     198                                if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent))
     199                                {
     200                                        abort | "epoll ctl add" | poller_fds[id] | fd | fd_map[fd] | expected | "error: " | errno | strerror(errno);
     201                                }
     202
     203                                fd_info_t * ninfo = alloc();
     204                                ninfo->pollid = id;
     205                                ninfo->rearms = 0;
     206                                __atomic_store_n( &fd_map[fd], ninfo, __ATOMIC_SEQ_CST);
     207
     208                                park();
     209                                return 0;
     210                        }
     211
     212                        if(expected >= 0) {
     213                                fd_map[fd]->rearms++;
     214                                epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event);
     215                                return 0;
     216                        }
     217
     218                        Pause();
     219                }
     220        }
     221#endif
     222
     223//================================================================================
     224// Thread run by the C Interface
    34225
    35226struct cfathread_object {
     
    288479        // IO operations
    289480        int cfathread_socket(int domain, int type, int protocol) {
    290                 return socket(domain, type, protocol);
     481                return socket(domain, type
     482                #if defined(EPOLL_FOR_SOCKETS)
     483                        | SOCK_NONBLOCK
     484                #endif
     485                , protocol);
    291486        }
    292487        int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) {
     
    299494
    300495        int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) {
    301                 return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
     496                #if defined(EPOLL_FOR_SOCKETS)
     497                        int ret;
     498                        for() {
     499                                yield();
     500                                ret = accept4(socket, address, address_len, SOCK_NONBLOCK);
     501                                if(ret >= 0) break;
     502                                if(errno != EAGAIN && errno != EWOULDBLOCK) break;
     503
     504                                epoll_wait(socket, EPOLLIN);
     505                        }
     506                        return ret;
     507                #else
     508                        return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
     509                #endif
    302510        }
    303511
    304512        int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) {
    305                 return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
     513                #if defined(EPOLL_FOR_SOCKETS)
     514                        int ret;
     515                        for() {
     516                                ret = connect(socket, address, address_len);
     517                                if(ret >= 0) break;
     518                                if(errno != EAGAIN && errno != EWOULDBLOCK) break;
     519
     520                                epoll_wait(socket, EPOLLIN);
     521                        }
     522                        return ret;
     523                #else
     524                        return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
     525                #endif
    306526        }
    307527
     
    315535
    316536        ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags) {
    317                 return cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
     537                #if defined(EPOLL_FOR_SOCKETS)
     538                        ssize_t ret;
     539                        __STATS__( false, io.ops.sockwrite++; )
     540                        for() {
     541                                ret = sendmsg(socket, message, flags);
     542                                if(ret >= 0) break;
     543                                if(errno != EAGAIN && errno != EWOULDBLOCK) break;
     544
     545                                __STATS__( false, io.ops.epllwrite++; )
     546                                epoll_wait(socket, EPOLLOUT);
     547                        }
     548                #else
     549                        ssize_t ret = cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
     550                #endif
     551                return ret;
    318552        }
    319553
    320554        ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) {
    321555                // Use send rather then write for socket since it's faster
    322                 return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
     556                #if defined(EPOLL_FOR_SOCKETS)
     557                        ssize_t ret;
     558                        // __STATS__( false, io.ops.sockwrite++; )
     559                        for() {
     560                                ret = send(fildes, buf, nbyte, 0);
     561                                if(ret >= 0) break;
     562                                if(errno != EAGAIN && errno != EWOULDBLOCK) break;
     563
     564                                // __STATS__( false, io.ops.epllwrite++; )
     565                                epoll_wait(fildes, EPOLLOUT);
     566                        }
     567                #else
     568                        ssize_t ret = cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
     569                #endif
     570                return ret;
    323571        }
    324572
     
    336584                msg.msg_controllen = 0;
    337585
    338                 ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
     586                #if defined(EPOLL_FOR_SOCKETS)
     587                        ssize_t ret;
     588                        yield();
     589                        for() {
     590                                ret = recvmsg(socket, &msg, flags);
     591                                if(ret >= 0) break;
     592                                if(errno != EAGAIN && errno != EWOULDBLOCK) break;
     593
     594                                epoll_wait(socket, EPOLLIN);
     595                        }
     596                #else
     597                        ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
     598                #endif
    339599
    340600                if(address_len) *address_len = msg.msg_namelen;
     
    344604        ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) {
    345605                // Use recv rather then read for socket since it's faster
    346                 return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
    347         }
    348 
    349 }
     606                #if defined(EPOLL_FOR_SOCKETS)
     607                        ssize_t ret;
     608                        __STATS__( false, io.ops.sockread++; )
     609                        yield();
     610                        for() {
     611                                ret = recv(fildes, buf, nbyte, 0);
     612                                if(ret >= 0) break;
     613                                if(errno != EAGAIN && errno != EWOULDBLOCK) break;
     614
     615                                __STATS__( false, io.ops.epllread++; )
     616                                epoll_wait(fildes, EPOLLIN);
     617                        }
     618                #else
     619                        ssize_t ret = cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
     620                #endif
     621                return ret;
     622        }
     623
     624}
Note: See TracChangeset for help on using the changeset viewer.