Changeset 8e4aa05 for benchmark/io/http/main.cfa
- Timestamp:
- Mar 4, 2021, 7:40:25 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 77d601f
- Parents:
- 342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/main.cfa
r342af53 r8e4aa05 6 6 #include <unistd.h> 7 7 extern "C" { 8 #include <signal.h> 8 9 #include <sys/socket.h> 9 10 #include <netinet/in.h> 10 11 } 11 12 13 #include <fstream.hfa> 12 14 #include <kernel.hfa> 15 #include <iofwd.hfa> 13 16 #include <stats.hfa> 14 17 #include <time.hfa> … … 26 29 27 30 //============================================================================================= 31 // Stats Printer 32 //=============================================================================================' 33 34 thread StatsPrinter {}; 35 36 void ?{}( StatsPrinter & this, cluster & cl ) { 37 ((thread&)this){ "Stats Printer Thread", cl }; 38 } 39 40 void ^?{}( StatsPrinter & mutex this ) {} 41 42 void main(StatsPrinter & this) { 43 LOOP: for() { 44 waitfor( ^?{} : this) { 45 break LOOP; 46 } 47 or else {} 48 49 sleep(10`s); 50 51 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 52 } 53 } 54 55 //============================================================================================= 28 56 // Globals 29 57 //============================================================================================= 30 struct ServerProc { 31 processor self; 58 struct ServerCluster { 59 cluster self; 60 processor * procs; 61 // io_context * ctxs; 62 StatsPrinter * prnt; 63 32 64 }; 33 65 34 void ?{}( ServerProc & this ) { 35 /* paranoid */ assert( options.clopts.instance != 0p ); 36 (this.self){ "Benchmark Processor", *options.clopts.instance }; 66 void ?{}( ServerCluster & this ) { 67 (this.self){ "Server Cluster", options.clopts.params }; 68 69 this.procs = alloc(options.clopts.nprocs); 70 for(i; options.clopts.nprocs) { 71 (this.procs[i]){ "Benchmark Processor", this.self }; 72 73 #if !defined(__CFA_NO_STATISTICS__) 74 if( options.clopts.procstats ) { 75 print_stats_at_exit( *this.procs, this.self.print_stats ); 76 } 77 if( options.clopts.viewhalts ) { 78 print_halts( *this.procs ); 79 } 80 #endif 81 } 82 83 if(options.stats) { 84 this.prnt = alloc(); 85 (*this.prnt){ this.self }; 86 } else { 87 this.prnt = 0p; 88 } 37 89 38 90 #if !defined(__CFA_NO_STATISTICS__) 39 if( options.clopts.procstats ) { 40 print_stats_at_exit( this.self, options.clopts.instance->print_stats ); 41 } 42 if( options.clopts.viewhalts ) { 43 print_halts( this.self ); 44 } 91 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); 45 92 #endif 93 94 options.clopts.instance[options.clopts.cltr_cnt] = &this.self; 95 options.clopts.cltr_cnt++; 96 } 97 98 void ^?{}( ServerCluster & this ) { 99 delete(this.prnt); 100 101 for(i; options.clopts.nprocs) { 102 ^(this.procs[i]){}; 103 } 104 free(this.procs); 105 106 ^(this.self){}; 46 107 } 47 108 … … 53 114 //=============================================================================================' 54 115 int main( int argc, char * argv[] ) { 116 __sighandler_t s = 1p; 117 signal(SIGPIPE, s); 118 55 119 //=================== 56 120 // Parse args 57 const char * path =parse_options(argc, argv);121 parse_options(argc, argv); 58 122 59 123 //=================== 60 124 // Open Files 61 printf("Filling cache from %s\n", path); 62 fill_cache( path ); 125 if( options.file_cache.path ) { 126 sout | "Filling cache from" | options.file_cache.path; 127 fill_cache( options.file_cache.path ); 128 } 63 129 64 130 //=================== 65 131 // Open Socket 66 printf("%ld : Listening on port %d\n", getpid(), options.socket.port);132 sout | getpid() | ": Listening on port" | options.socket.port; 67 133 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 68 134 if(server_fd < 0) { … … 84 150 if(errno == EADDRINUSE) { 85 151 if(waited == 0) { 86 printf("Waiting for port\n");152 sout | "Waiting for port"; 87 153 } else { 88 printf("\r%d", waited);89 f flush(stdout);154 sout | "\r" | waited | nonl; 155 flush( sout ); 90 156 } 91 157 waited ++; … … 106 172 // Run Server Cluster 107 173 { 108 cluster cl = { "Server Cluster", options.clopts.params };109 #if !defined(__CFA_NO_STATISTICS__)110 print_stats_at_exit( cl, CFA_STATS_READY_Q | CFA_STATS_IO );111 #endif112 options.clopts.instance = &cl;113 114 115 174 int pipe_cnt = options.clopts.nworkers * 2; 116 175 int pipe_off; … … 122 181 } 123 182 124 if(options.file_cache.fixed_fds) {125 register_fixed_files(cl, fds, pipe_off);126 }183 // if(options.file_cache.path && options.file_cache.fixed_fds) { 184 // register_fixed_files(cl, fds, pipe_off); 185 // } 127 186 128 187 { 129 Server Proc procs[options.clopts.nprocs];188 ServerCluster cl[options.clopts.nclusters]; 130 189 131 190 init_protocol(); … … 148 207 unpark( workers[i] ); 149 208 } 150 printf("%d workers started on %d processors\n", options.clopts.nworkers, options.clopts.nprocs); 209 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; 210 for(i; options.clopts.nclusters) { 211 sout | options.clopts.thrd_cnt[i] | nonl; 212 } 213 sout | nl; 151 214 { 152 215 char buffer[128]; 153 while(!feof(stdin)) { 154 fgets(buffer, 128, stdin); 216 for() { 217 int ret = cfa_read(0, buffer, 128, 0); 218 if(ret == 0) break; 219 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 220 sout | "User wrote '" | "" | nonl; 221 write(sout, buffer, ret - 1); 222 sout | "'"; 155 223 } 156 224 157 printf("Shutting Down\n"); 158 } 159 225 sout | "Shutdown received"; 226 } 227 228 sout | "Notifying connections..." | nonl; flush( sout ); 160 229 for(i; options.clopts.nworkers) { 161 printf("Cancelling %p\n", (void*)workers[i].cancel.target);162 230 workers[i].done = true; 163 cancel(workers[i].cancel);164 }165 166 printf("Shutting down socket\n");231 } 232 sout | "done"; 233 234 sout | "Shutting down socket..." | nonl; flush( sout ); 167 235 int ret = shutdown( server_fd, SHUT_RD ); 168 if( ret < 0 ) { abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); } 236 if( ret < 0 ) { 237 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 238 } 239 sout | "done"; 169 240 170 241 //=================== 171 242 // Close Socket 172 printf("Closing Socket\n");243 sout | "Closing Socket..." | nonl; flush( sout ); 173 244 ret = close( server_fd ); 174 245 if(ret < 0) { 175 246 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 176 247 } 177 } 178 printf("Workers Closed\n"); 179 248 sout | "done"; 249 250 sout | "Stopping connection threads..." | nonl; flush( sout ); 251 } 252 sout | "done"; 253 254 sout | "Stopping protocol threads..." | nonl; flush( sout ); 180 255 deinit_protocol(); 181 } 182 256 sout | "done"; 257 258 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 259 } 260 sout | "done"; 261 262 sout | "Closing splice fds..." | nonl; flush( sout ); 183 263 for(i; pipe_cnt) { 184 264 ret = close( fds[pipe_off + i] ); … … 188 268 } 189 269 free(fds); 190 191 } 270 sout | "done"; 271 272 sout | "Stopping processors..." | nonl; flush( sout ); 273 } 274 sout | "done"; 192 275 193 276 //=================== 194 277 // Close Files 195 printf("Closing Files\n"); 196 close_cache(); 197 } 278 if( options.file_cache.path ) { 279 sout | "Closing open files..." | nonl; flush( sout ); 280 close_cache(); 281 sout | "done"; 282 } 283 }
Note:
See TracChangeset
for help on using the changeset viewer.