Changeset 4500e52 for libcfa/src/concurrency/clib
- Timestamp:
- Sep 23, 2021, 12:44:19 PM (3 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- bc4a433
- Parents:
- db614d0
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
rdb614d0 r4500e52 14 14 // 15 15 16 #define EPOLL_FOR_SOCKETS 17 16 18 #include "fstream.hfa" 17 19 #include "locks.hfa" … … 23 25 #include "cfathread.h" 24 26 27 extern "C" { 28 #include <string.h> 29 #include <errno.h> 30 } 31 25 32 extern void ?{}(processor &, const char[], cluster &, thread$ *); 26 33 extern "C" { 27 34 extern void __cfactx_invoke_thread(void (*main)(void *), void * this); 35 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 28 36 } 29 37 30 38 extern Time __kernel_get_time(); 39 extern unsigned register_proc_id( void ); 31 40 32 41 //================================================================================ 33 // Thread run y the C Interface 42 // Epoll support for sockets 43 44 #if defined(EPOLL_FOR_SOCKETS) 45 extern "C" { 46 #include <sys/epoll.h> 47 #include <sys/resource.h> 48 } 49 50 static pthread_t master_poller; 51 static int master_epollfd = 0; 52 static size_t poller_cnt = 0; 53 static int * poller_fds = 0p; 54 static struct leaf_poller * pollers = 0p; 55 56 struct __attribute__((aligned)) fd_info_t { 57 int pollid; 58 size_t rearms; 59 }; 60 rlim_t fd_limit = 0; 61 static fd_info_t * volatile * fd_map = 0p; 62 63 void * master_epoll( __attribute__((unused)) void * args ) { 64 unsigned id = register_proc_id(); 65 66 enum { MAX_EVENTS = 5 }; 67 struct epoll_event events[MAX_EVENTS]; 68 for() { 69 int ret = epoll_wait(master_epollfd, events, MAX_EVENTS, -1); 70 if ( ret < 0 ) { 71 abort | "Master epoll error: " | strerror(errno); 72 } 73 74 for(i; ret) { 75 thread$ * thrd = (thread$ *)events[i].data.u64; 76 unpark( thrd ); 77 } 78 } 79 80 return 0p; 81 } 82 83 static inline int epoll_rearm(int epollfd, int fd, uint32_t event) { 84 struct epoll_event eevent; 85 eevent.events = event | EPOLLET | EPOLLONESHOT; 86 eevent.data.u64 = (uint64_t)active_thread(); 87 88 if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &eevent)) 89 { 90 if(errno == ENOENT) return -1; 91 abort | acquire | "epoll" | epollfd | "ctl rearm" | fd | "error: " | errno | strerror(errno); 92 } 93 94 park(); 95 return 0; 96 } 97 98 thread leaf_poller { 99 int epollfd; 100 }; 101 102 void ?{}(leaf_poller & this, int fd) { this.epollfd = fd; } 103 104 void main(leaf_poller & this) { 105 enum { MAX_EVENTS = 1024 }; 106 struct epoll_event events[MAX_EVENTS]; 107 const int max_retries = 5; 108 int retries = max_retries; 109 110 struct epoll_event event; 111 event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 112 event.data.u64 = (uint64_t)&(thread&)this; 113 114 if(0 != epoll_ctl(master_epollfd, EPOLL_CTL_ADD, this.epollfd, &event)) 115 { 116 abort | "master epoll ctl add leaf: " | errno | strerror(errno); 117 } 118 119 park(); 120 121 for() { 122 yield(); 123 int ret = epoll_wait(this.epollfd, events, MAX_EVENTS, 0); 124 if ( ret < 0 ) { 125 abort | "Leaf epoll error: " | errno | strerror(errno); 126 } 127 128 if(ret) { 129 for(i; ret) { 130 thread$ * thrd = (thread$ *)events[i].data.u64; 131 unpark( thrd ); 132 } 133 } 134 else if(0 >= --retries) { 135 epoll_rearm(master_epollfd, this.epollfd, EPOLLIN); 136 } 137 } 138 } 139 140 void setup_epoll( void ) __attribute__(( constructor )); 141 void setup_epoll( void ) { 142 if(master_epollfd) abort | "Master epoll already setup"; 143 144 master_epollfd = epoll_create1(0); 145 if(master_epollfd == -1) { 146 abort | "failed to create master epoll: " | errno | strerror(errno); 147 } 148 149 struct rlimit rlim; 150 if(int ret = getrlimit(RLIMIT_NOFILE, &rlim); 0 != ret) { 151 abort | "failed to get nofile limit: " | errno | strerror(errno); 152 } 153 154 fd_limit = rlim.rlim_cur; 155 fd_map = alloc(fd_limit); 156 for(i;fd_limit) { 157 fd_map[i] = 0p; 158 } 159 160 poller_cnt = 24; 161 poller_fds = alloc(poller_cnt); 162 pollers = alloc(poller_cnt); 163 for(i; poller_cnt) { 164 poller_fds[i] = epoll_create1(0); 165 if(poller_fds[i] == -1) { 166 abort | "failed to create leaf epoll [" | i | "]: " | errno | strerror(errno); 167 } 168 169 (pollers[i]){ poller_fds[i] }; 170 } 171 172 pthread_attr_t attr; 173 if (int ret = pthread_attr_init(&attr); 0 != ret) { 174 abort | "failed to create master epoll thread attr: " | ret | strerror(ret); 175 } 176 177 if (int ret = pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) { 178 abort | "failed to create master epoll thread: " | ret | strerror(ret); 179 } 180 } 181 182 static inline int epoll_wait(int fd, uint32_t event) { 183 if(fd_map[fd] >= 1p) { 184 fd_map[fd]->rearms++; 185 epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event); 186 return 0; 187 } 188 189 for() { 190 fd_info_t * expected = 0p; 191 fd_info_t * sentinel = 1p; 192 if(__atomic_compare_exchange_n( &(fd_map[fd]), &expected, sentinel, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) { 193 struct epoll_event eevent; 194 eevent.events = event | EPOLLET | EPOLLONESHOT; 195 eevent.data.u64 = (uint64_t)active_thread(); 196 197 int id = thread_rand() % poller_cnt; 198 if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent)) 199 { 200 abort | "epoll ctl add" | poller_fds[id] | fd | fd_map[fd] | expected | "error: " | errno | strerror(errno); 201 } 202 203 fd_info_t * ninfo = alloc(); 204 ninfo->pollid = id; 205 ninfo->rearms = 0; 206 __atomic_store_n( &fd_map[fd], ninfo, __ATOMIC_SEQ_CST); 207 208 park(); 209 return 0; 210 } 211 212 if(expected >= 0) { 213 fd_map[fd]->rearms++; 214 epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event); 215 return 0; 216 } 217 218 Pause(); 219 } 220 } 221 #endif 222 223 //================================================================================ 224 // Thread run by the C Interface 34 225 35 226 struct cfathread_object { … … 288 479 // IO operations 289 480 int cfathread_socket(int domain, int type, int protocol) { 290 return socket(domain, type, protocol); 481 return socket(domain, type 482 #if defined(EPOLL_FOR_SOCKETS) 483 | SOCK_NONBLOCK 484 #endif 485 , protocol); 291 486 } 292 487 int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) { … … 299 494 300 495 int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) { 301 return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY); 496 #if defined(EPOLL_FOR_SOCKETS) 497 int ret; 498 for() { 499 yield(); 500 ret = accept4(socket, address, address_len, SOCK_NONBLOCK); 501 if(ret >= 0) break; 502 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 503 504 epoll_wait(socket, EPOLLIN); 505 } 506 return ret; 507 #else 508 return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY); 509 #endif 302 510 } 303 511 304 512 int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) { 305 return cfa_connect(socket, address, address_len, CFA_IO_LAZY); 513 #if defined(EPOLL_FOR_SOCKETS) 514 int ret; 515 for() { 516 ret = connect(socket, address, address_len); 517 if(ret >= 0) break; 518 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 519 520 epoll_wait(socket, EPOLLIN); 521 } 522 return ret; 523 #else 524 return cfa_connect(socket, address, address_len, CFA_IO_LAZY); 525 #endif 306 526 } 307 527 … … 315 535 316 536 ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags) { 317 return cfa_sendmsg(socket, message, flags, CFA_IO_LAZY); 537 #if defined(EPOLL_FOR_SOCKETS) 538 ssize_t ret; 539 __STATS__( false, io.ops.sockwrite++; ) 540 for() { 541 ret = sendmsg(socket, message, flags); 542 if(ret >= 0) break; 543 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 544 545 __STATS__( false, io.ops.epllwrite++; ) 546 epoll_wait(socket, EPOLLOUT); 547 } 548 #else 549 ssize_t ret = cfa_sendmsg(socket, message, flags, CFA_IO_LAZY); 550 #endif 551 return ret; 318 552 } 319 553 320 554 ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) { 321 555 // Use send rather then write for socket since it's faster 322 return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY); 556 #if defined(EPOLL_FOR_SOCKETS) 557 ssize_t ret; 558 // __STATS__( false, io.ops.sockwrite++; ) 559 for() { 560 ret = send(fildes, buf, nbyte, 0); 561 if(ret >= 0) break; 562 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 563 564 // __STATS__( false, io.ops.epllwrite++; ) 565 epoll_wait(fildes, EPOLLOUT); 566 } 567 #else 568 ssize_t ret = cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY); 569 #endif 570 return ret; 323 571 } 324 572 … … 336 584 msg.msg_controllen = 0; 337 585 338 ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY); 586 #if defined(EPOLL_FOR_SOCKETS) 587 ssize_t ret; 588 yield(); 589 for() { 590 ret = recvmsg(socket, &msg, flags); 591 if(ret >= 0) break; 592 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 593 594 epoll_wait(socket, EPOLLIN); 595 } 596 #else 597 ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY); 598 #endif 339 599 340 600 if(address_len) *address_len = msg.msg_namelen; … … 344 604 ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) { 345 605 // Use recv rather then read for socket since it's faster 346 return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY); 347 } 348 349 } 606 #if defined(EPOLL_FOR_SOCKETS) 607 ssize_t ret; 608 __STATS__( false, io.ops.sockread++; ) 609 yield(); 610 for() { 611 ret = recv(fildes, buf, nbyte, 0); 612 if(ret >= 0) break; 613 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 614 615 __STATS__( false, io.ops.epllread++; ) 616 epoll_wait(fildes, EPOLLIN); 617 } 618 #else 619 ssize_t ret = cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY); 620 #endif 621 return ret; 622 } 623 624 }
Note: See TracChangeset
for help on using the changeset viewer.