source: benchmark/io/http/worker.cfa @ 519f11c

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 519f11c was 7f389a5c, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added support for argument parsing.
Removed unnecessary headers.
Fixed magic number in splice.

  • Property mode set to 100644
File size: 3.7 KB
Line 
1#include "worker.hfa"
2
3#define __USE_GNU
4
5#include <errno.h>
6#include <stdio.h>
7#include <string.h>
8extern "C" {
9        #include <fcntl.h>
10        #include <sys/socket.h>
11        #include <sys/types.h>
12        #include <netinet/in.h>
13}
14
15#include <iofwd.hfa>
16
17#include "options.hfa"
18#include "protocol.hfa"
19#include "filecache.hfa"
20
21extern "C" {
22// extern ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
23extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
24}
25
26ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count) {
27        return splice(in_fd, offset, out_fd, 0p, count, 0);
28}
29
30
31//=============================================================================================
32// Worker Thread
33//=============================================================================================
34void sendfile( Worker & this, int fd, int ans_fd, size_t count ) {
35        off_t offset = 0;
36        ssize_t ret;
37        SPLICE1: while(count > 0) {
38                ret = cfa_splice(ans_fd, &offset, this.pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE);
39                if( ret < 0 ) {
40                        if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1;
41                        abort( "splice [0] error: (%d) %s\n", (int)errno, strerror(errno) );
42                }
43
44                count -= ret;
45                offset += ret;
46                size_t in_pipe = ret;
47                SPLICE2: while(in_pipe > 0) {
48                        ret = cfa_splice(this.pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE);
49                        if( ret < 0 ) {
50                                if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2;
51                                abort( "splice [1] error: (%d) %s\n", (int)errno, strerror(errno) );
52                        }
53                        in_pipe -= ret;
54                }
55
56        }
57}
58
59void ?{}( Worker & this ) {
60        ((thread&)this){ "Server Worker Thread", *options.the_cluster };
61        int ret = pipe(this.pipe);
62        if( ret < 0 ) { abort( "pipe error: (%d) %s\n", (int)errno, strerror(errno) ); }
63}
64
65void main( Worker & this ) {
66        CONNECTION:
67        while( int fd = take(wait_connect); fd >= 0) {
68            printf("New connection, waiting for requests\n");
69                REQUEST:
70                for() {
71                        bool closed;
72                        HttpCode code;
73                        const char * file;
74                        size_t name_size;
75
76                        // Read the http request
77                        size_t len = 1024;
78                        char buffer[len];
79                        printf("Reading request\n");
80                        [code, closed, file, name_size] = http_read(fd, buffer, len);
81
82                        // if we are done, break out of the loop
83                        if( closed ) {
84                                printf("Connection closed\n");
85                                continue CONNECTION;
86                        }
87
88                        // If this wasn't a request retrun 400
89                        if( code != OK200 ) {
90                                printf("Invalid Request\n");
91                                answer_error(fd, code);
92                                continue REQUEST;
93                        }
94
95                        printf("Request for file %.*s\n", name_size, file);
96
97                        // Get the fd from the file cache
98                        int ans_fd;
99                        size_t count;
100                        [ans_fd, count] = get_file( file, name_size );
101
102                        // If we can't find the file, return 404
103                        if( ans_fd < 0 ) {
104                                printf("File Not Found\n");
105                                answer_error(fd, E404);
106                                continue REQUEST;
107                        }
108
109                        // Send the header
110                        answer_header(fd, count);
111
112                        // Send the desired file
113                        sendfile( this, fd, ans_fd, count);
114
115                        printf("File sent\n");
116                }
117        }
118}
119
120//=============================================================================================
121// Acceptor Thread
122//=============================================================================================
123void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) {
124        ((thread&)this){ "Acceptor Thread", *options.the_cluster };
125        this.sockfd  = sockfd;
126        this.addr    = addr;
127        this.addrlen = addrlen;
128        this.flags   = flags;
129}
130
131void main( Acceptor & this ) {
132        for() {
133                int ret = cfa_accept4( this.[sockfd, addr, addrlen, flags] );
134                if(ret < 0) {
135                        if( errno == ECONNABORTED ) break;
136                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
137                }
138
139            printf("New connection accepted\n");
140                put( wait_connect, ret );
141      }
142}
Note: See TracBrowser for help on using the repository browser.