Changes in / [14d8a9b:dfc13bb]
- Location:
- benchmark/io/http
- Files:
-
- 7 edited
-
filecache.cfa (modified) (7 diffs)
-
main.cfa (modified) (6 diffs)
-
options.cfa (modified) (5 diffs)
-
options.hfa (modified) (2 diffs)
-
protocol.cfa (modified) (6 diffs)
-
worker.cfa (modified) (2 diffs)
-
worker.hfa (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
r14d8a9b rdfc13bb 73 73 cache_line * entries; 74 74 size_t size; 75 int * rawfds;76 int nfds;77 75 } file_cache; 78 76 … … 100 98 } 101 99 102 int put_file( cache_line & entry , int fd) {100 int put_file( cache_line & entry ) { 103 101 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size; 104 102 … … 110 108 111 109 file_cache.entries[idx] = entry; 112 file_cache.entries[idx].fd = fd;113 110 return i > 0 ? 1 : 0; 114 111 } … … 124 121 size_t fcount = 0; 125 122 size_t fsize = 16; 126 cache_line * raw = alloc(fsize); 123 cache_line * raw = 0p; 124 raw = alloc(raw, fsize, true); 127 125 // Step 1 get a dense array of all files 128 126 int walk(const char *fpath, const struct stat *sb, int typeflag) { … … 133 131 if(fcount > fsize) { 134 132 fsize *= 2; 135 raw = alloc( fsize, raw`realloc);133 raw = alloc(raw, fsize, true); 136 134 } 137 135 … … 164 162 file_cache.entries = anew(file_cache.size); 165 163 166 if(options.file_cache.fixed_fds) {167 file_cache.nfds = fcount;168 file_cache.rawfds = alloc(fcount);169 }170 171 164 // Step 3 fill the cache 172 165 int conflicts = 0; 173 166 for(i; fcount) { 174 int fd; 175 if(options.file_cache.fixed_fds) { 176 file_cache.rawfds[i] = raw[i].fd; 177 fd = i; 178 } 179 else { 180 fd = raw[i].fd; 181 } 182 conflicts += put_file( raw[i], fd ); 167 conflicts += put_file( raw[i] ); 183 168 } 184 169 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount); … … 212 197 } 213 198 214 size_t s = file_cache.nfds + extra; 215 int * data = alloc(s, file_cache.rawfds`realloc); 216 return [data, file_cache.nfds]; 199 return [aalloc(extra), 0]; 217 200 } 218 201 -
benchmark/io/http/main.cfa
r14d8a9b rdfc13bb 12 12 #include <kernel.hfa> 13 13 #include <stats.hfa> 14 #include <time.hfa>15 14 #include <thread.hfa> 16 15 16 #include "channel.hfa" 17 17 #include "filecache.hfa" 18 18 #include "options.hfa" 19 19 #include "worker.hfa" 20 20 21 extern void register_fixed_files( cluster &, int *, unsigned count );22 23 Duration default_preemption() {24 return 0;25 }26 27 21 //============================================================================================= 28 22 // Globals 29 23 //============================================================================================= 24 channel & wait_connect; 25 30 26 struct ServerProc { 31 27 processor self; … … 88 84 // Run Server Cluster 89 85 { 90 cluster cl = { "Server Cluster", options.clopts. params };86 cluster cl = { "Server Cluster", options.clopts.flags }; 91 87 #if !defined(__CFA_NO_STATISTICS__) 92 88 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO ); 93 89 #endif 94 90 options.clopts.instance = &cl; 91 92 channel chan = { options.clopts.chan_size }; 93 &wait_connect = &chan; 95 94 96 95 int pipe_cnt = options.clopts.nworkers * 2; … … 103 102 } 104 103 105 if(options.file_cache.fixed_fds) {106 register_fixed_files(cl, fds, pipe_off);107 }108 109 104 { 110 105 ServerProc procs[options.clopts.nprocs]; … … 112 107 Worker workers[options.clopts.nworkers]; 113 108 for(i; options.clopts.nworkers) { 114 // if( options.file_cache.fixed_fds ) { 115 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 116 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 117 // } 118 // else 119 { 109 if( options.file_cache.fixed_fds ) { 110 workers[i].pipe[0] = pipe_off + (i * 2) + 0; 111 workers[i].pipe[1] = pipe_off + (i * 2) + 1; 112 } 113 else { 120 114 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 121 115 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 122 workers[i].sockfd = server_fd;123 workers[i].addr = (struct sockaddr *)&address;124 workers[i].addrlen = (socklen_t*)&addrlen;125 workers[i].flags = 0;126 116 } 127 117 unpark( workers[i] __cfaabi_dbg_ctx2 ); … … 129 119 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs); 130 120 { 121 Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 }; 122 131 123 char buffer[128]; 132 124 while(!feof(stdin)) { … … 135 127 136 128 printf("Shutting Down\n"); 129 } 130 printf("Acceptor Closed\n"); 131 132 // Clean-up the workers 133 for(options.clopts.nworkers) { 134 put( wait_connect, -1 ); 137 135 } 138 136 } -
benchmark/io/http/options.cfa
r14d8a9b rdfc13bb 31 31 1, // nworkers; 32 32 0, // flags; 33 10, // chan_size; 33 34 false, // procstats 34 35 false, // viewhalts … … 38 39 39 40 const char * parse_options( int argc, char * argv[] ) { 41 bool uthrdpo = false; 40 42 bool subthrd = false; 41 43 bool eagrsub = false; … … 50 52 {'t', "threads", "Number of worker threads to use", options.clopts.nworkers}, 51 53 {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 54 {'B', "channel-size", "Maximum number of accepted connection pending", options.clopts.chan_size}, 52 55 {'r', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 53 56 {'S', "seed", "seed to use for hashing", options.file_cache.hash_seed }, 54 57 {'C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size }, 55 58 {'l', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue }, 59 {'u', "userthread", "If set, cluster uses user-thread to poll I/O", uthrdpo, parse_settrue }, 56 60 {'s', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue }, 57 61 {'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue}, … … 67 71 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left ); 68 72 69 options.clopts.params.poller_submits = subthrd; 70 options.clopts.params.eager_submits = eagrsub; 73 if( uthrdpo ) { 74 options.clopts.flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD; 75 } 76 77 if( subthrd ) { 78 options.clopts.flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS; 79 } 80 81 if( eagrsub ) { 82 options.clopts.flags |= CFA_CLUSTER_IO_EAGER_SUBMITS; 83 } 71 84 72 85 if( fixedfd ) { … … 75 88 76 89 if( sqkpoll ) { 77 options.clopts. params.poll_submit = true;90 options.clopts.flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS; 78 91 options.file_cache.fixed_fds = true; 79 92 } 80 93 81 94 if( iokpoll ) { 82 options.clopts. params.poll_complete = true;95 options.clopts.flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES; 83 96 options.file_cache.open_flags |= O_DIRECT; 84 97 } 85 98 86 options.clopts. params.num_ready = sublen;99 options.clopts.flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET); 87 100 88 101 if( left[0] == 0p ) { return "."; } -
benchmark/io/http/options.hfa
r14d8a9b rdfc13bb 2 2 3 3 #include <stdint.h> 4 5 #include <kernel.hfa>6 4 7 5 struct cluster; … … 25 23 int nprocs; 26 24 int nworkers; 27 io_context_params params; 25 int flags; 26 int chan_size; 28 27 bool procstats; 29 28 bool viewhalts; -
benchmark/io/http/protocol.cfa
r14d8a9b rdfc13bb 11 11 extern "C" { 12 12 int snprintf ( char * s, size_t n, const char * format, ... ); 13 #include <linux/io_uring.h>14 13 } 15 14 #include <string.h> 15 16 16 #include <errno.h> 17 17 18 #include "options.hfa"19 18 20 19 const char * http_msgs[] = { … … 75 74 READ: 76 75 for() { 77 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p);78 if(ret == 0 ) return [OK200, true, 0 , 0];76 int ret = cfa_read(fd, it, count); 77 if(ret == 0 ) return [OK200, true, 0p, 0]; 79 78 if(ret < 0 ) { 80 79 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; … … 89 88 count -= ret; 90 89 91 if( count < 1 ) return [E414, false, 0 , 0];90 if( count < 1 ) return [E414, false, 0p, 0]; 92 91 } 93 92 … … 96 95 it = buffer; 97 96 int ret = memcmp(it, "GET /", 5); 98 if( ret != 0 ) return [E400, false, 0 , 0];97 if( ret != 0 ) return [E400, false, 0p, 0]; 99 98 it += 5; 100 99 … … 107 106 ssize_t ret; 108 107 SPLICE1: while(count > 0) { 109 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE , 0, -1`s, 0p, 0p);108 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE); 110 109 if( ret < 0 ) { 111 110 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; … … 117 116 size_t in_pipe = ret; 118 117 SPLICE2: while(in_pipe > 0) { 119 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE , 0, -1`s, 0p, 0p);118 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE); 120 119 if( ret < 0 ) { 121 120 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; -
benchmark/io/http/worker.cfa
r14d8a9b rdfc13bb 28 28 CONNECTION: 29 29 for() { 30 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p ); 31 if(fd < 0) { 32 if( errno == ECONNABORTED ) break; 33 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 34 } 30 int fd = take(wait_connect); 31 if (fd < 0) break; 35 32 36 33 printf("New connection %d, waiting for requests\n", fd); … … 85 82 } 86 83 } 84 85 //============================================================================================= 86 // Acceptor Thread 87 //============================================================================================= 88 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) { 89 ((thread&)this){ "Acceptor Thread", *options.clopts.instance }; 90 this.sockfd = sockfd; 91 this.addr = addr; 92 this.addrlen = addrlen; 93 this.flags = flags; 94 } 95 96 void main( Acceptor & this ) { 97 for() { 98 int ret = cfa_accept4( this.[sockfd, addr, addrlen, flags] ); 99 if(ret < 0) { 100 if( errno == ECONNABORTED ) break; 101 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 102 } 103 104 printf("New connection accepted\n"); 105 put( wait_connect, ret ); 106 } 107 } -
benchmark/io/http/worker.hfa
r14d8a9b rdfc13bb 7 7 } 8 8 9 #include "channel.hfa" 10 11 extern channel & wait_connect; 12 9 13 //============================================================================================= 10 14 // Worker Thread … … 13 17 thread Worker { 14 18 int pipe[2]; 19 }; 20 void ?{}( Worker & this ); 21 void main( Worker & ); 22 23 //============================================================================================= 24 // Acceptor Thread 25 //============================================================================================= 26 thread Acceptor { 15 27 int sockfd; 16 28 struct sockaddr * addr; … … 18 30 int flags; 19 31 }; 20 void ?{}( Worker & this); 21 void main( Worker & ); 32 33 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ); 34 void main( Acceptor & this );
Note:
See TracChangeset
for help on using the changeset viewer.