- Timestamp:
- Jun 8, 2022, 4:23:41 PM (4 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- 7f0ac12, db7a3ad
- Parents:
- 55422cf (diff), 720f2fe2 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- benchmark/io/http
- Files:
-
- 2 added
- 5 edited
-
Makefile.am (modified) (2 diffs)
-
main.cfa (modified) (15 diffs)
-
options.cfa (modified) (2 diffs)
-
options.hfa (modified) (1 diff)
-
socket.cfa (added)
-
socket.hfa (added)
-
worker.cfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
r55422cf r6e2b04e 21 21 include $(top_srcdir)/tools/build/cfa.make 22 22 23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g # -Werror 24 24 AM_CFAFLAGS = -quiet -nodebug 25 25 AM_LDFLAGS = -quiet -nodebug … … 39 39 protocol.cfa \ 40 40 protocol.hfa \ 41 socket.cfa \ 42 socket.hfa \ 41 43 worker.cfa \ 42 44 worker.hfa -
benchmark/io/http/main.cfa
r55422cf r6e2b04e 2 2 3 3 #include <errno.h> 4 #include <signal.h> 4 5 #include <stdio.h> 5 6 #include <string.h> … … 8 9 #include <sched.h> 9 10 #include <signal.h> 11 #include <sys/eventfd.h> 10 12 #include <sys/socket.h> 11 13 #include <netinet/in.h> … … 14 16 #include <fstream.hfa> 15 17 #include <kernel.hfa> 18 #include <locks.hfa> 16 19 #include <iofwd.hfa> 17 20 #include <stats.hfa> … … 21 24 #include "filecache.hfa" 22 25 #include "options.hfa" 26 #include "socket.hfa" 23 27 #include "worker.hfa" 24 28 … … 36 40 Worker * workers; 37 41 int worker_cnt; 42 condition_variable(fast_block_lock) var; 38 43 }; 39 44 … … 54 59 or else {} 55 60 56 sleep(10`s);61 wait(this.var, 10`s); 57 62 58 63 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); … … 177 182 178 183 //============================================================================================= 184 // Termination 185 //============================================================================================= 186 187 int closefd; 188 void cleanstop(int) { 189 eventfd_t buffer = 1; 190 char * buffer_s = (char*)&buffer; 191 int ret = write(closefd, buffer_s, sizeof(buffer)); 192 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) ); 193 return; 194 } 195 196 //============================================================================================= 179 197 // Main 180 198 //=============================================================================================' 181 199 int main( int argc, char * argv[] ) { 200 int ret; 182 201 __sighandler_t s = 1p; 183 202 signal(SIGPIPE, s); … … 186 205 // Parse args 187 206 parse_options(argc, argv); 207 208 //=================== 209 // Setup non-interactive termination 210 if(!options.interactive) { 211 closefd = eventfd(0, 0); 212 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) ); 213 214 sighandler_t prev = signal(SIGTERM, cleanstop); 215 intptr_t prev_workaround = (intptr_t) prev; 216 // can't use SIG_ERR it crashes the compiler 217 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) ); 218 219 sout | "Signal termination ready"; 220 } 188 221 189 222 //=================== … … 197 230 // Open Socket 198 231 sout | getpid() | ": Listening on port" | options.socket.port; 199 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 200 if(server_fd < 0) { 201 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) ); 202 } 203 204 int ret = 0; 232 205 233 struct sockaddr_in address; 206 int addrlen = sizeof(address); 207 memset( (char *)&address, '\0' ); 208 address.sin_family = AF_INET; 209 address.sin_addr.s_addr = htonl(INADDR_ANY); 210 address.sin_port = htons( options.socket.port ); 211 212 int waited = 0; 213 for() { 214 int sockfd = server_fd; 215 __CONST_SOCKADDR_ARG addr; 216 addr.__sockaddr__ = (struct sockaddr *)&address; 217 socklen_t addrlen = sizeof(address); 218 ret = bind( sockfd, addr, addrlen ); 219 if(ret < 0) { 220 if(errno == EADDRINUSE) { 221 if(waited == 0) { 222 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting"; 223 sout | "Waiting for port"; 224 } else { 225 sout | "\r" | waited | nonl; 226 flush( sout ); 227 } 228 waited ++; 229 sleep( 1`s ); 230 continue; 231 } 232 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) ); 233 } 234 break; 235 } 236 237 ret = listen( server_fd, options.socket.backlog ); 238 if(ret < 0) { 239 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); 234 int addrlen = prepaddr(address); 235 236 int server_fd; 237 if(!options.socket.manyreuse) { 238 server_fd = listener(address, addrlen); 240 239 } 241 240 … … 257 256 258 257 { 258 // Stats printer makes a copy so this needs to persist longer than normal 259 Worker * workers; 259 260 ServerCluster cl[options.clopts.nclusters]; 260 261 261 262 init_protocol(); 262 263 { 263 Worker *workers = anew(options.clopts.nworkers);264 workers = anew(options.clopts.nworkers); 264 265 cl[0].prnt->workers = workers; 265 266 cl[0].prnt->worker_cnt = options.clopts.nworkers; … … 273 274 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 274 275 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 275 workers[i].sockfd = server_fd;276 workers[i].sockfd = options.socket.manyreuse ? listener(address, addrlen) : server_fd; 276 277 workers[i].addr = (struct sockaddr *)&address; 277 278 workers[i].addrlen = (socklen_t*)&addrlen; … … 285 286 } 286 287 sout | nl; 287 if(!options.interactive) park();288 288 { 289 char buffer[128]; 290 for() { 291 int ret = cfa_read(0, buffer, 128, 0); 292 if(ret == 0) break; 289 if(options.interactive) { 290 char buffer[128]; 291 for() { 292 int ret = cfa_read(0, buffer, 128, 0); 293 if(ret == 0) break; 294 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 295 sout | "User wrote '" | "" | nonl; 296 write(sout, buffer, ret - 1); 297 sout | "'"; 298 } 299 } 300 else { 301 char buffer[sizeof(eventfd_t)]; 302 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0); 293 303 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 294 sout | "User wrote '" | "" | nonl;295 write(sout, buffer, ret - 1);296 sout | "'";297 304 } 298 305 … … 307 314 308 315 sout | "Shutting down socket..." | nonl; flush( sout ); 309 int ret = shutdown( server_fd, SHUT_RD ); 310 if( ret < 0 ) { 311 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 316 if(options.socket.manyreuse) { 317 for(i; options.clopts.nworkers) { 318 ret = shutdown( workers[i].sockfd, SHUT_RD ); 319 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 320 } 321 } 322 else { 323 ret = shutdown( server_fd, SHUT_RD ); 324 if( ret < 0 ) { 325 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 326 } 312 327 } 313 328 sout | "done"; … … 316 331 // Close Socket 317 332 sout | "Closing Socket..." | nonl; flush( sout ); 318 ret = close( server_fd ); 319 if(ret < 0) { 320 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 333 if(options.socket.manyreuse) { 334 for(i; options.clopts.nworkers) { 335 ret = close(workers[i].sockfd); 336 if(ret < 0) abort( "close socket %d error: (%d) %s\n", i, (int)errno, strerror(errno) ); 337 } 338 } 339 else { 340 ret = close( server_fd ); 341 if(ret < 0) { 342 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 343 } 321 344 } 322 345 sout | "done"; 323 346 324 347 sout | "Stopping connection threads..." | nonl; flush( sout ); 325 adelete(workers); 348 for(i; options.clopts.nworkers) { 349 for(j; 2) { 350 ret = close(workers[i].pipe[j]); 351 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 352 } 353 join(workers[i]); 354 } 326 355 } 327 356 sout | "done"; … … 331 360 sout | "done"; 332 361 362 sout | "Stopping printer threads..." | nonl; flush( sout ); 363 for(i; options.clopts.nclusters) { 364 StatsPrinter * p = cl[i].prnt; 365 if(p) { 366 notify_one(p->var); 367 join(*p); 368 } 369 } 370 sout | "done"; 371 372 // Now that the stats printer is stopped, we can reclaim this 373 adelete(workers); 374 333 375 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 334 376 } 335 377 sout | "done"; 336 378 337 sout | "Closing splice fds..." | nonl; flush( sout );338 for(i; pipe_cnt) {339 ret = close( fds[pipe_off + i] );340 if(ret < 0) {341 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );342 }343 }379 // sout | "Closing splice fds..." | nonl; flush( sout ); 380 // for(i; pipe_cnt) { 381 // ret = close( fds[pipe_off + i] ); 382 // if(ret < 0) { 383 // abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) ); 384 // } 385 // } 344 386 free(fds); 345 387 sout | "done"; -
benchmark/io/http/options.cfa
r55422cf r6e2b04e 35 35 36 36 { // socket 37 8080, // port 38 10, // backlog 39 1024 // buflen 37 8080, // port 38 10, // backlog 39 1024, // buflen 40 false, // onereuse 41 false // manyreuse 40 42 }, 41 43 … … 70 72 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 71 73 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 74 {'\0', "reuseport-one", "Create a single listen socket with SO_REUSEPORT", options.socket.onereuse, parse_settrue}, 75 {'\0', "reuseport", "Use many listen sockets with SO_REUSEPORT", options.socket.manyreuse, parse_settrue}, 72 76 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 73 77 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, -
benchmark/io/http/options.hfa
r55422cf r6e2b04e 27 27 int backlog; 28 28 int buflen; 29 bool onereuse; 30 bool manyreuse; 29 31 } socket; 30 32 -
benchmark/io/http/worker.cfa
r55422cf r6e2b04e 43 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 const bool reuse = options.socket.manyreuse; 45 47 46 48 CONNECTION:
Note:
See TracChangeset
for help on using the changeset viewer.