#include "protocol.hfa" #define _GNU_SOURCE extern "C" { #include } #define xstr(s) str(s) #define str(s) #s #include #include #include #include #include // #include // Don't use stdio.h, too slow to compile extern "C" { int snprintf ( char * s, size_t n, const char * format, ... ); ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count); // #include } #include #include #include "options.hfa" #include "worker.hfa" #define PLAINTEXT_1WRITE #define PLAINTEXT_MEMCPY #define PLAINTEXT_NOCOPY // #define LINKED_IO #define TRUE_SENDFILE static inline __s32 wait_res( io_future_t & this ) { wait( this ); if( this.result < 0 ) {{ errno = -this.result; return -1; }} return this.result; } struct https_msg_str { char msg[512]; size_t len; }; const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 }; _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); const int http_codes[KNOWN_CODES] = { 200, 200, 400, 404, 405, 408, 413, 414, }; _Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); int code_val(HttpCode code) { return http_codes[code]; } static inline int answer( int fd, const char * it, int len ) { while(len > 0) { // Call write int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); if( ret < 0 ) { if( errno == ECONNRESET || errno == EPIPE || errno == EBADF ) { ret = close(fd); if( ret != 0 ) abort( "close in 'answer' error: (%d) %s\n", (int)errno, strerror(errno) ); return -ECONNRESET; } abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); } // update it/len it += ret; len -= ret; } return 0; } // int answer_error( int fd, HttpCode code ) { // /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); // int idx = (int)code; // return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len ); // } static int fill_header(char * it, size_t size) { memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); it += http_msgs[OK200]->len; int len = http_msgs[OK200]->len; len += snprintf(it, 512 - len, "%d \n\n", size); return len; } static int answer_header( int fd, size_t size ) { char buffer[512]; int len = fill_header(buffer, size); return answer( fd, buffer, len ); } // #if defined(PLAINTEXT_NOCOPY) // int answer_plaintext( int fd ) { // return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator // } // #elif defined(PLAINTEXT_MEMCPY) // #define TEXTSIZE 15 // int answer_plaintext( int fd ) { // char text[] = "Hello, World!\n\n"; // char ts[] = xstr(TEXTSIZE) " \n\n"; // _Static_assert(sizeof(text) - 1 == TEXTSIZE); // char buffer[512 + TEXTSIZE]; // char * it = buffer; // memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); // it += http_msgs[OK200]->len; // int len = http_msgs[OK200]->len; // memcpy(it, ts, sizeof(ts) - 1); // it += sizeof(ts) - 1; // len += sizeof(ts) - 1; // memcpy(it, text, TEXTSIZE); // return answer(fd, buffer, len + TEXTSIZE); // } // #elif defined(PLAINTEXT_1WRITE) // int answer_plaintext( int fd ) { // char text[] = "Hello, World!\n\n"; // char buffer[512 + sizeof(text)]; // char * it = buffer; // memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); // it += http_msgs[OK200]->len; // int len = http_msgs[OK200]->len; // int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); // it += r; // len += r; // memcpy(it, text, sizeof(text)); // return answer(fd, buffer, len + sizeof(text)); // } // #else // int answer_plaintext( int fd ) { // char text[] = "Hello, World!\n\n"; // int ret = answer_header(fd, sizeof(text)); // if( ret < 0 ) return ret; // return answer(fd, text, sizeof(text)); // } // #endif // int answer_empty( int fd ) { // return answer_header(fd, 0); // } static int sendfile( int pipe[2], int fd, int ans_fd, size_t count, sendfile_stats_t & stats ) { int zipf_idx = -1; STATS: for(i; zipf_cnts) { if(count <= zipf_sizes[i]) { zipf_idx = i; break STATS; } } if(zipf_idx < 0) mutex(serr) serr | "SENDFILE" | count | " greated than biggest zipf file"; #if defined(TRUE_SENDFILE) off_t offset = 0; ssize_t ret; int flags = fcntl(fd, F_GETFL); if(flags < 0) abort("getfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); ret = fcntl(fd, F_SETFL, flags | O_NONBLOCK); if(ret < 0) abort("setfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); while(count) { ret = sendfile(fd, ans_fd, &offset, count); if( ret <= 0 ) { if( errno == EAGAIN || errno == EWOULDBLOCK ) { stats.eagain++; yield(); continue; } if( errno == ECONNRESET || errno == EPIPE ) { ret = close(fd); if( ret != 0 ) abort( "close in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); return -ECONNRESET; } abort( "sendfile error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); } count -= ret; stats.splcin++; if(count > 0) stats.avgrd[zipf_idx].calls++; stats.avgrd[zipf_idx].bytes += ret; } ret = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); if(ret < 0) abort("resetfl in 'true sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); #else #error not implemented // unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; // off_t offset = 0; // ssize_t ret; // SPLICE1: while(count > 0) { // stats.tries++; // // ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); // ret = splice(ans_fd, &offset, pipe[1], 0p, count, sflags); // if( ret <= 0 ) { // if( errno == ECONNRESET || errno == EPIPE ) { // ret = close(fd); // if( ret != 0 ) abort( "close in 'sendfile splice in' error: (%d) %s\n", (int)errno, strerror(errno) ); // return -ECONNRESET; // } // abort( "splice [0] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); // } // count -= ret; // stats.splcin++; // if(count > 0) stats.avgrd[zipf_idx].calls++; // stats.avgrd[zipf_idx].bytes += ret; // size_t in_pipe = ret; // SPLICE2: while(in_pipe > 0) { // ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); // // ret = splice(pipe[0], 0p, fd, 0p, in_pipe, sflags); // if( ret <= 0 ) { // if( errno == ECONNRESET || errno == EPIPE ) { // ret = close(fd); // if( ret != 0 ) abort( "close in 'sendfile splice out' error: (%d) %s\n", (int)errno, strerror(errno) ); // return -ECONNRESET; // } // abort( "splice [1] error: %d (%d) %s\n", ret, (int)errno, strerror(errno) ); // } // stats.splcot++; // in_pipe -= ret; // } // } #endif return count; } // enum FSM_STATE { // Initial, // Retry, // Error, // Done, // }; // struct FSM_Result { // FSM_STATE state; // int error; // }; // static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } // static inline bool is_error(FSM_Result & this) { return Error == this.state; } // static inline bool is_done(FSM_Result & this) { return Done == this.state; } // static inline int error(FSM_Result & this, int error) { // this.error = error; // this.state = Error; // return error; // } // static inline int done(FSM_Result & this) { // this.state = Done; // return 0; // } // static inline int retry(FSM_Result & this) { // this.state = Retry; // return 0; // } // static inline int need(FSM_Result & this) { // switch(this.state) { // case Initial: // case Retry: // return 1; // case Error: // if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; // case Done: // return 0; // } // } // // Generator that handles sending the header // generator header_g { // io_future_t f; // const char * next; // int fd; size_t len; // FSM_Result res; // }; // static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { // this.next = it; // this.fd = fd; // this.len = len; // } // static inline void fill(header_g & this, struct io_uring_sqe * sqe) { // zero_sqe(sqe); // sqe->opcode = IORING_OP_SEND; // sqe->user_data = (uintptr_t)&this.f; // sqe->flags = IOSQE_IO_LINK; // sqe->fd = this.fd; // sqe->addr = (uintptr_t)this.next; // sqe->len = this.len; // } // static inline int error(header_g & this, int error) { // int ret = close(this.fd); // if( ret != 0 ) { // mutex(serr) serr | "Failed to close fd" | errno; // } // return error(this.res, error); // } // static inline int wait_and_process(header_g & this, sendfile_stats_t & stats) { // wait(this.f); // // Did something crazy happen? // if(this.f.result > this.len) { // mutex(serr) serr | "HEADER sent too much!"; // return error(this, -ERANGE); // } // // Something failed? // if(this.f.result < 0) { // int error = -this.f.result; // if( error == ECONNRESET ) return error(this, -ECONNRESET); // if( error == EPIPE ) return error(this, -EPIPE); // if( error == ECANCELED ) { // mutex(serr) serr | "HEADER was cancelled, WTF!"; // return error(this, -ECONNRESET); // } // if( error == EAGAIN || error == EWOULDBLOCK) { // mutex(serr) serr | "HEADER got eagain, WTF!"; // return error(this, -ECONNRESET); // } // } // // Done? // if(this.f.result == this.len) { // return done(this.res); // } // stats.header++; // // It must be a Short read // this.len -= this.f.result; // this.next += this.f.result; // reset(this.f); // return retry(this.res); // } // // Generator that handles splicing in a file // struct splice_in_t { // io_future_t f; // int fd; int pipe; size_t len; off_t off; // short zipf_idx; // FSM_Result res; // }; // static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { // this.fd = fd; // this.pipe = pipe; // this.len = len; // this.off = 0; // this.zipf_idx = -1; // STATS: for(i; zipf_cnts) { // if(len <= zipf_sizes[i]) { // this.zipf_idx = i; // break STATS; // } // } // if(this.zipf_idx < 0) mutex(serr) serr | "SPLICE IN" | len | " greated than biggest zipf file"; // } // static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { // zero_sqe(sqe); // sqe->opcode = IORING_OP_SPLICE; // sqe->user_data = (uintptr_t)&this.f; // sqe->flags = 0; // sqe->splice_fd_in = this.fd; // sqe->splice_off_in = this.off; // sqe->fd = this.pipe; // sqe->off = (__u64)-1; // sqe->len = this.len; // sqe->splice_flags = SPLICE_F_MOVE; // } // static inline int wait_and_process(splice_in_t & this, sendfile_stats_t & stats ) { // wait(this.f); // // Something failed? // if(this.f.result < 0) { // int error = -this.f.result; // if( error == ECONNRESET ) return error(this.res, -ECONNRESET); // if( error == EPIPE ) return error(this.res, -EPIPE); // if( error == ECANCELED ) { // mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; // return error(this.res, -ECONNRESET); // } // if( error == EAGAIN || error == EWOULDBLOCK) { // mutex(serr) serr | "SPLICE IN got eagain, WTF!"; // return error(this.res, -ECONNRESET); // } // mutex(serr) serr | "SPLICE IN got" | error | ", WTF!"; // return error(this.res, -ECONNRESET); // } // // Did something crazy happen? // if(this.f.result > this.len) { // mutex(serr) serr | "SPLICE IN spliced too much!"; // return error(this.res, -ERANGE); // } // // Done? // if(this.f.result == this.len) { // return done(this.res); // } // stats.splcin++; // stats.avgrd[this.zipf_idx].calls++; // stats.avgrd[this.zipf_idx].bytes += this.f.result; // // It must be a Short read // this.len -= this.f.result; // this.off += this.f.result; // reset(this.f); // return retry(this.res); // } // generator splice_out_g { // io_future_t f; // int pipe; int fd; size_t len; // FSM_Result res; // }; // static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { // this.pipe = pipe; // this.fd = fd; // this.len = len; // } // static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { // zero_sqe(sqe); // sqe->opcode = IORING_OP_SPLICE; // sqe->user_data = (uintptr_t)&this.f; // sqe->flags = 0; // sqe->splice_fd_in = this.pipe; // sqe->splice_off_in = (__u64)-1; // sqe->fd = this.fd; // sqe->off = (__u64)-1; // sqe->len = this.len; // sqe->splice_flags = SPLICE_F_MOVE; // } // static inline int error(splice_out_g & this, int error) { // int ret = close(this.fd); // if( ret != 0 ) { // mutex(serr) serr | "Failed to close fd" | errno; // } // return error(this.res, error); // } // static inline void wait_and_process(splice_out_g & this, sendfile_stats_t & stats ) { // wait(this.f); // // Something failed? // if(this.f.result < 0) { // int error = -this.f.result; // if( error == ECONNRESET ) return error(this, -ECONNRESET); // if( error == EPIPE ) return error(this, -EPIPE); // if( error == ECANCELED ) { // this.f.result = 0; // goto SHORT_WRITE; // } // if( error == EAGAIN || error == EWOULDBLOCK) { // mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; // return error(this, -ECONNRESET); // } // mutex(serr) serr | "SPLICE OUT got" | error | ", WTF!"; // return error(this, -ECONNRESET); // } // // Did something crazy happen? // if(this.f.result > this.len) { // mutex(serr) serr | "SPLICE OUT spliced too much!" | this.f.result | ">" | this.len; // return error(this.res, -ERANGE); // } // // Done? // if(this.f.result == this.len) { // return done(this.res); // } // SHORT_WRITE: // stats.splcot++; // // It must be a Short Write // this.len -= this.f.result; // reset(this.f); // return retry(this.res); // } int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize, sendfile_stats_t & stats ) { stats.calls++; #if defined(LINKED_IO) // char buffer[512]; // int len = fill_header(buffer, fsize); // header_g header = { fd, buffer, len }; // splice_in_t splice_in = { ans_fd, pipe[1], fsize }; // splice_out_g splice_out = { pipe[0], fd, fsize }; // RETRY_LOOP: for() { // stats.tries++; // int have = need(header.res) + need(splice_in.res) + 1; // int idx = 0; // struct io_uring_sqe * sqes[3]; // __u32 idxs[3]; // struct io_context$ * ctx = cfa_io_allocate(sqes, idxs, have); // if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } // if(need( header.res)) { fill(header , sqes[idx++]); } // fill(splice_out, sqes[idx]); // // Submit everything // asm volatile("": : :"memory"); // cfa_io_submit( ctx, idxs, have, false ); // // wait for the results // // Always wait for splice-in to complete as // // we may need to kill the connection if it fails // // If it already completed, this is a no-op // wait_and_process(splice_in, stats); // if(is_error(splice_in.res)) { // if(splice_in.res.error == -EPIPE) return -ECONNRESET; // mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; // int ret = close(fd); // if( ret != 0 ) abort( "close in 'answer sendfile' error: (%d) %s\n", (int)errno, strerror(errno) ); // } // // Process the other 2 // wait_and_process(header, stats); // wait_and_process(splice_out, stats); // if(is_done(splice_out.res)) { // break RETRY_LOOP; // } // // We need to wait for the completion if // // - both completed // // - the header failed // // - // if( is_error(header.res) // || is_error(splice_in.res) // || is_error(splice_out.res)) { // return -ECONNRESET; // } // } // return len + fsize; #else int ret = answer_header(fd, fsize); if( ret < 0 ) { return ret; } return sendfile(pipe, fd, ans_fd, fsize, stats); #endif } [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) { char * it = buffer; size_t count = len - 1; int rlen = 0; READ: for() { int ret; if( f ) { ret = wait_res(*f); reset(*f); f = 0p; } else { ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); } // int ret = read(fd, (void*)it, count); if(ret == 0 ) { ret = close(fd); if( ret != 0 ) abort( "close in 'http read good' error: (%d) %s\n", (int)errno, strerror(errno) ); return [OK200, true, 0, 0]; } if(ret < 0 ) { if( errno == ECONNRESET || errno == EPIPE ) { ret = close(fd); if( ret != 0 ) abort( "close in 'http read bad' error: (%d) %s\n", (int)errno, strerror(errno) ); return [E408, true, 0, 0]; } abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); } it[ret + 1] = '\0'; rlen += ret; if( strstr( it, "\r\n\r\n" ) ) break; it += ret; count -= ret; if( count < 1 ) return [E414, false, 0, 0]; } if( options.log ) { write(sout, buffer, rlen); sout | nl; } it = buffer; int ret = memcmp(it, "GET /", 5); if( ret != 0 ) return [E400, false, 0, 0]; it += 5; char * end = strstr( it, " " ); return [OK200, false, it, end - it]; } //============================================================================================= #include #include #include const char * original_http_msgs[] = { "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ", "HTTP/1.1 200 OK\r\nServer: HttpForall\r\nConnection: keep-alive\r\nContent-Length: 15\r\nContent-Type: text/html\r\nDate: %s \r\n\r\nHello, World!\r\n", "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", }; struct date_buffer { https_msg_str strs[KNOWN_CODES]; }; thread DateFormater { int idx; date_buffer buffers[2]; }; void ?{}( DateFormater & this ) { ((thread&)this){ "Server Date Thread" }; this.idx = 0; memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); memset( &this.buffers[1], 0, sizeof(this.buffers[1]) ); } void main(DateFormater & this) { LOOP: for() { waitfor( ^?{} : this) { break LOOP; } or else {} char buff[100]; Time now = timeHiRes(); strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); // if( options.log ) sout | "Updated date to '" | buff | "'"; for(i; KNOWN_CODES) { size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff ); this.buffers[this.idx].strs[i].len = len; } for(i; KNOWN_CODES) { https_msg_str * next = &this.buffers[this.idx].strs[i]; __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST); } this.idx = (this.idx + 1) % 2; // if( options.log ) sout | "Date thread sleeping"; sleep(1`s); } } //============================================================================================= DateFormater * the_date_formatter; void init_protocol(void) { the_date_formatter = alloc(); (*the_date_formatter){}; } void deinit_protocol(void) { ^(*the_date_formatter){}; free( the_date_formatter ); }