#include "protocol.hfa" #define _GNU_SOURCE extern "C" { #include } #define xstr(s) str(s) #define str(s) #s #include #include #include #include #include // #include // Don't use stdio.h, too slow to compile extern "C" { int snprintf ( char * s, size_t n, const char * format, ... ); // #include } #include #include #include "options.hfa" #define PLAINTEXT_1WRITE #define PLAINTEXT_MEMCPY #define PLAINTEXT_NOCOPY #define LINKED_IO struct https_msg_str { char msg[512]; size_t len; }; const https_msg_str * volatile http_msgs[KNOWN_CODES] = { 0 }; _Static_assert( KNOWN_CODES == (sizeof(http_msgs ) / sizeof(http_msgs [0]))); const int http_codes[KNOWN_CODES] = { 200, 200, 400, 404, 405, 408, 413, 414, }; _Static_assert( KNOWN_CODES == (sizeof(http_codes) / sizeof(http_codes[0]))); int code_val(HttpCode code) { return http_codes[code]; } static inline int answer( int fd, const char * it, int len ) { while(len > 0) { // Call write int ret = cfa_send(fd, it, len, 0, CFA_IO_LAZY); if( ret < 0 ) { if( errno == ECONNRESET || errno == EPIPE ) { close(fd); return -ECONNRESET; } if( errno == EAGAIN || errno == EWOULDBLOCK) return -EAGAIN; abort( "'answer error' error: (%d) %s\n", (int)errno, strerror(errno) ); } // update it/len it += ret; len -= ret; } return 0; } int answer_error( int fd, HttpCode code ) { /* paranoid */ assert( code < KNOWN_CODES && code != OK200 ); int idx = (int)code; return answer( fd, http_msgs[idx]->msg, http_msgs[idx]->len ); } static int fill_header(char * it, size_t size) { memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); it += http_msgs[OK200]->len; int len = http_msgs[OK200]->len; len += snprintf(it, 512 - len, "%d \n\n", size); return len; } static int answer_header( int fd, size_t size ) { char buffer[512]; int len = fill_header(buffer, size); return answer( fd, buffer, len ); } #if defined(PLAINTEXT_NOCOPY) int answer_plaintext( int fd ) { return answer(fd, http_msgs[OK200_PlainText]->msg, http_msgs[OK200_PlainText]->len); // +1 cause snprintf doesn't count nullterminator } #elif defined(PLAINTEXT_MEMCPY) #define TEXTSIZE 15 int answer_plaintext( int fd ) { char text[] = "Hello, World!\n\n"; char ts[] = xstr(TEXTSIZE) " \n\n"; _Static_assert(sizeof(text) - 1 == TEXTSIZE); char buffer[512 + TEXTSIZE]; char * it = buffer; memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); it += http_msgs[OK200]->len; int len = http_msgs[OK200]->len; memcpy(it, ts, sizeof(ts) - 1); it += sizeof(ts) - 1; len += sizeof(ts) - 1; memcpy(it, text, TEXTSIZE); return answer(fd, buffer, len + TEXTSIZE); } #elif defined(PLAINTEXT_1WRITE) int answer_plaintext( int fd ) { char text[] = "Hello, World!\n\n"; char buffer[512 + sizeof(text)]; char * it = buffer; memcpy(it, http_msgs[OK200]->msg, http_msgs[OK200]->len); it += http_msgs[OK200]->len; int len = http_msgs[OK200]->len; int r = snprintf(it, 512 - len, "%d \n\n", sizeof(text)); it += r; len += r; memcpy(it, text, sizeof(text)); return answer(fd, buffer, len + sizeof(text)); } #else int answer_plaintext( int fd ) { char text[] = "Hello, World!\n\n"; int ret = answer_header(fd, sizeof(text)); if( ret < 0 ) return ret; return answer(fd, text, sizeof(text)); } #endif int answer_empty( int fd ) { return answer_header(fd, 0); } static int sendfile( int pipe[2], int fd, int ans_fd, size_t count ) { unsigned sflags = SPLICE_F_MOVE; // | SPLICE_F_MORE; off_t offset = 0; ssize_t ret; SPLICE1: while(count > 0) { ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, sflags, CFA_IO_LAZY); if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; if( errno == ECONNRESET ) return -ECONNRESET; if( errno == EPIPE ) return -EPIPE; abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) ); } count -= ret; offset += ret; size_t in_pipe = ret; SPLICE2: while(in_pipe > 0) { ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, sflags, CFA_IO_LAZY); if( ret < 0 ) { if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; if( errno == ECONNRESET ) return -ECONNRESET; if( errno == EPIPE ) return -EPIPE; abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) ); } in_pipe -= ret; } } return count; } static void zero_sqe(struct io_uring_sqe * sqe) { sqe->flags = 0; sqe->ioprio = 0; sqe->fd = 0; sqe->off = 0; sqe->addr = 0; sqe->len = 0; sqe->fsync_flags = 0; sqe->__pad2[0] = 0; sqe->__pad2[1] = 0; sqe->__pad2[2] = 0; sqe->fd = 0; sqe->off = 0; sqe->addr = 0; sqe->len = 0; } enum FSM_STATE { Initial, Retry, Error, Done, }; struct FSM_Result { FSM_STATE state; int error; }; static inline void ?{}(FSM_Result & this) { this.state = Initial; this.error = 0; } static inline bool is_error(FSM_Result & this) { return Error == this.state; } static inline bool is_done(FSM_Result & this) { return Done == this.state; } static inline int error(FSM_Result & this, int error) { this.error = error; this.state = Error; return error; } static inline int done(FSM_Result & this) { this.state = Done; return 0; } static inline int retry(FSM_Result & this) { this.state = Retry; return 0; } static inline int need(FSM_Result & this) { switch(this.state) { case Initial: case Retry: return 1; case Error: if(this.error == 0) mutex(serr) serr | "State marked error but code is 0"; case Done: return 0; } } // Generator that handles sending the header generator header_g { io_future_t f; const char * next; int fd; size_t len; FSM_Result res; }; static inline void ?{}(header_g & this, int fd, const char * it, size_t len ) { this.next = it; this.fd = fd; this.len = len; } static inline void fill(header_g & this, struct io_uring_sqe * sqe) { zero_sqe(sqe); sqe->opcode = IORING_OP_SEND; sqe->user_data = (uintptr_t)&this.f; sqe->flags = IOSQE_IO_LINK; sqe->fd = this.fd; sqe->addr = (uintptr_t)this.next; sqe->len = this.len; } static inline int error(header_g & this, int error) { int ret = close(this.fd); if( ret != 0 ) { mutex(serr) serr | "Failed to close fd" | errno; } return error(this.res, error); } static inline int wait_and_process(header_g & this) { wait(this.f); // Did something crazy happen? if(this.f.result > this.len) { mutex(serr) serr | "HEADER sent too much!"; return error(this, -ERANGE); } // Something failed? if(this.f.result < 0) { int error = -this.f.result; if( error == ECONNRESET ) return error(this, -ECONNRESET); if( error == EPIPE ) return error(this, -EPIPE); if( error == ECANCELED ) { mutex(serr) serr | "HEADER was cancelled, WTF!"; return error(this, -ECONNRESET); } if( error == EAGAIN || error == EWOULDBLOCK) { mutex(serr) serr | "HEADER got eagain, WTF!"; return error(this, -ECONNRESET); } } // Done? if(this.f.result == this.len) { return done(this.res); } // It must be a Short read this.len -= this.f.result; this.next += this.f.result; reset(this.f); return retry(this.res); } // Generator that handles splicing in a file struct splice_in_t { io_future_t f; int fd; int pipe; size_t len; off_t off; FSM_Result res; }; static inline void ?{}(splice_in_t & this, int fd, int pipe, size_t len) { this.fd = fd; this.pipe = pipe; this.len = len; this.off = 0; } static inline void fill(splice_in_t & this, struct io_uring_sqe * sqe) { zero_sqe(sqe); sqe->opcode = IORING_OP_SPLICE; sqe->user_data = (uintptr_t)&this.f; sqe->flags = 0; sqe->splice_fd_in = this.fd; sqe->splice_off_in = this.off; sqe->fd = this.pipe; sqe->off = (__u64)-1; sqe->len = this.len; sqe->splice_flags = SPLICE_F_MOVE; } static inline int wait_and_process(splice_in_t & this) { wait(this.f); // Did something crazy happen? if(this.f.result > this.len) { mutex(serr) serr | "SPLICE IN spliced too much!"; return error(this.res, -ERANGE); } // Something failed? if(this.f.result < 0) { int error = -this.f.result; if( error == ECONNRESET ) return error(this.res, -ECONNRESET); if( error == EPIPE ) return error(this.res, -EPIPE); if( error == ECANCELED ) { mutex(serr) serr | "SPLICE IN was cancelled, WTF!"; return error(this.res, -ECONNRESET); } if( error == EAGAIN || error == EWOULDBLOCK) { mutex(serr) serr | "SPLICE IN got eagain, WTF!"; return error(this.res, -ECONNRESET); } } // Done? if(this.f.result == this.len) { return done(this.res); } // It must be a Short read this.len -= this.f.result; this.off += this.f.result; reset(this.f); return retry(this.res); } generator splice_out_g { io_future_t f; int pipe; int fd; size_t len; FSM_Result res; }; static inline void ?{}(splice_out_g & this, int pipe, int fd, size_t len) { this.pipe = pipe; this.fd = fd; this.len = len; } static inline void fill(splice_out_g & this, struct io_uring_sqe * sqe) { zero_sqe(sqe); sqe->opcode = IORING_OP_SPLICE; sqe->user_data = (uintptr_t)&this.f; sqe->flags = 0; sqe->splice_fd_in = this.pipe; sqe->splice_off_in = (__u64)-1; sqe->fd = this.fd; sqe->off = (__u64)-1; sqe->len = this.len; sqe->splice_flags = SPLICE_F_MOVE; } static inline int error(splice_out_g & this, int error) { int ret = close(this.fd); if( ret != 0 ) { mutex(serr) serr | "Failed to close fd" | errno; } return error(this.res, error); } static inline void wait_and_process(splice_out_g & this) { wait(this.f); // Did something crazy happen? if(this.f.result > this.len) { mutex(serr) serr | "SPLICE OUT spliced too much!"; return error(this.res, -ERANGE); } // Something failed? if(this.f.result < 0) { int error = -this.f.result; if( error == ECONNRESET ) return error(this, -ECONNRESET); if( error == EPIPE ) return error(this, -EPIPE); if( error == ECANCELED ) { this.f.result = 0; goto SHORT_WRITE; } if( error == EAGAIN || error == EWOULDBLOCK) { mutex(serr) serr | "SPLICE OUT got eagain, WTF!"; return error(this, -ECONNRESET); } } // Done? if(this.f.result == this.len) { return done(this.res); } SHORT_WRITE: // It must be a Short Write this.len -= this.f.result; reset(this.f); return retry(this.res); } int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t fsize ) { #if defined(LINKED_IO) char buffer[512]; int len = fill_header(buffer, fsize); header_g header = { fd, buffer, len }; splice_in_t splice_in = { ans_fd, pipe[1], fsize }; splice_out_g splice_out = { pipe[0], fd, fsize }; RETRY_LOOP: for() { int have = need(header.res) + need(splice_in.res) + 1; int idx = 0; struct io_uring_sqe * sqes[3]; __u32 idxs[3]; struct $io_context * ctx = cfa_io_allocate(sqes, idxs, have); if(need(splice_in.res)) { fill(splice_in, sqes[idx++]); } if(need( header.res)) { fill(header , sqes[idx++]); } fill(splice_out, sqes[idx]); // Submit everything asm volatile("": : :"memory"); cfa_io_submit( ctx, idxs, have, false ); // wait for the results // Always wait for splice-in to complete as // we may need to kill the connection if it fails // If it already completed, this is a no-op wait_and_process(splice_in); if(is_error(splice_in.res)) { mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; close(fd); } // Process the other 2 wait_and_process(header); wait_and_process(splice_out); if(is_done(splice_out.res)) { break RETRY_LOOP; } // We need to wait for the completion if // - both completed // - the header failed // - if( is_error(header.res) || is_error(splice_in.res) || is_error(splice_out.res)) { return -ECONNRESET; } } return len + fsize; #else int ret = answer_header(fd, fsize); if( ret < 0 ) return ret; return sendfile(pipe, fd, ans_fd, fsize); #endif } [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { char * it = buffer; size_t count = len - 1; int rlen = 0; READ: for() { int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); // int ret = read(fd, (void*)it, count); if(ret == 0 ) return [OK200, true, 0, 0]; if(ret < 0 ) { if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; if( errno == ECONNRESET ) { close(fd); return [E408, true, 0, 0]; } if( errno == EPIPE ) { close(fd); return [E408, true, 0, 0]; } abort( "read error: (%d) %s\n", (int)errno, strerror(errno) ); } it[ret + 1] = '\0'; rlen += ret; if( strstr( it, "\r\n\r\n" ) ) break; it += ret; count -= ret; if( count < 1 ) return [E414, false, 0, 0]; } if( options.log ) { write(sout, buffer, rlen); sout | nl; } it = buffer; int ret = memcmp(it, "GET /", 5); if( ret != 0 ) return [E400, false, 0, 0]; it += 5; char * end = strstr( it, " " ); return [OK200, false, it, end - it]; } //============================================================================================= #include #include #include const char * original_http_msgs[] = { "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: ", "HTTP/1.1 200 OK\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 15\n\nHello, World!\n\n", "HTTP/1.1 400 Bad Request\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 404 Not Found\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 405 Method Not Allowed\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 408 Request Timeout\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 413 Payload Too Large\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", "HTTP/1.1 414 URI Too Long\nServer: HttpForall\nDate: %s \nContent-Type: text/plain\nContent-Length: 0 \n\n", }; struct date_buffer { https_msg_str strs[KNOWN_CODES]; }; thread DateFormater { int idx; date_buffer buffers[2]; }; void ?{}( DateFormater & this ) { ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] }; this.idx = 0; memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); memset( &this.buffers[1], 0, sizeof(this.buffers[1]) ); } void main(DateFormater & this) { LOOP: for() { waitfor( ^?{} : this) { break LOOP; } or else {} char buff[100]; Time now = timeHiRes(); strftime( buff, 100, "%a, %d %b %Y %H:%M:%S %Z", now ); // if( options.log ) sout | "Updated date to '" | buff | "'"; for(i; KNOWN_CODES) { size_t len = snprintf( this.buffers[this.idx].strs[i].msg, 512, original_http_msgs[i], buff ); this.buffers[this.idx].strs[i].len = len; } for(i; KNOWN_CODES) { https_msg_str * next = &this.buffers[this.idx].strs[i]; __atomic_exchange_n((https_msg_str * volatile *)&http_msgs[i], next, __ATOMIC_SEQ_CST); } this.idx = (this.idx + 1) % 2; // if( options.log ) sout | "Date thread sleeping"; sleep(1`s); } } //============================================================================================= DateFormater * the_date_formatter; void init_protocol(void) { the_date_formatter = alloc(); (*the_date_formatter){}; } void deinit_protocol(void) { ^(*the_date_formatter){}; free( the_date_formatter ); }