Changes in / [eb5962a:1df492a]
- Files:
-
- 22 deleted
- 35 edited
-
benchmark/io/http/Makefile.am (modified) (2 diffs)
-
benchmark/io/http/main.cfa (modified) (16 diffs)
-
benchmark/io/http/options.cfa (modified) (3 diffs)
-
benchmark/io/http/options.hfa (modified) (1 diff)
-
benchmark/io/http/printer.cfa (deleted)
-
benchmark/io/http/printer.hfa (deleted)
-
benchmark/io/http/protocol.cfa (modified) (5 diffs)
-
benchmark/io/http/protocol.hfa (modified) (2 diffs)
-
benchmark/io/http/socket.cfa (deleted)
-
benchmark/io/http/socket.hfa (deleted)
-
benchmark/io/http/worker.cfa (modified) (3 diffs)
-
benchmark/io/http/worker.hfa (modified) (3 diffs)
-
benchmark/io/sendfile/producer.cfa (deleted)
-
libcfa/src/bits/locks.hfa (modified) (3 diffs)
-
libcfa/src/concurrency/invoke.h (modified) (3 diffs)
-
libcfa/src/concurrency/io.cfa (modified) (1 diff)
-
libcfa/src/concurrency/io/setup.cfa (modified) (1 diff)
-
libcfa/src/concurrency/iofwd.hfa (modified) (1 diff)
-
libcfa/src/concurrency/kernel.cfa (modified) (1 diff)
-
libcfa/src/concurrency/kernel/fwd.hfa (modified) (5 diffs)
-
libcfa/src/concurrency/locks.cfa (modified) (2 diffs)
-
libcfa/src/concurrency/locks.hfa (modified) (9 diffs)
-
libcfa/src/concurrency/thread.cfa (modified) (2 diffs)
-
libcfa/src/containers/queueLockFree.hfa (modified) (1 diff)
-
libcfa/src/startup.cfa (modified) (1 diff)
-
src/AST/Pass.impl.hpp (modified) (5 diffs)
-
src/AST/TypeSubstitution.cpp (modified) (4 diffs)
-
src/AST/TypeSubstitution.hpp (modified) (3 diffs)
-
src/Concurrency/Waitfor.cc (modified) (1 diff)
-
src/Concurrency/Waitfor.h (modified) (1 diff)
-
src/Concurrency/WaitforNew.cpp (deleted)
-
src/Concurrency/module.mk (modified) (1 diff)
-
src/GenPoly/Specialize.cc (modified) (1 diff)
-
src/InitTweak/FixGlobalInit.cc (modified) (1 diff)
-
src/InitTweak/FixInit.cc (modified) (1 diff)
-
src/InitTweak/InitTweak.cc (modified) (4 diffs)
-
src/InitTweak/InitTweak.h (modified) (1 diff)
-
src/main.cc (modified) (3 diffs)
-
tests/concurrent/futures/.expect/wait_any.txt (deleted)
-
tests/concurrent/futures/wait_any.cfa (deleted)
-
tests/unified_locking/.expect/block_spin_lock.txt (deleted)
-
tests/unified_locking/.expect/clh.txt (deleted)
-
tests/unified_locking/.expect/mcs_block_spin_lock.txt (deleted)
-
tests/unified_locking/.expect/mcs_spin.txt (deleted)
-
tests/unified_locking/.expect/pthread_locks.txt (deleted)
-
tests/unified_locking/.expect/simple_owner_lock.txt (deleted)
-
tests/unified_locking/.expect/spin_queue_lock.txt (deleted)
-
tests/unified_locking/block_spin_lock.cfa (deleted)
-
tests/unified_locking/clh.cfa (deleted)
-
tests/unified_locking/mcs_block_spin_lock.cfa (deleted)
-
tests/unified_locking/mcs_spin.cfa (deleted)
-
tests/unified_locking/mutex_test.hfa (modified) (4 diffs)
-
tests/unified_locking/pthread_locks.cfa (deleted)
-
tests/unified_locking/simple_owner_lock.cfa (deleted)
-
tests/unified_locking/spin_queue_lock.cfa (deleted)
-
tools/cfa.nanorc (modified) (1 diff)
-
tools/jenkins/setup.sh.in (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
benchmark/io/http/Makefile.am
reb5962a r1df492a 21 21 include $(top_srcdir)/tools/build/cfa.make 22 22 23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g# -Werror23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror 24 24 AM_CFAFLAGS = -quiet -nodebug 25 25 AM_LDFLAGS = -quiet -nodebug … … 37 37 options.cfa \ 38 38 options.hfa \ 39 printer.cfa \40 printer.hfa \41 39 protocol.cfa \ 42 40 protocol.hfa \ 43 socket.cfa \44 socket.hfa \45 41 worker.cfa \ 46 42 worker.hfa -
benchmark/io/http/main.cfa
reb5962a r1df492a 2 2 3 3 #include <errno.h> 4 #include <signal.h>5 4 #include <stdio.h> 6 5 #include <string.h> … … 9 8 #include <sched.h> 10 9 #include <signal.h> 11 #include <sys/eventfd.h>12 10 #include <sys/socket.h> 13 11 #include <netinet/in.h> … … 16 14 #include <fstream.hfa> 17 15 #include <kernel.hfa> 18 #include <locks.hfa>19 16 #include <iofwd.hfa> 20 17 #include <stats.hfa> … … 24 21 #include "filecache.hfa" 25 22 #include "options.hfa" 26 #include "socket.hfa"27 #include "printer.hfa"28 23 #include "worker.hfa" 29 24 … … 32 27 Duration default_preemption() { 33 28 return 0; 29 } 30 31 //============================================================================================= 32 // Stats Printer 33 //=============================================================================================' 34 35 thread StatsPrinter { 36 Worker * workers; 37 int worker_cnt; 38 }; 39 40 void ?{}( StatsPrinter & this, cluster & cl ) { 41 ((thread&)this){ "Stats Printer Thread", cl }; 42 this.worker_cnt = 0; 43 } 44 45 void ^?{}( StatsPrinter & mutex this ) {} 46 47 #define eng3(X) (ws(3, 3, unit(eng( X )))) 48 49 void main(StatsPrinter & this) { 50 LOOP: for() { 51 waitfor( ^?{} : this) { 52 break LOOP; 53 } 54 or else {} 55 56 sleep(10`s); 57 58 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO ); 59 if(this.worker_cnt != 0) { 60 uint64_t tries = 0; 61 uint64_t calls = 0; 62 uint64_t header = 0; 63 uint64_t splcin = 0; 64 uint64_t splcot = 0; 65 struct { 66 volatile uint64_t calls; 67 volatile uint64_t bytes; 68 } avgrd[zipf_cnts]; 69 memset(avgrd, 0, sizeof(avgrd)); 70 71 for(i; this.worker_cnt) { 72 tries += this.workers[i].stats.sendfile.tries; 73 calls += this.workers[i].stats.sendfile.calls; 74 header += this.workers[i].stats.sendfile.header; 75 splcin += this.workers[i].stats.sendfile.splcin; 76 splcot += this.workers[i].stats.sendfile.splcot; 77 for(j; zipf_cnts) { 78 avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls; 79 avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes; 80 } 81 } 82 83 double ratio = ((double)tries) / calls; 84 85 sout | "----- Worker Stats -----"; 86 sout | "sendfile : " | calls | "calls," | tries | "tries (" | ratio | " try/call)"; 87 sout | " " | header | "header," | splcin | "splice in," | splcot | "splice out"; 88 sout | " - zipf sizes:"; 89 for(i; zipf_cnts) { 90 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0; 91 sout | " " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written"; 92 } 93 } 94 else { 95 sout | "No Workers!"; 96 } 97 } 34 98 } 35 99 … … 37 101 // Globals 38 102 //============================================================================================= 103 struct ServerCluster { 104 cluster self; 105 processor * procs; 106 // io_context * ctxs; 107 StatsPrinter * prnt; 108 109 }; 110 39 111 void ?{}( ServerCluster & this ) { 40 112 (this.self){ "Server Cluster", options.clopts.params }; … … 50 122 (this.procs[i]){ "Benchmark Processor", this.self }; 51 123 52 //int c = 0;53 //int n = 1 + (i % cnt);54 //for(int j = 0; j < CPU_SETSIZE; j++) {55 //if(CPU_ISSET(j, &fullset)) n--;56 //if(n == 0) {57 //c = j;58 //break;59 //}60 //}61 //cpu_set_t localset;62 //CPU_ZERO(&localset);63 //CPU_SET(c, &localset);64 //ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);65 //if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );124 int c = 0; 125 int n = 1 + (i % cnt); 126 for(int j = 0; j < CPU_SETSIZE; j++) { 127 if(CPU_ISSET(j, &fullset)) n--; 128 if(n == 0) { 129 c = j; 130 break; 131 } 132 } 133 cpu_set_t localset; 134 CPU_ZERO(&localset); 135 CPU_SET(c, &localset); 136 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset); 137 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret ); 66 138 67 139 #if !defined(__CFA_NO_STATISTICS__) … … 75 147 } 76 148 149 if(options.stats) { 150 this.prnt = alloc(); 151 (*this.prnt){ this.self }; 152 } else { 153 this.prnt = 0p; 154 } 155 77 156 #if !defined(__CFA_NO_STATISTICS__) 78 157 print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO ); … … 84 163 85 164 void ^?{}( ServerCluster & this ) { 165 delete(this.prnt); 166 86 167 for(i; options.clopts.nprocs) { 87 168 ^(this.procs[i]){}; … … 94 175 extern void init_protocol(void); 95 176 extern void deinit_protocol(void); 96 97 //=============================================================================================98 // REUSEPORT99 //=============================================================================================100 101 size_t sockarr_size;102 struct __attribute__((aligned(128))) Q {103 mpsc_queue(PendingRead) q;104 };105 106 //=============================================================================================107 // Termination108 //=============================================================================================109 110 int closefd;111 void cleanstop(int) {112 eventfd_t buffer = 1;113 char * buffer_s = (char*)&buffer;114 int ret = write(closefd, buffer_s, sizeof(buffer));115 if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );116 return;117 }118 177 119 178 //============================================================================================= … … 121 180 //=============================================================================================' 122 181 int main( int argc, char * argv[] ) { 123 int ret;124 182 __sighandler_t s = 1p; 125 183 signal(SIGPIPE, s); … … 128 186 // Parse args 129 187 parse_options(argc, argv); 130 131 //===================132 // Setup non-interactive termination133 if(!options.interactive) {134 closefd = eventfd(0, 0);135 if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );136 137 sighandler_t prev = signal(SIGTERM, cleanstop);138 intptr_t prev_workaround = (intptr_t) prev;139 // can't use SIG_ERR it crashes the compiler140 if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );141 142 sout | "Signal termination ready";143 }144 188 145 189 //=================== … … 153 197 // Open Socket 154 198 sout | getpid() | ": Listening on port" | options.socket.port; 155 199 int server_fd = socket(AF_INET, SOCK_STREAM, 0); 200 if(server_fd < 0) { 201 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) ); 202 } 203 204 int ret = 0; 156 205 struct sockaddr_in address; 157 int addrlen = prepaddr(address); 158 159 int server_fd; 206 int addrlen = sizeof(address); 207 memset( (char *)&address, '\0' ); 208 address.sin_family = AF_INET; 209 address.sin_addr.s_addr = htonl(INADDR_ANY); 210 address.sin_port = htons( options.socket.port ); 211 212 int waited = 0; 213 for() { 214 int sockfd = server_fd; 215 __CONST_SOCKADDR_ARG addr; 216 addr.__sockaddr__ = (struct sockaddr *)&address; 217 socklen_t addrlen = sizeof(address); 218 ret = bind( sockfd, addr, addrlen ); 219 if(ret < 0) { 220 if(errno == EADDRINUSE) { 221 if(waited == 0) { 222 if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting"; 223 sout | "Waiting for port"; 224 } else { 225 sout | "\r" | waited | nonl; 226 flush( sout ); 227 } 228 waited ++; 229 sleep( 1`s ); 230 continue; 231 } 232 abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) ); 233 } 234 break; 235 } 236 237 ret = listen( server_fd, options.socket.backlog ); 238 if(ret < 0) { 239 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) ); 240 } 160 241 161 242 //=================== … … 176 257 177 258 { 178 // Stats printer makes a copy so this needs to persist longer than normal179 connection ** conns;180 AcceptWorker * aworkers = 0p;181 ChannelWorker * cworkers = 0p;182 Acceptor * acceptors = 0p;183 Q * queues = 0p;184 259 ServerCluster cl[options.clopts.nclusters]; 185 186 if(options.stats) {187 stats_thrd = alloc();188 (*stats_thrd){ cl };189 } else {190 stats_thrd = 0p;191 }192 260 193 261 init_protocol(); 194 262 { 195 int nacceptors = options.clopts.nprocs * options.clopts.nclusters; 196 conns = alloc(options.clopts.nworkers); 197 if(options.socket.reuseport) { 198 queues = alloc(nacceptors); 199 acceptors = alloc(nacceptors); 200 sout | "Creating" | nacceptors | "Acceptors"; 201 for(i; nacceptors) { 202 (acceptors[i]){ i % options.clopts.nclusters }; 263 Worker * workers = anew(options.clopts.nworkers); 264 cl[0].prnt->workers = workers; 265 cl[0].prnt->worker_cnt = options.clopts.nworkers; 266 for(i; options.clopts.nworkers) { 267 // if( options.file_cache.fixed_fds ) { 268 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 269 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 270 // } 271 // else 272 { 273 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0]; 274 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1]; 275 workers[i].sockfd = server_fd; 276 workers[i].addr = (struct sockaddr *)&address; 277 workers[i].addrlen = (socklen_t*)&addrlen; 278 workers[i].flags = 0; 203 279 } 204 for(i; nacceptors) { 205 (queues[i]){}; 206 { 207 acceptors[i].sockfd = listener(address, addrlen); 208 acceptors[i].addr = (struct sockaddr *)&address; 209 acceptors[i].addrlen = (socklen_t*)&addrlen; 210 acceptors[i].flags = 0; 211 acceptors[i].queue = &queues[i].q; 212 } 213 unpark( acceptors[i] ); 214 } 215 216 cworkers = anew(options.clopts.nworkers); 217 for(i; options.clopts.nworkers) { 218 { 219 cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 220 cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 221 cworkers[i].queue = &queues[i % nacceptors].q; 222 conns[i] = &cworkers[i].conn; 223 } 224 unpark( cworkers[i] ); 225 } 226 } 227 else { 228 server_fd = listener(address, addrlen); 229 aworkers = anew(options.clopts.nworkers); 230 for(i; options.clopts.nworkers) { 231 // if( options.file_cache.fixed_fds ) { 232 // workers[i].pipe[0] = pipe_off + (i * 2) + 0; 233 // workers[i].pipe[1] = pipe_off + (i * 2) + 1; 234 // } 235 // else 236 { 237 aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0]; 238 aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1]; 239 aworkers[i].sockfd = server_fd; 240 aworkers[i].addr = (struct sockaddr *)&address; 241 aworkers[i].addrlen = (socklen_t*)&addrlen; 242 aworkers[i].flags = 0; 243 conns[i] = &aworkers[i].conn; 244 } 245 unpark( aworkers[i] ); 246 } 280 unpark( workers[i] ); 247 281 } 248 282 sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters"; … … 251 285 } 252 286 sout | nl; 287 if(!options.interactive) park(); 253 288 { 254 if(options.interactive) { 255 char buffer[128]; 256 for() { 257 int ret = cfa_read(0, buffer, 128, 0); 258 if(ret == 0) break; 259 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 260 sout | "User wrote '" | "" | nonl; 261 write(sout, buffer, ret - 1); 262 sout | "'"; 263 } 289 char buffer[128]; 290 for() { 291 int ret = cfa_read(0, buffer, 128, 0); 292 if(ret == 0) break; 293 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) ); 294 sout | "User wrote '" | "" | nonl; 295 write(sout, buffer, ret - 1); 296 sout | "'"; 264 297 } 265 else {266 char buffer[sizeof(eventfd_t)];267 int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);268 if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );269 }270 298 271 299 sout | "Shutdown received"; 272 300 } 273 301 302 sout | "Notifying connections..." | nonl; flush( sout ); 303 for(i; options.clopts.nworkers) { 304 workers[i].done = true; 305 } 306 sout | "done"; 307 308 sout | "Shutting down socket..." | nonl; flush( sout ); 309 int ret = shutdown( server_fd, SHUT_RD ); 310 if( ret < 0 ) { 311 abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) ); 312 } 313 sout | "done"; 314 274 315 //=================== 275 // Close Socket and join 276 if(options.socket.reuseport) { 277 sout | "Notifying connections..." | nonl; flush( sout ); 278 for(i; nacceptors) { 279 acceptors[i].done = true; 280 } 281 for(i; options.clopts.nworkers) { 282 cworkers[i].done = true; 283 } 284 sout | "done"; 285 286 sout | "Shutting down Socket..." | nonl; flush( sout ); 287 for(i; nacceptors) { 288 ret = shutdown( acceptors[i].sockfd, SHUT_RD ); 289 if( ret < 0 ) { 290 abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) ); 291 } 292 } 293 sout | "done"; 294 295 sout | "Closing Socket..." | nonl; flush( sout ); 296 for(i; nacceptors) { 297 ret = close( acceptors[i].sockfd ); 298 if( ret < 0) { 299 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 300 } 301 } 302 sout | "done"; 303 304 sout | "Stopping accept threads..." | nonl; flush( sout ); 305 for(i; nacceptors) { 306 join(acceptors[i]); 307 } 308 sout | "done"; 309 310 sout | "Draining worker queues..." | nonl; flush( sout ); 311 for(i; nacceptors) { 312 PendingRead * p = 0p; 313 while(p = pop(queues[i].q)) { 314 fulfil(p->f, -ECONNRESET); 315 } 316 } 317 sout | "done"; 318 319 sout | "Stopping worker threads..." | nonl; flush( sout ); 320 for(i; options.clopts.nworkers) { 321 for(j; 2) { 322 ret = close(cworkers[i].conn.pipe[j]); 323 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 324 } 325 join(cworkers[i]); 326 } 327 } 328 else { 329 sout | "Notifying connections..." | nonl; flush( sout ); 330 for(i; options.clopts.nworkers) { 331 aworkers[i].done = true; 332 } 333 sout | "done"; 334 335 sout | "Shutting down Socket..." | nonl; flush( sout ); 336 ret = shutdown( server_fd, SHUT_RD ); 337 if( ret < 0 ) { 338 abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) ); 339 } 340 sout | "done"; 341 342 sout | "Closing Socket..." | nonl; flush( sout ); 343 ret = close( server_fd ); 344 if(ret < 0) { 345 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 346 } 347 sout | "done"; 348 349 sout | "Stopping connection threads..." | nonl; flush( sout ); 350 for(i; options.clopts.nworkers) { 351 for(j; 2) { 352 ret = close(aworkers[i].conn.pipe[j]); 353 if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) ); 354 } 355 join(aworkers[i]); 356 } 357 } 316 // Close Socket 317 sout | "Closing Socket..." | nonl; flush( sout ); 318 ret = close( server_fd ); 319 if(ret < 0) { 320 abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) ); 321 } 322 sout | "done"; 323 324 sout | "Stopping connection threads..." | nonl; flush( sout ); 325 adelete(workers); 358 326 } 359 327 sout | "done"; … … 363 331 sout | "done"; 364 332 365 sout | "Stopping printer threads..." | nonl; flush( sout );366 if(stats_thrd) {367 notify_one(stats_thrd->var);368 }369 delete(stats_thrd);370 sout | "done";371 372 // Now that the stats printer is stopped, we can reclaim this373 adelete(aworkers);374 adelete(cworkers);375 adelete(acceptors);376 adelete(queues);377 free(conns);378 379 333 sout | "Stopping processors/clusters..." | nonl; flush( sout ); 380 334 } 381 335 sout | "done"; 382 336 337 sout | "Closing splice fds..." | nonl; flush( sout ); 338 for(i; pipe_cnt) { 339 ret = close( fds[pipe_off + i] ); 340 if(ret < 0) { 341 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) ); 342 } 343 } 383 344 free(fds); 345 sout | "done"; 384 346 385 347 sout | "Stopping processors..." | nonl; flush( sout ); -
benchmark/io/http/options.cfa
reb5962a r1df492a 35 35 36 36 { // socket 37 8080, // port 38 10, // backlog 39 1024, // buflen 40 false // reuseport 37 8080, // port 38 10, // backlog 39 1024 // buflen 41 40 }, 42 41 … … 53 52 54 53 void parse_options( int argc, char * argv[] ) { 54 // bool fixedfd = false; 55 // bool sqkpoll = false; 56 // bool iokpoll = false; 55 57 unsigned nentries = 0; 56 58 bool isolate = false; … … 68 70 {'\0', "shell", "Disable interactive mode", options.interactive, parse_setfalse}, 69 71 {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog}, 70 {'\0', "reuseport", "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue},71 72 {'\0', "request_len", "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen}, 72 73 {'\0', "seed", "seed to use for hashing", options.file_cache.hash_seed }, -
benchmark/io/http/options.hfa
reb5962a r1df492a 27 27 int backlog; 28 28 int buflen; 29 bool reuseport;30 29 } socket; 31 30 -
benchmark/io/http/protocol.cfa
reb5962a r1df492a 30 30 #define PLAINTEXT_NOCOPY 31 31 #define LINKED_IO 32 33 static inline __s32 wait_res( io_future_t & this ) {34 wait( this );35 if( this.result < 0 ) {{36 errno = -this.result;37 return -1;38 }}39 return this.result;40 }41 32 42 33 struct https_msg_str { … … 479 470 480 471 if(is_error(splice_in.res)) { 481 if(splice_in.res.error == -EPIPE) return -ECONNRESET;482 472 mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error; 483 473 close(fd); … … 513 503 } 514 504 515 [HttpCode code, bool closed, * const char file, size_t len] http_read( volatile int & fd, []char buffer, size_t len, io_future_t * f) {505 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) { 516 506 char * it = buffer; 517 507 size_t count = len - 1; … … 519 509 READ: 520 510 for() { 521 int ret; 522 if( f ) { 523 ret = wait_res(*f); 524 reset(*f); 525 f = 0p; 526 } else { 527 ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 528 } 511 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY); 529 512 // int ret = read(fd, (void*)it, count); 530 513 if(ret == 0 ) return [OK200, true, 0, 0]; … … 587 570 588 571 void ?{}( DateFormater & this ) { 589 ((thread&)this){ "Server Date Thread" };572 ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] }; 590 573 this.idx = 0; 591 574 memset( &this.buffers[0], 0, sizeof(this.buffers[0]) ); -
benchmark/io/http/protocol.hfa
reb5962a r1df492a 1 1 #pragma once 2 2 3 struct io_future_t;4 3 struct sendfile_stats_t; 5 4 … … 23 22 int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & ); 24 23 25 [HttpCode code, bool closed, * const char file, size_t len] http_read( volatile int & fd, []char buffer, size_t len, io_future_t * f);24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len); -
benchmark/io/http/worker.cfa
reb5962a r1df492a 8 8 #include <fstream.hfa> 9 9 #include <iofwd.hfa> 10 #include <mutex_stmt.hfa>11 10 12 11 #include "options.hfa" … … 15 14 16 15 //============================================================================================= 17 // Generic connection handling16 // Worker Thread 18 17 //============================================================================================= 19 static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last) {20 REQUEST:21 for() {22 bool closed;23 HttpCode code;24 const char * file;25 size_t name_size;18 void ?{}( Worker & this ) { 19 size_t cli = rand() % options.clopts.cltr_cnt; 20 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 21 options.clopts.thrd_cnt[cli]++; 22 this.pipe[0] = -1; 23 this.pipe[1] = -1; 24 this.done = false; 26 25 27 // Read the http request 28 if( options.log ) mutex(sout) sout | "=== Reading request ==="; 29 [code, closed, file, name_size] = http_read(fd, buffer, len, f); 30 f = 0p; 31 32 // if we are done, break out of the loop 33 if( closed ) break REQUEST; 34 35 // If this wasn't a request retrun 400 36 if( code != OK200 ) { 37 sout | "=== Invalid Request :" | code_val(code) | "==="; 38 answer_error(fd, code); 39 continue REQUEST; 40 } 41 42 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 43 if( options.log ) mutex(sout) sout | "=== Request for /plaintext ==="; 44 45 int ret = answer_plaintext(fd); 46 if( ret == -ECONNRESET ) break REQUEST; 47 48 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 49 continue REQUEST; 50 } 51 52 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 53 if( options.log ) mutex(sout) sout | "=== Request for /ping ==="; 54 55 // Send the header 56 int ret = answer_empty(fd); 57 if( ret == -ECONNRESET ) break REQUEST; 58 59 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 60 continue REQUEST; 61 } 62 63 if( options.log ) { 64 sout | "=== Request for file " | nonl; 65 write(sout, file, name_size); 66 sout | " ==="; 67 } 68 69 if( !options.file_cache.path ) { 70 if( options.log ) { 71 sout | "=== File Not Found (" | nonl; 72 write(sout, file, name_size); 73 sout | ") ==="; 74 } 75 answer_error(fd, E405); 76 continue REQUEST; 77 } 78 79 // Get the fd from the file cache 80 int ans_fd; 81 size_t count; 82 [ans_fd, count] = get_file( file, name_size ); 83 84 // If we can't find the file, return 404 85 if( ans_fd < 0 ) { 86 if( options.log ) { 87 sout | "=== File Not Found (" | nonl; 88 write(sout, file, name_size); 89 sout | ") ==="; 90 } 91 answer_error(fd, E404); 92 continue REQUEST; 93 } 94 95 // Send the desired file 96 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 97 if( ret == -ECONNRESET ) break REQUEST; 98 99 if( options.log ) mutex(sout) sout | "=== Answer sent ==="; 100 } 101 102 if (stats_thrd) { 103 unsigned long long next = rdtscl(); 104 if(next > (last + 500000000)) { 105 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) { 106 push(this.stats.sendfile, stats_thrd->stats.send); 107 unlock(stats_thrd->stats.lock); 108 last = next; 109 } 110 } 26 this.stats.sendfile.calls = 0; 27 this.stats.sendfile.tries = 0; 28 this.stats.sendfile.header = 0; 29 this.stats.sendfile.splcin = 0; 30 this.stats.sendfile.splcot = 0; 31 for(i; zipf_cnts) { 32 this.stats.sendfile.avgrd[i].calls = 0; 33 this.stats.sendfile.avgrd[i].bytes = 0; 111 34 } 112 35 } 113 36 114 //============================================================================================= 115 // Self Accepting Worker Thread 116 //============================================================================================= 117 void ?{}( AcceptWorker & this ) { 118 size_t cli = rand() % options.clopts.cltr_cnt; 119 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 }; 120 options.clopts.thrd_cnt[cli]++; 121 this.done = false; 37 extern "C" { 38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 122 39 } 123 40 124 void main( AcceptWorker & this ) {41 void main( Worker & this ) { 125 42 park(); 126 unsigned long long last = rdtscl(); 127 /* paranoid */ assert( this.conn.pipe[0] != -1 ); 128 /* paranoid */ assert( this.conn.pipe[1] != -1 ); 43 /* paranoid */ assert( this.pipe[0] != -1 ); 44 /* paranoid */ assert( this.pipe[1] != -1 ); 45 46 CONNECTION: 129 47 for() { 130 if( options.log ) mutex(sout)sout | "=== Accepting connection ===";131 int fd = cfa_accept4( this. sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );48 if( options.log ) sout | "=== Accepting connection ==="; 49 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY ); 132 50 if(fd < 0) { 133 51 if( errno == ECONNABORTED ) break; … … 137 55 if(this.done) break; 138 56 139 if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 140 size_t len = options.socket.buflen; 141 char buffer[len]; 142 handle_connection( this.conn, fd, buffer, len, 0p, last ); 57 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ==="; 58 REQUEST: 59 for() { 60 bool closed; 61 HttpCode code; 62 const char * file; 63 size_t name_size; 143 64 144 if( options.log ) mutex(sout) sout | "=== Connection closed ==="; 65 // Read the http request 66 size_t len = options.socket.buflen; 67 char buffer[len]; 68 if( options.log ) sout | "=== Reading request ==="; 69 [code, closed, file, name_size] = http_read(fd, buffer, len); 70 71 // if we are done, break out of the loop 72 if( closed ) break REQUEST; 73 74 // If this wasn't a request retrun 400 75 if( code != OK200 ) { 76 sout | "=== Invalid Request :" | code_val(code) | "==="; 77 answer_error(fd, code); 78 continue REQUEST; 79 } 80 81 if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) { 82 if( options.log ) sout | "=== Request for /plaintext ==="; 83 84 int ret = answer_plaintext(fd); 85 if( ret == -ECONNRESET ) break REQUEST; 86 87 if( options.log ) sout | "=== Answer sent ==="; 88 continue REQUEST; 89 } 90 91 if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) { 92 if( options.log ) sout | "=== Request for /ping ==="; 93 94 // Send the header 95 int ret = answer_empty(fd); 96 if( ret == -ECONNRESET ) break REQUEST; 97 98 if( options.log ) sout | "=== Answer sent ==="; 99 continue REQUEST; 100 } 101 102 if( options.log ) { 103 sout | "=== Request for file " | nonl; 104 write(sout, file, name_size); 105 sout | " ==="; 106 } 107 108 if( !options.file_cache.path ) { 109 if( options.log ) { 110 sout | "=== File Not Found (" | nonl; 111 write(sout, file, name_size); 112 sout | ") ==="; 113 } 114 answer_error(fd, E405); 115 continue REQUEST; 116 } 117 118 // Get the fd from the file cache 119 int ans_fd; 120 size_t count; 121 [ans_fd, count] = get_file( file, name_size ); 122 123 // If we can't find the file, return 404 124 if( ans_fd < 0 ) { 125 if( options.log ) { 126 sout | "=== File Not Found (" | nonl; 127 write(sout, file, name_size); 128 sout | ") ==="; 129 } 130 answer_error(fd, E404); 131 continue REQUEST; 132 } 133 134 // Send the desired file 135 int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile ); 136 if( ret == -ECONNRESET ) break REQUEST; 137 138 if( options.log ) sout | "=== Answer sent ==="; 139 } 140 141 if( options.log ) sout | "=== Connection closed ==="; 142 continue CONNECTION; 145 143 } 146 144 } 147 148 149 //=============================================================================================150 // Channel Worker Thread151 //=============================================================================================152 void ?{}( ChannelWorker & this ) {153 size_t cli = rand() % options.clopts.cltr_cnt;154 ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };155 options.clopts.thrd_cnt[cli]++;156 this.done = false;157 }158 159 void main( ChannelWorker & this ) {160 park();161 unsigned long long last = rdtscl();162 /* paranoid */ assert( this.conn.pipe[0] != -1 );163 /* paranoid */ assert( this.conn.pipe[1] != -1 );164 for() {165 size_t len = options.socket.buflen;166 char buffer[len];167 PendingRead p;168 p.next = 0p;169 p.in.buf = (void*)buffer;170 p.in.len = len;171 push(*this.queue, &p);172 173 if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";174 handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );175 176 if( options.log ) mutex(sout) sout | "=== Connection closed ===";177 if(this.done) break;178 }179 }180 181 extern "C" {182 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);183 }184 185 void ?{}( Acceptor & this, int cli ) {186 ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };187 options.clopts.thrd_cnt[cli]++;188 this.done = false;189 }190 191 static inline __s32 get_res( io_future_t & this ) {192 if( this.result < 0 ) {{193 errno = -this.result;194 return -1;195 }}196 return this.result;197 }198 199 static inline void push_connection( Acceptor & this, int fd ) {200 PendingRead * p = 0p;201 for() {202 if(this.done) return;203 p = pop(*this.queue);204 if(p) break;205 yield();206 this.stats.creates++;207 };208 209 p->out.fd = fd;210 async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);211 }212 213 // #define ACCEPT_SPIN214 #define ACCEPT_MANY215 216 void main( Acceptor & this ) {217 park();218 unsigned long long last = rdtscl();219 220 #if defined(ACCEPT_SPIN)221 if( options.log ) sout | "=== Accepting connection ===";222 for() {223 int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);224 if(fd < 0) {225 if( errno == EWOULDBLOCK) {226 this.stats.eagains++;227 yield();228 continue;229 }230 if( errno == ECONNABORTED ) break;231 if( this.done && (errno == EINVAL || errno == EBADF) ) break;232 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );233 }234 this.stats.accepts++;235 236 if(this.done) return;237 238 if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";239 240 if(fd) push_connection(this, fd);241 242 if (stats_thrd) {243 unsigned long long next = rdtscl();244 if(next > (last + 500000000)) {245 if(try_lock(stats_thrd->stats.lock)) {246 push(this.stats, stats_thrd->stats.accpt);247 unlock(stats_thrd->stats.lock);248 last = next;249 }250 }251 }252 253 if( options.log ) sout | "=== Accepting connection ===";254 }255 256 #elif defined(ACCEPT_MANY)257 const int nacc = 10;258 io_future_t results[nacc];259 260 for(i; nacc) {261 io_future_t & res = results[i];262 reset(res);263 /* paranoid */ assert(!available(res));264 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";265 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);266 }267 268 for() {269 if (stats_thrd) {270 unsigned long long next = rdtscl();271 if(next > (last + 500000000)) {272 if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {273 push(this.stats, stats_thrd->stats.accpt);274 unlock(stats_thrd->stats.lock);275 last = next;276 }277 }278 }279 280 for(i; nacc) {281 io_future_t & res = results[i];282 if(available(res)) {283 if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";284 int fd = get_res(res);285 reset(res);286 this.stats.accepts++;287 if(fd < 0) {288 if( errno == ECONNABORTED ) continue;289 if( this.done && (errno == EINVAL || errno == EBADF) ) continue;290 abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );291 }292 push_connection( this, fd );293 294 /* paranoid */ assert(!available(res));295 if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";296 async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);297 }298 }299 if(this.done) return;300 301 if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";302 this.stats.eagains++;303 wait_any(results, nacc);304 305 if( options.log ) mutex(sout) {306 sout | "=== Acceptor wake-up ===";307 for(i; nacc) {308 io_future_t & res = results[i];309 sout | i | "available:" | available(res);310 }311 }312 313 }314 315 for(i; nacc) {316 wait(results[i]);317 }318 #else319 #error no accept algorithm specified320 #endif321 } -
benchmark/io/http/worker.hfa
reb5962a r1df492a 1 1 #pragma once 2 2 3 #include <iofwd.hfa>4 #include <queueLockFree.hfa>5 3 #include <thread.hfa> 6 4 … … 9 7 } 10 8 11 #include "printer.hfa"12 13 9 //============================================================================================= 14 10 // Worker Thread 15 11 //============================================================================================= 16 12 17 struct connection { 18 int pipe[2]; 13 extern const size_t zipf_sizes[]; 14 enum { zipf_cnts = 36, }; 15 16 struct sendfile_stats_t { 17 volatile uint64_t calls; 18 volatile uint64_t tries; 19 volatile uint64_t header; 20 volatile uint64_t splcin; 21 volatile uint64_t splcot; 19 22 struct { 20 sendfile_stats_t sendfile; 21 } stats; 23 volatile uint64_t calls; 24 volatile uint64_t bytes; 25 } avgrd[zipf_cnts]; 22 26 }; 23 27 24 static inline void ?{}( connection & this ) { 25 this.pipe[0] = -1; 26 this.pipe[1] = -1; 27 } 28 29 thread AcceptWorker { 30 connection conn; 28 thread Worker { 29 int pipe[2]; 31 30 int sockfd; 32 31 struct sockaddr * addr; … … 34 33 int flags; 35 34 volatile bool done; 35 struct { 36 sendfile_stats_t sendfile; 37 } stats; 36 38 }; 37 void ?{}( AcceptWorker & this); 38 void main( AcceptWorker & ); 39 40 41 struct PendingRead { 42 PendingRead * volatile next; 43 io_future_t f; 44 struct { 45 void * buf; 46 size_t len; 47 } in; 48 struct { 49 volatile int fd; 50 } out; 51 }; 52 53 static inline PendingRead * volatile & ?`next ( PendingRead * node ) { 54 return node->next; 55 } 56 57 thread ChannelWorker { 58 connection conn; 59 volatile bool done; 60 mpsc_queue(PendingRead) * queue; 61 }; 62 void ?{}( ChannelWorker & ); 63 void main( ChannelWorker & ); 64 65 thread Acceptor { 66 mpsc_queue(PendingRead) * queue; 67 int sockfd; 68 struct sockaddr * addr; 69 socklen_t * addrlen; 70 int flags; 71 volatile bool done; 72 acceptor_stats_t stats; 73 }; 74 void ?{}( Acceptor &, int cli ); 75 void main( Acceptor & ); 39 void ?{}( Worker & this); 40 void main( Worker & ); -
libcfa/src/bits/locks.hfa
reb5962a r1df492a 26 26 // Wrap in struct to prevent false sharing with debug info 27 27 volatile bool lock; 28 #ifdef __CFA_DEBUG__ 29 // previous function to acquire the lock 30 const char * prev_name; 31 // previous thread to acquire the lock 32 void* prev_thrd; 33 // keep track of number of times we had to spin, just in case the number is unexpectedly huge 34 size_t spin_count; 35 #endif 28 36 }; 29 37 … … 32 40 extern void disable_interrupts() OPTIONAL_THREAD; 33 41 extern void enable_interrupts( bool poll = true ) OPTIONAL_THREAD; 34 #define __cfaabi_dbg_record_lock(x, y) 42 43 #ifdef __CFA_DEBUG__ 44 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]); 45 #else 46 #define __cfaabi_dbg_record_lock(x, y) 47 #endif 35 48 } 36 49 37 50 static inline void ?{}( __spinlock_t & this ) { 38 51 this.lock = 0; 52 #ifdef __CFA_DEBUG__ 53 this.spin_count = 0; 54 #endif 39 55 } 40 56 … … 61 77 for ( unsigned int i = 1;; i += 1 ) { 62 78 if ( (this.lock == 0) && (__atomic_test_and_set( &this.lock, __ATOMIC_ACQUIRE ) == 0) ) break; 79 #ifdef __CFA_DEBUG__ 80 this.spin_count++; 81 #endif 63 82 #ifndef NOEXPBACK 64 83 // exponential spin -
libcfa/src/concurrency/invoke.h
reb5962a r1df492a 195 195 struct __monitor_group_t monitors; 196 196 197 // used to put threads on user data structures 198 struct { 199 struct thread$ * next; 200 struct thread$ * back; 201 } seqable; 202 197 203 // used to put threads on dlist data structure 198 204 __cfa_dlink(thread$); … … 202 208 struct thread$ * prev; 203 209 } node; 204 205 // used to store state between clh lock/unlock206 volatile bool * clh_prev;207 208 // used to point to this thd's current clh node209 volatile bool * clh_node;210 210 211 211 struct processor * last_proc; … … 240 240 } 241 241 242 static inline thread$ * volatile & ?`next ( thread$ * this ) __attribute__((const)) { 243 return this->seqable.next; 244 } 245 246 static inline thread$ *& Back( thread$ * this ) __attribute__((const)) { 247 return this->seqable.back; 248 } 249 250 static inline thread$ *& Next( thread$ * this ) __attribute__((const)) { 251 return this->seqable.next; 252 } 253 254 static inline bool listed( thread$ * this ) { 255 return this->seqable.next != 0p; 256 } 257 242 258 static inline void ?{}(__monitor_group_t & this) { 243 259 (this.data){0p}; -
libcfa/src/concurrency/io.cfa
reb5962a r1df492a 159 159 160 160 const __u32 mask = *ctx->cq.mask; 161 const __u32 num = *ctx->cq.num;162 161 unsigned long long ts_prev = ctx->cq.ts; 163 unsigned long long ts_next; 164 165 // We might need to do this multiple times if more events completed than can fit in the queue. 166 for() { 167 // re-read the head and tail in case it already changed. 168 const __u32 head = *ctx->cq.head; 169 const __u32 tail = *ctx->cq.tail; 170 const __u32 count = tail - head; 171 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 172 173 for(i; count) { 174 unsigned idx = (head + i) & mask; 175 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 176 177 /* paranoid */ verify(&cqe); 178 179 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 180 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 181 182 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 183 } 184 185 ts_next = ctx->cq.ts = rdtscl(); 186 187 // Mark to the kernel that the cqe has been seen 188 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 189 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 190 ctx->proc->idle_wctx.drain_time = ts_next; 191 192 if(likely(count < num)) break; 193 194 ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS); 195 } 162 163 // re-read the head and tail in case it already changed. 164 const __u32 head = *ctx->cq.head; 165 const __u32 tail = *ctx->cq.tail; 166 const __u32 count = tail - head; 167 __STATS__( false, io.calls.drain++; io.calls.completed += count; ) 168 169 for(i; count) { 170 unsigned idx = (head + i) & mask; 171 volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx]; 172 173 /* paranoid */ verify(&cqe); 174 175 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data; 176 // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 177 178 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 179 } 180 181 unsigned long long ts_next = ctx->cq.ts = rdtscl(); 182 183 // Mark to the kernel that the cqe has been seen 184 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 185 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST ); 186 ctx->proc->idle_wctx.drain_time = ts_next; 196 187 197 188 __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next); -
libcfa/src/concurrency/io/setup.cfa
reb5962a r1df492a 138 138 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; 139 139 if( !is_pow2(nentries) ) { 140 abort("ERROR: I/O setup 'num_entries' must be a power of 2 , was %u\n", nentries);140 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); 141 141 } 142 142 -
libcfa/src/concurrency/iofwd.hfa
reb5962a r1df492a 76 76 void reset ( io_future_t & this ) { return reset (this.self); } 77 77 bool available( io_future_t & this ) { return available(this.self); } 78 bool setup ( io_future_t & this, oneshot & ctx ) { return setup (this.self, ctx); }79 bool retract ( io_future_t & this, oneshot & ctx ) { return retract(this.self, ctx); }80 78 } 81 79 -
libcfa/src/concurrency/kernel.cfa
reb5962a r1df492a 834 834 #endif 835 835 836 837 838 //----------------------------------------------------------------------------- 839 // Debug 840 __cfaabi_dbg_debug_do( 841 extern "C" { 842 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]) { 843 this.prev_name = prev_name; 844 this.prev_thrd = kernelTLS().this_thread; 845 } 846 } 847 ) 848 836 849 //----------------------------------------------------------------------------- 837 850 // Debug -
libcfa/src/concurrency/kernel/fwd.hfa
reb5962a r1df492a 200 200 struct thread$ * expected = this.ptr; 201 201 if(expected == 1p) return false; 202 /* paranoid */ verify( expected == 0p ); 202 203 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 203 204 park(); … … 212 213 thread$ * post(oneshot & this, bool do_unpark = true) { 213 214 struct thread$ * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 214 if( got == 0p || got == 1p) return 0p;215 if( got == 0p ) return 0p; 215 216 if(do_unpark) unpark( got ); 216 217 return got; … … 262 263 263 264 // The future is not fulfilled, try to setup the wait context 265 /* paranoid */ verify( expected == 0p ); 264 266 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 265 267 return true; … … 273 275 // should retract the wait ctx 274 276 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 275 bool retract( future_t & this, oneshot & wait_ctx ) { 276 for() { 277 struct oneshot * expected = this.ptr; 278 279 // expected == 0p: future was never actually setup, just return 280 if( expected == 0p ) return false; 281 282 // expected == 1p: the future is ready and the context was fully consumed 283 // the server won't use the pointer again 284 // It is safe to delete (which could happen after the return) 285 if( expected == 1p ) return true; 286 287 // expected == 2p: the future is ready but the context hasn't fully been consumed 288 // spin until it is safe to move on 289 if( expected == 2p ) { 290 while( this.ptr != 1p ) Pause(); 291 /* paranoid */ verify( this.ptr == 1p ); 292 return true; 293 } 294 295 // expected != wait_ctx: the future was setup with a different context ?!?! 296 // something went wrong here, abort 297 if( expected != &wait_ctx ) abort("Future in unexpected state"); 298 299 // we still have the original context, then no one else saw it 300 // attempt to remove the context so it doesn't get consumed. 301 if(__atomic_compare_exchange_n( &this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 302 return false; 303 } 304 } 277 void retract( future_t & this, oneshot & wait_ctx ) { 278 // Remove the wait context 279 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 280 281 // got == 0p: future was never actually setup, just return 282 if( got == 0p ) return; 283 284 // got == wait_ctx: since fulfil does an atomic_swap, 285 // if we got back the original then no one else saw context 286 // It is safe to delete (which could happen after the return) 287 if( got == &wait_ctx ) return; 288 289 // got == 1p: the future is ready and the context was fully consumed 290 // the server won't use the pointer again 291 // It is safe to delete (which could happen after the return) 292 if( got == 1p ) return; 293 294 // got == 2p: the future is ready but the context hasn't fully been consumed 295 // spin until it is safe to move on 296 if( got == 2p ) { 297 while( this.ptr != 1p ) Pause(); 298 return; 299 } 300 301 // got == any thing else, something wen't wrong here, abort 302 abort("Future in unexpected state"); 305 303 } 306 304 … … 381 379 return ret; 382 380 } 383 384 // Wait for any future to be fulfilled385 forall(T& | sized(T) | { bool setup( T&, oneshot & ); bool retract( T&, oneshot & ); })386 T & wait_any( T * futures, size_t num_futures ) {387 oneshot temp;388 389 // setup all futures390 // if any are already satisfied return391 for ( i; num_futures ) {392 if( !setup(futures[i], temp) ) return futures[i];393 }394 395 // Wait context is setup, just wait on it396 wait( temp );397 398 size_t ret;399 // attempt to retract all futures400 for ( i; num_futures ) {401 if ( retract( futures[i], temp ) ) ret = i;402 }403 404 return futures[ret];405 }406 381 } 407 382 -
libcfa/src/concurrency/locks.cfa
reb5962a r1df492a 219 219 // this casts the alarm node to our wrapped type since we used type erasure 220 220 static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); } 221 222 struct pthread_alarm_node_wrap {223 alarm_node_t alarm_node;224 pthread_cond_var(L) * cond;225 info_thread(L) * info_thd;226 };227 228 void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {229 this.alarm_node{ callback, alarm, period };230 this.cond = c;231 this.info_thd = i;232 }233 234 void ^?{}( pthread_alarm_node_wrap(L) & this ) { }235 236 static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {237 // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.238 lock( cond->lock __cfaabi_dbg_ctx2 );239 240 // this check is necessary to avoid a race condition since this timeout handler241 // may still be called after a thread has been removed from the queue but242 // before the alarm is unregistered243 if ( (*info_thd)`isListed ) { // is thread on queue244 info_thd->signalled = false;245 // remove this thread O(1)246 remove( *info_thd );247 on_notify(*info_thd->lock, info_thd->t);248 }249 unlock( cond->lock );250 }251 252 // this casts the alarm node to our wrapped type since we used type erasure253 static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }254 221 } 255 222 … … 421 388 on_wakeup(*i.lock, recursion_count); 422 389 } 423 424 //----------------------------------------------------------------------------- 425 // pthread_cond_var 426 427 void ?{}( pthread_cond_var(L) & this ) with(this) { 428 blocked_threads{}; 429 lock{}; 430 } 431 432 void ^?{}( pthread_cond_var(L) & this ) { } 433 434 bool notify_one( pthread_cond_var(L) & this ) with(this) { 435 lock( lock __cfaabi_dbg_ctx2 ); 436 bool ret = ! blocked_threads`isEmpty; 437 if ( ret ) { 438 info_thread(L) & popped = try_pop_front( blocked_threads ); 439 on_notify(*popped.lock, popped.t); 440 } 441 unlock( lock ); 442 return ret; 443 } 444 445 bool notify_all( pthread_cond_var(L) & this ) with(this) { 446 lock( lock __cfaabi_dbg_ctx2 ); 447 bool ret = ! blocked_threads`isEmpty; 448 while( ! blocked_threads`isEmpty ) { 449 info_thread(L) & popped = try_pop_front( blocked_threads ); 450 on_notify(*popped.lock, popped.t); 451 } 452 unlock( lock ); 453 return ret; 454 } 455 456 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; } 457 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; } 458 459 static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) { 460 // add info_thread to waiting queue 461 insert_last( blocked_threads, *i ); 462 size_t recursion_count = 0; 463 recursion_count = on_wait( *i->lock ); 464 return recursion_count; 465 } 466 467 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) { 468 lock( lock __cfaabi_dbg_ctx2 ); 469 size_t recursion_count = queue_and_get_recursion(this, &info); 470 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info }; 471 register_self( &node_wrap.alarm_node ); 472 unlock( lock ); 473 474 // blocks here 475 park(); 476 477 // unregisters alarm so it doesn't go off if this happens first 478 unregister_self( &node_wrap.alarm_node ); 479 480 // resets recursion count here after waking 481 if (info.lock) on_wakeup(*info.lock, recursion_count); 482 } 483 484 void wait( pthread_cond_var(L) & this, L & l ) with(this) { 485 wait( this, l, 0 ); 486 } 487 488 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) { 489 lock( lock __cfaabi_dbg_ctx2 ); 490 info_thread( L ) i = { active_thread(), info, &l }; 491 size_t recursion_count = queue_and_get_recursion(this, &i); 492 unlock( lock ); 493 park( ); 494 on_wakeup(*i.lock, recursion_count); 495 } 496 497 #define PTHREAD_WAIT_TIME( u, l, t ) \ 498 info_thread( L ) i = { active_thread(), u, l }; \ 499 queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \ 500 return i.signalled; 501 502 bool wait( pthread_cond_var(L) & this, L & l, timespec t ) { 503 Duration d = { t }; 504 WAIT_TIME( 0, &l , d ) 505 } 506 507 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) { 508 Duration d = { t }; 509 WAIT_TIME( info, &l , d ) 510 } 511 } 390 } 391 512 392 //----------------------------------------------------------------------------- 513 393 // Semaphore -
libcfa/src/concurrency/locks.hfa
reb5962a r1df492a 98 98 mcs_node * next = advance(l.queue, &n); 99 99 if(next) post(next->sem); 100 }101 102 //-----------------------------------------------------------------------------103 // MCS Spin Lock104 // - No recursive acquisition105 // - Needs to be released by owner106 107 struct mcs_spin_node {108 mcs_spin_node * volatile next;109 volatile bool locked;110 };111 112 struct mcs_spin_queue {113 mcs_spin_node * volatile tail;114 };115 116 static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; }117 118 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) {119 return node->next;120 }121 122 struct mcs_spin_lock {123 mcs_spin_queue queue;124 };125 126 static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) {127 mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST);128 n.locked = true;129 if(prev == 0p) return;130 prev->next = &n;131 while(__atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause();132 }133 134 static inline void unlock(mcs_spin_lock & l, mcs_spin_node & n) {135 mcs_spin_node * n_ptr = &n;136 if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return;137 while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {}138 n.next->locked = false;139 }140 141 //-----------------------------------------------------------------------------142 // CLH Spinlock143 // - No recursive acquisition144 // - Needs to be released by owner145 146 struct clh_lock {147 volatile bool * volatile tail;148 };149 150 static inline void ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }151 static inline void ^?{}( clh_lock & this ) { free(this.tail); }152 153 static inline void lock(clh_lock & l) {154 thread$ * curr_thd = active_thread();155 *(curr_thd->clh_node) = false;156 volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);157 while(!__atomic_load_n(prev, __ATOMIC_ACQUIRE)) Pause();158 curr_thd->clh_prev = prev;159 }160 161 static inline void unlock(clh_lock & l) {162 thread$ * curr_thd = active_thread();163 __atomic_store_n(curr_thd->clh_node, true, __ATOMIC_RELEASE);164 curr_thd->clh_node = curr_thd->clh_prev;165 100 } 166 101 … … 270 205 // Fast Block Lock 271 206 272 // minimal blocking lock207 // High efficiency minimal blocking lock 273 208 // - No reacquire for cond var 274 209 // - No recursive acquisition 275 210 // - No ownership 276 211 struct fast_block_lock { 212 // Spin lock used for mutual exclusion 213 __spinlock_t lock; 214 277 215 // List of blocked threads 278 216 dlist( thread$ ) blocked_threads; 279 217 280 // Spin lock used for mutual exclusion281 __spinlock_t lock;282 283 // flag showing if lock is held284 218 bool held:1; 285 286 #ifdef __CFA_DEBUG__287 // for deadlock detection288 struct thread$ * owner;289 #endif290 219 }; 291 220 … … 302 231 static inline void lock(fast_block_lock & this) with(this) { 303 232 lock( lock __cfaabi_dbg_ctx2 ); 304 305 #ifdef __CFA_DEBUG__306 assert(!(held && owner == active_thread()));307 #endif308 233 if (held) { 309 234 insert_last( blocked_threads, *active_thread() ); … … 313 238 } 314 239 held = true; 315 #ifdef __CFA_DEBUG__316 owner = active_thread();317 #endif318 240 unlock( lock ); 319 241 } … … 324 246 thread$ * t = &try_pop_front( blocked_threads ); 325 247 held = ( t ? true : false ); 326 #ifdef __CFA_DEBUG__327 owner = ( t ? t : 0p );328 #endif329 248 unpark( t ); 330 249 unlock( lock ); … … 334 253 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; } 335 254 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { } 336 337 //-----------------------------------------------------------------------------338 // simple_owner_lock339 340 // pthread owner lock341 // - reacquire for cond var342 // - recursive acquisition343 // - ownership344 struct simple_owner_lock {345 // List of blocked threads346 dlist( thread$ ) blocked_threads;347 348 // Spin lock used for mutual exclusion349 __spinlock_t lock;350 351 // owner showing if lock is held352 struct thread$ * owner;353 354 size_t recursion_count;355 };356 357 static inline void ?{}( simple_owner_lock & this ) with(this) {358 lock{};359 blocked_threads{};360 owner = 0p;361 recursion_count = 0;362 }363 static inline void ^?{}( simple_owner_lock & this ) {}364 static inline void ?{}( simple_owner_lock & this, simple_owner_lock this2 ) = void;365 static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;366 367 static inline void lock(simple_owner_lock & this) with(this) {368 if (owner == active_thread()) {369 recursion_count++;370 return;371 }372 lock( lock __cfaabi_dbg_ctx2 );373 374 if (owner != 0p) {375 insert_last( blocked_threads, *active_thread() );376 unlock( lock );377 park( );378 return;379 }380 owner = active_thread();381 recursion_count = 1;382 unlock( lock );383 }384 385 // TODO: fix duplicate def issue and bring this back386 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {387 // thread$ * t = &try_pop_front( blocked_threads );388 // owner = t;389 // recursion_count = ( t ? 1 : 0 );390 // unpark( t );391 // }392 393 static inline void unlock(simple_owner_lock & this) with(this) {394 lock( lock __cfaabi_dbg_ctx2 );395 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );396 /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );397 // if recursion count is zero release lock and set new owner if one is waiting398 recursion_count--;399 if ( recursion_count == 0 ) {400 // pop_and_set_new_owner( this );401 thread$ * t = &try_pop_front( blocked_threads );402 owner = t;403 recursion_count = ( t ? 1 : 0 );404 unpark( t );405 }406 unlock( lock );407 }408 409 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {410 lock( lock __cfaabi_dbg_ctx2 );411 // lock held412 if ( owner != 0p ) {413 insert_last( blocked_threads, *t );414 unlock( lock );415 }416 // lock not held417 else {418 owner = t;419 recursion_count = 1;420 unpark( t );421 unlock( lock );422 }423 }424 425 static inline size_t on_wait(simple_owner_lock & this) with(this) {426 lock( lock __cfaabi_dbg_ctx2 );427 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );428 /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );429 430 size_t ret = recursion_count;431 432 // pop_and_set_new_owner( this );433 434 thread$ * t = &try_pop_front( blocked_threads );435 owner = t;436 recursion_count = ( t ? 1 : 0 );437 unpark( t );438 439 unlock( lock );440 return ret;441 }442 443 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }444 445 //-----------------------------------------------------------------------------446 // Spin Queue Lock447 448 // - No reacquire for cond var449 // - No recursive acquisition450 // - No ownership451 // - spin lock with no locking/atomics in unlock452 struct spin_queue_lock {453 // Spin lock used for mutual exclusion454 mcs_spin_lock lock;455 456 // flag showing if lock is held457 volatile bool held;458 459 #ifdef __CFA_DEBUG__460 // for deadlock detection461 struct thread$ * owner;462 #endif463 };464 465 static inline void ?{}( spin_queue_lock & this ) with(this) {466 lock{};467 held = false;468 }469 static inline void ^?{}( spin_queue_lock & this ) {}470 static inline void ?{}( spin_queue_lock & this, spin_queue_lock this2 ) = void;471 static inline void ?=?( spin_queue_lock & this, spin_queue_lock this2 ) = void;472 473 // if this is called recursively IT WILL DEADLOCK!!!!!474 static inline void lock(spin_queue_lock & this) with(this) {475 mcs_spin_node node;476 #ifdef __CFA_DEBUG__477 assert(!(held && owner == active_thread()));478 #endif479 lock( lock, node );480 while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();481 __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);482 unlock( lock, node );483 #ifdef __CFA_DEBUG__484 owner = active_thread();485 #endif486 }487 488 static inline void unlock(spin_queue_lock & this) with(this) {489 #ifdef __CFA_DEBUG__490 owner = 0p;491 #endif492 __atomic_store_n(&held, false, __ATOMIC_RELEASE);493 }494 495 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); }496 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }497 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { }498 499 500 //-----------------------------------------------------------------------------501 // MCS Block Spin Lock502 503 // - No reacquire for cond var504 // - No recursive acquisition505 // - No ownership506 // - Blocks but first node spins (like spin queue but blocking for not first thd)507 struct mcs_block_spin_lock {508 // Spin lock used for mutual exclusion509 mcs_lock lock;510 511 // flag showing if lock is held512 volatile bool held;513 514 #ifdef __CFA_DEBUG__515 // for deadlock detection516 struct thread$ * owner;517 #endif518 };519 520 static inline void ?{}( mcs_block_spin_lock & this ) with(this) {521 lock{};522 held = false;523 }524 static inline void ^?{}( mcs_block_spin_lock & this ) {}525 static inline void ?{}( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void;526 static inline void ?=?( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void;527 528 // if this is called recursively IT WILL DEADLOCK!!!!!529 static inline void lock(mcs_block_spin_lock & this) with(this) {530 mcs_node node;531 #ifdef __CFA_DEBUG__532 assert(!(held && owner == active_thread()));533 #endif534 lock( lock, node );535 while(held) Pause();536 held = true;537 unlock( lock, node );538 #ifdef __CFA_DEBUG__539 owner = active_thread();540 #endif541 }542 543 static inline void unlock(mcs_block_spin_lock & this) with(this) {544 #ifdef __CFA_DEBUG__545 owner = 0p;546 #endif547 held = false;548 }549 550 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }551 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }552 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { }553 554 //-----------------------------------------------------------------------------555 // Block Spin Lock556 557 // - No reacquire for cond var558 // - No recursive acquisition559 // - No ownership560 // - Blocks but first node spins (like spin queue but blocking for not first thd)561 struct block_spin_lock {562 // Spin lock used for mutual exclusion563 fast_block_lock lock;564 565 // flag showing if lock is held566 volatile bool held;567 568 #ifdef __CFA_DEBUG__569 // for deadlock detection570 struct thread$ * owner;571 #endif572 };573 574 static inline void ?{}( block_spin_lock & this ) with(this) {575 lock{};576 held = false;577 }578 static inline void ^?{}( block_spin_lock & this ) {}579 static inline void ?{}( block_spin_lock & this, block_spin_lock this2 ) = void;580 static inline void ?=?( block_spin_lock & this, block_spin_lock this2 ) = void;581 582 // if this is called recursively IT WILL DEADLOCK!!!!!583 static inline void lock(block_spin_lock & this) with(this) {584 #ifdef __CFA_DEBUG__585 assert(!(held && owner == active_thread()));586 #endif587 lock( lock );588 while(held) Pause();589 held = true;590 unlock( lock );591 #ifdef __CFA_DEBUG__592 owner = active_thread();593 #endif594 }595 596 static inline void unlock(block_spin_lock & this) with(this) {597 #ifdef __CFA_DEBUG__598 owner = 0p;599 #endif600 held = false;601 }602 603 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); }604 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }605 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { }606 255 607 256 //----------------------------------------------------------------------------- … … 683 332 // - signalling without holding branded lock is UNSAFE! 684 333 // - only allows usage of one lock, cond var is branded after usage 685 686 334 struct fast_cond_var { 687 335 // List of blocked threads 688 336 dlist( info_thread(L) ) blocked_threads; 337 689 338 #ifdef __CFA_DEBUG__ 690 339 L * lock_used; … … 692 341 }; 693 342 343 694 344 void ?{}( fast_cond_var(L) & this ); 695 345 void ^?{}( fast_cond_var(L) & this ); … … 699 349 700 350 uintptr_t front( fast_cond_var(L) & this ); 351 701 352 bool empty ( fast_cond_var(L) & this ); 702 353 703 354 void wait( fast_cond_var(L) & this, L & l ); 704 355 void wait( fast_cond_var(L) & this, L & l, uintptr_t info ); 705 706 707 //----------------------------------------------------------------------------- 708 // pthread_cond_var 709 // 710 // - cond var with minimal footprint 711 // - supports operations needed for phthread cond 712 713 struct pthread_cond_var { 714 dlist( info_thread(L) ) blocked_threads; 715 __spinlock_t lock; 716 }; 717 718 void ?{}( pthread_cond_var(L) & this ); 719 void ^?{}( pthread_cond_var(L) & this ); 720 721 bool notify_one( pthread_cond_var(L) & this ); 722 bool notify_all( pthread_cond_var(L) & this ); 723 724 uintptr_t front( pthread_cond_var(L) & this ); 725 bool empty ( pthread_cond_var(L) & this ); 726 727 void wait( pthread_cond_var(L) & this, L & l ); 728 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ); 729 bool wait( pthread_cond_var(L) & this, L & l, timespec t ); 730 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ); 731 } 356 } -
libcfa/src/concurrency/thread.cfa
reb5962a r1df492a 53 53 #endif 54 54 55 seqable.next = 0p; 56 seqable.back = 0p; 57 55 58 node.next = 0p; 56 59 node.prev = 0p; 60 doregister(curr_cluster, this); 57 61 58 clh_node = malloc( );59 *clh_node = false;60 61 doregister(curr_cluster, this);62 62 monitors{ &self_mon_p, 1, (fptr_t)0 }; 63 63 } … … 67 67 canary = 0xDEADDEADDEADDEADp; 68 68 #endif 69 free(clh_node);70 69 unregister(curr_cluster, this); 71 70 ^self_cor{}; -
libcfa/src/containers/queueLockFree.hfa
reb5962a r1df492a 2 2 3 3 #include <assert.h> 4 5 #include <bits/defs.hfa>6 4 7 5 forall( T &) { -
libcfa/src/startup.cfa
reb5962a r1df492a 63 63 64 64 struct __spinlock_t; 65 extern "C" { 66 void __cfaabi_dbg_record_lock(struct __spinlock_t & this, const char prev_name[]) __attribute__(( weak )) libcfa_public {} 67 } 65 68 66 69 // Local Variables: // -
src/AST/Pass.impl.hpp
reb5962a r1df492a 182 182 183 183 // get the stmts/decls that will need to be spliced in 184 auto stmts_before = __pass::stmtsToAddBefore( core, 0 );185 auto stmts_after = __pass::stmtsToAddAfter ( core, 0 );186 auto decls_before = __pass::declsToAddBefore( core, 0 );187 auto decls_after = __pass::declsToAddAfter ( core, 0 );184 auto stmts_before = __pass::stmtsToAddBefore( core, 0); 185 auto stmts_after = __pass::stmtsToAddAfter ( core, 0); 186 auto decls_before = __pass::declsToAddBefore( core, 0); 187 auto decls_after = __pass::declsToAddAfter ( core, 0); 188 188 189 189 // These may be modified by subnode but most be restored once we exit this statemnet. … … 287 287 288 288 // get the stmts/decls that will need to be spliced in 289 auto stmts_before = __pass::stmtsToAddBefore( core, 0 );290 auto stmts_after = __pass::stmtsToAddAfter ( core, 0 );291 auto decls_before = __pass::declsToAddBefore( core, 0 );292 auto decls_after = __pass::declsToAddAfter ( core, 0 );289 auto stmts_before = __pass::stmtsToAddBefore( core, 0); 290 auto stmts_after = __pass::stmtsToAddAfter ( core, 0); 291 auto decls_before = __pass::declsToAddBefore( core, 0); 292 auto decls_after = __pass::declsToAddAfter ( core, 0); 293 293 294 294 // These may be modified by subnode but most be restored once we exit this statemnet. … … 317 317 assert(( empty( stmts_before ) && empty( stmts_after )) 318 318 || ( empty( decls_before ) && empty( decls_after )) ); 319 320 319 321 320 322 // Take all the statements which should have gone after, N/A for first iteration … … 2114 2116 if ( __visit_children() ) { 2115 2117 bool mutated = false; 2116 ast::TypeSubstitution::TypeMapnew_map;2117 for ( const auto & p : node->type Map) {2118 std::unordered_map< ast::TypeInstType::TypeEnvKey, ast::ptr< ast::Type > > new_map; 2119 for ( const auto & p : node->typeEnv ) { 2118 2120 guard_symtab guard { *this }; 2119 2121 auto new_node = p.second->accept( *this ); … … 2123 2125 if (mutated) { 2124 2126 auto new_node = __pass::mutate<core_t>( node ); 2125 new_node->type Map.swap( new_map );2127 new_node->typeEnv.swap( new_map ); 2126 2128 node = new_node; 2127 2129 } -
src/AST/TypeSubstitution.cpp
reb5962a r1df492a 38 38 39 39 void TypeSubstitution::initialize( const TypeSubstitution &src, TypeSubstitution &dest ) { 40 dest.type Map.clear();40 dest.typeEnv.clear(); 41 41 dest.add( src ); 42 42 } 43 43 44 44 void TypeSubstitution::add( const TypeSubstitution &other ) { 45 for ( Type Map::const_iterator i = other.typeMap.begin(); i != other.typeMap.end(); ++i ) {46 type Map[ i->first ] = i->second;45 for ( TypeEnvType::const_iterator i = other.typeEnv.begin(); i != other.typeEnv.end(); ++i ) { 46 typeEnv[ i->first ] = i->second; 47 47 } // for 48 48 } 49 49 50 50 void TypeSubstitution::add( const TypeInstType * formalType, const Type *actualType ) { 51 type Map[ *formalType ] = actualType;51 typeEnv[ *formalType ] = actualType; 52 52 } 53 53 54 54 void TypeSubstitution::add( const TypeInstType::TypeEnvKey & key, const Type * actualType) { 55 type Map[ key ] = actualType;55 typeEnv[ key ] = actualType; 56 56 } 57 57 58 58 void TypeSubstitution::remove( const TypeInstType * formalType ) { 59 Type Map::iterator i = typeMap.find( *formalType );60 if ( i != type Map.end() ) {61 type Map.erase( *formalType );59 TypeEnvType::iterator i = typeEnv.find( *formalType ); 60 if ( i != typeEnv.end() ) { 61 typeEnv.erase( *formalType ); 62 62 } // if 63 63 } 64 64 65 const Type *TypeSubstitution::lookup( 66 const TypeInstType::TypeEnvKey & formalType ) const { 67 TypeMap::const_iterator i = typeMap.find( formalType ); 65 const Type *TypeSubstitution::lookup( const TypeInstType * formalType ) const { 66 TypeEnvType::const_iterator i = typeEnv.find( *formalType ); 68 67 69 68 // break on not in substitution set 70 if ( i == type Map.end() ) return 0;69 if ( i == typeEnv.end() ) return 0; 71 70 72 71 // attempt to transitively follow TypeInstType links. 73 72 while ( const TypeInstType *actualType = i->second.as<TypeInstType>()) { 74 73 // break cycles in the transitive follow 75 if ( formalType == *actualType ) break;74 if ( *formalType == *actualType ) break; 76 75 77 76 // Look for the type this maps to, returning previous mapping if none-such 78 i = type Map.find( *actualType );79 if ( i == type Map.end() ) return actualType;77 i = typeEnv.find( *actualType ); 78 if ( i == typeEnv.end() ) return actualType; 80 79 } 81 80 … … 84 83 } 85 84 86 const Type *TypeSubstitution::lookup( const TypeInstType * formalType ) const {87 return lookup( ast::TypeInstType::TypeEnvKey( *formalType ) );88 }89 90 85 bool TypeSubstitution::empty() const { 91 return type Map.empty();86 return typeEnv.empty(); 92 87 } 93 88 … … 124 119 sub.core.subCount = 0; 125 120 sub.core.freeOnly = true; 126 for ( Type Map::iterator i = typeMap.begin(); i != typeMap.end(); ++i ) {121 for ( TypeEnvType::iterator i = typeEnv.begin(); i != typeEnv.end(); ++i ) { 127 122 i->second = i->second->accept( sub ); 128 123 } … … 134 129 if ( bound != boundVars.end() ) return inst; 135 130 136 Type Map::const_iterator i = sub.typeMap.find( *inst );137 if ( i == sub.type Map.end() ) {131 TypeEnvType::const_iterator i = sub.typeEnv.find( *inst ); 132 if ( i == sub.typeEnv.end() ) { 138 133 return inst; 139 134 } else { -
src/AST/TypeSubstitution.hpp
reb5962a r1df492a 75 75 void add( const TypeSubstitution &other ); 76 76 void remove( const TypeInstType * formalType ); 77 const Type *lookup( const TypeInstType::TypeEnvKey & formalType ) const;78 77 const Type *lookup( const TypeInstType * formalType ) const; 79 78 bool empty() const; … … 105 104 friend class Pass; 106 105 107 typedef std::unordered_map< TypeInstType::TypeEnvKey, ptr<Type> > Type Map;108 Type Map typeMap;106 typedef std::unordered_map< TypeInstType::TypeEnvKey, ptr<Type> > TypeEnvType; 107 TypeEnvType typeEnv; 109 108 110 109 public: 111 // has to come after declaration of type Map112 auto begin() -> decltype( type Map.begin() ) { return typeMap.begin(); }113 auto end() -> decltype( type Map. end() ) { return typeMap. end(); }114 auto begin() const -> decltype( type Map.begin() ) { return typeMap.begin(); }115 auto end() const -> decltype( type Map. end() ) { return typeMap. end(); }110 // has to come after declaration of typeEnv 111 auto begin() -> decltype( typeEnv.begin() ) { return typeEnv.begin(); } 112 auto end() -> decltype( typeEnv. end() ) { return typeEnv. end(); } 113 auto begin() const -> decltype( typeEnv.begin() ) { return typeEnv.begin(); } 114 auto end() const -> decltype( typeEnv. end() ) { return typeEnv. end(); } 116 115 117 116 }; … … 145 144 if ( const TypeExpr *actual = actualIt->template as<TypeExpr>() ) { 146 145 if ( formal->name != "" ) { 147 type Map[ formal ] = actual->type;146 typeEnv[ formal ] = actual->type; 148 147 } // if 149 148 } else { -
src/Concurrency/Waitfor.cc
reb5962a r1df492a 56 56 | | 57 57 | | 58 | |58 | | 59 59 | | 60 60 | | -
src/Concurrency/Waitfor.h
reb5962a r1df492a 19 19 20 20 class Declaration; 21 namespace ast {22 class TranslationUnit;23 }24 21 25 22 namespace Concurrency { 26 23 void generateWaitFor( std::list< Declaration * > & translationUnit ); 27 28 void generateWaitFor( ast::TranslationUnit & translationUnit );29 24 }; 30 25 -
src/Concurrency/module.mk
reb5962a r1df492a 19 19 Concurrency/Keywords.cc \ 20 20 Concurrency/Keywords.h \ 21 Concurrency/WaitforNew.cpp \22 21 Concurrency/Waitfor.cc \ 23 22 Concurrency/Waitfor.h -
src/GenPoly/Specialize.cc
reb5962a r1df492a 247 247 structureArg( (*actualBegin)->get_type(), argBegin, argEnd, back_inserter( appExpr->get_args() ) ); 248 248 } 249 assertf( argBegin == argEnd, "Did not structure all arguments." );250 249 251 250 appExpr->env = TypeSubstitution::newFromExpr( appExpr, env ); -
src/InitTweak/FixGlobalInit.cc
reb5962a r1df492a 162 162 } // if 163 163 if ( Statement * ctor = ctorInit->ctor ) { 164 addDataSect ionAttribute( objDecl );164 addDataSectonAttribute( objDecl ); 165 165 initStatements.push_back( ctor ); 166 166 objDecl->init = nullptr; -
src/InitTweak/FixInit.cc
reb5962a r1df492a 806 806 // The attribute works, and is meant to apply, both for leaving the static local alone, 807 807 // and for hoisting it out as a static global. 808 addDataSect ionAttribute( objDecl );808 addDataSectonAttribute( objDecl ); 809 809 810 810 // originally wanted to take advantage of gcc nested functions, but -
src/InitTweak/InitTweak.cc
reb5962a r1df492a 587 587 588 588 bool isConstructable( const ast::Type * type ) { 589 return ! dynamic_cast< const ast::VarArgsType * >( type ) && ! dynamic_cast< const ast::ReferenceType * >( type ) 589 return ! dynamic_cast< const ast::VarArgsType * >( type ) && ! dynamic_cast< const ast::ReferenceType * >( type ) 590 590 && ! dynamic_cast< const ast::FunctionType * >( type ) && ! Tuples::isTtype( type ); 591 591 } … … 1025 1025 if (!assign) { 1026 1026 auto td = new ast::TypeDecl({}, "T", {}, nullptr, ast::TypeDecl::Dtype, true); 1027 assign = new ast::FunctionDecl({}, "?=?", {}, 1027 assign = new ast::FunctionDecl({}, "?=?", {}, 1028 1028 { new ast::ObjectDecl({}, "_dst", new ast::ReferenceType(new ast::TypeInstType("T", td))), 1029 1029 new ast::ObjectDecl({}, "_src", new ast::TypeInstType("T", td))}, … … 1095 1095 1096 1096 // address of a variable or member expression is constexpr 1097 if ( ! dynamic_cast< const ast::NameExpr * >( arg ) 1098 && ! dynamic_cast< const ast::VariableExpr * >( arg ) 1099 && ! dynamic_cast< const ast::MemberExpr * >( arg ) 1097 if ( ! dynamic_cast< const ast::NameExpr * >( arg ) 1098 && ! dynamic_cast< const ast::VariableExpr * >( arg ) 1099 && ! dynamic_cast< const ast::MemberExpr * >( arg ) 1100 1100 && ! dynamic_cast< const ast::UntypedMemberExpr * >( arg ) ) result = false; 1101 1101 } … … 1241 1241 } 1242 1242 1243 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message 1244 #define ASM_COMMENT "#" 1245 #else // defined( __ARM_ARCH ) 1246 #define ASM_COMMENT "//" 1247 #endif 1248 static const char * const data_section = ".data" ASM_COMMENT; 1249 static const char * const tlsd_section = ".tdata" ASM_COMMENT; 1250 void addDataSectionAttribute( ObjectDecl * objDecl ) { 1251 const bool is_tls = objDecl->get_storageClasses().is_threadlocal; 1252 const char * section = is_tls ? tlsd_section : data_section; 1243 void addDataSectonAttribute( ObjectDecl * objDecl ) { 1253 1244 objDecl->attributes.push_back(new Attribute("section", { 1254 new ConstantExpr( Constant::from_string( section ) ) 1255 })); 1245 new ConstantExpr( Constant::from_string(".data" 1246 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message 1247 "#" 1248 #else // defined( __ARM_ARCH ) 1249 "//" 1250 #endif 1251 ))})); 1256 1252 } 1257 1253 1258 1254 void addDataSectionAttribute( ast::ObjectDecl * objDecl ) { 1259 const bool is_tls = objDecl->storage.is_threadlocal;1260 const char * section = is_tls ? tlsd_section : data_section;1261 1255 objDecl->attributes.push_back(new ast::Attribute("section", { 1262 ast::ConstantExpr::from_string(objDecl->location, section) 1263 })); 1256 ast::ConstantExpr::from_string(objDecl->location, ".data" 1257 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message 1258 "#" 1259 #else // defined( __ARM_ARCH ) 1260 "//" 1261 #endif 1262 )})); 1264 1263 } 1265 1264 -
src/InitTweak/InitTweak.h
reb5962a r1df492a 127 127 /// .section .data#,"a" 128 128 /// to avoid assembler warning "ignoring changed section attributes for .data" 129 void addDataSect ionAttribute( ObjectDecl * objDecl );129 void addDataSectonAttribute( ObjectDecl * objDecl ); 130 130 131 131 void addDataSectionAttribute( ast::ObjectDecl * objDecl ); -
src/main.cc
reb5962a r1df492a 10 10 // Created On : Fri May 15 23:12:02 2015 11 11 // Last Modified By : Andrew Beach 12 // Last Modified On : Tue Jun 7 13:29:00 202213 // Update Count : 67 412 // Last Modified On : Fri Apr 29 9:52:00 2022 13 // Update Count : 673 14 14 // 15 15 … … 447 447 PASS( "Expand Unique Expr", Tuples::expandUniqueExpr( transUnit ) ); // xxx - is this the right place for this? want to expand ASAP so tha, sequent passes don't need to worry about double-visiting a unique expr - needs to go after InitTweak::fix so that copy constructed return declarations are reused 448 448 449 PASS( "Translate Tries", ControlStruct::translateTries( transUnit ) ); 450 PASS( "Gen Waitfor", Concurrency::generateWaitFor( transUnit ) ); 449 PASS( "Translate Tries" , ControlStruct::translateTries( transUnit ) ); 451 450 452 451 translationUnit = convert( move( transUnit ) ); … … 518 517 519 518 PASS( "Expand Unique Expr", Tuples::expandUniqueExpr( translationUnit ) ); // xxx - is this the right place for this? want to expand ASAP so tha, sequent passes don't need to worry about double-visiting a unique expr - needs to go after InitTweak::fix so that copy constructed return declarations are reused 520 PASS( "Translate Tries", ControlStruct::translateTries( translationUnit ) ); 521 PASS( " Gen Waitfor", Concurrency::generateWaitFor( translationUnit ) );519 520 PASS( "Translate Tries" , ControlStruct::translateTries( translationUnit ) ); 522 521 } 522 523 PASS( "Gen Waitfor" , Concurrency::generateWaitFor( translationUnit ) ); 523 524 524 525 PASS( "Convert Specializations", GenPoly::convertSpecializations( translationUnit ) ); // needs to happen before tuple types are expanded -
tests/unified_locking/mutex_test.hfa
reb5962a r1df492a 22 22 } 23 23 24 uint32_t cs( uint32_t & entries) {24 uint32_t cs() { 25 25 thread$ * me = active_thread(); 26 26 uint32_t value; 27 27 lock(mo.l); 28 28 { 29 entries++;30 29 uint32_t tsum = mo.sum; 31 30 uint32_t cnt = mo.cnt; … … 43 42 thread LockCheck { 44 43 uint32_t sum; 45 uint32_t entries;46 44 }; 47 45 48 46 void main(LockCheck & this) { 49 47 this.sum = 0; 50 this.entries = 0;51 48 for(num_times) { 52 49 trash(); 53 this.sum += cs( this.entries);50 this.sum += cs(); 54 51 trash(); 55 52 yield(random(10)); … … 61 58 mo.sum = -32; 62 59 mo.cnt = 0; 63 uint32_t real_entries = 0;64 60 processor p[2]; 65 61 sout | "Starting"; … … 67 63 LockCheck checkers[13]; 68 64 for(i;13) { 69 LockCheck & curr = join(checkers[i]); 70 sum += curr.sum; 71 real_entries += curr.entries; 65 sum += join(checkers[i]).sum; 72 66 } 73 67 } 74 68 sout | "Done!"; 75 if(real_entries != (13 * num_times)) sout | "Invalid real cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')'; 76 if(mo.cnt != (13 * num_times)) sout | "Invalid concurrent cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')'; 69 if(mo.cnt != (13 * num_times)) sout | "Invalid cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')'; 77 70 if(sum == mo.sum) sout | "Match!"; 78 71 else sout | "No Match!" | sum | "vs" | mo.sum; -
tools/cfa.nanorc
reb5962a r1df492a 10 10 color green "\<(forall|trait|(o|d|f|t)type|mutex|_Bool|volatile|virtual)\>" 11 11 color green "\<(float|double|bool|char|int|short|long|enum|void|auto)\>" 12 color green "\<(static|const|extern|(un)?signed|inline |sizeof|vtable)\>"12 color green "\<(static|const|extern|(un)?signed|inline)\>" "\<(sizeof)\>" 13 13 color green "\<((s?size)|one|zero|((u_?)?int(8|16|32|64|ptr)))_t\>" 14 14 15 15 # Declarations 16 16 color brightgreen "\<(struct|union|typedef|trait|coroutine|generator)\>" 17 color brightgreen "\<(monitor|thread|with |exception)\>"17 color brightgreen "\<(monitor|thread|with)\>" 18 18 19 19 # Control Flow Structures -
tools/jenkins/setup.sh.in
reb5962a r1df492a 29 29 function getrunpath() 30 30 { 31 local elfout=$(readelf -d $1 | grep -E "RPATH|RUNPATH")31 local elfout=$(readelf -d $1 | grep "RUNPATH") 32 32 regex='\[/([[:alpha:][:digit:]@/_.-]+)\]' 33 33 if [[ $elfout =~ $regex ]]; then … … 43 43 { 44 44 local deps=$(ldd $1) 45 retsysdeps=() 45 46 retlcldeps=() 46 retsysdeps=()47 47 while IFS= read -r line; do 48 regex1=' (libcfa[[:alpha:][:digit:].]+)'49 regex2=' /([[:alpha:][:digit:]@/_.-]+)'48 regex1='/([[:alpha:][:digit:]@/_.-]+)' 49 regex2='(libcfa[[:alpha:][:digit:].]+) => not found' 50 50 regex3='linux-vdso.so.1|linux-gate.so.1' 51 51 if [[ $line =~ $regex1 ]]; then 52 retsysdeps+=(${BASH_REMATCH[1]}) 53 elif [[ $line =~ $regex2 ]]; then 52 54 retlcldeps+=(${BASH_REMATCH[1]}) 53 elif [[ $line =~ $regex2 ]]; then54 retsysdeps+=(${BASH_REMATCH[1]})55 55 elif [[ $line =~ $regex3 ]]; then 56 56 # echo "ignoring '$line}': intrinsic"
Note:
See TracChangeset
for help on using the changeset viewer.