Changeset eb5962a


Ignore:
Timestamp:
Jun 21, 2022, 1:39:24 PM (6 months ago)
Author:
JiadaL <j82liang@…>
Branches:
master, pthread-emulation, qualifiedEnum
Children:
b62d1d6
Parents:
1df492a (diff), 1dbbef6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Files:
22 added
35 edited

Legend:

Unmodified
Added
Removed
  • benchmark/io/http/Makefile.am

    r1df492a reb5962a  
    2121include $(top_srcdir)/tools/build/cfa.make
    2222
    23 AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread # -Werror
     23AM_CFLAGS = -O3 -Wall -Wextra -I$(srcdir) -lrt -pthread -g # -Werror
    2424AM_CFAFLAGS = -quiet -nodebug
    2525AM_LDFLAGS = -quiet -nodebug
     
    3737        options.cfa \
    3838        options.hfa \
     39        printer.cfa \
     40        printer.hfa \
    3941        protocol.cfa \
    4042        protocol.hfa \
     43        socket.cfa \
     44        socket.hfa \
    4145        worker.cfa \
    4246        worker.hfa
  • benchmark/io/http/main.cfa

    r1df492a reb5962a  
    22
    33#include <errno.h>
     4#include <signal.h>
    45#include <stdio.h>
    56#include <string.h>
     
    89        #include <sched.h>
    910        #include <signal.h>
     11        #include <sys/eventfd.h>
    1012        #include <sys/socket.h>
    1113        #include <netinet/in.h>
     
    1416#include <fstream.hfa>
    1517#include <kernel.hfa>
     18#include <locks.hfa>
    1619#include <iofwd.hfa>
    1720#include <stats.hfa>
     
    2124#include "filecache.hfa"
    2225#include "options.hfa"
     26#include "socket.hfa"
     27#include "printer.hfa"
    2328#include "worker.hfa"
    2429
     
    3035
    3136//=============================================================================================
    32 // Stats Printer
    33 //============================================================================================='
    34 
    35 thread StatsPrinter {
    36         Worker * workers;
    37         int worker_cnt;
    38 };
    39 
    40 void ?{}( StatsPrinter & this, cluster & cl ) {
    41         ((thread&)this){ "Stats Printer Thread", cl };
    42         this.worker_cnt = 0;
    43 }
    44 
    45 void ^?{}( StatsPrinter & mutex this ) {}
    46 
    47 #define eng3(X) (ws(3, 3, unit(eng( X ))))
    48 
    49 void main(StatsPrinter & this) {
    50         LOOP: for() {
    51                 waitfor( ^?{} : this) {
    52                         break LOOP;
    53                 }
    54                 or else {}
    55 
    56                 sleep(10`s);
    57 
    58                 print_stats_now( *active_cluster(), CFA_STATS_READY_Q | CFA_STATS_IO );
    59                 if(this.worker_cnt != 0) {
    60                         uint64_t tries = 0;
    61                         uint64_t calls = 0;
    62                         uint64_t header = 0;
    63                         uint64_t splcin = 0;
    64                         uint64_t splcot = 0;
    65                         struct {
    66                                 volatile uint64_t calls;
    67                                 volatile uint64_t bytes;
    68                         } avgrd[zipf_cnts];
    69                         memset(avgrd, 0, sizeof(avgrd));
    70 
    71                         for(i; this.worker_cnt) {
    72                                 tries += this.workers[i].stats.sendfile.tries;
    73                                 calls += this.workers[i].stats.sendfile.calls;
    74                                 header += this.workers[i].stats.sendfile.header;
    75                                 splcin += this.workers[i].stats.sendfile.splcin;
    76                                 splcot += this.workers[i].stats.sendfile.splcot;
    77                                 for(j; zipf_cnts) {
    78                                         avgrd[j].calls += this.workers[i].stats.sendfile.avgrd[j].calls;
    79                                         avgrd[j].bytes += this.workers[i].stats.sendfile.avgrd[j].bytes;
    80                                 }
    81                         }
    82 
    83                         double ratio = ((double)tries) / calls;
    84 
    85                         sout | "----- Worker Stats -----";
    86                         sout | "sendfile  : " | calls | "calls," | tries | "tries (" | ratio | " try/call)";
    87                         sout | "            " | header | "header," | splcin | "splice in," | splcot | "splice out";
    88                         sout | " - zipf sizes:";
    89                         for(i; zipf_cnts) {
    90                                 double written = avgrd[i].calls > 0 ? ((double)avgrd[i].bytes) / avgrd[i].calls : 0;
    91                                 sout | "        " | zipf_sizes[i] | "bytes," | avgrd[i].calls | "shorts," | written | "written";
    92                         }
    93                 }
    94                 else {
    95                         sout | "No Workers!";
    96                 }
    97         }
    98 }
    99 
    100 //=============================================================================================
    10137// Globals
    10238//=============================================================================================
    103 struct ServerCluster {
    104         cluster self;
    105         processor    * procs;
    106         // io_context   * ctxs;
    107         StatsPrinter * prnt;
    108 
    109 };
    110 
    11139void ?{}( ServerCluster & this ) {
    11240        (this.self){ "Server Cluster", options.clopts.params };
     
    12250                (this.procs[i]){ "Benchmark Processor", this.self };
    12351
    124                 int c = 0;
    125                 int n = 1 + (i % cnt);
    126                 for(int j = 0; j < CPU_SETSIZE; j++) {
    127                         if(CPU_ISSET(j, &fullset)) n--;
    128                         if(n == 0) {
    129                                 c = j;
    130                                 break;
    131                         }
    132                 }
    133                 cpu_set_t localset;
    134                 CPU_ZERO(&localset);
    135                 CPU_SET(c, &localset);
    136                 ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
    137                 if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
     52                // int c = 0;
     53                // int n = 1 + (i % cnt);
     54                // for(int j = 0; j < CPU_SETSIZE; j++) {
     55                //      if(CPU_ISSET(j, &fullset)) n--;
     56                //      if(n == 0) {
     57                //              c = j;
     58                //              break;
     59                //      }
     60                // }
     61                // cpu_set_t localset;
     62                // CPU_ZERO(&localset);
     63                // CPU_SET(c, &localset);
     64                // ret = pthread_setaffinity_np(this.procs[i].kernel_thread, sizeof(localset), &localset);
     65                // if( ret != 0 ) abort | "sched_getaffinity failed with" | ret | strerror( ret );
    13866
    13967                #if !defined(__CFA_NO_STATISTICS__)
     
    14775        }
    14876
    149         if(options.stats) {
    150                 this.prnt = alloc();
    151                 (*this.prnt){ this.self };
    152         } else {
    153                 this.prnt = 0p;
    154         }
    155 
    15677        #if !defined(__CFA_NO_STATISTICS__)
    15778                print_stats_at_exit( this.self, CFA_STATS_READY_Q | CFA_STATS_IO );
     
    16384
    16485void ^?{}( ServerCluster & this ) {
    165         delete(this.prnt);
    166 
    16786        for(i; options.clopts.nprocs) {
    16887                ^(this.procs[i]){};
     
    17594extern void init_protocol(void);
    17695extern void deinit_protocol(void);
     96
     97//=============================================================================================
     98// REUSEPORT
     99//=============================================================================================
     100
     101size_t sockarr_size;
     102struct __attribute__((aligned(128))) Q {
     103        mpsc_queue(PendingRead) q;
     104};
     105
     106//=============================================================================================
     107// Termination
     108//=============================================================================================
     109
     110int closefd;
     111void cleanstop(int) {
     112        eventfd_t buffer = 1;
     113        char * buffer_s = (char*)&buffer;
     114        int ret = write(closefd, buffer_s, sizeof(buffer));
     115        if(ret < 0) abort( "eventfd write error: (%d) %s\n", (int)errno, strerror(errno) );
     116        return;
     117}
    177118
    178119//=============================================================================================
     
    180121//============================================================================================='
    181122int main( int argc, char * argv[] ) {
     123        int ret;
    182124        __sighandler_t s = 1p;
    183125        signal(SIGPIPE, s);
     
    186128        // Parse args
    187129        parse_options(argc, argv);
     130
     131        //===================
     132        // Setup non-interactive termination
     133        if(!options.interactive) {
     134                closefd = eventfd(0, 0);
     135                if(closefd < 0) abort( "eventfd error: (%d) %s\n", (int)errno, strerror(errno) );
     136
     137                sighandler_t prev = signal(SIGTERM, cleanstop);
     138                intptr_t prev_workaround = (intptr_t) prev;
     139                // can't use SIG_ERR it crashes the compiler
     140                if(prev_workaround == -1) abort( "signal setup error: (%d) %s\n", (int)errno, strerror(errno) );
     141
     142                sout | "Signal termination ready";
     143        }
    188144
    189145        //===================
     
    197153        // Open Socket
    198154        sout | getpid() | ": Listening on port" | options.socket.port;
    199         int server_fd = socket(AF_INET, SOCK_STREAM, 0);
    200         if(server_fd < 0) {
    201                 abort( "socket error: (%d) %s\n", (int)errno, strerror(errno) );
    202         }
    203 
    204         int ret = 0;
     155
    205156        struct sockaddr_in address;
    206         int addrlen = sizeof(address);
    207         memset( (char *)&address, '\0' );
    208         address.sin_family = AF_INET;
    209         address.sin_addr.s_addr = htonl(INADDR_ANY);
    210         address.sin_port = htons( options.socket.port );
    211 
    212         int waited = 0;
    213         for() {
    214                 int sockfd = server_fd;
    215                 __CONST_SOCKADDR_ARG addr;
    216                 addr.__sockaddr__ = (struct sockaddr *)&address;
    217                 socklen_t addrlen = sizeof(address);
    218                 ret = bind( sockfd, addr, addrlen );
    219                 if(ret < 0) {
    220                         if(errno == EADDRINUSE) {
    221                                 if(waited == 0) {
    222                                         if(!options.interactive) abort | "Port already in use in non-interactive mode. Aborting";
    223                                         sout | "Waiting for port";
    224                                 } else {
    225                                         sout | "\r" | waited | nonl;
    226                                         flush( sout );
    227                                 }
    228                                 waited ++;
    229                                 sleep( 1`s );
    230                                 continue;
    231                         }
    232                         abort( "bind error: (%d) %s\n", (int)errno, strerror(errno) );
    233                 }
    234                 break;
    235         }
    236 
    237         ret = listen( server_fd, options.socket.backlog );
    238         if(ret < 0) {
    239                 abort( "listen error: (%d) %s\n", (int)errno, strerror(errno) );
    240         }
     157        int addrlen = prepaddr(address);
     158
     159        int server_fd;
    241160
    242161        //===================
     
    257176
    258177                {
     178                        // Stats printer makes a copy so this needs to persist longer than normal
     179                        connection ** conns;
     180                        AcceptWorker  * aworkers = 0p;
     181                        ChannelWorker * cworkers = 0p;
     182                        Acceptor * acceptors = 0p;
     183                        Q * queues = 0p;
    259184                        ServerCluster cl[options.clopts.nclusters];
     185
     186                        if(options.stats) {
     187                                stats_thrd = alloc();
     188                                (*stats_thrd){ cl };
     189                        } else {
     190                                stats_thrd = 0p;
     191                        }
    260192
    261193                        init_protocol();
    262194                        {
    263                                 Worker * workers = anew(options.clopts.nworkers);
    264                                 cl[0].prnt->workers = workers;
    265                                 cl[0].prnt->worker_cnt = options.clopts.nworkers;
    266                                 for(i; options.clopts.nworkers) {
    267                                         // if( options.file_cache.fixed_fds ) {
    268                                         //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
    269                                         //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
    270                                         // }
    271                                         // else
    272                                         {
    273                                                 workers[i].pipe[0] = fds[pipe_off + (i * 2) + 0];
    274                                                 workers[i].pipe[1] = fds[pipe_off + (i * 2) + 1];
    275                                                 workers[i].sockfd  = server_fd;
    276                                                 workers[i].addr    = (struct sockaddr *)&address;
    277                                                 workers[i].addrlen = (socklen_t*)&addrlen;
    278                                                 workers[i].flags   = 0;
    279                                         }
    280                                         unpark( workers[i] );
     195                                int nacceptors = options.clopts.nprocs * options.clopts.nclusters;
     196                                conns = alloc(options.clopts.nworkers);
     197                                if(options.socket.reuseport) {
     198                                        queues = alloc(nacceptors);
     199                                        acceptors = alloc(nacceptors);
     200                                        sout | "Creating" | nacceptors | "Acceptors";
     201                                        for(i; nacceptors) {
     202                                                (acceptors[i]){ i % options.clopts.nclusters };
     203                                        }
     204                                        for(i; nacceptors) {
     205                                                (queues[i]){};
     206                                                {
     207                                                        acceptors[i].sockfd  = listener(address, addrlen);
     208                                                        acceptors[i].addr    = (struct sockaddr *)&address;
     209                                                        acceptors[i].addrlen = (socklen_t*)&addrlen;
     210                                                        acceptors[i].flags   = 0;
     211                                                        acceptors[i].queue   = &queues[i].q;
     212                                                }
     213                                                unpark( acceptors[i] );
     214                                        }
     215
     216                                        cworkers = anew(options.clopts.nworkers);
     217                                        for(i; options.clopts.nworkers) {
     218                                                {
     219                                                        cworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     220                                                        cworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     221                                                        cworkers[i].queue = &queues[i % nacceptors].q;
     222                                                        conns[i] = &cworkers[i].conn;
     223                                                }
     224                                                unpark( cworkers[i] );
     225                                        }
     226                                }
     227                                else {
     228                                        server_fd = listener(address, addrlen);
     229                                        aworkers = anew(options.clopts.nworkers);
     230                                        for(i; options.clopts.nworkers) {
     231                                                // if( options.file_cache.fixed_fds ) {
     232                                                //      workers[i].pipe[0] = pipe_off + (i * 2) + 0;
     233                                                //      workers[i].pipe[1] = pipe_off + (i * 2) + 1;
     234                                                // }
     235                                                // else
     236                                                {
     237                                                        aworkers[i].conn.pipe[0] = fds[pipe_off + (i * 2) + 0];
     238                                                        aworkers[i].conn.pipe[1] = fds[pipe_off + (i * 2) + 1];
     239                                                        aworkers[i].sockfd = server_fd;
     240                                                        aworkers[i].addr    = (struct sockaddr *)&address;
     241                                                        aworkers[i].addrlen = (socklen_t*)&addrlen;
     242                                                        aworkers[i].flags   = 0;
     243                                                        conns[i] = &aworkers[i].conn;
     244                                                }
     245                                                unpark( aworkers[i] );
     246                                        }
    281247                                }
    282248                                sout | options.clopts.nworkers | "workers started on" | options.clopts.nprocs | "processors /" | options.clopts.nclusters | "clusters";
     
    285251                                }
    286252                                sout | nl;
    287                                 if(!options.interactive) park();
    288253                                {
    289                                         char buffer[128];
    290                                         for() {
    291                                                 int ret = cfa_read(0, buffer, 128, 0);
    292                                                 if(ret == 0) break;
     254                                        if(options.interactive) {
     255                                                char buffer[128];
     256                                                for() {
     257                                                        int ret = cfa_read(0, buffer, 128, 0);
     258                                                        if(ret == 0) break;
     259                                                        if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
     260                                                        sout | "User wrote '" | "" | nonl;
     261                                                        write(sout, buffer, ret - 1);
     262                                                        sout | "'";
     263                                                }
     264                                        }
     265                                        else {
     266                                                char buffer[sizeof(eventfd_t)];
     267                                                int ret = cfa_read(closefd, buffer, sizeof(eventfd_t), 0);
    293268                                                if(ret < 0) abort( "main read error: (%d) %s\n", (int)errno, strerror(errno) );
    294                                                 sout | "User wrote '" | "" | nonl;
    295                                                 write(sout, buffer, ret - 1);
    296                                                 sout | "'";
    297269                                        }
    298270
     
    300272                                }
    301273
    302                                 sout | "Notifying connections..." | nonl; flush( sout );
    303                                 for(i; options.clopts.nworkers) {
    304                                         workers[i].done = true;
    305                                 }
    306                                 sout | "done";
    307 
    308                                 sout | "Shutting down socket..." | nonl; flush( sout );
    309                                 int ret = shutdown( server_fd, SHUT_RD );
    310                                 if( ret < 0 ) {
    311                                         abort( "shutdown error: (%d) %s\n", (int)errno, strerror(errno) );
    312                                 }
    313                                 sout | "done";
    314 
    315274                                //===================
    316                                 // Close Socket
    317                                 sout | "Closing Socket..." | nonl; flush( sout );
    318                                 ret = close( server_fd );
    319                                 if(ret < 0) {
    320                                         abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
    321                                 }
    322                                 sout | "done";
    323 
    324                                 sout | "Stopping connection threads..." | nonl; flush( sout );
    325                                 adelete(workers);
     275                                // Close Socket and join
     276                                if(options.socket.reuseport) {
     277                                        sout | "Notifying connections..." | nonl; flush( sout );
     278                                        for(i; nacceptors) {
     279                                                acceptors[i].done = true;
     280                                        }
     281                                        for(i; options.clopts.nworkers) {
     282                                                cworkers[i].done = true;
     283                                        }
     284                                        sout | "done";
     285
     286                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     287                                        for(i; nacceptors) {
     288                                                ret = shutdown( acceptors[i].sockfd, SHUT_RD );
     289                                                if( ret < 0 ) {
     290                                                        abort( "shutdown1 error: (%d) %s\n", (int)errno, strerror(errno) );
     291                                                }
     292                                        }
     293                                        sout | "done";
     294
     295                                        sout | "Closing Socket..." | nonl; flush( sout );
     296                                        for(i; nacceptors) {
     297                                                ret = close( acceptors[i].sockfd );
     298                                                if( ret < 0) {
     299                                                        abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     300                                                }
     301                                        }
     302                                        sout | "done";
     303
     304                                        sout | "Stopping accept threads..." | nonl; flush( sout );
     305                                        for(i; nacceptors) {
     306                                                join(acceptors[i]);
     307                                        }
     308                                        sout | "done";
     309
     310                                        sout | "Draining worker queues..." | nonl; flush( sout );
     311                                        for(i; nacceptors) {
     312                                                PendingRead * p = 0p;
     313                                                while(p = pop(queues[i].q)) {
     314                                                        fulfil(p->f, -ECONNRESET);
     315                                                }
     316                                        }
     317                                        sout | "done";
     318
     319                                        sout | "Stopping worker threads..." | nonl; flush( sout );
     320                                        for(i; options.clopts.nworkers) {
     321                                                for(j; 2) {
     322                                                        ret = close(cworkers[i].conn.pipe[j]);
     323                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     324                                                }
     325                                                join(cworkers[i]);
     326                                        }
     327                                }
     328                                else {
     329                                        sout | "Notifying connections..." | nonl; flush( sout );
     330                                        for(i; options.clopts.nworkers) {
     331                                                aworkers[i].done = true;
     332                                        }
     333                                        sout | "done";
     334
     335                                        sout | "Shutting down Socket..." | nonl; flush( sout );
     336                                        ret = shutdown( server_fd, SHUT_RD );
     337                                        if( ret < 0 ) {
     338                                                abort( "shutdown2 error: (%d) %s\n", (int)errno, strerror(errno) );
     339                                        }
     340                                        sout | "done";
     341
     342                                        sout | "Closing Socket..." | nonl; flush( sout );
     343                                        ret = close( server_fd );
     344                                        if(ret < 0) {
     345                                                abort( "close socket error: (%d) %s\n", (int)errno, strerror(errno) );
     346                                        }
     347                                        sout | "done";
     348
     349                                        sout | "Stopping connection threads..." | nonl; flush( sout );
     350                                        for(i; options.clopts.nworkers) {
     351                                                for(j; 2) {
     352                                                        ret = close(aworkers[i].conn.pipe[j]);
     353                                                        if(ret < 0) abort( "close pipe %d error: (%d) %s\n", j, (int)errno, strerror(errno) );
     354                                                }
     355                                                join(aworkers[i]);
     356                                        }
     357                                }
    326358                        }
    327359                        sout | "done";
     
    331363                        sout | "done";
    332364
     365                        sout | "Stopping printer threads..." | nonl; flush( sout );
     366                        if(stats_thrd) {
     367                                notify_one(stats_thrd->var);
     368                        }
     369                        delete(stats_thrd);
     370                        sout | "done";
     371
     372                        // Now that the stats printer is stopped, we can reclaim this
     373                        adelete(aworkers);
     374                        adelete(cworkers);
     375                        adelete(acceptors);
     376                        adelete(queues);
     377                        free(conns);
     378
    333379                        sout | "Stopping processors/clusters..." | nonl; flush( sout );
    334380                }
    335381                sout | "done";
    336382
    337                 sout | "Closing splice fds..." | nonl; flush( sout );
    338                 for(i; pipe_cnt) {
    339                         ret = close( fds[pipe_off + i] );
    340                         if(ret < 0) {
    341                                 abort( "close pipe error: (%d) %s\n", (int)errno, strerror(errno) );
    342                         }
    343                 }
    344383                free(fds);
    345                 sout | "done";
    346384
    347385                sout | "Stopping processors..." | nonl; flush( sout );
  • benchmark/io/http/options.cfa

    r1df492a reb5962a  
    3535
    3636        { // socket
    37                 8080, // port
    38                 10,   // backlog
    39                 1024  // buflen
     37                8080,  // port
     38                10,    // backlog
     39                1024,  // buflen
     40                false  // reuseport
    4041        },
    4142
     
    5253
    5354void parse_options( int argc, char * argv[] ) {
    54         // bool fixedfd = false;
    55         // bool sqkpoll = false;
    56         // bool iokpoll = false;
    5755        unsigned nentries = 0;
    5856        bool isolate = false;
     
    7068                {'\0', "shell",          "Disable interactive mode", options.interactive, parse_setfalse},
    7169                {'\0', "accept-backlog", "Maximum number of pending accepts", options.socket.backlog},
     70                {'\0', "reuseport",      "Use acceptor threads with reuse port SO_REUSEPORT", options.socket.reuseport, parse_settrue},
    7271                {'\0', "request_len",    "Maximum number of bytes in the http request, requests with more data will be answered with Http Code 414", options.socket.buflen},
    7372                {'\0', "seed",           "seed to use for hashing", options.file_cache.hash_seed },
  • benchmark/io/http/options.hfa

    r1df492a reb5962a  
    2727                int backlog;
    2828                int buflen;
     29                bool reuseport;
    2930        } socket;
    3031
  • benchmark/io/http/protocol.cfa

    r1df492a reb5962a  
    3030#define PLAINTEXT_NOCOPY
    3131#define LINKED_IO
     32
     33static inline __s32 wait_res( io_future_t & this ) {
     34        wait( this );
     35        if( this.result < 0 ) {{
     36                errno = -this.result;
     37                return -1;
     38        }}
     39        return this.result;
     40}
    3241
    3342struct https_msg_str {
     
    470479
    471480                        if(is_error(splice_in.res)) {
     481                                if(splice_in.res.error == -EPIPE) return -ECONNRESET;
    472482                                mutex(serr) serr | "SPLICE IN failed with" | splice_in.res.error;
    473483                                close(fd);
     
    503513}
    504514
    505 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len) {
     515[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f) {
    506516        char * it = buffer;
    507517        size_t count = len - 1;
     
    509519        READ:
    510520        for() {
    511                 int ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     521                int ret;
     522                if( f ) {
     523                        ret = wait_res(*f);
     524                        reset(*f);
     525                        f = 0p;
     526                } else {
     527                        ret = cfa_recv(fd, (void*)it, count, 0, CFA_IO_LAZY);
     528                }
    512529                // int ret = read(fd, (void*)it, count);
    513530                if(ret == 0 ) return [OK200, true, 0, 0];
     
    570587
    571588void ?{}( DateFormater & this ) {
    572         ((thread&)this){ "Server Date Thread", *options.clopts.instance[0] };
     589        ((thread&)this){ "Server Date Thread" };
    573590        this.idx = 0;
    574591        memset( &this.buffers[0], 0, sizeof(this.buffers[0]) );
  • benchmark/io/http/protocol.hfa

    r1df492a reb5962a  
    11#pragma once
    22
     3struct io_future_t;
    34struct sendfile_stats_t;
    45
     
    2223int answer_sendfile( int pipe[2], int fd, int ans_fd, size_t count, struct sendfile_stats_t & );
    2324
    24 [HttpCode code, bool closed, * const char file, size_t len] http_read(int fd, []char buffer, size_t len);
     25[HttpCode code, bool closed, * const char file, size_t len] http_read(volatile int & fd, []char buffer, size_t len, io_future_t * f);
  • benchmark/io/http/worker.cfa

    r1df492a reb5962a  
    88#include <fstream.hfa>
    99#include <iofwd.hfa>
     10#include <mutex_stmt.hfa>
    1011
    1112#include "options.hfa"
     
    1415
    1516//=============================================================================================
    16 // Worker Thread
    17 //=============================================================================================
    18 void ?{}( Worker & this ) {
     17// Generic connection handling
     18//=============================================================================================
     19static void handle_connection( connection & this, volatile int & fd, char * buffer, size_t len, io_future_t * f, unsigned long long & last ) {
     20        REQUEST:
     21        for() {
     22                bool closed;
     23                HttpCode code;
     24                const char * file;
     25                size_t name_size;
     26
     27                // Read the http request
     28                if( options.log ) mutex(sout) sout | "=== Reading request ===";
     29                [code, closed, file, name_size] = http_read(fd, buffer, len, f);
     30                f = 0p;
     31
     32                // if we are done, break out of the loop
     33                if( closed ) break REQUEST;
     34
     35                // If this wasn't a request retrun 400
     36                if( code != OK200 ) {
     37                        sout | "=== Invalid Request :" | code_val(code) | "===";
     38                        answer_error(fd, code);
     39                        continue REQUEST;
     40                }
     41
     42                if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
     43                        if( options.log ) mutex(sout) sout | "=== Request for /plaintext ===";
     44
     45                        int ret = answer_plaintext(fd);
     46                        if( ret == -ECONNRESET ) break REQUEST;
     47
     48                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     49                        continue REQUEST;
     50                }
     51
     52                if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
     53                        if( options.log ) mutex(sout) sout | "=== Request for /ping ===";
     54
     55                        // Send the header
     56                        int ret = answer_empty(fd);
     57                        if( ret == -ECONNRESET ) break REQUEST;
     58
     59                        if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     60                        continue REQUEST;
     61                }
     62
     63                if( options.log ) {
     64                        sout | "=== Request for file " | nonl;
     65                        write(sout, file, name_size);
     66                        sout | " ===";
     67                }
     68
     69                if( !options.file_cache.path ) {
     70                        if( options.log ) {
     71                                sout | "=== File Not Found (" | nonl;
     72                                write(sout, file, name_size);
     73                                sout | ") ===";
     74                        }
     75                        answer_error(fd, E405);
     76                        continue REQUEST;
     77                }
     78
     79                // Get the fd from the file cache
     80                int ans_fd;
     81                size_t count;
     82                [ans_fd, count] = get_file( file, name_size );
     83
     84                // If we can't find the file, return 404
     85                if( ans_fd < 0 ) {
     86                        if( options.log ) {
     87                                sout | "=== File Not Found (" | nonl;
     88                                write(sout, file, name_size);
     89                                sout | ") ===";
     90                        }
     91                        answer_error(fd, E404);
     92                        continue REQUEST;
     93                }
     94
     95                // Send the desired file
     96                int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
     97                if( ret == -ECONNRESET ) break REQUEST;
     98
     99                if( options.log ) mutex(sout) sout | "=== Answer sent ===";
     100        }
     101
     102        if (stats_thrd) {
     103                unsigned long long next = rdtscl();
     104                if(next > (last + 500000000)) {
     105                        if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
     106                                push(this.stats.sendfile, stats_thrd->stats.send);
     107                                unlock(stats_thrd->stats.lock);
     108                                last = next;
     109                        }
     110                }
     111        }
     112}
     113
     114//=============================================================================================
     115// Self Accepting Worker Thread
     116//=============================================================================================
     117void ?{}( AcceptWorker & this ) {
    19118        size_t cli = rand() % options.clopts.cltr_cnt;
    20119        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
    21120        options.clopts.thrd_cnt[cli]++;
    22         this.pipe[0] = -1;
    23         this.pipe[1] = -1;
    24121        this.done = false;
    25 
    26         this.stats.sendfile.calls = 0;
    27         this.stats.sendfile.tries = 0;
    28         this.stats.sendfile.header = 0;
    29         this.stats.sendfile.splcin = 0;
    30         this.stats.sendfile.splcot = 0;
    31         for(i; zipf_cnts) {
    32                 this.stats.sendfile.avgrd[i].calls = 0;
    33                 this.stats.sendfile.avgrd[i].bytes = 0;
    34         }
    35 }
    36 
    37 extern "C" {
    38 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    39 }
    40 
    41 void main( Worker & this ) {
     122}
     123
     124void main( AcceptWorker & this ) {
    42125        park();
    43         /* paranoid */ assert( this.pipe[0] != -1 );
    44         /* paranoid */ assert( this.pipe[1] != -1 );
    45 
    46         CONNECTION:
    47         for() {
    48                 if( options.log ) sout | "=== Accepting connection ===";
    49                 int fd = cfa_accept4( this.[sockfd, addr, addrlen, flags], CFA_IO_LAZY );
     126        unsigned long long last = rdtscl();
     127        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     128        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     129        for() {
     130                if( options.log ) mutex(sout) sout | "=== Accepting connection ===";
     131                int fd = cfa_accept4( this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY );
    50132                if(fd < 0) {
    51133                        if( errno == ECONNABORTED ) break;
     
    55137                if(this.done) break;
    56138
     139                if( options.log ) mutex(sout) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
     140                size_t len = options.socket.buflen;
     141                char buffer[len];
     142                handle_connection( this.conn, fd, buffer, len, 0p, last );
     143
     144                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
     145        }
     146}
     147
     148
     149//=============================================================================================
     150// Channel Worker Thread
     151//=============================================================================================
     152void ?{}( ChannelWorker & this ) {
     153        size_t cli = rand() % options.clopts.cltr_cnt;
     154        ((thread&)this){ "Server Worker Thread", *options.clopts.instance[cli], 64000 };
     155        options.clopts.thrd_cnt[cli]++;
     156        this.done = false;
     157}
     158
     159void main( ChannelWorker & this ) {
     160        park();
     161        unsigned long long last = rdtscl();
     162        /* paranoid */ assert( this.conn.pipe[0] != -1 );
     163        /* paranoid */ assert( this.conn.pipe[1] != -1 );
     164        for() {
     165                size_t len = options.socket.buflen;
     166                char buffer[len];
     167                PendingRead p;
     168                p.next = 0p;
     169                p.in.buf = (void*)buffer;
     170                p.in.len = len;
     171                push(*this.queue, &p);
     172
     173                if( options.log ) mutex(sout) sout | "=== Waiting new connection ===";
     174                handle_connection( this.conn, p.out.fd, buffer, len, &p.f, last );
     175
     176                if( options.log ) mutex(sout) sout | "=== Connection closed ===";
     177                if(this.done) break;
     178        }
     179}
     180
     181extern "C" {
     182extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
     183}
     184
     185void ?{}( Acceptor & this, int cli ) {
     186        ((thread&)this){ "Server Acceptor Thread", *options.clopts.instance[cli], 64000 };
     187        options.clopts.thrd_cnt[cli]++;
     188        this.done = false;
     189}
     190
     191static inline __s32 get_res( io_future_t & this ) {
     192        if( this.result < 0 ) {{
     193                errno = -this.result;
     194                return -1;
     195        }}
     196        return this.result;
     197}
     198
     199static inline void push_connection( Acceptor & this, int fd ) {
     200        PendingRead * p = 0p;
     201        for() {
     202                if(this.done) return;
     203                p = pop(*this.queue);
     204                if(p) break;
     205                yield();
     206                this.stats.creates++;
     207        };
     208
     209        p->out.fd = fd;
     210        async_recv(p->f, p->out.fd, p->in.buf, p->in.len, 0, CFA_IO_LAZY);
     211}
     212
     213// #define ACCEPT_SPIN
     214#define ACCEPT_MANY
     215
     216void main( Acceptor & this ) {
     217        park();
     218        unsigned long long last = rdtscl();
     219
     220#if defined(ACCEPT_SPIN)
     221        if( options.log ) sout | "=== Accepting connection ===";
     222        for() {
     223                int fd = accept4(this.sockfd, this.[addr, addrlen, flags]);
     224                if(fd < 0) {
     225                        if( errno == EWOULDBLOCK) {
     226                                this.stats.eagains++;
     227                                yield();
     228                                continue;
     229                        }
     230                        if( errno == ECONNABORTED ) break;
     231                        if( this.done && (errno == EINVAL || errno == EBADF) ) break;
     232                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     233                }
     234                this.stats.accepts++;
     235
     236                if(this.done) return;
     237
    57238                if( options.log ) sout | "=== New connection" | fd | "" | ", waiting for requests ===";
    58                 REQUEST:
    59                 for() {
    60                         bool closed;
    61                         HttpCode code;
    62                         const char * file;
    63                         size_t name_size;
    64 
    65                         // Read the http request
    66                         size_t len = options.socket.buflen;
    67                         char buffer[len];
    68                         if( options.log ) sout | "=== Reading request ===";
    69                         [code, closed, file, name_size] = http_read(fd, buffer, len);
    70 
    71                         // if we are done, break out of the loop
    72                         if( closed ) break REQUEST;
    73 
    74                         // If this wasn't a request retrun 400
    75                         if( code != OK200 ) {
    76                                 sout | "=== Invalid Request :" | code_val(code) | "===";
    77                                 answer_error(fd, code);
    78                                 continue REQUEST;
    79                         }
    80 
    81                         if(0 == strncmp(file, "plaintext", min(name_size, sizeof("plaintext") ))) {
    82                                 if( options.log ) sout | "=== Request for /plaintext ===";
    83 
    84                                 int ret = answer_plaintext(fd);
    85                                 if( ret == -ECONNRESET ) break REQUEST;
    86 
    87                                 if( options.log ) sout | "=== Answer sent ===";
    88                                 continue REQUEST;
    89                         }
    90 
    91                         if(0 == strncmp(file, "ping", min(name_size, sizeof("ping") ))) {
    92                                 if( options.log ) sout | "=== Request for /ping ===";
    93 
    94                                 // Send the header
    95                                 int ret = answer_empty(fd);
    96                                 if( ret == -ECONNRESET ) break REQUEST;
    97 
    98                                 if( options.log ) sout | "=== Answer sent ===";
    99                                 continue REQUEST;
    100                         }
    101 
    102                         if( options.log ) {
    103                                 sout | "=== Request for file " | nonl;
    104                                 write(sout, file, name_size);
    105                                 sout | " ===";
    106                         }
    107 
    108                         if( !options.file_cache.path ) {
    109                                 if( options.log ) {
    110                                         sout | "=== File Not Found (" | nonl;
    111                                         write(sout, file, name_size);
    112                                         sout | ") ===";
     239
     240                if(fd) push_connection(this, fd);
     241
     242                if (stats_thrd) {
     243                        unsigned long long next = rdtscl();
     244                        if(next > (last + 500000000)) {
     245                                if(try_lock(stats_thrd->stats.lock)) {
     246                                        push(this.stats, stats_thrd->stats.accpt);
     247                                        unlock(stats_thrd->stats.lock);
     248                                        last = next;
    113249                                }
    114                                 answer_error(fd, E405);
    115                                 continue REQUEST;
    116                         }
    117 
    118                         // Get the fd from the file cache
    119                         int ans_fd;
    120                         size_t count;
    121                         [ans_fd, count] = get_file( file, name_size );
    122 
    123                         // If we can't find the file, return 404
    124                         if( ans_fd < 0 ) {
    125                                 if( options.log ) {
    126                                         sout | "=== File Not Found (" | nonl;
    127                                         write(sout, file, name_size);
    128                                         sout | ") ===";
     250                        }
     251                }
     252
     253                if( options.log ) sout | "=== Accepting connection ===";
     254        }
     255
     256#elif defined(ACCEPT_MANY)
     257        const int nacc = 10;
     258        io_future_t results[nacc];
     259
     260        for(i; nacc) {
     261                io_future_t & res = results[i];
     262                reset(res);
     263                /* paranoid */ assert(!available(res));
     264                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
     265                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
     266        }
     267
     268        for() {
     269                if (stats_thrd) {
     270                        unsigned long long next = rdtscl();
     271                        if(next > (last + 500000000)) {
     272                                if(try_lock(stats_thrd->stats.lock __cfaabi_dbg_ctx2)) {
     273                                        push(this.stats, stats_thrd->stats.accpt);
     274                                        unlock(stats_thrd->stats.lock);
     275                                        last = next;
    129276                                }
    130                                 answer_error(fd, E404);
    131                                 continue REQUEST;
    132                         }
    133 
    134                         // Send the desired file
    135                         int ret = answer_sendfile( this.pipe, fd, ans_fd, count, this.stats.sendfile );
    136                         if( ret == -ECONNRESET ) break REQUEST;
    137 
    138                         if( options.log ) sout | "=== Answer sent ===";
    139                 }
    140 
    141                 if( options.log ) sout | "=== Connection closed ===";
    142                 continue CONNECTION;
    143         }
    144 }
     277                        }
     278                }
     279
     280                for(i; nacc) {
     281                        io_future_t & res = results[i];
     282                        if(available(res)) {
     283                                if( options.log ) mutex(sout) sout | "=== Accept no " | i | "completed with result" | res.result | "===";
     284                                int fd = get_res(res);
     285                                reset(res);
     286                                this.stats.accepts++;
     287                                if(fd < 0) {
     288                                        if( errno == ECONNABORTED ) continue;
     289                                        if( this.done && (errno == EINVAL || errno == EBADF) ) continue;
     290                                        abort( "accept error: (%d) %s\n", (int)errno, strerror(errno) );
     291                                }
     292                                push_connection( this, fd );
     293
     294                                /* paranoid */ assert(!available(res));
     295                                if( options.log ) mutex(sout) sout | "=== Re-arming accept no" | i | " ===";
     296                                async_accept4(res, this.sockfd, this.[addr, addrlen, flags], CFA_IO_LAZY);
     297                        }
     298                }
     299                if(this.done) return;
     300
     301                if( options.log ) mutex(sout) sout | "=== Waiting for any accept ===";
     302                this.stats.eagains++;
     303                wait_any(results, nacc);
     304
     305                if( options.log ) mutex(sout) {
     306                        sout | "=== Acceptor wake-up ===";
     307                        for(i; nacc) {
     308                                io_future_t & res = results[i];
     309                                sout | i | "available:" | available(res);
     310                        }
     311                }
     312
     313        }
     314
     315        for(i; nacc) {
     316                wait(results[i]);
     317        }
     318#else
     319#error no accept algorithm specified
     320#endif
     321}
  • benchmark/io/http/worker.hfa

    r1df492a reb5962a  
    11#pragma once
    22
     3#include <iofwd.hfa>
     4#include <queueLockFree.hfa>
    35#include <thread.hfa>
    46
     
    79}
    810
     11#include "printer.hfa"
     12
    913//=============================================================================================
    1014// Worker Thread
    1115//=============================================================================================
    1216
    13 extern const size_t zipf_sizes[];
    14 enum { zipf_cnts = 36, };
    15 
    16 struct sendfile_stats_t {
    17         volatile uint64_t calls;
    18         volatile uint64_t tries;
    19         volatile uint64_t header;
    20         volatile uint64_t splcin;
    21         volatile uint64_t splcot;
     17struct connection {
     18        int pipe[2];
    2219        struct {
    23                 volatile uint64_t calls;
    24                 volatile uint64_t bytes;
    25         } avgrd[zipf_cnts];
     20                sendfile_stats_t sendfile;
     21        } stats;
    2622};
    2723
    28 thread Worker {
    29         int pipe[2];
     24static inline void ?{}( connection & this ) {
     25        this.pipe[0] = -1;
     26        this.pipe[1] = -1;
     27}
     28
     29thread AcceptWorker {
     30        connection conn;
    3031        int sockfd;
    3132        struct sockaddr * addr;
     
    3334        int flags;
    3435        volatile bool done;
     36};
     37void ?{}( AcceptWorker & this);
     38void main( AcceptWorker & );
     39
     40
     41struct PendingRead {
     42        PendingRead * volatile next;
     43        io_future_t f;
    3544        struct {
    36                 sendfile_stats_t sendfile;
    37         } stats;
     45                void * buf;
     46                size_t len;
     47        } in;
     48        struct {
     49                volatile int fd;
     50        } out;
    3851};
    39 void ?{}( Worker & this);
    40 void main( Worker & );
     52
     53static inline PendingRead * volatile & ?`next ( PendingRead * node ) {
     54        return node->next;
     55}
     56
     57thread ChannelWorker {
     58        connection conn;
     59        volatile bool done;
     60        mpsc_queue(PendingRead) * queue;
     61};
     62void ?{}( ChannelWorker & );
     63void main( ChannelWorker & );
     64
     65thread Acceptor {
     66        mpsc_queue(PendingRead) * queue;
     67        int sockfd;
     68        struct sockaddr * addr;
     69        socklen_t * addrlen;
     70        int flags;
     71        volatile bool done;
     72        acceptor_stats_t stats;
     73};
     74void ?{}( Acceptor &, int cli );
     75void main( Acceptor & );
  • libcfa/src/bits/locks.hfa

    r1df492a reb5962a  
    2626        // Wrap in struct to prevent false sharing with debug info
    2727        volatile bool lock;
    28         #ifdef __CFA_DEBUG__
    29                 // previous function to acquire the lock
    30                 const char * prev_name;
    31                 // previous thread to acquire the lock
    32                 void* prev_thrd;
    33                 // keep track of number of times we had to spin, just in case the number is unexpectedly huge
    34                 size_t spin_count;
    35         #endif
    3628};
    3729
     
    4032                extern void disable_interrupts() OPTIONAL_THREAD;
    4133                extern void enable_interrupts( bool poll = true ) OPTIONAL_THREAD;
    42 
    43                 #ifdef __CFA_DEBUG__
    44                         void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]);
    45                 #else
    46                         #define __cfaabi_dbg_record_lock(x, y)
    47                 #endif
     34                #define __cfaabi_dbg_record_lock(x, y)
    4835        }
    4936
    5037        static inline void ?{}( __spinlock_t & this ) {
    5138                this.lock = 0;
    52                 #ifdef __CFA_DEBUG__
    53                         this.spin_count = 0;
    54                 #endif
    5539        }
    5640
     
    7761                for ( unsigned int i = 1;; i += 1 ) {
    7862                        if ( (this.lock == 0) && (__atomic_test_and_set( &this.lock, __ATOMIC_ACQUIRE ) == 0) ) break;
    79                         #ifdef __CFA_DEBUG__
    80                                 this.spin_count++;
    81                         #endif
    8263                        #ifndef NOEXPBACK
    8364                                // exponential spin
  • libcfa/src/concurrency/invoke.h

    r1df492a reb5962a  
    195195                struct __monitor_group_t monitors;
    196196
    197                 // used to put threads on user data structures
    198                 struct {
    199                         struct thread$ * next;
    200                         struct thread$ * back;
    201                 } seqable;
    202 
    203197                // used to put threads on dlist data structure
    204198                __cfa_dlink(thread$);
     
    208202                        struct thread$ * prev;
    209203                } node;
     204
     205                // used to store state between clh lock/unlock
     206                volatile bool * clh_prev;
     207
     208                // used to point to this thd's current clh node
     209                volatile bool * clh_node;
    210210
    211211                struct processor * last_proc;
     
    240240                }
    241241
    242                 static inline thread$ * volatile & ?`next ( thread$ * this )  __attribute__((const)) {
    243                         return this->seqable.next;
    244                 }
    245 
    246                 static inline thread$ *& Back( thread$ * this ) __attribute__((const)) {
    247                         return this->seqable.back;
    248                 }
    249 
    250                 static inline thread$ *& Next( thread$ * this ) __attribute__((const)) {
    251                                 return this->seqable.next;
    252                 }
    253 
    254                 static inline bool listed( thread$ * this ) {
    255                         return this->seqable.next != 0p;
    256                 }
    257 
    258242                static inline void ?{}(__monitor_group_t & this) {
    259243                        (this.data){0p};
  • libcfa/src/concurrency/io.cfa

    r1df492a reb5962a  
    159159
    160160                const __u32 mask = *ctx->cq.mask;
     161                const __u32 num  = *ctx->cq.num;
    161162                unsigned long long ts_prev = ctx->cq.ts;
    162 
    163                 // re-read the head and tail in case it already changed.
    164                 const __u32 head = *ctx->cq.head;
    165                 const __u32 tail = *ctx->cq.tail;
    166                 const __u32 count = tail - head;
    167                 __STATS__( false, io.calls.drain++; io.calls.completed += count; )
    168 
    169                 for(i; count) {
    170                         unsigned idx = (head + i) & mask;
    171                         volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    172 
    173                         /* paranoid */ verify(&cqe);
    174 
    175                         struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    176                         // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    177 
    178                         __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
    179                 }
    180 
    181                 unsigned long long ts_next = ctx->cq.ts = rdtscl();
    182 
    183                 // Mark to the kernel that the cqe has been seen
    184                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    185                 __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
    186                 ctx->proc->idle_wctx.drain_time = ts_next;
     163                unsigned long long ts_next;
     164
     165                // We might need to do this multiple times if more events completed than can fit in the queue.
     166                for() {
     167                        // re-read the head and tail in case it already changed.
     168                        const __u32 head = *ctx->cq.head;
     169                        const __u32 tail = *ctx->cq.tail;
     170                        const __u32 count = tail - head;
     171                        __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     172
     173                        for(i; count) {
     174                                unsigned idx = (head + i) & mask;
     175                                volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
     176
     177                                /* paranoid */ verify(&cqe);
     178
     179                                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     180                                // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     181
     182                                __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
     183                        }
     184
     185                        ts_next = ctx->cq.ts = rdtscl();
     186
     187                        // Mark to the kernel that the cqe has been seen
     188                        // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
     189                        __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     190                        ctx->proc->idle_wctx.drain_time = ts_next;
     191
     192                        if(likely(count < num)) break;
     193
     194                        ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS);
     195                }
    187196
    188197                __cfadbg_print_safe(io, "Kernel I/O : %u completed age %llu\n", count, ts_next);
  • libcfa/src/concurrency/io/setup.cfa

    r1df492a reb5962a  
    138138                __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
    139139                if( !is_pow2(nentries) ) {
    140                         abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
     140                        abort("ERROR: I/O setup 'num_entries' must be a power of 2, was %u\n", nentries);
    141141                }
    142142
  • libcfa/src/concurrency/iofwd.hfa

    r1df492a reb5962a  
    7676        void reset    ( io_future_t & this ) { return reset    (this.self); }
    7777        bool available( io_future_t & this ) { return available(this.self); }
     78        bool setup    ( io_future_t & this, oneshot & ctx ) { return setup  (this.self, ctx); }
     79        bool retract  ( io_future_t & this, oneshot & ctx ) { return retract(this.self, ctx); }
    7880}
    7981
  • libcfa/src/concurrency/kernel.cfa

    r1df492a reb5962a  
    834834#endif
    835835
    836 
    837 
    838 //-----------------------------------------------------------------------------
    839 // Debug
    840 __cfaabi_dbg_debug_do(
    841         extern "C" {
    842                 void __cfaabi_dbg_record_lock(__spinlock_t & this, const char prev_name[]) {
    843                         this.prev_name = prev_name;
    844                         this.prev_thrd = kernelTLS().this_thread;
    845                 }
    846         }
    847 )
    848 
    849836//-----------------------------------------------------------------------------
    850837// Debug
  • libcfa/src/concurrency/kernel/fwd.hfa

    r1df492a reb5962a  
    200200                                        struct thread$ * expected = this.ptr;
    201201                                        if(expected == 1p) return false;
    202                                         /* paranoid */ verify( expected == 0p );
    203202                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    204203                                                park();
     
    213212                        thread$ * post(oneshot & this, bool do_unpark = true) {
    214213                                struct thread$ * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
    215                                 if( got == 0p ) return 0p;
     214                                if( got == 0p || got == 1p ) return 0p;
    216215                                if(do_unpark) unpark( got );
    217216                                return got;
     
    263262
    264263                                        // The future is not fulfilled, try to setup the wait context
    265                                         /* paranoid */ verify( expected == 0p );
    266264                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    267265                                                return true;
     
    275273                        // should retract the wait ctx
    276274                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
    277                         void retract( future_t & this, oneshot & wait_ctx ) {
    278                                 // Remove the wait context
    279                                 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
    280 
    281                                 // got == 0p: future was never actually setup, just return
    282                                 if( got == 0p ) return;
    283 
    284                                 // got == wait_ctx: since fulfil does an atomic_swap,
    285                                 // if we got back the original then no one else saw context
    286                                 // It is safe to delete (which could happen after the return)
    287                                 if( got == &wait_ctx ) return;
    288 
    289                                 // got == 1p: the future is ready and the context was fully consumed
    290                                 // the server won't use the pointer again
    291                                 // It is safe to delete (which could happen after the return)
    292                                 if( got == 1p ) return;
    293 
    294                                 // got == 2p: the future is ready but the context hasn't fully been consumed
    295                                 // spin until it is safe to move on
    296                                 if( got == 2p ) {
    297                                         while( this.ptr != 1p ) Pause();
    298                                         return;
    299                                 }
    300 
    301                                 // got == any thing else, something wen't wrong here, abort
    302                                 abort("Future in unexpected state");
     275                        bool retract( future_t & this, oneshot & wait_ctx ) {
     276                                for() {
     277                                        struct oneshot * expected = this.ptr;
     278
     279                                        // expected == 0p: future was never actually setup, just return
     280                                        if( expected == 0p ) return false;
     281
     282                                        // expected == 1p: the future is ready and the context was fully consumed
     283                                        // the server won't use the pointer again
     284                                        // It is safe to delete (which could happen after the return)
     285                                        if( expected == 1p ) return true;
     286
     287                                        // expected == 2p: the future is ready but the context hasn't fully been consumed
     288                                        // spin until it is safe to move on
     289                                        if( expected == 2p ) {
     290                                                while( this.ptr != 1p ) Pause();
     291                                                /* paranoid */ verify( this.ptr == 1p );
     292                                                return true;
     293                                        }
     294
     295                                        // expected != wait_ctx: the future was setup with a different context ?!?!
     296                                        // something went wrong here, abort
     297                                        if( expected != &wait_ctx ) abort("Future in unexpected state");
     298
     299                                        // we still have the original context, then no one else saw it
     300                                        // attempt to remove the context so it doesn't get consumed.
     301                                        if(__atomic_compare_exchange_n( &this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     302                                                return false;
     303                                        }
     304                                }
    303305                        }
    304306
     
    379381                                return ret;
    380382                        }
     383
     384                        // Wait for any future to be fulfilled
     385                        forall(T& | sized(T) | { bool setup( T&, oneshot & ); bool retract( T&, oneshot & ); })
     386                        T & wait_any( T * futures, size_t num_futures ) {
     387                                oneshot temp;
     388
     389                                // setup all futures
     390                                // if any are already satisfied return
     391                                for ( i; num_futures ) {
     392                                        if( !setup(futures[i], temp) ) return futures[i];
     393                                }
     394
     395                                // Wait context is setup, just wait on it
     396                                wait( temp );
     397
     398                                size_t ret;
     399                                // attempt to retract all futures
     400                                for ( i; num_futures ) {
     401                                        if ( retract( futures[i], temp ) ) ret = i;
     402                                }
     403
     404                                return futures[ret];
     405                        }
    381406                }
    382407
  • libcfa/src/concurrency/locks.cfa

    r1df492a reb5962a  
    219219        // this casts the alarm node to our wrapped type since we used type erasure
    220220        static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
     221
     222        struct pthread_alarm_node_wrap {
     223                alarm_node_t alarm_node;
     224                pthread_cond_var(L) * cond;
     225                info_thread(L) * info_thd;
     226        };
     227
     228        void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
     229                this.alarm_node{ callback, alarm, period };
     230                this.cond = c;
     231                this.info_thd = i;
     232        }
     233
     234        void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
     235
     236        static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
     237                // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
     238                lock( cond->lock __cfaabi_dbg_ctx2 );
     239
     240                // this check is necessary to avoid a race condition since this timeout handler
     241                //      may still be called after a thread has been removed from the queue but
     242                //      before the alarm is unregistered
     243                if ( (*info_thd)`isListed ) {   // is thread on queue
     244                        info_thd->signalled = false;
     245                        // remove this thread O(1)
     246                        remove( *info_thd );
     247                        on_notify(*info_thd->lock, info_thd->t);
     248                }
     249                unlock( cond->lock );
     250        }
     251
     252        // this casts the alarm node to our wrapped type since we used type erasure
     253        static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
    221254}
    222255
     
    388421                on_wakeup(*i.lock, recursion_count);
    389422        }
    390 }
    391 
     423
     424        //-----------------------------------------------------------------------------
     425        // pthread_cond_var
     426
     427        void  ?{}( pthread_cond_var(L) & this ) with(this) {
     428                blocked_threads{};
     429                lock{};
     430        }
     431
     432        void ^?{}( pthread_cond_var(L) & this ) { }
     433
     434        bool notify_one( pthread_cond_var(L) & this ) with(this) {
     435                lock( lock __cfaabi_dbg_ctx2 );
     436                bool ret = ! blocked_threads`isEmpty;
     437                if ( ret ) {
     438                        info_thread(L) & popped = try_pop_front( blocked_threads );
     439                        on_notify(*popped.lock, popped.t);
     440                }
     441                unlock( lock );
     442                return ret;
     443        }
     444
     445        bool notify_all( pthread_cond_var(L) & this ) with(this) {
     446                lock( lock __cfaabi_dbg_ctx2 );
     447                bool ret = ! blocked_threads`isEmpty;
     448                while( ! blocked_threads`isEmpty ) {
     449                        info_thread(L) & popped = try_pop_front( blocked_threads );
     450                        on_notify(*popped.lock, popped.t);
     451                }
     452                unlock( lock );
     453                return ret;
     454        }
     455
     456        uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
     457        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
     458
     459        static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
     460                // add info_thread to waiting queue
     461                insert_last( blocked_threads, *i );
     462                size_t recursion_count = 0;
     463                recursion_count = on_wait( *i->lock );
     464                return recursion_count;
     465        }
     466       
     467        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
     468                lock( lock __cfaabi_dbg_ctx2 );
     469                size_t recursion_count = queue_and_get_recursion(this, &info);
     470                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
     471                register_self( &node_wrap.alarm_node );
     472                unlock( lock );
     473
     474                // blocks here
     475                park();
     476
     477                // unregisters alarm so it doesn't go off if this happens first
     478                unregister_self( &node_wrap.alarm_node );
     479
     480                // resets recursion count here after waking
     481                if (info.lock) on_wakeup(*info.lock, recursion_count);
     482        }
     483
     484        void wait( pthread_cond_var(L) & this, L & l ) with(this) {
     485                wait( this, l, 0 );
     486        }
     487
     488        void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
     489                lock( lock __cfaabi_dbg_ctx2 );
     490                info_thread( L ) i = { active_thread(), info, &l };
     491                size_t recursion_count = queue_and_get_recursion(this, &i);
     492                unlock( lock );
     493                park( );
     494                on_wakeup(*i.lock, recursion_count);
     495        }
     496
     497        #define PTHREAD_WAIT_TIME( u, l, t ) \
     498                info_thread( L ) i = { active_thread(), u, l }; \
     499                queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
     500                return i.signalled;
     501
     502        bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
     503                Duration d = { t };
     504                WAIT_TIME( 0, &l , d )
     505        }
     506       
     507        bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t  ) {
     508                Duration d = { t };
     509                WAIT_TIME( info, &l , d )
     510        }
     511}
    392512//-----------------------------------------------------------------------------
    393513// Semaphore
  • libcfa/src/concurrency/locks.hfa

    r1df492a reb5962a  
    101101
    102102//-----------------------------------------------------------------------------
     103// MCS Spin Lock
     104// - No recursive acquisition
     105// - Needs to be released by owner
     106
     107struct mcs_spin_node {
     108        mcs_spin_node * volatile next;
     109        volatile bool locked;
     110};
     111
     112struct mcs_spin_queue {
     113        mcs_spin_node * volatile tail;
     114};
     115
     116static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; }
     117
     118static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) {
     119        return node->next;
     120}
     121
     122struct mcs_spin_lock {
     123        mcs_spin_queue queue;
     124};
     125
     126static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) {
     127        mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST);
     128        n.locked = true;
     129        if(prev == 0p) return;
     130        prev->next = &n;
     131        while(__atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause();
     132}
     133
     134static inline void unlock(mcs_spin_lock & l, mcs_spin_node & n) {
     135        mcs_spin_node * n_ptr = &n;
     136        if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return;
     137        while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {}
     138        n.next->locked = false;
     139}
     140
     141//-----------------------------------------------------------------------------
     142// CLH Spinlock
     143// - No recursive acquisition
     144// - Needs to be released by owner
     145
     146struct clh_lock {
     147        volatile bool * volatile tail;
     148};
     149
     150static inline void  ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }
     151static inline void ^?{}( clh_lock & this ) { free(this.tail); }
     152
     153static inline void lock(clh_lock & l) {
     154        thread$ * curr_thd = active_thread();
     155        *(curr_thd->clh_node) = false;
     156        volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);
     157        while(!__atomic_load_n(prev, __ATOMIC_ACQUIRE)) Pause();
     158        curr_thd->clh_prev = prev;
     159}
     160
     161static inline void unlock(clh_lock & l) {
     162        thread$ * curr_thd = active_thread();
     163        __atomic_store_n(curr_thd->clh_node, true, __ATOMIC_RELEASE);
     164        curr_thd->clh_node = curr_thd->clh_prev;
     165}
     166
     167//-----------------------------------------------------------------------------
    103168// Linear backoff Spinlock
    104169struct linear_backoff_then_block_lock {
     
    205270// Fast Block Lock
    206271
    207 // High efficiency minimal blocking lock
     272// minimal blocking lock
    208273// - No reacquire for cond var
    209274// - No recursive acquisition
    210275// - No ownership
    211276struct fast_block_lock {
     277        // List of blocked threads
     278        dlist( thread$ ) blocked_threads;
     279
    212280        // Spin lock used for mutual exclusion
    213281        __spinlock_t lock;
    214282
    215         // List of blocked threads
    216         dlist( thread$ ) blocked_threads;
    217 
     283        // flag showing if lock is held
    218284        bool held:1;
     285
     286        #ifdef __CFA_DEBUG__
     287        // for deadlock detection
     288        struct thread$ * owner;
     289        #endif
    219290};
    220291
     
    231302static inline void lock(fast_block_lock & this) with(this) {
    232303        lock( lock __cfaabi_dbg_ctx2 );
     304
     305        #ifdef __CFA_DEBUG__
     306        assert(!(held && owner == active_thread()));
     307        #endif
    233308        if (held) {
    234309                insert_last( blocked_threads, *active_thread() );
     
    238313        }
    239314        held = true;
     315        #ifdef __CFA_DEBUG__
     316        owner = active_thread();
     317        #endif
    240318        unlock( lock );
    241319}
     
    246324        thread$ * t = &try_pop_front( blocked_threads );
    247325        held = ( t ? true : false );
     326        #ifdef __CFA_DEBUG__
     327        owner = ( t ? t : 0p );
     328        #endif
    248329        unpark( t );
    249330        unlock( lock );
     
    253334static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    254335static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     336
     337//-----------------------------------------------------------------------------
     338// simple_owner_lock
     339
     340// pthread owner lock
     341// - reacquire for cond var
     342// - recursive acquisition
     343// - ownership
     344struct simple_owner_lock {
     345        // List of blocked threads
     346        dlist( thread$ ) blocked_threads;
     347
     348        // Spin lock used for mutual exclusion
     349        __spinlock_t lock;
     350
     351        // owner showing if lock is held
     352        struct thread$ * owner;
     353
     354        size_t recursion_count;
     355};
     356
     357static inline void  ?{}( simple_owner_lock & this ) with(this) {
     358        lock{};
     359        blocked_threads{};
     360        owner = 0p;
     361        recursion_count = 0;
     362}
     363static inline void ^?{}( simple_owner_lock & this ) {}
     364static inline void ?{}( simple_owner_lock & this, simple_owner_lock this2 ) = void;
     365static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;
     366
     367static inline void lock(simple_owner_lock & this) with(this) {
     368        if (owner == active_thread()) {
     369                recursion_count++;
     370                return;
     371        }
     372        lock( lock __cfaabi_dbg_ctx2 );
     373
     374        if (owner != 0p) {
     375                insert_last( blocked_threads, *active_thread() );
     376                unlock( lock );
     377                park( );
     378                return;
     379        }
     380        owner = active_thread();
     381        recursion_count = 1;
     382        unlock( lock );
     383}
     384
     385// TODO: fix duplicate def issue and bring this back
     386// void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
     387        // thread$ * t = &try_pop_front( blocked_threads );
     388        // owner = t;
     389        // recursion_count = ( t ? 1 : 0 );
     390        // unpark( t );
     391// }
     392
     393static inline void unlock(simple_owner_lock & this) with(this) {
     394        lock( lock __cfaabi_dbg_ctx2 );
     395        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     396        /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
     397        // if recursion count is zero release lock and set new owner if one is waiting
     398        recursion_count--;
     399        if ( recursion_count == 0 ) {
     400                // pop_and_set_new_owner( this );
     401                thread$ * t = &try_pop_front( blocked_threads );
     402                owner = t;
     403                recursion_count = ( t ? 1 : 0 );
     404                unpark( t );
     405        }
     406        unlock( lock );
     407}
     408
     409static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
     410        lock( lock __cfaabi_dbg_ctx2 );
     411        // lock held
     412        if ( owner != 0p ) {
     413                insert_last( blocked_threads, *t );
     414                unlock( lock );
     415        }
     416        // lock not held
     417        else {
     418                owner = t;
     419                recursion_count = 1;
     420                unpark( t );
     421                unlock( lock );
     422        }
     423}
     424
     425static inline size_t on_wait(simple_owner_lock & this) with(this) {
     426        lock( lock __cfaabi_dbg_ctx2 );
     427        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     428        /* paranoid */ verifyf( owner == active_thread(), "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
     429
     430        size_t ret = recursion_count;
     431
     432        // pop_and_set_new_owner( this );
     433
     434        thread$ * t = &try_pop_front( blocked_threads );
     435        owner = t;
     436        recursion_count = ( t ? 1 : 0 );
     437        unpark( t );
     438
     439        unlock( lock );
     440        return ret;
     441}
     442
     443static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     444
     445//-----------------------------------------------------------------------------
     446// Spin Queue Lock
     447
     448// - No reacquire for cond var
     449// - No recursive acquisition
     450// - No ownership
     451// - spin lock with no locking/atomics in unlock
     452struct spin_queue_lock {
     453        // Spin lock used for mutual exclusion
     454        mcs_spin_lock lock;
     455
     456        // flag showing if lock is held
     457        volatile bool held;
     458
     459        #ifdef __CFA_DEBUG__
     460        // for deadlock detection
     461        struct thread$ * owner;
     462        #endif
     463};
     464
     465static inline void  ?{}( spin_queue_lock & this ) with(this) {
     466        lock{};
     467        held = false;
     468}
     469static inline void ^?{}( spin_queue_lock & this ) {}
     470static inline void ?{}( spin_queue_lock & this, spin_queue_lock this2 ) = void;
     471static inline void ?=?( spin_queue_lock & this, spin_queue_lock this2 ) = void;
     472
     473// if this is called recursively IT WILL DEADLOCK!!!!!
     474static inline void lock(spin_queue_lock & this) with(this) {
     475        mcs_spin_node node;
     476        #ifdef __CFA_DEBUG__
     477        assert(!(held && owner == active_thread()));
     478        #endif
     479        lock( lock, node );
     480        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     481        __atomic_store_n(&held, true, __ATOMIC_SEQ_CST);
     482        unlock( lock, node );
     483        #ifdef __CFA_DEBUG__
     484        owner = active_thread();
     485        #endif
     486}
     487
     488static inline void unlock(spin_queue_lock & this) with(this) {
     489        #ifdef __CFA_DEBUG__
     490        owner = 0p;
     491        #endif
     492        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
     493}
     494
     495static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) { unpark(t); }
     496static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
     497static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) { }
     498
     499
     500//-----------------------------------------------------------------------------
     501// MCS Block Spin Lock
     502
     503// - No reacquire for cond var
     504// - No recursive acquisition
     505// - No ownership
     506// - Blocks but first node spins (like spin queue but blocking for not first thd)
     507struct mcs_block_spin_lock {
     508        // Spin lock used for mutual exclusion
     509        mcs_lock lock;
     510
     511        // flag showing if lock is held
     512        volatile bool held;
     513
     514        #ifdef __CFA_DEBUG__
     515        // for deadlock detection
     516        struct thread$ * owner;
     517        #endif
     518};
     519
     520static inline void  ?{}( mcs_block_spin_lock & this ) with(this) {
     521        lock{};
     522        held = false;
     523}
     524static inline void ^?{}( mcs_block_spin_lock & this ) {}
     525static inline void ?{}( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void;
     526static inline void ?=?( mcs_block_spin_lock & this, mcs_block_spin_lock this2 ) = void;
     527
     528// if this is called recursively IT WILL DEADLOCK!!!!!
     529static inline void lock(mcs_block_spin_lock & this) with(this) {
     530        mcs_node node;
     531        #ifdef __CFA_DEBUG__
     532        assert(!(held && owner == active_thread()));
     533        #endif
     534        lock( lock, node );
     535        while(held) Pause();
     536        held = true;
     537        unlock( lock, node );
     538        #ifdef __CFA_DEBUG__
     539        owner = active_thread();
     540        #endif
     541}
     542
     543static inline void unlock(mcs_block_spin_lock & this) with(this) {
     544        #ifdef __CFA_DEBUG__
     545        owner = 0p;
     546        #endif
     547        held = false;
     548}
     549
     550static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
     551static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
     552static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) { }
     553
     554//-----------------------------------------------------------------------------
     555// Block Spin Lock
     556
     557// - No reacquire for cond var
     558// - No recursive acquisition
     559// - No ownership
     560// - Blocks but first node spins (like spin queue but blocking for not first thd)
     561struct block_spin_lock {
     562        // Spin lock used for mutual exclusion
     563        fast_block_lock lock;
     564
     565        // flag showing if lock is held
     566        volatile bool held;
     567
     568        #ifdef __CFA_DEBUG__
     569        // for deadlock detection
     570        struct thread$ * owner;
     571        #endif
     572};
     573
     574static inline void  ?{}( block_spin_lock & this ) with(this) {
     575        lock{};
     576        held = false;
     577}
     578static inline void ^?{}( block_spin_lock & this ) {}
     579static inline void ?{}( block_spin_lock & this, block_spin_lock this2 ) = void;
     580static inline void ?=?( block_spin_lock & this, block_spin_lock this2 ) = void;
     581
     582// if this is called recursively IT WILL DEADLOCK!!!!!
     583static inline void lock(block_spin_lock & this) with(this) {
     584        #ifdef __CFA_DEBUG__
     585        assert(!(held && owner == active_thread()));
     586        #endif
     587        lock( lock );
     588        while(held) Pause();
     589        held = true;
     590        unlock( lock );
     591        #ifdef __CFA_DEBUG__
     592        owner = active_thread();
     593        #endif
     594}
     595
     596static inline void unlock(block_spin_lock & this) with(this) {
     597        #ifdef __CFA_DEBUG__
     598        owner = 0p;
     599        #endif
     600        held = false;
     601}
     602
     603static inline void on_notify(block_spin_lock & this, struct thread$ * t ) { unpark(t); }
     604static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
     605static inline void on_wakeup(block_spin_lock & this, size_t recursion ) { }
    255606
    256607//-----------------------------------------------------------------------------
     
    332683        // - signalling without holding branded lock is UNSAFE!
    333684        // - only allows usage of one lock, cond var is branded after usage
     685
    334686        struct fast_cond_var {
    335687                // List of blocked threads
    336688                dlist( info_thread(L) ) blocked_threads;
    337 
    338689                #ifdef __CFA_DEBUG__
    339690                L * lock_used;
     
    341692        };
    342693
    343 
    344694        void  ?{}( fast_cond_var(L) & this );
    345695        void ^?{}( fast_cond_var(L) & this );
     
    349699
    350700        uintptr_t front( fast_cond_var(L) & this );
    351 
    352701        bool empty  ( fast_cond_var(L) & this );
    353702
    354703        void wait( fast_cond_var(L) & this, L & l );
    355704        void wait( fast_cond_var(L) & this, L & l, uintptr_t info );
    356 }
     705
     706
     707        //-----------------------------------------------------------------------------
     708        // pthread_cond_var
     709        //
     710        // - cond var with minimal footprint
     711        // - supports operations needed for phthread cond
     712
     713        struct pthread_cond_var {
     714                dlist( info_thread(L) ) blocked_threads;
     715                __spinlock_t lock;
     716        };
     717
     718        void  ?{}( pthread_cond_var(L) & this );
     719        void ^?{}( pthread_cond_var(L) & this );
     720
     721        bool notify_one( pthread_cond_var(L) & this );
     722        bool notify_all( pthread_cond_var(L) & this );
     723
     724        uintptr_t front( pthread_cond_var(L) & this );
     725        bool empty ( pthread_cond_var(L) & this );
     726
     727        void wait( pthread_cond_var(L) & this, L & l );
     728        void wait( pthread_cond_var(L) & this, L & l, uintptr_t info );
     729        bool wait( pthread_cond_var(L) & this, L & l, timespec t );
     730        bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t );
     731}
  • libcfa/src/concurrency/thread.cfa

    r1df492a reb5962a  
    5353        #endif
    5454
    55         seqable.next = 0p;
    56         seqable.back = 0p;
    57 
    5855        node.next = 0p;
    5956        node.prev = 0p;
     57
     58        clh_node = malloc( );
     59        *clh_node = false;
     60
    6061        doregister(curr_cluster, this);
    61 
    6262        monitors{ &self_mon_p, 1, (fptr_t)0 };
    6363}
     
    6767                canary = 0xDEADDEADDEADDEADp;
    6868        #endif
     69        free(clh_node);
    6970        unregister(curr_cluster, this);
    7071        ^self_cor{};
  • libcfa/src/containers/queueLockFree.hfa

    r1df492a reb5962a  
    22
    33#include <assert.h>
     4
     5#include <bits/defs.hfa>
    46
    57forall( T &) {
  • libcfa/src/startup.cfa

    r1df492a reb5962a  
    6363
    6464struct __spinlock_t;
    65 extern "C" {
    66         void __cfaabi_dbg_record_lock(struct __spinlock_t & this, const char prev_name[]) __attribute__(( weak )) libcfa_public {}
    67 }
    6865
    6966// Local Variables: //
  • src/AST/Pass.impl.hpp

    r1df492a reb5962a  
    182182
    183183                // get the stmts/decls that will need to be spliced in
    184                 auto stmts_before = __pass::stmtsToAddBefore( core, 0);
    185                 auto stmts_after  = __pass::stmtsToAddAfter ( core, 0);
    186                 auto decls_before = __pass::declsToAddBefore( core, 0);
    187                 auto decls_after  = __pass::declsToAddAfter ( core, 0);
     184                auto stmts_before = __pass::stmtsToAddBefore( core, 0 );
     185                auto stmts_after  = __pass::stmtsToAddAfter ( core, 0 );
     186                auto decls_before = __pass::declsToAddBefore( core, 0 );
     187                auto decls_after  = __pass::declsToAddAfter ( core, 0 );
    188188
    189189                // These may be modified by subnode but most be restored once we exit this statemnet.
     
    287287
    288288                // get the stmts/decls that will need to be spliced in
    289                 auto stmts_before = __pass::stmtsToAddBefore( core, 0);
    290                 auto stmts_after  = __pass::stmtsToAddAfter ( core, 0);
    291                 auto decls_before = __pass::declsToAddBefore( core, 0);
    292                 auto decls_after  = __pass::declsToAddAfter ( core, 0);
     289                auto stmts_before = __pass::stmtsToAddBefore( core, 0 );
     290                auto stmts_after  = __pass::stmtsToAddAfter ( core, 0 );
     291                auto decls_before = __pass::declsToAddBefore( core, 0 );
     292                auto decls_after  = __pass::declsToAddAfter ( core, 0 );
    293293
    294294                // These may be modified by subnode but most be restored once we exit this statemnet.
     
    317317                                assert(( empty( stmts_before ) && empty( stmts_after ))
    318318                                    || ( empty( decls_before ) && empty( decls_after )) );
    319 
    320 
    321319
    322320                                // Take all the statements which should have gone after, N/A for first iteration
     
    21162114        if ( __visit_children() ) {
    21172115                bool mutated = false;
    2118                 std::unordered_map< ast::TypeInstType::TypeEnvKey, ast::ptr< ast::Type > > new_map;
    2119                 for ( const auto & p : node->typeEnv ) {
     2116                ast::TypeSubstitution::TypeMap new_map;
     2117                for ( const auto & p : node->typeMap ) {
    21202118                        guard_symtab guard { *this };
    21212119                        auto new_node = p.second->accept( *this );
     
    21252123                if (mutated) {
    21262124                        auto new_node = __pass::mutate<core_t>( node );
    2127                         new_node->typeEnv.swap( new_map );
     2125                        new_node->typeMap.swap( new_map );
    21282126                        node = new_node;
    21292127                }
  • src/AST/TypeSubstitution.cpp

    r1df492a reb5962a  
    3838
    3939void TypeSubstitution::initialize( const TypeSubstitution &src, TypeSubstitution &dest ) {
    40         dest.typeEnv.clear();
     40        dest.typeMap.clear();
    4141        dest.add( src );
    4242}
    4343
    4444void TypeSubstitution::add( const TypeSubstitution &other ) {
    45         for ( TypeEnvType::const_iterator i = other.typeEnv.begin(); i != other.typeEnv.end(); ++i ) {
    46                 typeEnv[ i->first ] = i->second;
     45        for ( TypeMap::const_iterator i = other.typeMap.begin(); i != other.typeMap.end(); ++i ) {
     46                typeMap[ i->first ] = i->second;
    4747        } // for
    4848}
    4949
    5050void TypeSubstitution::add( const TypeInstType * formalType, const Type *actualType ) {
    51         typeEnv[ *formalType ] = actualType;
     51        typeMap[ *formalType ] = actualType;
    5252}
    5353
    5454void TypeSubstitution::add( const TypeInstType::TypeEnvKey & key, const Type * actualType) {
    55         typeEnv[ key ] = actualType;
     55        typeMap[ key ] = actualType;
    5656}
    5757
    5858void TypeSubstitution::remove( const TypeInstType * formalType ) {
    59         TypeEnvType::iterator i = typeEnv.find( *formalType );
    60         if ( i != typeEnv.end() ) {
    61                 typeEnv.erase( *formalType );
    62         } // if
    63 }
    64 
    65 const Type *TypeSubstitution::lookup( const TypeInstType * formalType ) const {
    66         TypeEnvType::const_iterator i = typeEnv.find( *formalType );
     59        TypeMap::iterator i = typeMap.find( *formalType );
     60        if ( i != typeMap.end() ) {
     61                typeMap.erase( *formalType );
     62        } // if
     63}
     64
     65const Type *TypeSubstitution::lookup(
     66                const TypeInstType::TypeEnvKey & formalType ) const {
     67        TypeMap::const_iterator i = typeMap.find( formalType );
    6768
    6869        // break on not in substitution set
    69         if ( i == typeEnv.end() ) return 0;
     70        if ( i == typeMap.end() ) return 0;
    7071
    7172        // attempt to transitively follow TypeInstType links.
    7273        while ( const TypeInstType *actualType = i->second.as<TypeInstType>()) {
    7374                // break cycles in the transitive follow
    74                 if ( *formalType == *actualType ) break;
     75                if ( formalType == *actualType ) break;
    7576
    7677                // Look for the type this maps to, returning previous mapping if none-such
    77                 i = typeEnv.find( *actualType );
    78                 if ( i == typeEnv.end() ) return actualType;
     78                i = typeMap.find( *actualType );
     79                if ( i == typeMap.end() ) return actualType;
    7980        }
    8081
     
    8384}
    8485
     86const Type *TypeSubstitution::lookup( const TypeInstType * formalType ) const {
     87        return lookup( ast::TypeInstType::TypeEnvKey( *formalType ) );
     88}
     89
    8590bool TypeSubstitution::empty() const {
    86         return typeEnv.empty();
     91        return typeMap.empty();
    8792}
    8893
     
    119124                sub.core.subCount = 0;
    120125                sub.core.freeOnly = true;
    121                 for ( TypeEnvType::iterator i = typeEnv.begin(); i != typeEnv.end(); ++i ) {
     126                for ( TypeMap::iterator i = typeMap.begin(); i != typeMap.end(); ++i ) {
    122127                        i->second = i->second->accept( sub );
    123128                }
     
    129134        if ( bound != boundVars.end() ) return inst;
    130135
    131         TypeEnvType::const_iterator i = sub.typeEnv.find( *inst );
    132         if ( i == sub.typeEnv.end() ) {
     136        TypeMap::const_iterator i = sub.typeMap.find( *inst );
     137        if ( i == sub.typeMap.end() ) {
    133138                return inst;
    134139        } else {
  • src/AST/TypeSubstitution.hpp

    r1df492a reb5962a  
    7575        void add( const TypeSubstitution &other );
    7676        void remove( const TypeInstType * formalType );
     77        const Type *lookup( const TypeInstType::TypeEnvKey & formalType ) const;
    7778        const Type *lookup( const TypeInstType * formalType ) const;
    7879        bool empty() const;
     
    104105        friend class Pass;
    105106
    106         typedef std::unordered_map< TypeInstType::TypeEnvKey, ptr<Type> > TypeEnvType;
    107         TypeEnvType typeEnv;
     107        typedef std::unordered_map< TypeInstType::TypeEnvKey, ptr<Type> > TypeMap;
     108        TypeMap typeMap;
    108109
    109110  public:
    110         // has to come after declaration of typeEnv
    111         auto begin()       -> decltype( typeEnv.begin() ) { return typeEnv.begin(); }
    112         auto   end()       -> decltype( typeEnv.  end() ) { return typeEnv.  end(); }
    113         auto begin() const -> decltype( typeEnv.begin() ) { return typeEnv.begin(); }
    114         auto   end() const -> decltype( typeEnv.  end() ) { return typeEnv.  end(); }
     111        // has to come after declaration of typeMap
     112        auto begin()       -> decltype( typeMap.begin() ) { return typeMap.begin(); }
     113        auto   end()       -> decltype( typeMap.  end() ) { return typeMap.  end(); }
     114        auto begin() const -> decltype( typeMap.begin() ) { return typeMap.begin(); }
     115        auto   end() const -> decltype( typeMap.  end() ) { return typeMap.  end(); }
    115116
    116117};
     
    144145                        if ( const TypeExpr *actual = actualIt->template as<TypeExpr>() ) {
    145146                                if ( formal->name != "" ) {
    146                                         typeEnv[ formal ] = actual->type;
     147                                        typeMap[ formal ] = actual->type;
    147148                                } // if
    148149                        } else {
  • src/Concurrency/Waitfor.cc

    r1df492a reb5962a  
    5656                      |  |
    5757                      |  |
    58                             |  |
     58                      |  |
    5959                      |  |
    6060                      |  |
  • src/Concurrency/Waitfor.h

    r1df492a reb5962a  
    1919
    2020class Declaration;
     21namespace ast {
     22        class TranslationUnit;
     23}
    2124
    2225namespace Concurrency {
    2326        void generateWaitFor( std::list< Declaration * > & translationUnit );
     27
     28void generateWaitFor( ast::TranslationUnit & translationUnit );
    2429};
    2530
  • src/Concurrency/module.mk

    r1df492a reb5962a  
    1919        Concurrency/Keywords.cc \
    2020        Concurrency/Keywords.h \
     21        Concurrency/WaitforNew.cpp \
    2122        Concurrency/Waitfor.cc \
    2223        Concurrency/Waitfor.h
  • src/GenPoly/Specialize.cc

    r1df492a reb5962a  
    247247                        structureArg( (*actualBegin)->get_type(), argBegin, argEnd, back_inserter( appExpr->get_args() ) );
    248248                }
     249                assertf( argBegin == argEnd, "Did not structure all arguments." );
    249250
    250251                appExpr->env = TypeSubstitution::newFromExpr( appExpr, env );
  • src/InitTweak/FixGlobalInit.cc

    r1df492a reb5962a  
    162162                        } // if
    163163                        if ( Statement * ctor = ctorInit->ctor ) {
    164                                 addDataSectonAttribute( objDecl );
     164                                addDataSectionAttribute( objDecl );
    165165                                initStatements.push_back( ctor );
    166166                                objDecl->init = nullptr;
  • src/InitTweak/FixInit.cc

    r1df492a reb5962a  
    806806                                                // The attribute works, and is meant to apply, both for leaving the static local alone,
    807807                                                // and for hoisting it out as a static global.
    808                                                 addDataSectonAttribute( objDecl );
     808                                                addDataSectionAttribute( objDecl );
    809809
    810810                                                // originally wanted to take advantage of gcc nested functions, but
  • src/InitTweak/InitTweak.cc

    r1df492a reb5962a  
    587587
    588588        bool isConstructable( const ast::Type * type ) {
    589                 return ! dynamic_cast< const ast::VarArgsType * >( type ) && ! dynamic_cast< const ast::ReferenceType * >( type ) 
     589                return ! dynamic_cast< const ast::VarArgsType * >( type ) && ! dynamic_cast< const ast::ReferenceType * >( type )
    590590                && ! dynamic_cast< const ast::FunctionType * >( type ) && ! Tuples::isTtype( type );
    591591        }
     
    10251025                if (!assign) {
    10261026                        auto td = new ast::TypeDecl({}, "T", {}, nullptr, ast::TypeDecl::Dtype, true);
    1027                         assign = new ast::FunctionDecl({}, "?=?", {}, 
     1027                        assign = new ast::FunctionDecl({}, "?=?", {},
    10281028                        { new ast::ObjectDecl({}, "_dst", new ast::ReferenceType(new ast::TypeInstType("T", td))),
    10291029                          new ast::ObjectDecl({}, "_src", new ast::TypeInstType("T", td))},
     
    10951095
    10961096                        // address of a variable or member expression is constexpr
    1097                         if ( ! dynamic_cast< const ast::NameExpr * >( arg ) 
    1098                         && ! dynamic_cast< const ast::VariableExpr * >( arg ) 
    1099                         && ! dynamic_cast< const ast::MemberExpr * >( arg ) 
     1097                        if ( ! dynamic_cast< const ast::NameExpr * >( arg )
     1098                        && ! dynamic_cast< const ast::VariableExpr * >( arg )
     1099                        && ! dynamic_cast< const ast::MemberExpr * >( arg )
    11001100                        && ! dynamic_cast< const ast::UntypedMemberExpr * >( arg ) ) result = false;
    11011101                }
     
    12411241        }
    12421242
    1243         void addDataSectonAttribute( ObjectDecl * objDecl ) {
     1243        #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message
     1244                #define ASM_COMMENT "#"
     1245        #else // defined( __ARM_ARCH )
     1246                #define ASM_COMMENT "//"
     1247        #endif
     1248        static const char * const data_section =  ".data" ASM_COMMENT;
     1249        static const char * const tlsd_section = ".tdata" ASM_COMMENT;
     1250        void addDataSectionAttribute( ObjectDecl * objDecl ) {
     1251                const bool is_tls = objDecl->get_storageClasses().is_threadlocal;
     1252                const char * section = is_tls ? tlsd_section : data_section;
    12441253                objDecl->attributes.push_back(new Attribute("section", {
    1245                         new ConstantExpr( Constant::from_string(".data"
    1246 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message
    1247                                         "#"
    1248 #else // defined( __ARM_ARCH )
    1249                                         "//"
    1250 #endif
    1251                                 ))}));
     1254                        new ConstantExpr( Constant::from_string( section ) )
     1255                }));
    12521256        }
    12531257
    12541258        void addDataSectionAttribute( ast::ObjectDecl * objDecl ) {
     1259                const bool is_tls = objDecl->storage.is_threadlocal;
     1260                const char * section = is_tls ? tlsd_section : data_section;
    12551261                objDecl->attributes.push_back(new ast::Attribute("section", {
    1256                         ast::ConstantExpr::from_string(objDecl->location, ".data"
    1257 #if defined( __x86_64 ) || defined( __i386 ) // assembler comment to prevent assembler warning message
    1258                                         "#"
    1259 #else // defined( __ARM_ARCH )
    1260                                         "//"
    1261 #endif
    1262                                 )}));
     1262                        ast::ConstantExpr::from_string(objDecl->location, section)
     1263                }));
    12631264        }
    12641265
  • src/InitTweak/InitTweak.h

    r1df492a reb5962a  
    127127        ///    .section .data#,"a"
    128128        /// to avoid assembler warning "ignoring changed section attributes for .data"
    129         void addDataSectonAttribute( ObjectDecl * objDecl );
     129        void addDataSectionAttribute( ObjectDecl * objDecl );
    130130
    131131        void addDataSectionAttribute( ast::ObjectDecl * objDecl );
  • src/main.cc

    r1df492a reb5962a  
    1010// Created On       : Fri May 15 23:12:02 2015
    1111// Last Modified By : Andrew Beach
    12 // Last Modified On : Fri Apr 29  9:52:00 2022
    13 // Update Count     : 673
     12// Last Modified On : Tue Jun  7 13:29:00 2022
     13// Update Count     : 674
    1414//
    1515
     
    447447                        PASS( "Expand Unique Expr", Tuples::expandUniqueExpr( transUnit ) ); // xxx - is this the right place for this? want to expand ASAP so tha, sequent passes don't need to worry about double-visiting a unique expr - needs to go after InitTweak::fix so that copy constructed return declarations are reused
    448448
    449                         PASS( "Translate Tries" , ControlStruct::translateTries( transUnit ) );
     449                        PASS( "Translate Tries", ControlStruct::translateTries( transUnit ) );
     450                        PASS( "Gen Waitfor", Concurrency::generateWaitFor( transUnit ) );
    450451
    451452                        translationUnit = convert( move( transUnit ) );
     
    517518
    518519                        PASS( "Expand Unique Expr", Tuples::expandUniqueExpr( translationUnit ) ); // xxx - is this the right place for this? want to expand ASAP so tha, sequent passes don't need to worry about double-visiting a unique expr - needs to go after InitTweak::fix so that copy constructed return declarations are reused
    519 
    520                         PASS( "Translate Tries" , ControlStruct::translateTries( translationUnit ) );
     520                        PASS( "Translate Tries", ControlStruct::translateTries( translationUnit ) );
     521                        PASS( "Gen Waitfor", Concurrency::generateWaitFor( translationUnit ) );
    521522                }
    522 
    523                 PASS( "Gen Waitfor" , Concurrency::generateWaitFor( translationUnit ) );
    524523
    525524                PASS( "Convert Specializations",  GenPoly::convertSpecializations( translationUnit ) ); // needs to happen before tuple types are expanded
  • tests/unified_locking/mutex_test.hfa

    r1df492a reb5962a  
    2222}
    2323
    24 uint32_t cs() {
     24uint32_t cs(uint32_t & entries) {
    2525        thread$ * me = active_thread();
    2626        uint32_t value;
    2727        lock(mo.l);
    2828        {
     29                entries++;
    2930                uint32_t tsum = mo.sum;
    3031                uint32_t cnt = mo.cnt;
     
    4243thread LockCheck {
    4344        uint32_t sum;
     45        uint32_t entries;
    4446};
    4547
    4648void main(LockCheck & this) {
    4749        this.sum = 0;
     50        this.entries = 0;
    4851        for(num_times) {
    4952                trash();
    50                 this.sum += cs();
     53                this.sum += cs( this.entries );
    5154                trash();
    5255                yield(random(10));
     
    5861        mo.sum = -32;
    5962        mo.cnt = 0;
     63        uint32_t real_entries = 0;
    6064        processor p[2];
    6165        sout | "Starting";
     
    6367                LockCheck checkers[13];
    6468                for(i;13) {
    65                         sum += join(checkers[i]).sum;
     69                        LockCheck & curr = join(checkers[i]);
     70                        sum += curr.sum;
     71                        real_entries += curr.entries;
    6672                }
    6773        }
    6874        sout | "Done!";
    69         if(mo.cnt != (13 * num_times)) sout | "Invalid cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')';
     75        if(real_entries != (13 * num_times)) sout | "Invalid real cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')';
     76        if(mo.cnt != (13 * num_times)) sout | "Invalid concurrent cs count!" | mo.cnt | "vs "| (13 * num_times) | "(13 *" | num_times | ')';
    7077        if(sum == mo.sum) sout | "Match!";
    7178        else sout | "No Match!" | sum | "vs" | mo.sum;
  • tools/cfa.nanorc

    r1df492a reb5962a  
    1010color green "\<(forall|trait|(o|d|f|t)type|mutex|_Bool|volatile|virtual)\>"
    1111color green "\<(float|double|bool|char|int|short|long|enum|void|auto)\>"
    12 color green "\<(static|const|extern|(un)?signed|inline)\>" "\<(sizeof)\>"
     12color green "\<(static|const|extern|(un)?signed|inline|sizeof|vtable)\>"
    1313color green "\<((s?size)|one|zero|((u_?)?int(8|16|32|64|ptr)))_t\>"
    1414
    1515# Declarations
    1616color brightgreen "\<(struct|union|typedef|trait|coroutine|generator)\>"
    17 color brightgreen "\<(monitor|thread|with)\>"
     17color brightgreen "\<(monitor|thread|with|exception)\>"
    1818
    1919# Control Flow Structures
  • tools/jenkins/setup.sh.in

    r1df492a reb5962a  
    2929function getrunpath()
    3030{
    31         local elfout=$(readelf -d $1 | grep "RUNPATH")
     31        local elfout=$(readelf -d $1 | grep -E "RPATH|RUNPATH")
    3232        regex='\[/([[:alpha:][:digit:]@/_.-]+)\]'
    3333        if [[ $elfout =~ $regex ]]; then
     
    4343{
    4444        local deps=$(ldd $1)
     45        retlcldeps=()
    4546        retsysdeps=()
    46         retlcldeps=()
    4747        while IFS= read -r line; do
    48                 regex1='/([[:alpha:][:digit:]@/_.-]+)'
    49                 regex2='(libcfa[[:alpha:][:digit:].]+) => not found'
     48                regex1='(libcfa[[:alpha:][:digit:].]+)'
     49                regex2='/([[:alpha:][:digit:]@/_.-]+)'
    5050                regex3='linux-vdso.so.1|linux-gate.so.1'
    5151                if [[ $line =~ $regex1 ]]; then
     52                        retlcldeps+=(${BASH_REMATCH[1]})
     53                elif [[ $line =~ $regex2 ]]; then
    5254                        retsysdeps+=(${BASH_REMATCH[1]})
    53                 elif [[ $line =~ $regex2 ]]; then
    54                         retlcldeps+=(${BASH_REMATCH[1]})
    5555                elif [[ $line =~ $regex3 ]]; then
    5656                        # echo "ignoring '$line}': intrinsic"
Note: See TracChangeset for help on using the changeset viewer.