Index: libcfa/src/concurrency/clib/cfathread.cfa
===================================================================
--- libcfa/src/concurrency/clib/cfathread.cfa	(revision db614d00deb5a401a1eae82834730b55a1446e7b)
+++ libcfa/src/concurrency/clib/cfathread.cfa	(revision 4500e5256e164106f39ba3ca0b1c8f11635711c2)
@@ -14,4 +14,6 @@
 //
 
+#define EPOLL_FOR_SOCKETS
+
 #include "fstream.hfa"
 #include "locks.hfa"
@@ -23,13 +25,202 @@
 #include "cfathread.h"
 
+extern "C" {
+		#include <string.h>
+		#include <errno.h>
+}
+
 extern void ?{}(processor &, const char[], cluster &, thread$ *);
 extern "C" {
       extern void __cfactx_invoke_thread(void (*main)(void *), void * this);
+	extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
 }
 
 extern Time __kernel_get_time();
+extern unsigned register_proc_id( void );
 
 //================================================================================
-// Thread run y the C Interface
+// Epoll support for sockets
+
+#if defined(EPOLL_FOR_SOCKETS)
+	extern "C" {
+		#include <sys/epoll.h>
+		#include <sys/resource.h>
+	}
+
+	static pthread_t master_poller;
+	static int master_epollfd = 0;
+	static size_t poller_cnt = 0;
+	static int * poller_fds = 0p;
+	static struct leaf_poller * pollers = 0p;
+
+	struct __attribute__((aligned)) fd_info_t {
+		int pollid;
+		size_t rearms;
+	};
+	rlim_t fd_limit = 0;
+	static fd_info_t * volatile * fd_map = 0p;
+
+	void * master_epoll( __attribute__((unused)) void * args ) {
+		unsigned id = register_proc_id();
+
+		enum { MAX_EVENTS = 5 };
+		struct epoll_event events[MAX_EVENTS];
+		for() {
+			int ret = epoll_wait(master_epollfd, events, MAX_EVENTS, -1);
+			if ( ret < 0 ) {
+				abort | "Master epoll error: " | strerror(errno);
+			}
+
+			for(i; ret) {
+				thread$ * thrd = (thread$ *)events[i].data.u64;
+				unpark( thrd );
+			}
+		}
+
+		return 0p;
+	}
+
+	static inline int epoll_rearm(int epollfd, int fd, uint32_t event) {
+		struct epoll_event eevent;
+		eevent.events = event | EPOLLET | EPOLLONESHOT;
+		eevent.data.u64 = (uint64_t)active_thread();
+
+		if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &eevent))
+		{
+			if(errno == ENOENT) return -1;
+			abort | acquire | "epoll" | epollfd | "ctl rearm" | fd | "error: " | errno | strerror(errno);
+		}
+
+		park();
+		return 0;
+	}
+
+	thread leaf_poller {
+		int epollfd;
+	};
+
+	void ?{}(leaf_poller & this, int fd) { this.epollfd = fd; }
+
+	void main(leaf_poller & this) {
+		enum { MAX_EVENTS = 1024 };
+		struct epoll_event events[MAX_EVENTS];
+		const int max_retries = 5;
+		int retries = max_retries;
+
+		struct epoll_event event;
+		event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
+		event.data.u64 = (uint64_t)&(thread&)this;
+
+		if(0 != epoll_ctl(master_epollfd, EPOLL_CTL_ADD, this.epollfd, &event))
+		{
+			abort | "master epoll ctl add leaf: " | errno | strerror(errno);
+		}
+
+		park();
+
+		for() {
+			yield();
+			int ret = epoll_wait(this.epollfd, events, MAX_EVENTS, 0);
+			if ( ret < 0 ) {
+				abort | "Leaf epoll error: " | errno | strerror(errno);
+			}
+
+			if(ret) {
+				for(i; ret) {
+					thread$ * thrd = (thread$ *)events[i].data.u64;
+					unpark( thrd );
+				}
+			}
+			else if(0 >= --retries) {
+				epoll_rearm(master_epollfd, this.epollfd, EPOLLIN);
+			}
+		}
+	}
+
+	void setup_epoll( void ) __attribute__(( constructor ));
+	void setup_epoll( void ) {
+		if(master_epollfd) abort | "Master epoll already setup";
+
+		master_epollfd = epoll_create1(0);
+		if(master_epollfd == -1) {
+			abort | "failed to create master epoll: " | errno | strerror(errno);
+		}
+
+		struct rlimit rlim;
+		if(int ret = getrlimit(RLIMIT_NOFILE, &rlim); 0 != ret) {
+			abort | "failed to get nofile limit: " | errno | strerror(errno);
+		}
+
+		fd_limit = rlim.rlim_cur;
+		fd_map = alloc(fd_limit);
+		for(i;fd_limit) {
+			fd_map[i] = 0p;
+		}
+
+		poller_cnt = 24;
+		poller_fds = alloc(poller_cnt);
+		pollers    = alloc(poller_cnt);
+		for(i; poller_cnt) {
+			poller_fds[i] = epoll_create1(0);
+			if(poller_fds[i] == -1) {
+				abort | "failed to create leaf epoll [" | i | "]: " | errno | strerror(errno);
+			}
+
+			(pollers[i]){ poller_fds[i] };
+		}
+
+		pthread_attr_t attr;
+		if (int ret = pthread_attr_init(&attr); 0 != ret) {
+			abort | "failed to create master epoll thread attr: " | ret | strerror(ret);
+		}
+
+		if (int ret = pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) {
+			abort | "failed to create master epoll thread: " | ret | strerror(ret);
+		}
+	}
+
+	static inline int epoll_wait(int fd, uint32_t event) {
+		if(fd_map[fd] >= 1p) {
+			fd_map[fd]->rearms++;
+			epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event);
+			return 0;
+		}
+
+		for() {
+			fd_info_t * expected = 0p;
+			fd_info_t * sentinel = 1p;
+			if(__atomic_compare_exchange_n( &(fd_map[fd]), &expected, sentinel, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) {
+				struct epoll_event eevent;
+				eevent.events = event | EPOLLET | EPOLLONESHOT;
+				eevent.data.u64 = (uint64_t)active_thread();
+
+				int id = thread_rand() % poller_cnt;
+				if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent))
+				{
+					abort | "epoll ctl add" | poller_fds[id] | fd | fd_map[fd] | expected | "error: " | errno | strerror(errno);
+				}
+
+				fd_info_t * ninfo = alloc();
+				ninfo->pollid = id;
+				ninfo->rearms = 0;
+				__atomic_store_n( &fd_map[fd], ninfo, __ATOMIC_SEQ_CST);
+
+				park();
+				return 0;
+			}
+
+			if(expected >= 0) {
+				fd_map[fd]->rearms++;
+				epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event);
+				return 0;
+			}
+
+			Pause();
+		}
+	}
+#endif
+
+//================================================================================
+// Thread run by the C Interface
 
 struct cfathread_object {
@@ -288,5 +479,9 @@
 	// IO operations
 	int cfathread_socket(int domain, int type, int protocol) {
-		return socket(domain, type, protocol);
+		return socket(domain, type
+		#if defined(EPOLL_FOR_SOCKETS)
+			| SOCK_NONBLOCK
+		#endif
+		, protocol);
 	}
 	int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) {
@@ -299,9 +494,34 @@
 
 	int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) {
-		return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
+		#if defined(EPOLL_FOR_SOCKETS)
+			int ret;
+			for() {
+				yield();
+				ret = accept4(socket, address, address_len, SOCK_NONBLOCK);
+				if(ret >= 0) break;
+				if(errno != EAGAIN && errno != EWOULDBLOCK) break;
+
+				epoll_wait(socket, EPOLLIN);
+			}
+			return ret;
+		#else
+			return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
+		#endif
 	}
 
 	int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) {
-		return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
+		#if defined(EPOLL_FOR_SOCKETS)
+			int ret;
+			for() {
+				ret = connect(socket, address, address_len);
+				if(ret >= 0) break;
+				if(errno != EAGAIN && errno != EWOULDBLOCK) break;
+
+				epoll_wait(socket, EPOLLIN);
+			}
+			return ret;
+		#else
+			return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
+		#endif
 	}
 
