#include "worker.hfa" #include #include #include #include #include #include "options.hfa" #include "protocol.hfa" #include "filecache.hfa" extern "C" { // extern ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count); extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags); } ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count) { return splice(in_fd, offset, out_fd, 0p, count, 0); } //============================================================================================= // Worker Thread //============================================================================================= void ?{}( Worker & this ) { ((thread&)this){ "Server Worker Thread", *options.the_cluster }; int ret = pipe(this.pipe); if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); } } void main( Worker & this ) { CONNECTION: while( int fd = take(wait_connect); fd >= 0) { printf("New connection, waiting for requests\n"); REQUEST: for() { bool closed; HttpCode code; const char * file; size_t name_size; // Read the http request size_t len = 1024; char buffer[len]; printf("Reading request\n"); [code, closed, file, name_size] = http_read(fd, buffer, len); // if we are done, break out of the loop if( closed ) { printf("Connection closed\n"); continue CONNECTION; } // If this wasn't a request retrun 400 if( code != OK200 ) { printf("Invalid Request\n"); answer_error(fd, code); continue REQUEST; } printf("Request for file %.*s\n", name_size, file); // Get the fd from the file cache int ans_fd; size_t count; [ans_fd, count] = get_file( file, name_size ); // If we can't find the file, return 404 if( ans_fd < 0 ) { printf("File Not Found\n"); answer_error(fd, E404); continue REQUEST; } // Send the header answer_header(fd, count); // Send the desired file sendfile( this.pipe, fd, ans_fd, count); printf("File sent\n"); } } } //============================================================================================= // Acceptor Thread //============================================================================================= void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) { ((thread&)this){ "Acceptor Thread", *options.the_cluster }; this.sockfd = sockfd; this.addr = addr; this.addrlen = addrlen; this.flags = flags; } void main( Acceptor & this ) { for() { int ret = cfa_accept4( this.[sockfd, addr, addrlen, flags] ); if(ret < 0) { if( errno == ECONNABORTED ) break; abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); } printf("New connection accepted\n"); put( wait_connect, ret ); } }