Changes in / [dfc13bb:14d8a9b]
- Location:
- benchmark/io/http
- Files:
-
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/filecache.cfa
rdfc13bb r14d8a9b 73 73 cache_line * entries; 74 74 size_t size; 75 int * rawfds; 76 int nfds; 75 77 } file_cache; 76 78 … … 98 100 } 99 101 100 int put_file( cache_line & entry ) {102 int put_file( cache_line & entry, int fd ) { 101 103 uint32_t idx = murmur3_32( (const uint8_t *)entry.file, strlen(entry.file), options.file_cache.hash_seed ) % file_cache.size; 102 104 … … 108 110 109 111 file_cache.entries[idx] = entry; 112 file_cache.entries[idx].fd = fd; 110 113 return i > 0 ? 1 : 0; 111 114 } … … 121 124 size_t fcount = 0; 122 125 size_t fsize = 16; 123 cache_line * raw = 0p; 124 raw = alloc(raw, fsize, true); 126 cache_line * raw = alloc(fsize); 125 127 // Step 1 get a dense array of all files 126 128 int walk(const char *fpath, const struct stat *sb, int typeflag) { … … 131 133 if(fcount > fsize) { 132 134 fsize *= 2; 133 raw = alloc( raw, fsize, true);135 raw = alloc(fsize, raw`realloc); 134 136 } 135 137 … … 162 164 file_cache.entries = anew(file_cache.size); 163 165 166 if(options.file_cache.fixed_fds) { 167 file_cache.nfds = fcount; 168 file_cache.rawfds = alloc(fcount); 169 } 170 164 171 // Step 3 fill the cache 165 172 int conflicts = 0; 166 173 for(i; fcount) { 167 conflicts += put_file( raw[i] ); 174 int fd; 175 if(options.file_cache.fixed_fds) { 176 file_cache.rawfds[i] = raw[i].fd; 177 fd = i; 178 } 179 else { 180 fd = raw[i].fd; 181 } 182 conflicts += put_file( raw[i], fd ); 168 183 } 169 184 printf("Filled cache from path \"%s\" with %zu files\n", path, fcount); … … 197 212 } 198 213 199 return [aalloc(extra), 0]; 214 size_t s = file_cache.nfds + extra; 215 int * data = alloc(s, file_cache.rawfds`realloc); 216 return [data, file_cache.nfds]; 200 217 } 201 218 -
benchmark/io/http/main.cfa
rdfc13bb r14d8a9b 12 12 #include <kernel.hfa> 13 13 #include <stats.hfa> 14 #include <time.hfa> 14 15 #include <thread.hfa> 15 16 16 #include "channel.hfa"17 17 #include "filecache.hfa" 18 18 #include "options.hfa" 19 19 #include "worker.hfa" 20 20 21 extern void register_fixed_files( cluster &, int *, unsigned count ); 22 23 Duration default_preemption() { 24 return 0; 25 } 26 21 27 //============================================================================================= 22 28 // Globals 23 29 //============================================================================================= 24 channel & wait_connect;25 26 30 struct ServerProc { 27 31 processor self; … … 84 88 // Run Server Cluster 85 89 { 86 cluster cl = { "Server Cluster", options.clopts. flags };90 cluster cl = { "Server Cluster", options.clopts.params }; 87 91 #if !defined(__CFA_NO_STATISTICS__) 88 92 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO ); 89 93 #endif 90 94 options.clopts.instance = &cl; 91 92 channel chan = { options.clopts.chan_size };93 &wait_connect = &chan;94 95 95 96 int pipe_cnt = options.clopts.nworkers * 2; … … 102 103 } 103 104 105 if(options.file_cache.fixed_fds) { 106 register_fixed_files(cl, fds, pipe_off); 107 } 108 104 109 { 105 110 ServerProc procs[options.clopts.nprocs]; … … 107 112 Worker workers[options.clopts.nworkers]; 108 113 for(i; options.clopts.nworkers) { 109 if( options.file_cache.fixed_fds ) { 110 workers[i].pipe[0] = pipe_off + (i * 2) + 0; 111 workers[i].pipe[1] = pipe_off + (i * 2) + 1; 112 } 113 else { 114 // if( options.file_cache.fixed_fds ) { 115 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 116 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 117 // } 118 // else 119 { 114 120 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 115 121 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 122 workers[i].sockfd = server_fd; 123 workers[i].addr = (struct sockaddr *)&address; 124 workers[i].addrlen = (socklen_t*)&addrlen; 125 workers[i].flags = 0; 116 126 } 117 127 unpark( workers[i] __cfaabi_dbg_ctx2 ); … … 119 129 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs); 120 130 { 121 Acceptor acceptor = { server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen, 0 };122 123 131 char buffer[128]; 124 132 while(!feof(stdin)) { … … 127 135 128 136 printf("Shutting Down\n"); 129 }130 printf("Acceptor Closed\n");131 132 // Clean-up the workers133 for(options.clopts.nworkers) {134 put( wait_connect, -1 );135 137 } 136 138 } -
benchmark/io/http/options.cfa
rdfc13bb r14d8a9b 31 31 1, // nworkers; 32 32 0, // flags; 33 10, // chan_size;34 33 false, // procstats 35 34 false, // viewhalts … … 39 38 40 39 const char * parse_options( int argc, char * argv[] ) { 41 bool uthrdpo = false;42 40 bool subthrd = false; 43 41 bool eagrsub = false; … … 52 50 {'t', "threads", "Number of worker threads to use", options.clopts.nworkers}, 53 51 {'b', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 54 {'B', "channel-size", "Maximum number of accepted connection pending", options.clopts.chan_size},55 52 {'r', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 56 53 {'S', "seed", "seed to use for hashing", options.file_cache.hash_seed }, 57 54 {'C', "cache-size", "Size of the cache to use, if set to small, will uses closes power of 2", options.file_cache.size }, 58 55 {'l', "list-files", "List the files in the specified path and exit", options.file_cache.list, parse_settrue }, 59 {'u', "userthread", "If set, cluster uses user-thread to poll I/O", uthrdpo, parse_settrue },60 56 {'s', "submitthread", "If set, cluster uses polling thread to submit I/O", subthrd, parse_settrue }, 61 57 {'e', "eagersubmit", "If set, cluster submits I/O eagerly but still aggregates submits", eagrsub, parse_settrue}, … … 71 67 parse_args( argc, argv, opt, opt_cnt, "[OPTIONS]... [PATH]\ncforall http server", left ); 72 68 73 if( uthrdpo ) { 74 options.clopts.flags |= CFA_CLUSTER_IO_POLLER_USER_THREAD; 75 } 76 77 if( subthrd ) { 78 options.clopts.flags |= CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS; 79 } 80 81 if( eagrsub ) { 82 options.clopts.flags |= CFA_CLUSTER_IO_EAGER_SUBMITS; 83 } 69 options.clopts.params.poller_submits = subthrd; 70 options.clopts.params.eager_submits = eagrsub; 84 71 85 72 if( fixedfd ) { … … 88 75 89 76 if( sqkpoll ) { 90 options.clopts. flags |= CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS;77 options.clopts.params.poll_submit = true; 91 78 options.file_cache.fixed_fds = true; 92 79 } 93 80 94 81 if( iokpoll ) { 95 options.clopts. flags |= CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES;82 options.clopts.params.poll_complete = true; 96 83 options.file_cache.open_flags |= O_DIRECT; 97 84 } 98 85 99 options.clopts. flags |= (sublen << CFA_CLUSTER_IO_BUFFLEN_OFFSET);86 options.clopts.params.num_ready = sublen; 100 87 101 88 if( left[0] == 0p ) { return "."; } -
benchmark/io/http/options.hfa
rdfc13bb r14d8a9b 2 2 3 3 #include <stdint.h> 4 5 #include <kernel.hfa> 4 6 5 7 struct cluster; … … 23 25 int nprocs; 24 26 int nworkers; 25 int flags; 26 int chan_size; 27 io_context_params params; 27 28 bool procstats; 28 29 bool viewhalts; -
benchmark/io/http/protocol.cfa
rdfc13bb r14d8a9b 11 11 extern "C" { 12 12 int snprintf ( char * s, size_t n, const char * format, ... ); 13 #include <linux/io_uring.h> 13 14 } 14 15 #include <string.h> 15 16 16 #include <errno.h> 17 17 18 #include "options.hfa" 18 19 19 20 const char * http_msgs[] = { … … 74 75 READ: 75 76 for() { 76 int ret = cfa_read(fd, it, count);77 if(ret == 0 ) return [OK200, true, 0 p, 0];77 int ret = cfa_read(fd, (void*)it, count, 0, -1`s, 0p, 0p); 78 if(ret == 0 ) return [OK200, true, 0, 0]; 78 79 if(ret < 0 ) { 79 80 if( errno == EAGAIN || errno == EWOULDBLOCK) continue READ; … … 88 89 count -= ret; 89 90 90 if( count < 1 ) return [E414, false, 0 p, 0];91 if( count < 1 ) return [E414, false, 0, 0]; 91 92 } 92 93 … … 95 96 it = buffer; 96 97 int ret = memcmp(it, "GET /", 5); 97 if( ret != 0 ) return [E400, false, 0 p, 0];98 if( ret != 0 ) return [E400, false, 0, 0]; 98 99 it += 5; 99 100 … … 106 107 ssize_t ret; 107 108 SPLICE1: while(count > 0) { 108 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE );109 ret = cfa_splice(ans_fd, &offset, pipe[1], 0p, count, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p); 109 110 if( ret < 0 ) { 110 111 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE1; … … 116 117 size_t in_pipe = ret; 117 118 SPLICE2: while(in_pipe > 0) { 118 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE );119 ret = cfa_splice(pipe[0], 0p, fd, 0p, in_pipe, SPLICE_F_MOVE | SPLICE_F_MORE, 0, -1`s, 0p, 0p); 119 120 if( ret < 0 ) { 120 121 if( errno != EAGAIN && errno != EWOULDBLOCK) continue SPLICE2; -
benchmark/io/http/worker.cfa
rdfc13bb r14d8a9b 28 28 CONNECTION: 29 29 for() { 30 int fd = take(wait_connect); 31 if (fd < 0) break; 30 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], 0, -1`s, 0p, 0p ); 31 if(fd < 0) { 32 if( errno == ECONNABORTED ) break; 33 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 34 } 32 35 33 36 printf("New connection %d, waiting for requests\n", fd); … … 82 85 } 83 86 } 84 85 //=============================================================================================86 // Acceptor Thread87 //=============================================================================================88 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ) {89 ((thread&)this){ "Acceptor Thread", *options.clopts.instance };90 this.sockfd = sockfd;91 this.addr = addr;92 this.addrlen = addrlen;93 this.flags = flags;94 }95 96 void main( Acceptor & this ) {97 for() {98 int ret = cfa_accept4( this.[sockfd, addr, addrlen, flags] );99 if(ret < 0) {100 if( errno == ECONNABORTED ) break;101 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );102 }103 104 printf("New connection accepted\n");105 put( wait_connect, ret );106 }107 } -
benchmark/io/http/worker.hfa
rdfc13bb r14d8a9b 7 7 } 8 8 9 #include "channel.hfa"10 11 extern channel & wait_connect;12 13 9 //============================================================================================= 14 10 // Worker Thread … … 17 13 thread Worker { 18 14 int pipe[2]; 19 };20 void ?{}( Worker & this );21 void main( Worker & );22 23 //=============================================================================================24 // Acceptor Thread25 //=============================================================================================26 thread Acceptor {27 15 int sockfd; 28 16 struct sockaddr * addr; … … 30 18 int flags; 31 19 }; 32 33 void ?{}( Acceptor & this, int sockfd, struct sockaddr * addr, socklen_t * addrlen, int flags ); 34 void main( Acceptor & this ); 20 void ?{}( Worker & this); 21 void main( Worker & );
Note: See TracChangeset
for help on using the changeset viewer.