@@ -315,10 +535,38 @@
 
 	ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags) {
-		return cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
+		#if defined(EPOLL_FOR_SOCKETS)
+			ssize_t ret;
+			__STATS__( false, io.ops.sockwrite++; )
+			for() {
+				ret = sendmsg(socket, message, flags);
+				if(ret >= 0) break;
+				if(errno != EAGAIN && errno != EWOULDBLOCK) break;
+
+				__STATS__( false, io.ops.epllwrite++; )
+				epoll_wait(socket, EPOLLOUT);
+			}
+		#else
+			ssize_t ret = cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
+		#endif
+		return ret;
 	}
 
 	ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) {
 		// Use send rather then write for socket since it's faster
-		return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
+		#if defined(EPOLL_FOR_SOCKETS)
+			ssize_t ret;
+			// __STATS__( false, io.ops.sockwrite++; )
+			for() {
+				ret = send(fildes, buf, nbyte, 0);
+				if(ret >= 0) break;
+				if(errno != EAGAIN && errno != EWOULDBLOCK) break;
+
+				// __STATS__( false, io.ops.epllwrite++; )
+				epoll_wait(fildes, EPOLLOUT);
+			}
+		#else
+			ssize_t ret = cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
+		#endif
+		return ret;
 	}
 
@@ -336,5 +584,17 @@
 		msg.msg_controllen = 0;
 
-		ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
+		#if defined(EPOLL_FOR_SOCKETS)
+			ssize_t ret;
+			yield();
+			for() {
+				ret = recvmsg(socket, &msg, flags);
+				if(ret >= 0) break;
+				if(errno != EAGAIN && errno != EWOULDBLOCK) break;
+
+				epoll_wait(socket, EPOLLIN);
+			}
+		#else
+			ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
+		#endif
 
 		if(address_len) *address_len = msg.msg_namelen;
@@ -344,6 +604,21 @@
 	ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) {
 		// Use recv rather then read for socket since it's faster
-		return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
-	}
-
-}
+		#if defined(EPOLL_FOR_SOCKETS)
+			ssize_t ret;
+			__STATS__( false, io.ops.sockread++; )
+			yield();
+			for() {
+				ret = recv(fildes, buf, nbyte, 0);
+				if(ret >= 0) break;
+				if(errno != EAGAIN && errno != EWOULDBLOCK) break;
+
+				__STATS__( false, io.ops.epllread++; )
+				epoll_wait(fildes, EPOLLIN);
+			}
+		#else
+			ssize_t ret = cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
+		#endif
+		return ret;
+	}
+
+}
