Changeset eb5962a
- Timestamp:
- Jun 21, 2022, 1:39:24 PM (2 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- b62d1d6
- Parents:
- 1df492a (diff), 1dbbef6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Files:
-
- 22 added
- 35 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
r1df492a reb5962a 21 21 include $(top_srcdir)/tools/build/cfa.make 22 22 23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g # -Werror 24 24 AM_CFAFLAGS = -quiet -nodebug 25 25 AM_LDFLAGS = -quiet -nodebug … … 37 37 options.cfa \ 38 38 options.hfa \ 39 printer.cfa \ 40 printer.hfa \ 39 41 protocol.cfa \ 40 42 protocol.hfa \ 43 socket.cfa \ 44 socket.hfa \ 41 45 worker.cfa \ 42 46 worker.hfa -
benchmark/io/http/main.cfa
r1df492a reb5962a 2 2 3 3 #include <errno.h> 4 #include <signal.h> 4 5 #include <stdio.h> 5 6 #include <string.h> … … 8 9 #include <sched.h> 9 10 #include <signal.h> 11 #include <sys/eventfd.h> 10 12 #include <sys/socket.h> 11 13 #include <netinet/in.h> … … 14 16 #include <fstream.hfa> 15 17 #include <kernel.hfa> 18 #include <locks.hfa> 16 19 #include <iofwd.hfa> 17 20 #include <stats.hfa> … … 21 24 #include "filecache.hfa" 22 25 #include "options.hfa" 26 #include "socket.hfa" 27 #include "printer.hfa" 23 28 #include "worker.hfa" 24 29 … … 30 35 31 36 //============================================================================================= 32 // Stats Printer33 //============================================================================================='34 35 thread StatsPrinter {36 Worker * workers;37 int worker_cnt;38 };39 40 void ?{}( StatsPrinter & this, cluster & cl ) {41 ((thread&)this){ "Stats Printer Thread", cl };42 this.worker_cnt = 0;43 }44 45 void ^?{}( StatsPrinter & mutex this ) {}46 47 #define eng3(X) (ws(3, 3, unit(eng( X ))))48 49 void main(StatsPrinter & this) {50 LOOP: for() {51 waitfor( ^?{} : this) {52 break LOOP;53 }54 or else {}55 56 sleep(10`s);57 58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );59 if(this.worker_cnt != 0) {60 uint64_t tries = 0;61 uint64_t calls = 0;62 uint64_t header = 0;63 uint64_t splcin = 0;64 uint64_t splcot = 0;65 struct {66 volatile uint64_t calls;67 volatile uint64_t bytes;68 } avgrd[zipf_cnts];69 memset(avgrd, 0, sizeof(avgrd));70 71 for(i; this.worker_cnt) {72 tries += this.workers[i].stats.sendfile.tries;73 calls += this.workers[i].stats.sendfile.calls;74 header += this.workers[i].stats.sendfile.header;75 splcin += this.workers[i].stats.sendfile.splcin;76 splcot += this.workers[i].stats.sendfile.splcot;77 for(j; zipf_cnts) {78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;80 }81 }82 83 double ratio = ((double)tries) / calls;84 85 sout | "----- Worker Stats -----";86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out";88 sout | " - zipf sizes:";89 for(i; zipf_cnts) {90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";92 }93 }94 else {95 sout | "No Workers!";96 }97 }98 }99 100 //=============================================================================================101 37 // Globals 102 38 //============================================================================================= 103 struct ServerCluster {104 cluster self;105 processor * procs;106 // io_context * ctxs;107 StatsPrinter * prnt;108 109 };110 111 39 void ?{}( ServerCluster & this ) { 112 40 (this.self){ "Server Cluster", options.clopts.params }; … … 122 50 (this.procs[i]){ "Benchmark Processor", this.self }; 123 51 124 int c = 0;125 int n = 1 + (i % cnt);126 for(int j = 0; j < CPU_SETSIZE; j++) {127 if(CPU_ISSET(j, &fullset)) n--;128 if(n == 0) {129 c = j;130 break;131 }132 }133 cpu_set_t localset;134 CPU_ZERO(&localset);135 CPU_SET(c, &localset);136 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);137 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );52 // int c = 0; 53 // int n = 1 + (i % cnt); 54 // for(int j = 0; j < CPU_SETSIZE; j++) { 55 // if(CPU_ISSET(j, &fullset)) n--; 56 // if(n == 0) { 57 // c = j; 58 // break; 59 // } 60 // } 61 // cpu_set_t localset; 62 // CPU_ZERO(&localset); 63 // CPU_SET(c, &localset); 64 // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset); 65 // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret ); 138 66 139 67 #if !defined(__CFA_NO_STATISTICS__) … … 147 75 } 148 76 149 if(options.stats) {150 this.prnt = alloc();151 (*this.prnt){ this.self };152 } else {153 this.prnt = 0p;154 }155 156 77 #if !defined(__CFA_NO_STATISTICS__) 157 78 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); … … 163 84 164 85 void ^?{}( ServerCluster & this ) { 165 delete(this.prnt);166 167 86 for(i; options.clopts.nprocs) { 168 87 ^(this.procs[i]){}; … … 175 94 extern void init_protocol(void); 176 95 extern void deinit_protocol(void); 96 97 //============================================================================================= 98 // REUSEPORT 99 //============================================================================================= 100 101 size_t sockarr_size; 102 struct __attribute__((aligned(128))) Q { 103 mpsc_queue(PendingRead) q; 104 }; 105 106 //============================================================================================= 107 // Termination 108 //============================================================================================= 109 110 int closefd; 111 void cleanstop(int) { 112 eventfd_t buffer = 1; 113 char * buffer_s = (char*)&buffer; 114 int ret = write(closefd, buffer_s, sizeof(buffer)); 115 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) ); 116 return; 117 } 177 118 178 119 //============================================================================================= … … 180 121 //=============================================================================================' 181 122 int main( int argc, char * argv[] ) { 123 int ret; 182 124 __sighandler_t s = 1p; 183 125 signal(SIGPIPE, s); … … 186 128 // Parse args 187 129 parse_options(argc, argv); 130 131 //=================== 132 // Setup non-interactive termination 133 if(!options.interactive) { 134 closefd = eventfd(0, 0); 135 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) ); 136 137 sighandler_t prev = signal(SIGTERM, cleanstop); 138 intptr_t prev_workaround = (intptr_t) prev; 139 // can't use SIG_ERR it crashes the compiler 140 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) ); 141 142 sout | "Signal termination ready"; 143 } 188 144 189 145 //=================== … … 197 153 // Open Socket 198 154 sout | getpid() | ": Listening on port" | options.socket.port; 199 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 200 if(server_fd < 0) { 201 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) ); 202 } 203 204 int ret = 0; 155 205 156 struct sockaddr_in address; 206 int addrlen = sizeof(address); 207 memset( (char *)&address, '\0' ); 208 address.sin_family = AF_INET; 209 address.sin_addr.s_addr = htonl(INADDR_ANY); 210 address.sin_port = htons( options.socket.port ); 211 212 int waited = 0; 213 for() { 214 int sockfd = server_fd; 215 __CONST_SOCKADDR_ARG addr; 216 addr.__sockaddr__ = (struct sockaddr *)&address; 217 socklen_t addrlen = sizeof(address); 218 ret = bind( sockfd, addr, addrlen ); 219 if(ret < 0) { 220 if(errno == EADDRINUSE) { 221 if(waited == 0) { 222 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting"; 223 sout | "Waiting for port"; 224 } else { 225 sout | "\r" | waited | nonl; 226 flush( sout ); 227 } 228 waited ++; 229 sleep( 1`s ); 230 continue; 231 } 232 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) ); 233 } 234 break; 235 } 236 237 ret = listen( server_fd, options.socket.backlog ); 238 if(ret < 0) { 239 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); 240 } 157 int addrlen = prepaddr(address); 158 159 int server_fd; 241 160 242 161 //=================== … … 257 176 258 177 { 178 // Stats printer makes a copy so this needs to persist longer than normal 179 connection ** conns; 180 AcceptWorker * aworkers = 0p; 181 ChannelWorker * cworkers = 0p; 182 Acceptor * acceptors = 0p; 183 Q * queues = 0p; 259 184 ServerCluster cl[options.clopts.nclusters]; 185 186 if(options.stats) { 187 stats_thrd = alloc(); 188 (*stats_thrd){ cl }; 189 } else { 190 stats_thrd = 0p; 191 } 260 192 261 193 init_protocol(); 262 194 { 263 Worker * workers = anew(options.clopts.nworkers); 264 cl[0].prnt->workers = workers; 265 cl[0].prnt->worker_cnt = options.clopts.nworkers; 266 for(i; options.clopts.nworkers) { 267 // if( options.file_cache.fixed_fds ) { 268 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 269 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 270 // } 271 // else 272 { 273 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 274 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 275 workers[i].sockfd = server_fd; 276 workers[i].addr = (struct sockaddr *)&address; 277 workers[i].addrlen = (socklen_t*)&addrlen; 278 workers[i].flags = 0; 279 } 280 unpark( workers[i] ); 195 int nacceptors = options.clopts.nprocs * options.clopts.nclusters; 196 conns = alloc(options.clopts.nworkers); 197 if(options.socket.reuseport) { 198 queues = alloc(nacceptors); 199 acceptors = alloc(nacceptors); 200 sout | "Creating" | nacceptors | "Acceptors"; 201 for(i; nacceptors) { 202 (acceptors[i]){ i % options.clopts.nclusters }; 203 } 204 for(i; nacceptors) { 205 (queues[i]){}; 206 { 207 acceptors[i].sockfd = listener(address, addrlen); 208 acceptors[i].addr = (struct sockaddr *)&address; 209 acceptors[i].addrlen = (socklen_t*)&addrlen; 210 acceptors[i].flags = 0; 211 acceptors[i].queue = &queues[i].q; 212 } 213 unpark( acceptors[i] ); 214 } 215 216 cworkers = anew(options.clopts.nworkers); 217 for(i; options.clopts.nworkers) { 218 { 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % nacceptors].q; 222 conns[i] = &cworkers[i].conn; 223 } 224 unpark( cworkers[i] ); 225 } 226 } 227 else { 228 server_fd = listener(address, addrlen); 229 aworkers = anew(options.clopts.nworkers); 230 for(i; options.clopts.nworkers) { 231 // if( options.file_cache.fixed_fds ) { 232 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 233 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 234 // } 235 // else 236 { 237 aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 238 aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 239 aworkers[i].sockfd = server_fd; 240 aworkers[i].addr = (struct sockaddr *)&address; 241 aworkers[i].addrlen = (socklen_t*)&addrlen; 242 aworkers[i].flags = 0; 243 conns[i] = &aworkers[i].conn; 244 } 245 unpark( aworkers[i] ); 246 } 281 247 } 282 248 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; … … 285 251 } 286 252 sout | nl; 287 if(!options.interactive) park();288 253 { 289 char buffer[128]; 290 for() { 291 int ret = cfa_read(0, buffer, 128, 0); 292 if(ret == 0) break; 254 if(options.interactive) { 255 char buffer[128]; 256 for() { 257 int ret = cfa_read(0, buffer, 128, 0); 258 if(ret == 0) break; 259 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 260 sout | "User wrote '" | "" | nonl; 261 write(sout, buffer, ret - 1); 262 sout | "'"; 263 } 264 } 265 else { 266 char buffer[sizeof(eventfd_t)]; 267 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0); 293 268 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 294 sout | "User wrote '" | "" | nonl;295 write(sout, buffer, ret - 1);296 sout | "'";297 269 } 298 270 … … 300 272 } 301 273 302 sout | "Notifying connections..." | nonl; flush( sout );303 for(i; options.clopts.nworkers) {304 workers[i].done = true;305 }306 sout | "done";307 308 sout | "Shutting down socket..." | nonl; flush( sout );309 int ret = shutdown( server_fd, SHUT_RD );310 if( ret < 0 ) {311 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );312 }313 sout | "done";314 315 274 //=================== 316 // Close Socket 317 sout | "Closing Socket..." | nonl; flush( sout ); 318 ret = close( server_fd ); 319 if(ret < 0) { 320 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 321 } 322 sout | "done"; 323 324 sout | "Stopping connection threads..." | nonl; flush( sout ); 325 adelete(workers); 275 // Close Socket and join 276 if(options.socket.reuseport) { 277 sout | "Notifying connections..." | nonl; flush( sout ); 278 for(i; nacceptors) { 279 acceptors[i].done = true; 280 } 281 for(i; options.clopts.nworkers) { 282 cworkers[i].done = true; 283 } 284 sout | "done"; 285 286 sout | "Shutting down Socket..." | nonl; flush( sout ); 287 for(i; nacceptors) { 288 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 289 if( ret < 0 ) { 290 abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) ); 291 } 292 } 293 sout | "done"; 294 295 sout | "Closing Socket..." | nonl; flush( sout ); 296 for(i; nacceptors) { 297 ret = close( acceptors[i].sockfd ); 298 if( ret < 0) { 299 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 300 } 301 } 302 sout | "done"; 303 304 sout | "Stopping accept threads..." | nonl; flush( sout ); 305 for(i; nacceptors) { 306 join(acceptors[i]); 307 } 308 sout | "done"; 309 310 sout | "Draining worker queues..." | nonl; flush( sout ); 311 for(i; nacceptors) { 312 PendingRead * p = 0p; 313 while(p = pop(queues[i].q)) { 314 fulfil(p->f, -ECONNRESET); 315 } 316 } 317 sout | "done"; 318 319 sout | "Stopping worker threads..." | nonl; flush( sout ); 320 for(i; options.clopts.nworkers) { 321 for(j; 2) { 322 ret = close(cworkers[i].conn.pipe[j]); 323 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 324 } 325 join(cworkers[i]); 326 } 327 } 328 else { 329 sout | "Notifying connections..." | nonl; flush( sout ); 330 for(i; options.clopts.nworkers) { 331 aworkers[i].done = true; 332 } 333 sout | "done"; 334 335 sout | "Shutting down Socket..." | nonl; flush( sout ); 336 ret = shutdown( server_fd, SHUT_RD ); 337 if( ret < 0 ) { 338 abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) ); 339 } 340 sout | "done"; 341 342 sout | "Closing Socket..." | nonl; flush( sout ); 343 ret = close( server_fd ); 344 if(ret < 0) { 345 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 346 } 347 sout | "done"; 348 349 sout | "Stopping connection threads..." | nonl; flush( sout ); 350 for(i; options.clopts.nworkers) { 351 for(j; 2) { 352 ret = close(aworkers[i].conn.pipe[j]); 353 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 354 } 355 join(aworkers[i]); 356 } 357 } 326 358 } 327 359 sout | "done"; … … 331 363 sout | "done"; 332 364 365 sout | "Stopping printer threads..." | nonl; flush( sout ); 366 if(stats_thrd) { 367 notify_one(stats_thrd->var); 368 } 369 delete(stats_thrd); 370 sout | "done"; 371 372 // Now that the stats printer is stopped, we can reclaim this 373 adelete(aworkers); 374 adelete(cworkers); 375 adelete(acceptors); 376 adelete(queues); 377 free(conns); 378 333 379 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 334 380 } 335 381 sout | "done"; 336 382 337 sout | "Closing splice fds..." | nonl; flush( sout );338 for(i; pipe_cnt) {339 ret = close( fds[pipe_off + i] );340 if(ret < 0) {341 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );342 }343 }344 383 free(fds); 345 sout | "done";346 384 347 385 sout | "Stopping processors..." | nonl; flush( sout ); -
benchmark/io/http/options.cfa
r1df492a reb5962a 35 35 36 36 { // socket 37 8080, // port 38 10, // backlog 39 1024 // buflen 37 8080, // port 38 10, // backlog 39 1024, // buflen 40 false // reuseport 40 41 }, 41 42 … … 52 53 53 54 void parse_options( int argc, char * argv[] ) { 54 // bool fixedfd = false;55 // bool sqkpoll = false;56 // bool iokpoll = false;57 55 unsigned nentries = 0; 58 56 bool isolate = false; … … 70 68 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 71 69 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 70 {'\0', "reuseport", "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue}, 72 71 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 73 72 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, -
benchmark/io/http/options.hfa
r1df492a reb5962a 27 27 int backlog; 28 28 int buflen; 29 bool reuseport; 29 30 } socket; 30 31 -
benchmark/io/http/protocol.cfa
r1df492a reb5962a 30 30 #define PLAINTEXT_NOCOPY 31 31 #define LINKED_IO 32 33 static inline __s32 wait_res( io_future_t & this ) { 34 wait( this ); 35 if( this.result < 0 ) {{ 36 errno = -this.result; 37 return -1; 38 }} 39 return this.result; 40 } 32 41 33 42 struct https_msg_str { … … 470 479 471 480 if(is_error(splice_in.res)) { 481 if(splice_in.res.error == -EPIPE) return -ECONNRESET; 472 482 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 473 483 close(fd); … … 503 513 } 504 514 505 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len) {515 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) { 506 516 char * it = buffer; 507 517 size_t count = len - 1; … … 509 519 READ: 510 520 for() { 511 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 521 int ret; 522 if( f ) { 523 ret = wait_res(*f); 524 reset(*f); 525 f = 0p; 526 } else { 527 ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 528 } 512 529 // int ret = read(fd, (void*)it, count); 513 530 if(ret == 0 ) return [OK200, true, 0, 0]; … … 570 587 571 588 void ?{}( DateFormater & this ) { 572 ((thread&)this){ "Server Date Thread" , *options.clopts.instance[0]};589 ((thread&)this){ "Server Date Thread" }; 573 590 this.idx = 0; 574 591 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); -
benchmark/io/http/protocol.hfa
r1df492a reb5962a 1 1 #pragma once 2 2 3 struct io_future_t; 3 4 struct sendfile_stats_t; 4 5 … … 22 23 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 23 24 24 [HttpCode code, bool closed, * const char file, size_t len] http_read( int fd, []char buffer, size_t len);25 [HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f); -
benchmark/io/http/worker.cfa
r1df492a reb5962a 8 8 #include <fstream.hfa> 9 9 #include <iofwd.hfa> 10 #include <mutex_stmt.hfa> 10 11 11 12 #include "options.hfa" … … 14 15 15 16 //============================================================================================= 16 // Worker Thread 17 //============================================================================================= 18 void ?{}( Worker & this ) { 17 // Generic connection handling 18 //============================================================================================= 19 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) { 20 REQUEST: 21 for() { 22 bool closed; 23 HttpCode code; 24 const char * file; 25 size_t name_size; 26 27 // Read the http request 28 if( options.log ) mutex(sout) sout | "=== Reading request ==="; 29 [code, closed, file, name_size] = http_read(fd, buffer, len, f); 30 f = 0p; 31 32 // if we are done, break out of the loop 33 if( closed ) break REQUEST; 34 35 // If this wasn't a request retrun 400 36 if( code != OK200 ) { 37 sout | "=== Invalid Request :" | code_val(code) | "==="; 38 answer_error(fd, code); 39 continue REQUEST; 40 } 41 42 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 43 if( options.log ) mutex(sout) sout | "=== Request for /plaintext ==="; 44 45 int ret = answer_plaintext(fd); 46 if( ret == -ECONNRESET ) break REQUEST; 47 48 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 49 continue REQUEST; 50 } 51 52 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 53 if( options.log ) mutex(sout) sout | "=== Request for /ping ==="; 54 55 // Send the header 56 int ret = answer_empty(fd); 57 if( ret == -ECONNRESET ) break REQUEST; 58 59 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 60 continue REQUEST; 61 } 62 63 if( options.log ) { 64 sout | "=== Request for file " | nonl; 65 write(sout, file, name_size); 66 sout | " ==="; 67 } 68 69 if( !options.file_cache.path ) { 70 if( options.log ) { 71 sout | "=== File Not Found (" | nonl; 72 write(sout, file, name_size); 73 sout | ") ==="; 74 } 75 answer_error(fd, E405); 76 continue REQUEST; 77 } 78 79 // Get the fd from the file cache 80 int ans_fd; 81 size_t count; 82 [ans_fd, count] = get_file( file, name_size ); 83 84 // If we can't find the file, return 404 85 if( ans_fd < 0 ) { 86 if( options.log ) { 87 sout | "=== File Not Found (" | nonl; 88 write(sout, file, name_size); 89 sout | ") ==="; 90 } 91 answer_error(fd, E404); 92 continue REQUEST; 93 } 94 95 // Send the desired file 96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 97 if( ret == -ECONNRESET ) break REQUEST; 98 99 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 100 } 101 102 if (stats_thrd) { 103 unsigned long long next = rdtscl(); 104 if(next > (last + 500000000)) { 105 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 106 push(this.stats.sendfile, stats_thrd->stats.send); 107 unlock(stats_thrd->stats.lock); 108 last = next; 109 } 110 } 111 } 112 } 113 114 //============================================================================================= 115 // Self Accepting Worker Thread 116 //============================================================================================= 117 void ?{}( AcceptWorker & this ) { 19 118 size_t cli = rand() % options.clopts.cltr_cnt; 20 119 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 21 120 options.clopts.thrd_cnt[cli]++; 22 this.pipe[0] = -1;23 this.pipe[1] = -1;24 121 this.done = false; 25 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 34 } 35 } 36 37 extern "C" { 38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 39 } 40 41 void main( Worker & this ) { 122 } 123 124 void main( AcceptWorker & this ) { 42 125 park(); 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 CONNECTION: 47 for() { 48 if( options.log ) sout | "=== Accepting connection ==="; 49 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY ); 126 unsigned long long last = rdtscl(); 127 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 128 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 129 for() { 130 if( options.log ) mutex(sout) sout | "=== Accepting connection ==="; 131 int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY ); 50 132 if(fd < 0) { 51 133 if( errno == ECONNABORTED ) break; … … 55 137 if(this.done) break; 56 138 139 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 140 size_t len = options.socket.buflen; 141 char buffer[len]; 142 handle_connection( this.conn, fd, buffer, len, 0p, last ); 143 144 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 145 } 146 } 147 148 149 //============================================================================================= 150 // Channel Worker Thread 151 //============================================================================================= 152 void ?{}( ChannelWorker & this ) { 153 size_t cli = rand() % options.clopts.cltr_cnt; 154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 155 options.clopts.thrd_cnt[cli]++; 156 this.done = false; 157 } 158 159 void main( ChannelWorker & this ) { 160 park(); 161 unsigned long long last = rdtscl(); 162 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 163 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 164 for() { 165 size_t len = options.socket.buflen; 166 char buffer[len]; 167 PendingRead p; 168 p.next = 0p; 169 p.in.buf = (void*)buffer; 170 p.in.len = len; 171 push(*this.queue, &p); 172 173 if( options.log ) mutex(sout) sout | "=== Waiting new connection ==="; 174 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last ); 175 176 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 177 if(this.done) break; 178 } 179 } 180 181 extern "C" { 182 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 183 } 184 185 void ?{}( Acceptor & this, int cli ) { 186 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 }; 187 options.clopts.thrd_cnt[cli]++; 188 this.done = false; 189 } 190 191 static inline __s32 get_res( io_future_t & this ) { 192 if( this.result < 0 ) {{ 193 errno = -this.result; 194 return -1; 195 }} 196 return this.result; 197 } 198 199 static inline void push_connection( Acceptor & this, int fd ) { 200 PendingRead * p = 0p; 201 for() { 202 if(this.done) return; 203 p = pop(*this.queue); 204 if(p) break; 205 yield(); 206 this.stats.creates++; 207 }; 208 209 p->out.fd = fd; 210 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY); 211 } 212 213 // #define ACCEPT_SPIN 214 #define ACCEPT_MANY 215 216 void main( Acceptor & this ) { 217 park(); 218 unsigned long long last = rdtscl(); 219 220 #if defined(ACCEPT_SPIN) 221 if( options.log ) sout | "=== Accepting connection ==="; 222 for() { 223 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]); 224 if(fd < 0) { 225 if( errno == EWOULDBLOCK) { 226 this.stats.eagains++; 227 yield(); 228 continue; 229 } 230 if( errno == ECONNABORTED ) break; 231 if( this.done && (errno == EINVAL || errno == EBADF) ) break; 232 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 233 } 234 this.stats.accepts++; 235 236 if(this.done) return; 237 57 238 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 58 REQUEST: 59 for() { 60 bool closed; 61 HttpCode code; 62 const char * file; 63 size_t name_size; 64 65 // Read the http request 66 size_t len = options.socket.buflen; 67 char buffer[len]; 68 if( options.log ) sout | "=== Reading request ==="; 69 [code, closed, file, name_size] = http_read(fd, buffer, len); 70 71 // if we are done, break out of the loop 72 if( closed ) break REQUEST; 73 74 // If this wasn't a request retrun 400 75 if( code != OK200 ) { 76 sout | "=== Invalid Request :" | code_val(code) | "==="; 77 answer_error(fd, code); 78 continue REQUEST; 79 } 80 81 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 82 if( options.log ) sout | "=== Request for /plaintext ==="; 83 84 int ret = answer_plaintext(fd); 85 if( ret == -ECONNRESET ) break REQUEST; 86 87 if( options.log ) sout | "=== Answer sent ==="; 88 continue REQUEST; 89 } 90 91 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 92 if( options.log ) sout | "=== Request for /ping ==="; 93 94 // Send the header 95 int ret = answer_empty(fd); 96 if( ret == -ECONNRESET ) break REQUEST; 97 98 if( options.log ) sout | "=== Answer sent ==="; 99 continue REQUEST; 100 } 101 102 if( options.log ) { 103 sout | "=== Request for file " | nonl; 104 write(sout, file, name_size); 105 sout | " ==="; 106 } 107 108 if( !options.file_cache.path ) { 109 if( options.log ) { 110 sout | "=== File Not Found (" | nonl; 111 write(sout, file, name_size); 112 sout | ") ==="; 239 240 if(fd) push_connection(this, fd); 241 242 if (stats_thrd) { 243 unsigned long long next = rdtscl(); 244 if(next > (last + 500000000)) { 245 if(try_lock(stats_thrd->stats.lock)) { 246 push(this.stats, stats_thrd->stats.accpt); 247 unlock(stats_thrd->stats.lock); 248 last = next; 113 249 } 114 answer_error(fd, E405); 115 continue REQUEST; 116 } 117 118 // Get the fd from the file cache 119 int ans_fd; 120 size_t count; 121 [ans_fd, count] = get_file( file, name_size ); 122 123 // If we can't find the file, return 404 124 if( ans_fd < 0 ) { 125 if( options.log ) { 126 sout | "=== File Not Found (" | nonl; 127 write(sout, file, name_size); 128 sout | ") ==="; 250 } 251 } 252 253 if( options.log ) sout | "=== Accepting connection ==="; 254 } 255 256 #elif defined(ACCEPT_MANY) 257 const int nacc = 10; 258 io_future_t results[nacc]; 259 260 for(i; nacc) { 261 io_future_t & res = results[i]; 262 reset(res); 263 /* paranoid */ assert(!available(res)); 264 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ==="; 265 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY); 266 } 267 268 for() { 269 if (stats_thrd) { 270 unsigned long long next = rdtscl(); 271 if(next > (last + 500000000)) { 272 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 273 push(this.stats, stats_thrd->stats.accpt); 274 unlock(stats_thrd->stats.lock); 275 last = next; 129 276 } 130 answer_error(fd, E404); 131 continue REQUEST; 132 } 133 134 // Send the desired file 135 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 136 if( ret == -ECONNRESET ) break REQUEST; 137 138 if( options.log ) sout | "=== Answer sent ==="; 139 } 140 141 if( options.log ) sout | "=== Connection closed ==="; 142 continue CONNECTION; 143 } 144 } 277 } 278 } 279 280 for(i; nacc) { 281 io_future_t & res = results[i]; 282 if(available(res)) { 283 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "==="; 284 int fd = get_res(res); 285 reset(res); 286 this.stats.accepts++; 287 if(fd < 0) { 288 if( errno == ECONNABORTED ) continue; 289 if( this.done && (errno == EINVAL || errno == EBADF) ) continue; 290 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) ); 291 } 292 push_connection( this, fd ); 293 294 /* paranoid */ assert(!available(res)); 295 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ==="; 296 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY); 297 } 298 } 299 if(this.done) return; 300 301 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ==="; 302 this.stats.eagains++; 303 wait_any(results, nacc); 304 305 if( options.log ) mutex(sout) { 306 sout | "=== Acceptor wake-up ==="; 307 for(i; nacc) { 308 io_future_t & res = results[i]; 309 sout | i | "available:" | available(res); 310 } 311 } 312 313 } 314 315 for(i; nacc) { 316 wait(results[i]); 317 } 318 #else 319 #error no accept algorithm specified 320 #endif 321 } -
benchmark/io/http/worker.hfa
r1df492a reb5962a 1 1 #pragma once 2 2 3 #include <iofwd.hfa> 4 #include <queueLockFree.hfa> 3 5 #include <thread.hfa> 4 6 … … 7 9 } 8 10 11 #include "printer.hfa" 12 9 13 //============================================================================================= 10 14 // Worker Thread 11 15 //============================================================================================= 12 16 13 extern const size_t zipf_sizes[]; 14 enum { zipf_cnts = 36, }; 15 16 struct sendfile_stats_t { 17 volatile uint64_t calls; 18 volatile uint64_t tries; 19 volatile uint64_t header; 20 volatile uint64_t splcin; 21 volatile uint64_t splcot; 17 struct connection { 18 int pipe[2]; 22 19 struct { 23 volatile uint64_t calls; 24 volatile uint64_t bytes; 25 } avgrd[zipf_cnts]; 20 sendfile_stats_t sendfile; 21 } stats; 26 22 }; 27 23 28 thread Worker { 29 int pipe[2]; 24 static inline void ?{}( connection & this ) { 25 this.pipe[0] = -1; 26 this.pipe[1] = -1; 27 } 28 29 thread AcceptWorker { 30 connection conn; 30 31 int sockfd; 31 32 struct sockaddr * addr; … … 33 34 int flags; 34 35 volatile bool done; 36 }; 37 void ?{}( AcceptWorker & this); 38 void main( AcceptWorker & ); 39 40 41 struct PendingRead { 42 PendingRead * volatile next; 43 io_future_t f; 35 44 struct { 36 sendfile_stats_t sendfile; 37 } stats; 45 void * buf; 46 size_t len; 47 } in; 48 struct { 49 volatile int fd; 50 } out; 38 51 }; 39 void ?{}( Worker & this); 40 void main( Worker & ); 52 53 static inline PendingRead * volatile & ?`next ( PendingRead * node ) { 54 return node->next; 55 } 56 57 thread ChannelWorker { 58 connection conn; 59 volatile bool done; 60 mpsc_queue(PendingRead) * queue; 61 }; 62 void ?{}( ChannelWorker & ); 63 void main( ChannelWorker & ); 64 65 thread Acceptor { 66 mpsc_queue(PendingRead) * queue; 67 int sockfd; 68 struct sockaddr * addr; 69 socklen_t * addrlen; 70 int flags; 71 volatile bool done; 72 acceptor_stats_t stats; 73 }; 74 void ?{}( Acceptor &, int cli ); 75 void main( Acceptor & ); -
libcfa/src/bits/locks.hfa
r1df492a reb5962a 26 26 // Wrap in struct to prevent false sharing with debug info 27 27 volatile bool lock; 28 #ifdef __CFA_DEBUG__29 // previous function to acquire the lock30 const char * prev_name;31 // previous thread to acquire the lock32 void* prev_thrd;33 // keep track of number of times we had to spin, just in case the number is unexpectedly huge34 size_t spin_count;35 #endif36 28 }; 37 29 … … 40 32 extern void disable_interrupts() OPTIONAL_THREAD; 41 33 extern void enable_interrupts( bool poll = true ) OPTIONAL_THREAD; 42 43 #ifdef __CFA_DEBUG__ 44 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]); 45 #else 46 #define __cfaabi_dbg_record_lock(x, y) 47 #endif 34 #define __cfaabi_dbg_record_lock(x, y) 48 35 } 49 36 50 37 static inline void ?{}( __spinlock_t & this ) { 51 38 this.lock = 0; 52 #ifdef __CFA_DEBUG__53 this.spin_count = 0;54 #endif55 39 } 56 40 … … 77 61 for ( unsigned int i = 1;; i += 1 ) { 78 62 if ( (this.lock == 0) && (__atomic_test_and_set( &this.lock, __ATOMIC_ACQUIRE ) == 0) ) break; 79 #ifdef __CFA_DEBUG__80 this.spin_count++;81 #endif82 63 #ifndef NOEXPBACK 83 64 // exponential spin -
libcfa/src/concurrency/invoke.h
r1df492a reb5962a 195 195 struct __monitor_group_t monitors; 196 196 197 // used to put threads on user data structures198 struct {199 struct thread$ * next;200 struct thread$ * back;201 } seqable;202 203 197 // used to put threads on dlist data structure 204 198 __cfa_dlink(thread$); … … 208 202 struct thread$ * prev; 209 203 } node; 204 205 // used to store state between clh lock/unlock 206 volatile bool * clh_prev; 207 208 // used to point to this thd's current clh node 209 volatile bool * clh_node; 210 210 211 211 struct processor * last_proc; … … 240 240 } 241 241 242 static inline thread$ * volatile & ?`next ( thread$ * this ) __attribute__((const)) {243 return this->seqable.next;244 }245 246 static inline thread$ *& Back( thread$ * this ) __attribute__((const)) {247 return this->seqable.back;248 }249 250 static inline thread$ *& Next( thread$ * this ) __attribute__((const)) {251 return this->seqable.next;252 }253 254 static inline bool listed( thread$ * this ) {255 return this->seqable.next != 0p;256 }257 258 242 static inline void ?{}(__monitor_group_t & this) { 259 243 (this.data){0p}; -
libcfa/src/concurrency/io.cfa
r1df492a reb5962a 159 159 160 160 const __u32 mask = *ctx->cq.mask; 161 const __u32 num = *ctx->cq.num; 161 162 unsigned long long ts_prev = ctx->cq.ts; 162 163 // re-read the head and tail in case it already changed. 164 const __u32 head = *ctx->cq.head; 165 const __u32 tail = *ctx->cq.tail; 166 const __u32 count = tail - head; 167 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 168 169 for(i; count) { 170 unsigned idx = (head + i) & mask; 171 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 172 173 /* paranoid */ verify(&cqe); 174 175 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 176 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 177 178 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 179 } 180 181 unsigned long long ts_next = ctx->cq.ts = rdtscl(); 182 183 // Mark to the kernel that the cqe has been seen 184 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 185 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 186 ctx->proc->idle_wctx.drain_time = ts_next; 163 unsigned long long ts_next; 164 165 // We might need to do this multiple times if more events completed than can fit in the queue. 166 for() { 167 // re-read the head and tail in case it already changed. 168 const __u32 head = *ctx->cq.head; 169 const __u32 tail = *ctx->cq.tail; 170 const __u32 count = tail - head; 171 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 172 173 for(i; count) { 174 unsigned idx = (head + i) & mask; 175 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 176 177 /* paranoid */ verify(&cqe); 178 179 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 180 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 181 182 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 183 } 184 185 ts_next = ctx->cq.ts = rdtscl(); 186 187 // Mark to the kernel that the cqe has been seen 188 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 189 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 190 ctx->proc->idle_wctx.drain_time = ts_next; 191 192 if(likely(count < num)) break; 193 194 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 195 } 187 196 188 197 __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next); -
libcfa/src/concurrency/io/setup.cfa
r1df492a reb5962a 138 138 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; 139 139 if( !is_pow2(nentries) ) { 140 abort("ERROR: I/O setup 'num_entries' must be a power of 2 \n");140 abort("ERROR: I/O setup 'num_entries' must be a power of 2, was %u\n", nentries); 141 141 } 142 142 -
libcfa/src/concurrency/iofwd.hfa
r1df492a reb5962a 76 76 void reset ( io_future_t & this ) { return reset (this.self); } 77 77 bool available( io_future_t & this ) { return available(this.self); } 78 bool setup ( io_future_t & this, oneshot & ctx ) { return setup (this.self, ctx); } 79 bool retract ( io_future_t & this, oneshot & ctx ) { return retract(this.self, ctx); } 78 80 } 79 81 -
libcfa/src/concurrency/kernel.cfa
r1df492a reb5962a 834 834 #endif 835 835 836 837 838 //-----------------------------------------------------------------------------839 // Debug840 __cfaabi_dbg_debug_do(841 extern "C" {842 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]) {843 this.prev_name = prev_name;844 this.prev_thrd = kernelTLS().this_thread;845 }846 }847 )848 849 836 //----------------------------------------------------------------------------- 850 837 // Debug -
libcfa/src/concurrency/kernel/fwd.hfa
r1df492a reb5962a 200 200 struct thread$ * expected = this.ptr; 201 201 if(expected == 1p) return false; 202 /* paranoid */ verify( expected == 0p );203 202 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 204 203 park(); … … 213 212 thread$ * post(oneshot & this, bool do_unpark = true) { 214 213 struct thread$ * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 215 if( got == 0p ) return 0p;214 if( got == 0p || got == 1p ) return 0p; 216 215 if(do_unpark) unpark( got ); 217 216 return got; … … 263 262 264 263 // The future is not fulfilled, try to setup the wait context 265 /* paranoid */ verify( expected == 0p );266 264 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 267 265 return true; … … 275 273 // should retract the wait ctx 276 274 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 277 void retract( future_t & this, oneshot & wait_ctx ) { 278 // Remove the wait context 279 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 280 281 // got == 0p: future was never actually setup, just return 282 if( got == 0p ) return; 283 284 // got == wait_ctx: since fulfil does an atomic_swap, 285 // if we got back the original then no one else saw context 286 // It is safe to delete (which could happen after the return) 287 if( got == &wait_ctx ) return; 288 289 // got == 1p: the future is ready and the context was fully consumed 290 // the server won't use the pointer again 291 // It is safe to delete (which could happen after the return) 292 if( got == 1p ) return; 293 294 // got == 2p: the future is ready but the context hasn't fully been consumed 295 // spin until it is safe to move on 296 if( got == 2p ) { 297 while( this.ptr != 1p ) Pause(); 298 return; 299 } 300 301 // got == any thing else, something wen't wrong here, abort 302 abort("Future in unexpected state"); 275 bool retract( future_t & this, oneshot & wait_ctx ) { 276 for() { 277 struct oneshot * expected = this.ptr; 278 279 // expected == 0p: future was never actually setup, just return 280 if( expected == 0p ) return false; 281 282 // expected == 1p: the future is ready and the context was fully consumed 283 // the server won't use the pointer again 284 // It is safe to delete (which could happen after the return) 285 if( expected == 1p ) return true; 286 287 // expected == 2p: the future is ready but the context hasn't fully been consumed 288 // spin until it is safe to move on 289 if( expected == 2p ) { 290 while( this.ptr != 1p ) Pause(); 291 /* paranoid */ verify( this.ptr == 1p ); 292 return true; 293 } 294 295 // expected != wait_ctx: the future was setup with a different context ?!?! 296 // something went wrong here, abort 297 if( expected != &wait_ctx ) abort("Future in unexpected state"); 298 299 // we still have the original context, then no one else saw it 300 // attempt to remove the context so it doesn't get consumed. 301 if(__atomic_compare_exchange_n( &this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 302 return false; 303 } 304 } 303 305 } 304 306 … … 379 381 return ret; 380 382 } 383 384 // Wait for any future to be fulfilled 385 forall(T& | sized(T) | { bool setup( T&, oneshot & ); bool retract( T&, oneshot & ); }) 386 T & wait_any( T * futures, size_t num_futures ) { 387 oneshot temp; 388 389 // setup all futures 390 // if any are already satisfied return 391 for ( i; num_futures ) { 392 if( !setup(futures[i], temp) ) return futures[i]; 393 } 394 395 // Wait context is setup, just wait on it 396 wait( temp ); 397 398 size_t ret; 399 // attempt to retract all futures 400 for ( i; num_futures ) { 401 if ( retract( futures[i], temp ) ) ret = i; 402 } 403 404 return futures[ret]; 405 } 381 406 } 382 407 -
libcfa/src/concurrency/locks.cfa
r1df492a reb5962a 219 219 // this casts the alarm node to our wrapped type since we used type erasure 220 220 static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); } 221 222 struct pthread_alarm_node_wrap { 223 alarm_node_t alarm_node; 224 pthread_cond_var(L) * cond; 225 info_thread(L) * info_thd; 226 }; 227 228 void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) { 229 this.alarm_node{ callback, alarm, period }; 230 this.cond = c; 231 this.info_thd = i; 232 } 233 234 void ^?{}( pthread_alarm_node_wrap(L) & this ) { } 235 236 static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) { 237 // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin. 238 lock( cond->lock __cfaabi_dbg_ctx2 ); 239 240 // this check is necessary to avoid a race condition since this timeout handler 241 // may still be called after a thread has been removed from the queue but 242 // before the alarm is unregistered 243 if ( (*info_thd)`isListed ) { // is thread on queue 244 info_thd->signalled = false; 245 // remove this thread O(1) 246 remove( *info_thd ); 247 on_notify(*info_thd->lock, info_thd->t); 248 } 249 unlock( cond->lock ); 250 } 251 252 // this casts the alarm node to our wrapped type since we used type erasure 253 static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); } 221 254 } 222 255 … … 388 421 on_wakeup(*i.lock, recursion_count); 389 422 } 390 } 391 423 424 //----------------------------------------------------------------------------- 425 // pthread_cond_var 426 427 void ?{}( pthread_cond_var(L) & this ) with(this) { 428 blocked_threads{}; 429 lock{}; 430 } 431 432 void ^?{}( pthread_cond_var(L) & this ) { } 433 434 bool notify_one( pthread_cond_var(L) & this ) with(this) { 435 lock( lock __cfaabi_dbg_ctx2 ); 436 bool ret = ! blocked_threads`isEmpty; 437 if ( ret ) { 438 info_thread(L) & popped = try_pop_front( blocked_threads ); 439 on_notify(*popped.lock, popped.t); 440 } 441 unlock( lock ); 442 return ret; 443 } 444 445 bool notify_all( pthread_cond_var(L) & this ) with(this) { 446 lock( lock __cfaabi_dbg_ctx2 ); 447 bool ret = ! blocked_threads`isEmpty; 448 while( ! blocked_threads`isEmpty ) { 449 info_thread(L) & popped = try_pop_front( blocked_threads ); 450 on_notify(*popped.lock, popped.t); 451 } 452 unlock( lock ); 453 return ret; 454 } 455 456 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; } 457 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 458 459 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) { 460 // add info_thread to waiting queue 461 insert_last( blocked_threads, *i ); 462 size_t recursion_count = 0; 463 recursion_count = on_wait( *i->lock ); 464 return recursion_count; 465 } 466 467 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 468 lock( lock __cfaabi_dbg_ctx2 ); 469 size_t recursion_count = queue_and_get_recursion(this, &info); 470 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 471 register_self( &node_wrap.alarm_node ); 472 unlock( lock ); 473 474 // blocks here 475 park(); 476 477 // unregisters alarm so it doesn't go off if this happens first 478 unregister_self( &node_wrap.alarm_node ); 479 480 // resets recursion count here after waking 481 if (info.lock) on_wakeup(*info.lock, recursion_count); 482 } 483 484 void wait( pthread_cond_var(L) & this, L & l ) with(this) { 485 wait( this, l, 0 ); 486 } 487 488 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) { 489 lock( lock __cfaabi_dbg_ctx2 ); 490 info_thread( L ) i = { active_thread(), info, &l }; 491 size_t recursion_count = queue_and_get_recursion(this, &i); 492 unlock( lock ); 493 park( ); 494 on_wakeup(*i.lock, recursion_count); 495 } 496 497 #define PTHREAD_WAIT_TIME( u, l, t ) \ 498 info_thread( L ) i = { active_thread(), u, l }; \ 499 queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \ 500 return i.signalled; 501 502 bool wait( pthread_cond_var(L) & this, L & l, timespec t ) { 503 Duration d = { t }; 504 WAIT_TIME( 0, &l , d ) 505 } 506 507 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) { 508 Duration d = { t }; 509 WAIT_TIME( info, &l , d ) 510 } 511 } 392 512 //----------------------------------------------------------------------------- 393 513 // Semaphore -
libcfa/src/concurrency/locks.hfa
r1df492a reb5962a 101 101 102 102 //----------------------------------------------------------------------------- 103 // MCS Spin Lock 104 // - No recursive acquisition 105 // - Needs to be released by owner 106 107 struct mcs_spin_node { 108 mcs_spin_node * volatile next; 109 volatile bool locked; 110 }; 111 112 struct mcs_spin_queue { 113 mcs_spin_node * volatile tail; 114 }; 115 116 static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; } 117 118 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) { 119 return node->next; 120 } 121 122 struct mcs_spin_lock { 123 mcs_spin_queue queue; 124 }; 125 126 static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) { 127 mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST); 128 n.locked = true; 129 if(prev == 0p) return; 130 prev->next = &n; 131 while(__atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause(); 132 } 133 134 static inline void unlock(mcs_spin_lock & l, mcs_spin_node & n) { 135 mcs_spin_node * n_ptr = &n; 136 if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return; 137 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {} 138 n.next->locked = false; 139 } 140 141 //----------------------------------------------------------------------------- 142 // CLH Spinlock 143 // - No recursive acquisition 144 // - Needs to be released by owner 145 146 struct clh_lock { 147 volatile bool * volatile tail; 148 }; 149 150 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; } 151 static inline void ^?{}( clh_lock & this ) { free(this.tail); } 152 153 static inline void lock(clh_lock & l) { 154 thread$ * curr_thd = active_thread(); 155 *(curr_thd->clh_node) = false; 156 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST); 157 while(!__atomic_load_n(prev, __ATOMIC_ACQUIRE)) Pause(); 158 curr_thd->clh_prev = prev; 159 } 160 161 static inline void unlock(clh_lock & l) { 162 thread$ * curr_thd = active_thread(); 163 __atomic_store_n(curr_thd->clh_node, true, __ATOMIC_RELEASE); 164 curr_thd->clh_node = curr_thd->clh_prev; 165 } 166 167 //----------------------------------------------------------------------------- 103 168 // Linear backoff Spinlock 104 169 struct linear_backoff_then_block_lock { … … 205 270 // Fast Block Lock 206 271 207 // High efficiencyminimal blocking lock272 // minimal blocking lock 208 273 // - No reacquire for cond var 209 274 // - No recursive acquisition 210 275 // - No ownership 211 276 struct fast_block_lock { 277 // List of blocked threads 278 dlist( thread$ ) blocked_threads; 279 212 280 // Spin lock used for mutual exclusion 213 281 __spinlock_t lock; 214 282 215 // List of blocked threads 216 dlist( thread$ ) blocked_threads; 217 283 // flag showing if lock is held 218 284 bool held:1; 285 286 #ifdef __CFA_DEBUG__ 287 // for deadlock detection 288 struct thread$ * owner; 289 #endif 219 290 }; 220 291 … … 231 302 static inline void lock(fast_block_lock & this) with(this) { 232 303 lock( lock __cfaabi_dbg_ctx2 ); 304 305 #ifdef __CFA_DEBUG__ 306 assert(!(held && owner == active_thread())); 307 #endif 233 308 if (held) { 234 309 insert_last( blocked_threads, *active_thread() ); … … 238 313 } 239 314 held = true; 315 #ifdef __CFA_DEBUG__ 316 owner = active_thread(); 317 #endif 240 318 unlock( lock ); 241 319 } … … 246 324 thread$ * t = &try_pop_front( blocked_threads ); 247 325 held = ( t ? true : false ); 326 #ifdef __CFA_DEBUG__ 327 owner = ( t ? t : 0p ); 328 #endif 248 329 unpark( t ); 249 330 unlock( lock ); … … 253 334 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 254 335 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } 336 337 //----------------------------------------------------------------------------- 338 // simple_owner_lock 339 340 // pthread owner lock 341 // - reacquire for cond var 342 // - recursive acquisition 343 // - ownership 344 struct simple_owner_lock { 345 // List of blocked threads 346 dlist( thread$ ) blocked_threads; 347 348 // Spin lock used for mutual exclusion 349 __spinlock_t lock; 350 351 // owner showing if lock is held 352 struct thread$ * owner; 353 354 size_t recursion_count; 355 }; 356 357 static inline void ?{}( simple_owner_lock & this ) with(this) { 358 lock{}; 359 blocked_threads{}; 360 owner = 0p; 361 recursion_count = 0; 362 } 363 static inline void ^?{}( simple_owner_lock & this ) {} 364 static inline void ?{}( simple_owner_lock & this, simple_owner_lock this2 ) = void; 365 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void; 366 367 static inline void lock(simple_owner_lock & this) with(this) { 368 if (owner == active_thread()) { 369 recursion_count++; 370 return; 371 } 372 lock( lock __cfaabi_dbg_ctx2 ); 373 374 if (owner != 0p) { 375 insert_last( blocked_threads, *active_thread() ); 376 unlock( lock ); 377 park( ); 378 return; 379 } 380 owner = active_thread(); 381 recursion_count = 1; 382 unlock( lock ); 383 } 384 385 // TODO: fix duplicate def issue and bring this back 386 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) { 387 // thread$ * t = &try_pop_front( blocked_threads ); 388 // owner = t; 389 // recursion_count = ( t ? 1 : 0 ); 390 // unpark( t ); 391 // } 392 393 static inline void unlock(simple_owner_lock & this) with(this) { 394 lock( lock __cfaabi_dbg_ctx2 ); 395 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); 396 /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this ); 397 // if recursion count is zero release lock and set new owner if one is waiting 398 recursion_count--; 399 if ( recursion_count == 0 ) { 400 // pop_and_set_new_owner( this ); 401 thread$ * t = &try_pop_front( blocked_threads ); 402 owner = t; 403 recursion_count = ( t ? 1 : 0 ); 404 unpark( t ); 405 } 406 unlock( lock ); 407 } 408 409 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) { 410 lock( lock __cfaabi_dbg_ctx2 ); 411 // lock held 412 if ( owner != 0p ) { 413 insert_last( blocked_threads, *t ); 414 unlock( lock ); 415 } 416 // lock not held 417 else { 418 owner = t; 419 recursion_count = 1; 420 unpark( t ); 421 unlock( lock ); 422 } 423 } 424 425 static inline size_t on_wait(simple_owner_lock & this) with(this) { 426 lock( lock __cfaabi_dbg_ctx2 ); 427 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); 428 /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this ); 429 430 size_t ret = recursion_count; 431 432 // pop_and_set_new_owner( this ); 433 434 thread$ * t = &try_pop_front( blocked_threads ); 435 owner = t; 436 recursion_count = ( t ? 1 : 0 ); 437 unpark( t ); 438 439 unlock( lock ); 440 return ret; 441 } 442 443 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; } 444 445 //----------------------------------------------------------------------------- 446 // Spin Queue Lock 447 448 // - No reacquire for cond var 449 // - No recursive acquisition 450 // - No ownership 451 // - spin lock with no locking/atomics in unlock 452 struct spin_queue_lock { 453 // Spin lock used for mutual exclusion 454 mcs_spin_lock lock; 455 456 // flag showing if lock is held 457 volatile bool held; 458 459 #ifdef __CFA_DEBUG__ 460 // for deadlock detection 461 struct thread$ * owner; 462 #endif 463 }; 464 465 static inline void ?{}( spin_queue_lock & this ) with(this) { 466 lock{}; 467 held = false; 468 } 469 static inline void ^?{}( spin_queue_lock & this ) {} 470 static inline void ?{}( spin_queue_lock & this, spin_queue_lock this2 ) = void; 471 static inline void ?=?( spin_queue_lock & this, spin_queue_lock this2 ) = void; 472 473 // if this is called recursively IT WILL DEADLOCK!!!!! 474 static inline void lock(spin_queue_lock & this) with(this) { 475 mcs_spin_node node; 476 #ifdef __CFA_DEBUG__ 477 assert(!(held && owner == active_thread())); 478 #endif 479 lock( lock, node ); 480 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause(); 481 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST); 482 unlock( lock, node ); 483 #ifdef __CFA_DEBUG__ 484 owner = active_thread(); 485 #endif 486 } 487 488 static inline void unlock(spin_queue_lock & this) with(this) { 489 #ifdef __CFA_DEBUG__ 490 owner = 0p; 491 #endif 492 __atomic_store_n(&held, false, __ATOMIC_RELEASE); 493 } 494 495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); } 496 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; } 497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { } 498 499 500 //----------------------------------------------------------------------------- 501 // MCS Block Spin Lock 502 503 // - No reacquire for cond var 504 // - No recursive acquisition 505 // - No ownership 506 // - Blocks but first node spins (like spin queue but blocking for not first thd) 507 struct mcs_block_spin_lock { 508 // Spin lock used for mutual exclusion 509 mcs_lock lock; 510 511 // flag showing if lock is held 512 volatile bool held; 513 514 #ifdef __CFA_DEBUG__ 515 // for deadlock detection 516 struct thread$ * owner; 517 #endif 518 }; 519 520 static inline void ?{}( mcs_block_spin_lock & this ) with(this) { 521 lock{}; 522 held = false; 523 } 524 static inline void ^?{}( mcs_block_spin_lock & this ) {} 525 static inline void ?{}( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void; 526 static inline void ?=?( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void; 527 528 // if this is called recursively IT WILL DEADLOCK!!!!! 529 static inline void lock(mcs_block_spin_lock & this) with(this) { 530 mcs_node node; 531 #ifdef __CFA_DEBUG__ 532 assert(!(held && owner == active_thread())); 533 #endif 534 lock( lock, node ); 535 while(held) Pause(); 536 held = true; 537 unlock( lock, node ); 538 #ifdef __CFA_DEBUG__ 539 owner = active_thread(); 540 #endif 541 } 542 543 static inline void unlock(mcs_block_spin_lock & this) with(this) { 544 #ifdef __CFA_DEBUG__ 545 owner = 0p; 546 #endif 547 held = false; 548 } 549 550 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); } 551 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; } 552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { } 553 554 //----------------------------------------------------------------------------- 555 // Block Spin Lock 556 557 // - No reacquire for cond var 558 // - No recursive acquisition 559 // - No ownership 560 // - Blocks but first node spins (like spin queue but blocking for not first thd) 561 struct block_spin_lock { 562 // Spin lock used for mutual exclusion 563 fast_block_lock lock; 564 565 // flag showing if lock is held 566 volatile bool held; 567 568 #ifdef __CFA_DEBUG__ 569 // for deadlock detection 570 struct thread$ * owner; 571 #endif 572 }; 573 574 static inline void ?{}( block_spin_lock & this ) with(this) { 575 lock{}; 576 held = false; 577 } 578 static inline void ^?{}( block_spin_lock & this ) {} 579 static inline void ?{}( block_spin_lock & this, block_spin_lock this2 ) = void; 580 static inline void ?=?( block_spin_lock & this, block_spin_lock this2 ) = void; 581 582 // if this is called recursively IT WILL DEADLOCK!!!!! 583 static inline void lock(block_spin_lock & this) with(this) { 584 #ifdef __CFA_DEBUG__ 585 assert(!(held && owner == active_thread())); 586 #endif 587 lock( lock ); 588 while(held) Pause(); 589 held = true; 590 unlock( lock ); 591 #ifdef __CFA_DEBUG__ 592 owner = active_thread(); 593 #endif 594 } 595 596 static inline void unlock(block_spin_lock & this) with(this) { 597 #ifdef __CFA_DEBUG__ 598 owner = 0p; 599 #endif 600 held = false; 601 } 602 603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); } 604 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; } 605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { } 255 606 256 607 //----------------------------------------------------------------------------- … … 332 683 // - signalling without holding branded lock is UNSAFE! 333 684 // - only allows usage of one lock, cond var is branded after usage 685 334 686 struct fast_cond_var { 335 687 // List of blocked threads 336 688 dlist( info_thread(L) ) blocked_threads; 337 338 689 #ifdef __CFA_DEBUG__ 339 690 L * lock_used; … … 341 692 }; 342 693 343 344 694 void ?{}( fast_cond_var(L) & this ); 345 695 void ^?{}( fast_cond_var(L) & this ); … … 349 699 350 700 uintptr_t front( fast_cond_var(L) & this ); 351 352 701 bool empty ( fast_cond_var(L) & this ); 353 702 354 703 void wait( fast_cond_var(L) & this, L & l ); 355 704 void wait( fast_cond_var(L) & this, L & l, uintptr_t info ); 356 } 705 706 707 //----------------------------------------------------------------------------- 708 // pthread_cond_var 709 // 710 // - cond var with minimal footprint 711 // - supports operations needed for phthread cond 712 713 struct pthread_cond_var { 714 dlist( info_thread(L) ) blocked_threads; 715 __spinlock_t lock; 716 }; 717 718 void ?{}( pthread_cond_var(L) & this ); 719 void ^?{}( pthread_cond_var(L) & this ); 720 721 bool notify_one( pthread_cond_var(L) & this ); 722 bool notify_all( pthread_cond_var(L) & this ); 723 724 uintptr_t front( pthread_cond_var(L) & this ); 725 bool empty ( pthread_cond_var(L) & this ); 726 727 void wait( pthread_cond_var(L) & this, L & l ); 728 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ); 729 bool wait( pthread_cond_var(L) & this, L & l, timespec t ); 730 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ); 731 } -
libcfa/src/concurrency/thread.cfa
r1df492a reb5962a 53 53 #endif 54 54 55 seqable.next = 0p;56 seqable.back = 0p;57 58 55 node.next = 0p; 59 56 node.prev = 0p; 57 58 clh_node = malloc( ); 59 *clh_node = false; 60 60 61 doregister(curr_cluster, this); 61 62 62 monitors{ &self_mon_p, 1, (fptr_t)0 }; 63 63 } … … 67 67 canary = 0xDEADDEADDEADDEADp; 68 68 #endif 69 free(clh_node); 69 70 unregister(curr_cluster, this); 70 71 ^self_cor{}; -
libcfa/src/containers/queueLockFree.hfa
r1df492a reb5962a 2 2 3 3 #include <assert.h> 4 5 #include <bits/defs.hfa> 4 6 5 7 forall( T &) { -
libcfa/src/startup.cfa
r1df492a reb5962a 63 63 64 64 struct __spinlock_t; 65 extern "C" {66 void __cfaabi_dbg_record_lock(struct __spinlock_t & this, const char prev_name[]) __attribute__(( weak )) libcfa_public {}67 }68 65 69 66 // Local Variables: // -
src/AST/Pass.impl.hpp
r1df492a reb5962a 182 182 183 183 // get the stmts/decls that will need to be spliced in 184 auto stmts_before = __pass::stmtsToAddBefore( core, 0 );185 auto stmts_after = __pass::stmtsToAddAfter ( core, 0 );186 auto decls_before = __pass::declsToAddBefore( core, 0 );187 auto decls_after = __pass::declsToAddAfter ( core, 0 );184 auto stmts_before = __pass::stmtsToAddBefore( core, 0 ); 185 auto stmts_after = __pass::stmtsToAddAfter ( core, 0 ); 186 auto decls_before = __pass::declsToAddBefore( core, 0 ); 187 auto decls_after = __pass::declsToAddAfter ( core, 0 ); 188 188 189 189 // These may be modified by subnode but most be restored once we exit this statemnet. … … 287 287 288 288 // get the stmts/decls that will need to be spliced in 289 auto stmts_before = __pass::stmtsToAddBefore( core, 0 );290 auto stmts_after = __pass::stmtsToAddAfter ( core, 0 );291 auto decls_before = __pass::declsToAddBefore( core, 0 );292 auto decls_after = __pass::declsToAddAfter ( core, 0 );289 auto stmts_before = __pass::stmtsToAddBefore( core, 0 ); 290 auto stmts_after = __pass::stmtsToAddAfter ( core, 0 ); 291 auto decls_before = __pass::declsToAddBefore( core, 0 ); 292 auto decls_after = __pass::declsToAddAfter ( core, 0 ); 293 293 294 294 // These may be modified by subnode but most be restored once we exit this statemnet. … … 317 317 assert(( empty( stmts_before ) && empty( stmts_after )) 318 318 || ( empty( decls_before ) && empty( decls_after )) ); 319 320 321 319 322 320 // Take all the statements which should have gone after, N/A for first iteration … … 2116 2114 if ( __visit_children() ) { 2117 2115 bool mutated = false; 2118 std::unordered_map< ast::TypeInstType::TypeEnvKey, ast::ptr< ast::Type > >new_map;2119 for ( const auto & p : node->type Env) {2116 ast::TypeSubstitution::TypeMap new_map; 2117 for ( const auto & p : node->typeMap ) { 2120 2118 guard_symtab guard { *this }; 2121 2119 auto new_node = p.second->accept( *this ); … … 2125 2123 if (mutated) { 2126 2124 auto new_node = __pass::mutate<core_t>( node ); 2127 new_node->type Env.swap( new_map );2125 new_node->typeMap.swap( new_map ); 2128 2126 node = new_node; 2129 2127 } -
src/AST/TypeSubstitution.cpp
r1df492a reb5962a 38 38 39 39 void TypeSubstitution::initialize( const TypeSubstitution &src, TypeSubstitution &dest ) { 40 dest.type Env.clear();40 dest.typeMap.clear(); 41 41 dest.add( src ); 42 42 } 43 43 44 44 void TypeSubstitution::add( const TypeSubstitution &other ) { 45 for ( Type EnvType::const_iterator i = other.typeEnv.begin(); i != other.typeEnv.end(); ++i ) {46 type Env[ i->first ] = i->second;45 for ( TypeMap::const_iterator i = other.typeMap.begin(); i != other.typeMap.end(); ++i ) { 46 typeMap[ i->first ] = i->second; 47 47 } // for 48 48 } 49 49 50 50 void TypeSubstitution::add( const TypeInstType * formalType, const Type *actualType ) { 51 type Env[ *formalType ] = actualType;51 typeMap[ *formalType ] = actualType; 52 52 } 53 53 54 54 void TypeSubstitution::add( const TypeInstType::TypeEnvKey & key, const Type * actualType) { 55 type Env[ key ] = actualType;55 typeMap[ key ] = actualType; 56 56 } 57 57 58 58 void TypeSubstitution::remove( const TypeInstType * formalType ) { 59 TypeEnvType::iterator i = typeEnv.find( *formalType ); 60 if ( i != typeEnv.end() ) { 61 typeEnv.erase( *formalType ); 62 } // if 63 } 64 65 const Type *TypeSubstitution::lookup( const TypeInstType * formalType ) const { 66 TypeEnvType::const_iterator i = typeEnv.find( *formalType ); 59 TypeMap::iterator i = typeMap.find( *formalType ); 60 if ( i != typeMap.end() ) { 61 typeMap.erase( *formalType ); 62 } // if 63 } 64 65 const Type *TypeSubstitution::lookup( 66 const TypeInstType::TypeEnvKey & formalType ) const { 67 TypeMap::const_iterator i = typeMap.find( formalType ); 67 68 68 69 // break on not in substitution set 69 if ( i == type Env.end() ) return 0;70 if ( i == typeMap.end() ) return 0; 70 71 71 72 // attempt to transitively follow TypeInstType links. 72 73 while ( const TypeInstType *actualType = i->second.as<TypeInstType>()) { 73 74 // break cycles in the transitive follow 74 if ( *formalType == *actualType ) break;75 if ( formalType == *actualType ) break; 75 76 76 77 // Look for the type this maps to, returning previous mapping if none-such 77 i = type Env.find( *actualType );78 if ( i == type Env.end() ) return actualType;78 i = typeMap.find( *actualType ); 79 if ( i == typeMap.end() ) return actualType; 79 80 } 80 81 … … 83 84 } 84 85 86 const Type *TypeSubstitution::lookup( const TypeInstType * formalType ) const { 87 return lookup( ast::TypeInstType::TypeEnvKey( *formalType ) ); 88 } 89 85 90 bool TypeSubstitution::empty() const { 86 return type Env.empty();91 return typeMap.empty(); 87 92 } 88 93 … … 119 124 sub.core.subCount = 0; 120 125 sub.core.freeOnly = true; 121 for ( Type EnvType::iterator i = typeEnv.begin(); i != typeEnv.end(); ++i ) {126 for ( TypeMap::iterator i = typeMap.begin(); i != typeMap.end(); ++i ) { 122 127 i->second = i->second->accept( sub ); 123 128 } … … 129 134 if ( bound != boundVars.end() ) return inst; 130 135 131 Type EnvType::const_iterator i = sub.typeEnv.find( *inst );132 if ( i == sub.type Env.end() ) {136 TypeMap::const_iterator i = sub.typeMap.find( *inst ); 137 if ( i == sub.typeMap.end() ) { 133 138 return inst; 134 139 } else { -
src/AST/TypeSubstitution.hpp
r1df492a reb5962a 75 75 void add( const TypeSubstitution &other ); 76 76 void remove( const TypeInstType * formalType ); 77 const Type *lookup( const TypeInstType::TypeEnvKey & formalType ) const; 77 78 const Type *lookup( const TypeInstType * formalType ) const; 78 79 bool empty() const; … … 104 105 friend class Pass; 105 106 106 typedef std::unordered_map< TypeInstType::TypeEnvKey, ptr<Type> > Type EnvType;107 Type EnvType typeEnv;107 typedef std::unordered_map< TypeInstType::TypeEnvKey, ptr<Type> > TypeMap; 108 TypeMap typeMap; 108 109 109 110 public: 110 // has to come after declaration of type Env111 auto begin() -> decltype( type Env.begin() ) { return typeEnv.begin(); }112 auto end() -> decltype( type Env. end() ) { return typeEnv. end(); }113 auto begin() const -> decltype( type Env.begin() ) { return typeEnv.begin(); }114 auto end() const -> decltype( type Env. end() ) { return typeEnv. end(); }111 // has to come after declaration of typeMap 112 auto begin() -> decltype( typeMap.begin() ) { return typeMap.begin(); } 113 auto end() -> decltype( typeMap. end() ) { return typeMap. end(); } 114 auto begin() const -> decltype( typeMap.begin() ) { return typeMap.begin(); } 115 auto end() const -> decltype( typeMap. end() ) { return typeMap. end(); } 115 116 116 117 }; … … 144 145 if ( const TypeExpr *actual = actualIt->template as<TypeExpr>() ) { 145 146 if ( formal->name != "" ) { 146 type Env[ formal ] = actual->type;147 typeMap[ formal ] = actual->type; 147 148 } // if 148 149 } else { -
src/Concurrency/Waitfor.cc
r1df492a reb5962a 56 56 | | 57 57 | | 58 58 | | 59 59 | | 60 60 | | -
src/Concurrency/Waitfor.h
r1df492a reb5962a 19 19 20 20 class Declaration; 21 namespace ast { 22 class TranslationUnit; 23 } 21 24 22 25 namespace Concurrency { 23 26 void generateWaitFor( std::list< Declaration * > & translationUnit ); 27 28 void generateWaitFor( ast::TranslationUnit & translationUnit ); 24 29 }; 25 30 -
src/Concurrency/module.mk
r1df492a reb5962a 19 19 Concurrency/Keywords.cc \ 20 20 Concurrency/Keywords.h \ 21 Concurrency/WaitforNew.cpp \ 21 22 Concurrency/Waitfor.cc \ 22 23 Concurrency/Waitfor.h -
src/GenPoly/Specialize.cc
r1df492a reb5962a 247 247 structureArg( (*actualBegin)->get_type(), argBegin, argEnd, back_inserter( appExpr->get_args() ) ); 248 248 } 249 assertf( argBegin == argEnd, "Did not structure all arguments." ); 249 250 250 251 appExpr->env = TypeSubstitution::newFromExpr( appExpr, env ); -
src/InitTweak/FixGlobalInit.cc
r1df492a reb5962a 162 162 } // if 163 163 if ( Statement * ctor = ctorInit->ctor ) { 164 addDataSect onAttribute( objDecl );164 addDataSectionAttribute( objDecl ); 165 165 initStatements.push_back( ctor ); 166 166 objDecl->init = nullptr; -
src/InitTweak/FixInit.cc
r1df492a reb5962a 806 806 // The attribute works, and is meant to apply, both for leaving the static local alone, 807 807 // and for hoisting it out as a static global. 808 addDataSect onAttribute( objDecl );808 addDataSectionAttribute( objDecl ); 809 809 810 810 // originally wanted to take advantage of gcc nested functions, but -
src/InitTweak/InitTweak.cc
r1df492a reb5962a 587 587 588 588 bool isConstructable( const ast::Type * type ) { 589 return ! dynamic_cast< const ast::VarArgsType * >( type ) && ! dynamic_cast< const ast::ReferenceType * >( type ) 589 return ! dynamic_cast< const ast::VarArgsType * >( type ) && ! dynamic_cast< const ast::ReferenceType * >( type ) 590 590 && ! dynamic_cast< const ast::FunctionType * >( type ) && ! Tuples::isTtype( type ); 591 591 } … … 1025 1025 if (!assign) { 1026 1026 auto td = new ast::TypeDecl({}, "T", {}, nullptr, ast::TypeDecl::Dtype, true); 1027 assign = new ast::FunctionDecl({}, "?=?", {}, 1027 assign = new ast::FunctionDecl({}, "?=?", {}, 1028 1028 { new ast::ObjectDecl({}, "_dst", new ast::ReferenceType(new ast::TypeInstType("T", td))), 1029 1029 new ast::ObjectDecl({}, "_src", new ast::TypeInstType("T", td))}, … … 1095 1095 1096 1096 // address of a variable or member expression is constexpr 1097 if ( ! dynamic_cast< const ast::NameExpr * >( arg ) 1098 && ! dynamic_cast< const ast::VariableExpr * >( arg ) 1099 && ! dynamic_cast< const ast::MemberExpr * >( arg ) 1097 if ( ! dynamic_cast< const ast::NameExpr * >( arg ) 1098 && ! dynamic_cast< const ast::VariableExpr * >( arg ) 1099 && ! dynamic_cast< const ast::MemberExpr * >( arg ) 1100 1100 && ! dynamic_cast< const ast::UntypedMemberExpr * >( arg ) ) result = false; 1101 1101 } … … 1241 1241 } 1242 1242 1243 void addDataSectonAttribute( ObjectDecl * objDecl ) { 1243 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message 1244 #define ASM_COMMENT "#" 1245 #else // defined( __ARM_ARCH ) 1246 #define ASM_COMMENT "//" 1247 #endif 1248 static const char * const data_section = ".data" ASM_COMMENT; 1249 static const char * const tlsd_section = ".tdata" ASM_COMMENT; 1250 void addDataSectionAttribute( ObjectDecl * objDecl ) { 1251 const bool is_tls = objDecl->get_storageClasses().is_threadlocal; 1252 const char * section = is_tls ? tlsd_section : data_section; 1244 1253 objDecl->attributes.push_back(new Attribute("section", { 1245 new ConstantExpr( Constant::from_string(".data" 1246 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message 1247 "#" 1248 #else // defined( __ARM_ARCH ) 1249 "//" 1250 #endif 1251 ))})); 1254 new ConstantExpr( Constant::from_string( section ) ) 1255 })); 1252 1256 } 1253 1257 1254 1258 void addDataSectionAttribute( ast::ObjectDecl * objDecl ) { 1259 const bool is_tls = objDecl->storage.is_threadlocal; 1260 const char * section = is_tls ? tlsd_section : data_section; 1255 1261 objDecl->attributes.push_back(new ast::Attribute("section", { 1256 ast::ConstantExpr::from_string(objDecl->location, ".data" 1257 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message 1258 "#" 1259 #else // defined( __ARM_ARCH ) 1260 "//" 1261 #endif 1262 )})); 1262 ast::ConstantExpr::from_string(objDecl->location, section) 1263 })); 1263 1264 } 1264 1265 -
src/InitTweak/InitTweak.h
r1df492a reb5962a 127 127 /// .section .data#,"a" 128 128 /// to avoid assembler warning "ignoring changed section attributes for .data" 129 void addDataSect onAttribute( ObjectDecl * objDecl );129 void addDataSectionAttribute( ObjectDecl * objDecl ); 130 130 131 131 void addDataSectionAttribute( ast::ObjectDecl * objDecl ); -
src/main.cc
r1df492a reb5962a 10 10 // Created On : Fri May 15 23:12:02 2015 11 11 // Last Modified By : Andrew Beach 12 // Last Modified On : Fri Apr 29 9:52:00 202213 // Update Count : 67 312 // Last Modified On : Tue Jun 7 13:29:00 2022 13 // Update Count : 674 14 14 // 15 15 … … 447 447 PASS( "Expand Unique Expr", Tuples::expandUniqueExpr( transUnit ) ); // xxx - is this the right place for this? want to expand ASAP so tha, sequent passes don't need to worry about double-visiting a unique expr - needs to go after InitTweak::fix so that copy constructed return declarations are reused 448 448 449 PASS( "Translate Tries" , ControlStruct::translateTries( transUnit ) ); 449 PASS( "Translate Tries", ControlStruct::translateTries( transUnit ) ); 450 PASS( "Gen Waitfor", Concurrency::generateWaitFor( transUnit ) ); 450 451 451 452 translationUnit = convert( move( transUnit ) ); … … 517 518 518 519 PASS( "Expand Unique Expr", Tuples::expandUniqueExpr( translationUnit ) ); // xxx - is this the right place for this? want to expand ASAP so tha, sequent passes don't need to worry about double-visiting a unique expr - needs to go after InitTweak::fix so that copy constructed return declarations are reused 519 520 PASS( " Translate Tries" , ControlStruct::translateTries( translationUnit ) );520 PASS( "Translate Tries", ControlStruct::translateTries( translationUnit ) ); 521 PASS( "Gen Waitfor", Concurrency::generateWaitFor( translationUnit ) ); 521 522 } 522 523 PASS( "Gen Waitfor" , Concurrency::generateWaitFor( translationUnit ) );524 523 525 524 PASS( "Convert Specializations", GenPoly::convertSpecializations( translationUnit ) ); // needs to happen before tuple types are expanded -
tests/unified_locking/mutex_test.hfa
r1df492a reb5962a 22 22 } 23 23 24 uint32_t cs( ) {24 uint32_t cs(uint32_t & entries) { 25 25 thread$ * me = active_thread(); 26 26 uint32_t value; 27 27 lock(mo.l); 28 28 { 29 entries++; 29 30 uint32_t tsum = mo.sum; 30 31 uint32_t cnt = mo.cnt; … … 42 43 thread LockCheck { 43 44 uint32_t sum; 45 uint32_t entries; 44 46 }; 45 47 46 48 void main(LockCheck & this) { 47 49 this.sum = 0; 50 this.entries = 0; 48 51 for(num_times) { 49 52 trash(); 50 this.sum += cs( );53 this.sum += cs( this.entries ); 51 54 trash(); 52 55 yield(random(10)); … … 58 61 mo.sum = -32; 59 62 mo.cnt = 0; 63 uint32_t real_entries = 0; 60 64 processor p[2]; 61 65 sout | "Starting"; … … 63 67 LockCheck checkers[13]; 64 68 for(i;13) { 65 sum += join(checkers[i]).sum; 69 LockCheck & curr = join(checkers[i]); 70 sum += curr.sum; 71 real_entries += curr.entries; 66 72 } 67 73 } 68 74 sout | "Done!"; 69 if(mo.cnt != (13 * num_times)) sout | "Invalid cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')'; 75 if(real_entries != (13 * num_times)) sout | "Invalid real cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')'; 76 if(mo.cnt != (13 * num_times)) sout | "Invalid concurrent cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')'; 70 77 if(sum == mo.sum) sout | "Match!"; 71 78 else sout | "No Match!" | sum | "vs" | mo.sum; -
tools/cfa.nanorc
r1df492a reb5962a 10 10 color green "\<(forall|trait|(o|d|f|t)type|mutex|_Bool|volatile|virtual)\>" 11 11 color green "\<(float|double|bool|char|int|short|long|enum|void|auto)\>" 12 color green "\<(static|const|extern|(un)?signed|inline )\>" "\<(sizeof)\>"12 color green "\<(static|const|extern|(un)?signed|inline|sizeof|vtable)\>" 13 13 color green "\<((s?size)|one|zero|((u_?)?int(8|16|32|64|ptr)))_t\>" 14 14 15 15 # Declarations 16 16 color brightgreen "\<(struct|union|typedef|trait|coroutine|generator)\>" 17 color brightgreen "\<(monitor|thread|with )\>"17 color brightgreen "\<(monitor|thread|with|exception)\>" 18 18 19 19 # Control Flow Structures -
tools/jenkins/setup.sh.in
r1df492a reb5962a 29 29 function getrunpath() 30 30 { 31 local elfout=$(readelf -d $1 | grep "RUNPATH")31 local elfout=$(readelf -d $1 | grep -E "RPATH|RUNPATH") 32 32 regex='\[/([[:alpha:][:digit:]@/_.-]+)\]' 33 33 if [[ $elfout =~ $regex ]]; then … … 43 43 { 44 44 local deps=$(ldd $1) 45 retlcldeps=() 45 46 retsysdeps=() 46 retlcldeps=()47 47 while IFS= read -r line; do 48 regex1=' /([[:alpha:][:digit:]@/_.-]+)'49 regex2=' (libcfa[[:alpha:][:digit:].]+) => not found'48 regex1='(libcfa[[:alpha:][:digit:].]+)' 49 regex2='/([[:alpha:][:digit:]@/_.-]+)' 50 50 regex3='linux-vdso.so.1|linux-gate.so.1' 51 51 if [[ $line =~ $regex1 ]]; then 52 retlcldeps+=(${BASH_REMATCH[1]}) 53 elif [[ $line =~ $regex2 ]]; then 52 54 retsysdeps+=(${BASH_REMATCH[1]}) 53 elif [[ $line =~ $regex2 ]]; then54 retlcldeps+=(${BASH_REMATCH[1]})55 55 elif [[ $line =~ $regex3 ]]; then 56 56 # echo "ignoring '$line}': intrinsic"
Note: See TracChangeset
for help on using the changeset viewer.