#include "worker.hfa" #include #include #include #include #include #include #include #include "options.hfa" #include "protocol.hfa" #include "filecache.hfa" static const unsigned long long period = 5_000_000; //============================================================================================= // Generic connection handling //============================================================================================= static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) { REQUEST: for() { bool closed; HttpCode code; const char * file; size_t name_size; // Read the http request if( options.log ) mutex(sout) sout | "=== Reading request ==="; [code, closed, file, name_size] = http_read(fd, buffer, len, f); f = 0p; // if we are done, break out of the loop if( closed ) { if( code != OK200 ) this.stats.sendfile.error++; break REQUEST; } // If this wasn't a request retrun 400 if( code != OK200 ) { abort | "=== Invalid Request :" | code_val(code) | "==="; // answer_error(fd, code); // continue REQUEST; } // if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { // if( options.log ) mutex(sout) sout | "=== Request for /plaintext ==="; // int ret = answer_plaintext(fd); // if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; } // if( options.log ) mutex(sout) sout | "=== Answer sent ==="; // continue REQUEST; // } // if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { // if( options.log ) mutex(sout) sout | "=== Request for /ping ==="; // // Send the header // int ret = answer_empty(fd); // if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; } // if( options.log ) mutex(sout) sout | "=== Answer sent ==="; // continue REQUEST; // } if( options.log ) { sout | "=== Request for file " | nonl; write(sout, file, name_size); sout | " ==="; } if( !options.file_cache.path ) { // if( options.log ) { serr | "=== File Not Found (" | nonl; write(serr, file, name_size); serr | ") ==="; abort(); // } // answer_error(fd, E405); // continue REQUEST; } // Get the fd from the file cache int ans_fd; size_t count; [ans_fd, count] = get_file( file, name_size ); // If we can't find the file, return 404 if( ans_fd < 0 ) { // if( options.log ) { serr | "=== File Not Found 2 (" | nonl; write(serr, file, name_size); serr | ") ==="; abort(); // } // answer_error(fd, E404); // continue REQUEST; } // Send the desired file int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); if(ret < 0) { if( ret == -ECONNABORTED ) { this.stats.sendfile.error++; break REQUEST; } if( ret == -ECONNRESET ) { this.stats.sendfile.error++; break REQUEST; } if( ret == -EPIPE ) { this.stats.sendfile.error++; break REQUEST; } if( ret == -EBADF ) { this.stats.sendfile.error++; break REQUEST; } abort( "answer sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); } if( options.log ) mutex(sout) sout | "=== Answer sent ==="; } this.stats.sendfile.close++; if (stats_thrd) { // unsigned long long next = rdtscl(); // if(next > (last + period)) { if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { push(this.stats.sendfile, stats_thrd->stats.send); unlock(stats_thrd->stats.lock); // last = next; } // } } } //============================================================================================= // Self Accepting Worker Thread //============================================================================================= void ?{}( AcceptWorker & this ) { size_t cli = rand() % options.clopts.cltr_cnt; ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; options.clopts.thrd_cnt[cli]++; this.done = false; } void main( AcceptWorker & this ) { park(); unsigned long long last = rdtscl(); /* paranoid */ assert( this.conn.pipe[0] != -1 ); /* paranoid */ assert( this.conn.pipe[1] != -1 ); for() { if( options.log ) mutex(sout) sout | "=== Accepting connection ==="; int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY ); if(fd <= 0) { if( errno == ECONNABORTED ) break; if( this.done && (errno == EINVAL || errno == EBADF) ) break; abort( "accept error %d: (%d) %s\n", fd, (int)errno, strerror(errno) ); } if(this.done) break; this.stats.accepts++; if (stats_thrd && try_lock(stats_thrd->stats.lock)) { push(this.stats, stats_thrd->stats.accpt); unlock(stats_thrd->stats.lock); } if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; size_t len = options.socket.buflen; char buffer[len]; handle_connection( this.conn, fd, buffer, len, 0p, last ); this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, fd); if( options.log ) mutex(sout) sout | "=== Connection closed ==="; } } //============================================================================================= // Channel Worker Thread //============================================================================================= void ?{}( ChannelWorker & this ) { size_t cli = rand() % options.clopts.cltr_cnt; ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; options.clopts.thrd_cnt[cli]++; this.done = false; } void main( ChannelWorker & this ) { park(); unsigned long long last = rdtscl(); /* paranoid */ assert( this.conn.pipe[0] != -1 ); /* paranoid */ assert( this.conn.pipe[1] != -1 ); this.conn.stats.sendfile.maxfd = max(this.conn.pipe[0], this.conn.pipe[1]); // this.conn.stats.sendfile.maxfd = 0; for() { size_t len = options.socket.buflen; char buffer[len]; PendingRead p; p.next = 0p; p.in.buf = (void*)buffer; p.in.len = len; push(*this.queue, &p); if( options.log ) mutex(sout) sout | "=== Waiting new connection ==="; handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); if(this.done) break; this.conn.stats.sendfile.maxfd = max(this.conn.stats.sendfile.maxfd, p.out.fd); this.conn.stats.sendfile.close++; if( options.log ) mutex(sout) sout | "=== Connection closed ==="; } lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2); push(this.conn.stats.sendfile, stats_thrd->stats.send); unlock(stats_thrd->stats.lock); } extern "C" { extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); } void ?{}( Acceptor & this, int cli ) { ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 }; options.clopts.thrd_cnt[cli]++; this.done = false; } static inline __s32 get_res( io_future_t & this ) { if( this.result < 0 ) {{ errno = -this.result; return -1; }} return this.result; } static inline void push_connection( Acceptor & this, int fd ) { this.stats.accepts++; PendingRead * p = 0p; for() { if(this.done) return; p = pop(*this.queue); if(p) break; // abort( "Too few threads" ); yield(); this.stats.creates++; }; p->out.fd = fd; async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY); } // #define ACCEPT_SPIN #define ACCEPT_ONE // #define ACCEPT_MANY void main( Acceptor & this ) { park(); unsigned long long last = rdtscl(); #if defined(ACCEPT_SPIN) if( options.log ) sout | "=== Accepting connection ==="; for() { int fd = accept4(this.sockfd, this.[addr, addrlen, flags]); if(fd < 0) { if( errno == EWOULDBLOCK) { this.stats.eagains++; yield(); continue; } if( errno == ECONNABORTED ) break; if( this.done && (errno == EINVAL || errno == EBADF) ) break; abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); } if(this.done) return; if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; if(fd) push_connection(this, fd); if (stats_thrd) { unsigned long long next = rdtscl(); if(next > (last + period)) { if(try_lock(stats_thrd->stats.lock)) { push(this.stats, stats_thrd->stats.accpt); unlock(stats_thrd->stats.lock); last = next; } } } if( options.log ) sout | "=== Accepting connection ==="; } #elif defined(ACCEPT_ONE) if( options.log ) sout | "=== Accepting connection ==="; for() { int fd = cfa_accept4(this.sockfd, this.[addr, addrlen, flags], 0); if(fd < 0) { if( errno == ECONNABORTED ) break; if( this.done && (errno == EINVAL || errno == EBADF) ) break; abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); } if(this.done) return; if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; if(fd) push_connection(this, fd); if (stats_thrd) { // unsigned long long next = rdtscl(); // if(next > (last + period)) { if(try_lock(stats_thrd->stats.lock)) { push(this.stats, stats_thrd->stats.accpt); unlock(stats_thrd->stats.lock); // last = next; } // } } if( options.log ) sout | "=== Accepting connection ==="; } #elif defined(ACCEPT_MANY) const int nacc = 10; io_future_t results[nacc]; for(i; nacc) { io_future_t & res = results[i]; reset(res); /* paranoid */ assert(!available(res)); if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ==="; async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY); } for() { if (stats_thrd) { unsigned long long next = rdtscl(); if(next > (last + period)) { if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { push(this.stats, stats_thrd->stats.accpt); unlock(stats_thrd->stats.lock); last = next; } } } for(i; nacc) { io_future_t & res = results[i]; if(available(res)) { if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "==="; int fd = get_res(res); reset(res); if(fd < 0) { if( errno == ECONNABORTED ) continue; if( this.done && (errno == EINVAL || errno == EBADF) ) continue; abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); } push_connection( this, fd ); /* paranoid */ assert(!available(res)); if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ==="; async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY); } } if(this.done) return; if( options.log ) mutex(sout) sout | "=== Waiting for any accept ==="; this.stats.eagains++; wait_any(results, nacc); if( options.log ) mutex(sout) { sout | "=== Acceptor wake-up ==="; for(i; nacc) { io_future_t & res = results[i]; sout | i | "available:" | available(res); } } } for(i; nacc) { wait(results[i]); } #else #error no accept algorithm specified #endif lock(stats_thrd->stats.lock); push(this.stats, stats_thrd->stats.accpt); unlock(stats_thrd->stats.lock); }