Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/clib/cfathread.cfa

    r75c7252 re84ab3d  
    1313// Update Count     :
    1414//
    15 
    16 // #define EPOLL_FOR_SOCKETS
    1715
    1816#include "fstream.hfa"
     
    2523#include "cfathread.h"
    2624
    27 extern "C" {
    28                 #include <string.h>
    29                 #include <errno.h>
    30 }
    31 
    3225extern void ?{}(processor &, const char[], cluster &, thread$ *);
    3326extern "C" {
    3427      extern void __cfactx_invoke_thread(void (*main)(void *), void * this);
    35         extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    3628}
    3729
    3830extern Time __kernel_get_time();
    39 extern unsigned register_proc_id( void );
    4031
    4132//================================================================================
    42 // Epoll support for sockets
    43 
    44 #if defined(EPOLL_FOR_SOCKETS)
    45         extern "C" {
    46                 #include <sys/epoll.h>
    47                 #include <sys/resource.h>
    48         }
    49 
    50         static pthread_t master_poller;
    51         static int master_epollfd = 0;
    52         static size_t poller_cnt = 0;
    53         static int * poller_fds = 0p;
    54         static struct leaf_poller * pollers = 0p;
    55 
    56         struct __attribute__((aligned)) fd_info_t {
    57                 int pollid;
    58                 size_t rearms;
    59         };
    60         rlim_t fd_limit = 0;
    61         static fd_info_t * volatile * fd_map = 0p;
    62 
    63         void * master_epoll( __attribute__((unused)) void * args ) {
    64                 unsigned id = register_proc_id();
    65 
    66                 enum { MAX_EVENTS = 5 };
    67                 struct epoll_event events[MAX_EVENTS];
    68                 for() {
    69                         int ret = epoll_wait(master_epollfd, events, MAX_EVENTS, -1);
    70                         if ( ret < 0 ) {
    71                                 abort | "Master epoll error: " | strerror(errno);
    72                         }
    73 
    74                         for(i; ret) {
    75                                 thread$ * thrd = (thread$ *)events[i].data.u64;
    76                                 unpark( thrd );
    77                         }
    78                 }
    79 
    80                 return 0p;
    81         }
    82 
    83         static inline int epoll_rearm(int epollfd, int fd, uint32_t event) {
    84                 struct epoll_event eevent;
    85                 eevent.events = event | EPOLLET | EPOLLONESHOT;
    86                 eevent.data.u64 = (uint64_t)active_thread();
    87 
    88                 if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &eevent))
    89                 {
    90                         if(errno == ENOENT) return -1;
    91                         abort | acquire | "epoll" | epollfd | "ctl rearm" | fd | "error: " | errno | strerror(errno);
    92                 }
    93 
    94                 park();
    95                 return 0;
    96         }
    97 
    98         thread leaf_poller {
    99                 int epollfd;
    100         };
    101 
    102         void ?{}(leaf_poller & this, int fd) { this.epollfd = fd; }
    103 
    104         void main(leaf_poller & this) {
    105                 enum { MAX_EVENTS = 1024 };
    106                 struct epoll_event events[MAX_EVENTS];
    107                 const int max_retries = 5;
    108                 int retries = max_retries;
    109 
    110                 struct epoll_event event;
    111                 event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    112                 event.data.u64 = (uint64_t)&(thread&)this;
    113 
    114                 if(0 != epoll_ctl(master_epollfd, EPOLL_CTL_ADD, this.epollfd, &event))
    115                 {
    116                         abort | "master epoll ctl add leaf: " | errno | strerror(errno);
    117                 }
    118 
    119                 park();
    120 
    121                 for() {
    122                         yield();
    123                         int ret = epoll_wait(this.epollfd, events, MAX_EVENTS, 0);
    124                         if ( ret < 0 ) {
    125                                 abort | "Leaf epoll error: " | errno | strerror(errno);
    126                         }
    127 
    128                         if(ret) {
    129                                 for(i; ret) {
    130                                         thread$ * thrd = (thread$ *)events[i].data.u64;
    131                                         unpark( thrd, UNPARK_REMOTE );
    132                                 }
    133                         }
    134                         else if(0 >= --retries) {
    135                                 epoll_rearm(master_epollfd, this.epollfd, EPOLLIN);
    136                         }
    137                 }
    138         }
    139 
    140         void setup_epoll( void ) __attribute__(( constructor ));
    141         void setup_epoll( void ) {
    142                 if(master_epollfd) abort | "Master epoll already setup";
    143 
    144                 master_epollfd = epoll_create1(0);
    145                 if(master_epollfd == -1) {
    146                         abort | "failed to create master epoll: " | errno | strerror(errno);
    147                 }
    148 
    149                 struct rlimit rlim;
    150                 if(int ret = getrlimit(RLIMIT_NOFILE, &rlim); 0 != ret) {
    151                         abort | "failed to get nofile limit: " | errno | strerror(errno);
    152                 }
    153 
    154                 fd_limit = rlim.rlim_cur;
    155                 fd_map = alloc(fd_limit);
    156                 for(i;fd_limit) {
    157                         fd_map[i] = 0p;
    158                 }
    159 
    160                 poller_cnt = 2;
    161                 poller_fds = alloc(poller_cnt);
    162                 pollers    = alloc(poller_cnt);
    163                 for(i; poller_cnt) {
    164                         poller_fds[i] = epoll_create1(0);
    165                         if(poller_fds[i] == -1) {
    166                                 abort | "failed to create leaf epoll [" | i | "]: " | errno | strerror(errno);
    167                         }
    168 
    169                         (pollers[i]){ poller_fds[i] };
    170                 }
    171 
    172                 pthread_attr_t attr;
    173                 if (int ret = pthread_attr_init(&attr); 0 != ret) {
    174                         abort | "failed to create master epoll thread attr: " | ret | strerror(ret);
    175                 }
    176 
    177                 if (int ret = pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) {
    178                         abort | "failed to create master epoll thread: " | ret | strerror(ret);
    179                 }
    180         }
    181 
    182         static inline int epoll_wait(int fd, uint32_t event) {
    183                 if(fd_map[fd] >= 1p) {
    184                         fd_map[fd]->rearms++;
    185                         epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event);
    186                         return 0;
    187                 }
    188 
    189                 for() {
    190                         fd_info_t * expected = 0p;
    191                         fd_info_t * sentinel = 1p;
    192                         if(__atomic_compare_exchange_n( &(fd_map[fd]), &expected, sentinel, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
    193                                 struct epoll_event eevent;
    194                                 eevent.events = event | EPOLLET | EPOLLONESHOT;
    195                                 eevent.data.u64 = (uint64_t)active_thread();
    196 
    197                                 int id = thread_rand() % poller_cnt;
    198                                 if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent))
    199                                 {
    200                                         abort | "epoll ctl add" | poller_fds[id] | fd | fd_map[fd] | expected | "error: " | errno | strerror(errno);
    201                                 }
    202 
    203                                 fd_info_t * ninfo = alloc();
    204                                 ninfo->pollid = id;
    205                                 ninfo->rearms = 0;
    206                                 __atomic_store_n( &fd_map[fd], ninfo, __ATOMIC_SEQ_CST);
    207 
    208                                 park();
    209                                 return 0;
    210                         }
    211 
    212                         if(expected >= 0) {
    213                                 fd_map[fd]->rearms++;
    214                                 epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event);
    215                                 return 0;
    216                         }
    217 
    218                         Pause();
    219                 }
    220         }
    221 #endif
    222 
    223 //================================================================================
    224 // Thread run by the C Interface
     33// Thread run y the C Interface
    22534
    22635struct cfathread_object {
     
    436245        // Mutex
    437246        struct cfathread_mutex {
    438                 linear_backoff_then_block_lock impl;
     247                fast_lock impl;
    439248        };
    440249        int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; }
     
    451260        // Condition
    452261        struct cfathread_condition {
    453                 condition_variable(linear_backoff_then_block_lock) impl;
     262                condition_variable(fast_lock) impl;
    454263        };
    455264        int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; }
     
    479288        // IO operations
    480289        int cfathread_socket(int domain, int type, int protocol) {
    481                 return socket(domain, type
    482                 #if defined(EPOLL_FOR_SOCKETS)
    483                         | SOCK_NONBLOCK
    484                 #endif
    485                 , protocol);
     290                return socket(domain, type, protocol);
    486291        }
    487292        int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) {
     
    494299
    495300        int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) {
    496                 #if defined(EPOLL_FOR_SOCKETS)
    497                         int ret;
    498                         for() {
    499                                 yield();
    500                                 ret = accept4(socket, address, address_len, SOCK_NONBLOCK);
    501                                 if(ret >= 0) break;
    502                                 if(errno != EAGAIN && errno != EWOULDBLOCK) break;
    503 
    504                                 epoll_wait(socket, EPOLLIN);
    505                         }
    506                         return ret;
    507                 #else
    508                         return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
    509                 #endif
     301                return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
    510302        }
    511303
    512304        int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) {
    513                 #if defined(EPOLL_FOR_SOCKETS)
    514                         int ret;
    515                         for() {
    516                                 ret = connect(socket, address, address_len);
    517                                 if(ret >= 0) break;
    518                                 if(errno != EAGAIN && errno != EWOULDBLOCK) break;
    519 
    520                                 epoll_wait(socket, EPOLLIN);
    521                         }
    522                         return ret;
    523                 #else
    524                         return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
    525                 #endif
     305                return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
    526306        }
    527307
     
    535315
    536316        ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags) {
    537                 #if defined(EPOLL_FOR_SOCKETS)
    538                         ssize_t ret;
    539                         __STATS__( false, io.ops.sockwrite++; )
    540                         for() {
    541                                 ret = sendmsg(socket, message, flags);
    542                                 if(ret >= 0) break;
    543                                 if(errno != EAGAIN && errno != EWOULDBLOCK) break;
    544 
    545                                 __STATS__( false, io.ops.epllwrite++; )
    546                                 epoll_wait(socket, EPOLLOUT);
    547                         }
    548                 #else
    549                         ssize_t ret = cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
    550                 #endif
    551                 return ret;
     317                return cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
    552318        }
    553319
    554320        ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) {
    555321                // Use send rather then write for socket since it's faster
    556                 #if defined(EPOLL_FOR_SOCKETS)
    557                         ssize_t ret;
    558                         // __STATS__( false, io.ops.sockwrite++; )
    559                         for() {
    560                                 ret = send(fildes, buf, nbyte, 0);
    561                                 if(ret >= 0) break;
    562                                 if(errno != EAGAIN && errno != EWOULDBLOCK) break;
    563 
    564                                 // __STATS__( false, io.ops.epllwrite++; )
    565                                 epoll_wait(fildes, EPOLLOUT);
    566                         }
    567                 #else
    568                         ssize_t ret = cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
    569                 #endif
    570                 return ret;
     322                return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
    571323        }
    572324
     
    584336                msg.msg_controllen = 0;
    585337
    586                 #if defined(EPOLL_FOR_SOCKETS)
    587                         ssize_t ret;
    588                         yield();
    589                         for() {
    590                                 ret = recvmsg(socket, &msg, flags);
    591                                 if(ret >= 0) break;
    592                                 if(errno != EAGAIN && errno != EWOULDBLOCK) break;
    593 
    594                                 epoll_wait(socket, EPOLLIN);
    595                         }
    596                 #else
    597                         ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
    598                 #endif
     338                ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
    599339
    600340                if(address_len) *address_len = msg.msg_namelen;
     
    604344        ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) {
    605345                // Use recv rather then read for socket since it's faster
    606                 #if defined(EPOLL_FOR_SOCKETS)
    607                         ssize_t ret;
    608                         __STATS__( false, io.ops.sockread++; )
    609                         yield();
    610                         for() {
    611                                 ret = recv(fildes, buf, nbyte, 0);
    612                                 if(ret >= 0) break;
    613                                 if(errno != EAGAIN && errno != EWOULDBLOCK) break;
    614 
    615                                 __STATS__( false, io.ops.epllread++; )
    616                                 epoll_wait(fildes, EPOLLIN);
    617                         }
    618                 #else
    619                         ssize_t ret = cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
    620                 #endif
    621                 return ret;
    622         }
    623 
    624 }
     346                return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
     347        }
     348
     349}
Note: See TracChangeset for help on using the changeset viewer.