Changeset eb47a80


Ignore:
Timestamp:
Mar 30, 2023, 9:48:06 PM (2 months ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, master
Children:
0b66ef9
Parents:
70056ed (diff), 6e83384 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Files:
41 added
13 edited
1 moved

Legend:

Unmodified
Added
Removed
  • doc/theses/colby_parsons_MMAth/Makefile

    r70056ed reb47a80  
    4646        figures/nasus_Aggregate_Lock_4  \
    4747        figures/nasus_Aggregate_Lock_8  \
     48        figures/nasus_Channel_Contention \
     49        figures/pyke_Channel_Contention \
    4850}
    4951
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/cfa/contend.cfa

    r70056ed reb47a80  
    88
    99// user defines this
    10 #define BIG 1
     10// #define BIG 1
    1111
    1212owner_lock o;
    1313
    14 unsigned long long total_operations = 0;
     14size_t total_operations = 0;
    1515
    1616struct bigObject {
     
    3636Channel * channels;
    3737
    38 volatile bool cons_done = false, prod_done = false;
     38bool cons_done = false, prod_done = false;
    3939volatile int cons_done_count = 0;
    4040size_t cons_check = 0, prod_check = 0;
     
    4848}
    4949void main(Consumer & this) {
    50     unsigned long long runs = 0;
     50    size_t runs = 0;
    5151    size_t my_check = 0;
    5252    for ( ;; ) {
     
    7878}
    7979void main(Producer & this) {
    80     unsigned long long runs = 0;
     80    size_t runs = 0;
    8181    size_t my_check = 0;
     82    size_t my_id = this.i;
    8283    for ( ;; ) {
    8384        if ( prod_done ) break;
    8485        #ifdef BIG
    8586        bigObject j{(size_t)runs};
    86         insert( channels[ this.i ], j );
     87        insert( channels[ my_id ], j );
    8788        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
    8889        #else
    89         insert( channels[ this.i ], (size_t)runs );
     90        insert( channels[ my_id ], (size_t)runs );
    9091        my_check = my_check ^ ((size_t)runs);
    9192        #endif
     
    99100}
    100101
    101 int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
    102     size_t Clusters = Processors;
     102static inline int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
     103    size_t Clusters = 1;
    103104    // create a cluster
    104105    cluster clus[Clusters];
    105106    processor * proc[Processors];
    106107    for ( i; Processors ) {
    107         (*(proc[i] = alloc())){clus[i % Clusters]};
     108        (*(proc[i] = malloc())){clus[i % Clusters]};
    108109    }
    109110
     
    120121    Producer * p[Producers * Channels];
    121122
    122     for ( i; Consumers * Channels ) {
    123         (*(c[i] = alloc())){ i % Channels, clus[i % Clusters] };
    124     }
    125 
    126     for ( i; Producers * Channels ) {
    127         (*(p[i] = alloc())){ i % Channels, clus[i % Clusters] };
     123    for ( j; Channels ) {
     124        for ( i; Producers ) {
     125            (*(p[i] = malloc())){ j, clus[j % Clusters] };
     126        }
     127
     128        for ( i; Consumers ) {
     129            (*(c[i] = malloc())){ j, clus[j % Clusters] };
     130        }
    128131    }
    129132
     
    148151            }
    149152        }
    150        
    151153    }
    152154
     
    185187
    186188int main( int argc, char * argv[] ) {
    187     size_t Processors = 10, Channels = 1, Producers = 10, Consumers = 10, ChannelSize = 128;
     189    size_t Processors = 1, Channels = 1, Producers = 1, Consumers = 1, ChannelSize = 128;
    188190    switch ( argc ) {
    189191          case 3:
     
    206208                exit( EXIT_FAILURE );
    207209        } // switch
    208     Producers = Processors;
    209     Consumers = Processors;
     210    Producers = Processors / 2;
     211    Consumers = Processors / 2;
    210212    test(Processors, Channels, Producers, Consumers, ChannelSize);
    211213}
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/go/contend/contend.go

    r70056ed reb47a80  
    6464                case 3:
    6565                        if os.Args[2] != "d" {                                                  // default ?
    66                                 ChannelSize, _ = strconv.Atoi( os.Args[1] )
     66                                ChannelSize, _ = strconv.Atoi( os.Args[2] )
    6767                                        if ChannelSize < 0 { usage(); }
    6868                        } // if
     
    7878        } // switch
    7979        runtime.GOMAXPROCS( Processors );
    80         ProdsPerChan = Processors
    81         ConsPerChan = Processors
     80        ProdsPerChan = Processors /2
     81        ConsPerChan = Processors / 2
    8282
    8383        // fmt.Println("Processors: ",Processors," Channels: ",Channels," ProdsPerChan: ",ProdsPerChan," ConsPerChan: ",ConsPerChan," Channel Size: ",ChannelSize)
     
    108108        cons_done = true
    109109        for i := range chans {
    110                 J: for j := 0; j < ConsPerChan; j++ {
     110                L: for {
    111111                        select {
    112                                 case chans[i] <- 0:
    113                                        
     112                                case k := <-chans[i]:
     113                                        cons_check = cons_check ^ k
    114114                                default:
    115                                         break J
     115                                        break L
    116116                        }
    117117                }
    118118        }
     119        for i := range chans {
     120                close(chans[i])
     121        }
     122
    119123        for j := 0; j < ConsPerChan * Channels; j++{
    120124                <-consJoin
    121125        }
    122126       
    123         // for i := range chans {
    124         //      L: for {
    125         //              select {
    126         //                      case k := <-chans[i]:
    127         //                              cons_check = cons_check ^ k
    128         //                      default:
    129         //                              break L
    130         //              }
    131         //      }
    132         // }
     127       
    133128        if cons_check != prod_check {
    134129                fmt.Println("\nChecksum mismatch: Cons: %d, Prods: %d", cons_check, prod_check)
    135130        }
    136         for i := range chans {
    137                 close(chans[i])
    138         }
    139131    fmt.Println(total_operations)
    140132}
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/plotData.py

    r70056ed reb47a80  
    3636    procs.append(int(val))
    3737
    38 # 3rd line has num locks args
    39 line = readfile.readline()
    40 locks = []
    41 for val in line.split():
    42     locks.append(int(val))
    43 
    44 # 4th line has number of variants
     38# 3rd line has number of variants
    4539line = readfile.readline()
    4640names = line.split()
     
    5044lines = (line for line in lines if line) # Non-blank lines
    5145
     46class Bench(Enum):
     47    Unset = 0
     48    Contend = 1
     49    Zero = 2
     50    Barrier = 3
     51    Churn = 4
     52    Daisy_Chain = 5
     53    Hot_Potato = 6
     54    Pub_Sub = 7
     55
    5256nameSet = False
    53 currLocks = -1 # default val
     57currBench = Bench.Unset # default val
    5458count = 0
    5559procCount = 0
    5660currVariant = 0
    57 name = "Aggregate Lock"
     61name = ""
    5862var_name = ""
    5963sendData = [0.0 for j in range(numVariants)]
     
    6468    # print(line)
    6569   
    66     if currLocks == -1:
    67         lineArr = line.split()
    68         currLocks = lineArr[-1]
     70    if currBench == Bench.Unset:
     71        if line == "contend:":
     72            name = "Contend"
     73            currBench = Bench.Contend
     74        elif line == "zero:":
     75            name = "Zero"
     76            currBench = Bench.Zero
     77        elif line == "barrier:":
     78            name = "Barrier"
     79            currBench = Bench.Barrier
     80        elif line == "churn:":
     81            name = "Churn"
     82            currBench = Bench.Churn
     83        elif line == "daisy_chain:":
     84            name = "Daisy_Chain"
     85            currBench = Bench.Daisy_Chain
     86        elif line == "hot_potato:":
     87            name = "Hot_Potato"
     88            currBench = Bench.Hot_Potato
     89        elif line == "pub_sub:":
     90            name = "Pub_Sub"
     91            currBench = Bench.Pub_Sub
     92        else:
     93            print("Expected benchmark name")
     94            print("Line: " + line)
     95            sys.exit()
    6996        continue
    7097
     
    95122            if currVariant == numVariants:
    96123                fig, ax = plt.subplots()
    97                 plt.title(name + " Benchmark: " + str(currLocks) + " Locks")
    98                 plt.ylabel("Throughput (entries)")
     124                plt.title(name + " Benchmark")
     125                plt.ylabel("Throughput (channel operations)")
    99126                plt.xlabel("Cores")
    100127                for idx, arr in enumerate(data):
    101128                    plt.errorbar( procs, arr, [bars[idx][0], bars[idx][1]], capsize=2, marker='o' )
     129               
    102130                plt.yscale("log")
     131                # plt.ylim(1, None)
     132                # ax.get_yaxis().set_major_formatter(ticks.ScalarFormatter())
     133                # else:
     134                #     plt.ylim(0, None)
    103135                plt.xticks(procs)
    104136                ax.legend(names)
    105                 # fig.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks) + ".png")
    106                 plt.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks) + ".pgf")
     137                # fig.savefig("plots/" + machineName + name + ".png")
     138                plt.savefig("plots/" + machineName + name + ".pgf")
    107139                fig.clf()
    108140
    109141                # reset
    110                 currLocks = -1
     142                currBench = Bench.Unset
    111143                currVariant = 0
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/run

    r70056ed reb47a80  
    8585}
    8686
    87 numtimes=11
    88 
    89 # locks=('-DLOCKS=L1' '-DLOCKS=L2' '-DLOCKS=L3' '-DLOCKS=L4' '-DLOCKS=L5' '-DLOCKS=L6' '-DLOCKS=L7' '-DLOCKS=L8')
    90 # locks='1 2 3 4 5 6 7 8'
    91 lock_flags=('-DLOCKS=L2' '-DLOCKS=L4' '-DLOCKS=L8')
    92 locks=('2' '4' '8')
     87numtimes=5
    9388
    9489num_threads='2 4 8 16 24 32'
    95 # num_threads='2 4 8'
     90# num_threads='2'
    9691
    9792# toggle benchmarks
    98 order=${true}
    99 rand=${true}
    100 baseline=${true}
     93zero=${false}
     94contend=${true}
     95barrier=${false}
     96churn=${false}
     97daisy_chain=${false}
     98hot_potato=${false}
     99pub_sub=${false}
    101100
    102101runCFA=${true}
    103 runCPP=${true}
     102runGO=${true}
    104103# runCFA=${false}
    105 # runCPP=${false}
     104# runGO=${false}
    106105
    107106cfa=~/cfa-cc/driver/cfa
    108 cpp=g++
    109107
    110108# Helpers to minimize code duplication
     
    152150echo $num_threads
    153151
    154 for i in ${!locks[@]}; do
    155         echo -n ${locks[$i]}' '
    156 done
    157 echo ""
    158 
    159 if [ ${runCFA} -eq ${true} ] && [ ${order} -eq ${true} ]; then
    160     echo -n 'CFA-order '
    161 fi
    162 if [ ${runCPP} -eq ${true} ] && [ ${order} -eq ${true} ]; then
    163     echo -n 'CPP-order '
    164 fi
    165 if [ ${runCFA} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then
    166     echo -n 'CFA-baseline '
    167 fi
    168 if [ ${runCPP} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then
    169     echo -n 'CPP-baseline '
    170 fi
    171 if [ ${runCFA} -eq ${true} ] && [ ${rand} -eq ${true} ]; then
    172     echo -n 'CFA-rand '
    173 fi
    174 if [ ${runCPP} -eq ${true} ] && [ ${rand} -eq ${true} ]; then
    175     echo -n 'CPP-rand '
     152if [ ${runCFA} -eq ${true} ]; then
     153    echo -n 'CFA '
     154fi
     155if [ ${runGO} -eq ${true} ]; then
     156    echo -n 'Go '
    176157fi
    177158echo ""
     
    182163cfa_flags='-quiet -O3 -nodebug -DNDEBUG'
    183164
    184 # cpp flagse
    185 cpp_flags='-O3 -std=c++17 -lpthread -pthread -DNDEBUG'
    186 
    187165# run the benchmarks
    188166
    189 run_order() {
     167run_contend() {
    190168    post_args=${1}
    191169
    192170    if [ ${runCFA} -eq ${true} ] ; then
    193171        cd cfa # CFA RUN
    194         print_header 'CFA-'${3}
    195         ${cfa} ${cfa_flags} ${2} ${3}.cfa -o a.${hostname} > /dev/null 2>&1
     172        print_header 'CFA'
     173        ${cfa} ${cfa_flags} ${2}.cfa -o a.${hostname} > /dev/null 2>&1
    196174        run_bench
    197175        rm a.${hostname}
     
    199177    fi # done CFA
    200178
    201     if [ ${runCPP} -eq ${true} ] ; then
    202         cd cpp # CPP RUN
    203         print_header 'CPP-'${3}
    204         ${cpp} ${cpp_flags} ${2} ${3}.cc -o a.${hostname} > /dev/null 2>&1
     179    if [ ${runGO} -eq ${true} ] ; then
     180        cd go/${2} # Go RUN
     181        print_header 'Go'
     182        go build -o a.${hostname} > /dev/null 2>&1
    205183        run_bench
    206184        rm a.${hostname}
    207185        cd - > /dev/null
    208     fi # done CPP
     186    fi # done Go
    209187}
    210188
    211189# /usr/bin/time -f "%Uu %Ss %Er %Mkb"
    212 
    213 for i in ${!locks[@]}; do
    214     echo "locks: "${locks[$i]}
    215     if [ ${order} -eq ${true} ] ; then
    216         run_order ${locks[$i]} ${lock_flags[$i]} 'order'
    217     fi
    218     if [ ${baseline} -eq ${true} ] ; then
    219         run_order ${locks[$i]} ${lock_flags[$i]} 'baseline'
    220     fi
    221     if [ ${rand} -eq ${true} ] ; then
    222         run_order ${locks[$i]} '-DLOCKS=L8' 'rand'
    223     fi
    224 done
    225 
    226 
     190if [ ${contend} -eq ${true} ] ; then
     191    echo "contend: "
     192    run_contend '128' 'contend'
     193fi
     194
     195if [ ${zero} -eq ${true} ] ; then
     196    echo "zero: "
     197    run_contend '0' 'contend'
     198fi
     199
     200if [ ${barrier} -eq ${true} ] ; then
     201    echo "barrier: "
     202    run_contend '' 'barrier'
     203fi
     204
     205if [ ${churn} -eq ${true} ] ; then
     206    echo "churn: "
     207    run_contend '' 'churn'
     208fi
     209
     210if [ ${daisy_chain} -eq ${true} ] ; then
     211    echo "daisy_chain: "
     212    run_contend '' 'daisy_chain'
     213fi
     214
     215if [ ${hot_potato} -eq ${true} ] ; then
     216    echo "hot_potato: "
     217    run_contend '' 'hot_potato'
     218fi
     219
     220if [ ${pub_sub} -eq ${true} ] ; then
     221    echo "pub_sub: "
     222    run_contend '' 'pub_sub'
     223fi
  • doc/theses/colby_parsons_MMAth/glossary.tex

    r70056ed reb47a80  
    99% \textit{Synonyms : User threads, Lightweight threads, Green threads, Virtual threads, Tasks.}
    1010% }
    11 
     11% C_TODO: replace usages of these acronyms with \acrshort{name}
    1212\newacronym{tls}{TLS}{Thread Local Storage}
    1313\newacronym{api}{API}{Application Program Interface}
     
    1616\newacronym{rtti}{RTTI}{Run-Time Type Information}
    1717\newacronym{fcfs}{FCFS}{First Come First Served}
     18\newacronym{toctou}{TOCTOU}{time-of-check to time-of-use}
  • doc/theses/colby_parsons_MMAth/local.bib

    r70056ed reb47a80  
    4747  publisher={ACM New York, NY, USA}
    4848}
     49
     50@mastersthesis{Beach21,
     51author={{Beach, Andrew James}},
     52title={Exception Handling in C∀},
     53year={2021},
     54publisher="UWSpace",
     55url={http://hdl.handle.net/10012/17617}
     56}
  • doc/theses/colby_parsons_MMAth/text/actors.tex

    r70056ed reb47a80  
    334334
    335335\section{Safety and Productivity}
    336 \CFA's actor system comes with a suite of safety and productivity features. Most of these features are present in \CFA's debug mode, but are removed when code is compiled in nodebug mode. Some of the features include:
     336\CFA's actor system comes with a suite of safety and productivity features. Most of these features are present in \CFA's debug mode, but are removed when code is compiled in nodebug mode. The suit of features include the following.
    337337
    338338\begin{itemize}
  • doc/theses/colby_parsons_MMAth/text/channels.tex

    r70056ed reb47a80  
    55% ======================================================================
    66
    7 Channels were first introduced by Hoare in his paper Communicating Sequentual Processes~\cite{Hoare78}, where he proposes a concurrent language that communicates across processes using input/output channels to send data inbetween processes. Channels are used to perform message passing concurrency, a model of concurrency where threads communicate by sending data to each other, and synchronizing via the passing mechanism. This is an alternative to shared memory concurrency, where threads can communicate directly by changing shared memory state. Most modern concurrent programming languages do not subscribe to just one style of communication between threads, and provide features that support both. Channels as a programming language feature has been popularized in recent years due to the language Go, which encourages the use of channels as its fundamental concurrent feature.
     7Channels were first introduced by Hoare in his paper Communicating Sequentual Processes~\cite{Hoare78}, where he proposes a concurrent language that communicates across processes using input/output channels to send data. Channels are a concurrent language feature used to perform message passing concurrency, a model of concurrency where threads communicate by sending data as messages, and synchronizing via the message passing mechanism. This is an alternative to shared memory concurrency, where threads can communicate directly by changing shared memory state. Most modern concurrent programming languages do not subscribe to just one style of communication between threads, and provide features that support both. Channels as a programming language feature has been popularized in recent years due to the language Go, which encourages the use of channels as its fundamental concurrent feature.
    88
    99\section{Producer-Consumer Problem}
     
    1414
    1515\section{Channel Implementation}
    16 % C_TODO: rewrite to reflect on current impl
     16The channel implementation in \CFA is a near carbon copy of the Go implementation. Experimentation was conducted that varied the producer-consumer problem algorithm and lock type used inside the channel. With the exception of non-FCFS algorithms, no algorithm or lock usage in the channel implementation was found to be consistently more performant that Go's choice of algorithm and lock implementation. As such the research contributions added by \CFA's channel implementation lie in the realm of safety and productivity features.
     17
     18\section{Safety and Productivity}
     19Channels in \CFA come with safety and productivity features to aid users. The features include the following.
     20
     21\begin{itemize}
     22\item Toggle-able statistic collection on channel behvaiour that counts channel operations, and the number of the operations that block. Tracking blocking operations helps users tune their channel size or channel usage when the channel is used for buffering, where the aim is to have as few blocking operations as possible.
     23\item Deadlock detection on deallocation of the channel. If any threads are blocked inside the channel when it terminates it is detected and informs the user, as this would cause a deadlock.
     24\item A \code{flush} routine that delivers copies of an element to all waiting consumers, flushing the buffer. Programmers can use this to easily to broadcast data to multiple consumers. Additionally, the \code{flush} routine is more performant then looping around the \code{insert} operation since it can deliver the elements without having to reaquire mutual exclusion for each element sent.
     25\end{itemize}
     26
     27The other safety and productivity feature of \CFA channels deals with concurrent termination. Terminating concurrent programs is often one of the most difficult parts of writing concurrent code, particularly if graceful termination is needed. The difficulty of graceful termination often arises from the usage of synchronization primitives which need to be handled carefully during shutdown. It is easy to deadlock during termination if threads are left behind on synchronization primitives. Additionally, most synchronization primitives are prone to time-of-check to time-of-use (TOCTOU) issues where there is race between one thread checking the state of a concurrent object and another thread changing the state. TOCTOU issues with synchronization primitives often involve a race between one thread checking the primitive for blocked threads and another thread blocking on it. Channels are a particularly hard synchronization primitive to terminate since both sending and receiving off a channel can block. Thus, improperly handled TOCTOU issues with channels often result in deadlocks as threads trying to perform the termination may end up unexpectedly blocking in their attempt to help other threads exit the system.
     28
     29% C_TODO: add reference to select chapter, add citation to go channels info
     30Go channels provide a set of tools to help with concurrent shutdown. Channels in Go have a \code{close} operation and a \code{select} statement that both can be used to help threads terminate. The \code{select} statement will be discussed in \ref{}, where \CFA's \code{waituntil} statement will be compared with the Go \code{select} statement. The \code{close} operation on a channel in Go changes the state of the channel. When a channel is closed, sends to the channel will panic and additional calls to \code{close} will panic. Receives are handled differently where receivers will never block on a closed channel and will continue to remove elements from the channel. Once a channel is empty, receivers can continue to remove elements, but will receive the zero-value version of the element type. To aid in avoiding unwanted zero-value elements, Go provides the ability to iterate over a closed channel to remove the remaining elements. These design choices for Go channels enforce a specific interaction style with channels during termination, where careful thought is needed to ensure that additional \code{close} calls don't occur and that no sends occur after channels are closed. These design choices fit Go's paradigm of error management, where users are expected to explicitly check for errors, rather than letting errors occur and catching them. If errors need to occur in Go, return codes are used to pass error information where they are needed. Note that panics in Go can be caught, but it is not considered an idiomatic way to write Go programs.
     31
     32While Go's channel closing semantics are powerful enough to perform any concurrent termination needed by a program, their lack of ease of use leaves much to be desired. Since both closing and sending panic, once a channel is closed, a user often has to synchronize the senders to a channel before the channel can be closed to avoid panics. However, in doing so it renders the \code{close} operation nearly useless, as the only utilities it provides are the ability to ensure that receivers no longer block on the channel, and will receive zero-valued elements. This can be useful if the zero-typed element is recognized as a sentinel value, but if another sentinel value is preferred, then \code{close} only provides its non-blocking feature. To avoid TOCTOU issues during shutdown, a busy wait with a \code{select} statement is often used to add or remove elements from a channel. Due to Go's asymmetric approach to channel shutdown, separate synchronization between producers and consumers of a channel has to occur during shutdown.
     33
     34In \CFA, exception handling is an encouraged paradigm and has full language support \cite{}.
     35% \cite{Beach21}. TODO: this citation breaks when compiled. Need to fix and insert above
     36As such \CFA uses an exception based approach to channel shutdown that is symmetric for both producers and consumers, and supports graceful shutdown.Exceptions in \CFA support both termination and resumption.Termination exceptions operate in the same way as exceptions seen in many popular programming languages such as \CC, Python and Java.
     37Resumption exceptions are a style of exception that when caught run the corresponding catch block in the same way that termination exceptions do.
     38The difference between the exception handling mechanisms arises after the exception is handled. In termination handling, the control flow continues into the code following the catch after the exception is handled. In resumption handling, the control flow returns to the site of the \code{throw}, allowing the control to continue where it left off. Note that in resumption, since control can return to the point of error propagation, the stack is not unwound during resumption propagation. In \CFA if a resumption is not handled, it is reraised as a termination. This mechanism can be used to create a flexible and robust termination system for channels.
     39
     40When a channel in \CFA is closed, all subsequent calls to the channel will throw a resumption exception at the caller. If the resumption is handled, then the caller will proceed to attempt to complete their operation. If the resumption is not handled it is then rethrown as a termination exception. Or, if the resumption is handled, but the subsequent attempt at an operation would block, a termination exception is thrown. These termination exceptions allow for non-local transfer that can be used to great effect to eagerly and gracefully shut down a thread. When a channel is closed, if there are any blocked producers or consumers inside the channel, they are woken up and also have a resumption thrown at them. The resumption exception, \code{channel_closed}, has a couple fields to aid in handling the exception. The exception contains a pointer to the channel it was thrown from, and a pointer to an element. In exceptions thrown from remove the element pointer will be null. In the case of insert the element pointer points to the element that the thread attempted to insert. This element pointer allows the handler to know which operation failed and also allows the element to not be lost on a failed insert since it can be moved elsewhere in the handler. Furthermore, due to \CFA's powerful exception system, this data can be used to choose handlers based which channel and operation failed. Exception handlers in \CFA have an optional predicate after the exception type which can be used to optionally trigger or skip handlers based on the content of an exception. It is worth mentioning that the approach of exceptions for termination may incur a larger performance cost during termination that the approach used in Go. This should not be an issue, since termination is rarely an fast-path of an application and ensuring that termination can be implemented correctly with ease is the aim of the exception approach.
     41
     42To highlight the differences between \CFA's and Go's close semantics, an example program is presented. The program is a barrier implemented using two channels shown in Listings~\ref{l:cfa_chan_bar} and \ref{l:go_chan_bar}. Both of these exaples are implmented using \CFA syntax so that they can be easily compared. Listing~\ref{l:go_chan_bar} uses go-style channel close semantics and Listing~\ref{l:cfa_chan_bar} uses \CFA close semantics. In this problem it is infeasible to use the Go \code{close} call since all tasks are both potentially producers and consumers, causing panics on close to be unavoidable. As such in Listing~\ref{l:go_chan_bar} to implement a flush routine for the buffer, a sentinel value of $-1$ has to be used to indicate to threads that they need to leave the barrier. This sentinel value has to be checked at two points. Furthermore, an additional flag \code{done} is needed to communicate to threads once they have left the barrier that they are done. This use of an additional flag or communication method is common in Go channel shutdown code, since to avoid panics on a channel, the shutdown of a channel often has to be communicated with threads before it occurs. In the \CFA version~\ref{l:cfa_chan_bar}, the barrier shutdown results in an exception being thrown at threads operating on it, which informs the threads that they must terminate. This avoids the need to use a separate communication method other than the barrier, and avoids extra conditional checks on the fast path of the barrier implementation. Also note that in the Go version~\ref{l:go_chan_bar}, the size of the barrier channels has to be larger than in the \CFA version to ensure that the main thread does not block when attempting to clear the barrier.
     43
     44\begin{cfacode}[tabsize=3,caption={\CFA channel barrier termination},label={l:cfa_chan_bar}]
     45struct barrier {
     46    channel( int ) barWait;
     47    channel( int ) entryWait;
     48    int size;
     49}
     50void ?{}(barrier & this, int size) with(this) {
     51    barWait{size};
     52    entryWait{size};
     53    this.size = size;
     54    for ( j; size )
     55        insert( *entryWait, j );
     56}
     57
     58void flush(barrier & this) with(this) {
     59    close(barWait);
     60    close(entryWait);
     61}
     62void wait(barrier & this) with(this) {
     63    int ticket = remove( *entryWait );
     64    if ( ticket == size - 1 ) {
     65        for ( j; size - 1 )
     66            insert( *barWait, j );
     67        return;
     68    }
     69    ticket = remove( *barWait );
     70
     71    // last one out
     72    if ( size == 1 || ticket == size - 2 ) {
     73        for ( j; size )
     74            insert( *entryWait, j );
     75    }
     76}
     77barrier b{Tasks};
     78
     79// thread main
     80void main(Task & this) {
     81    try {
     82        for ( ;; ) {
     83            wait( b );
     84        }
     85    } catch ( channel_closed * e ) {}
     86}
     87
     88int main() {
     89    {
     90        Task t[Tasks];
     91
     92        sleep(10`s);
     93        flush( b );
     94    } // wait for tasks to terminate
     95    return 0;
     96}
     97\end{cfacode}
     98
     99\begin{cfacode}[tabsize=3,caption={Go channel barrier termination},label={l:go_chan_bar}]
     100
     101struct barrier {
     102    channel( int ) barWait;
     103    channel( int ) entryWait;
     104    int size;
     105}
     106void ?{}(barrier & this, int size) with(this) {
     107    barWait{size + 1};
     108    entryWait{size + 1};
     109    this.size = size;
     110    for ( j; size )
     111        insert( *entryWait, j );
     112}
     113
     114void flush(barrier & this) with(this) {
     115    insert( *entryWait, -1 );
     116    insert( *barWait, -1 );
     117}
     118void wait(barrier & this) with(this) {
     119    int ticket = remove( *entryWait );
     120    if ( ticket == -1 ) {
     121        insert( *entryWait, -1 );
     122        return;
     123    }
     124    if ( ticket == size - 1 ) {
     125        for ( j; size - 1 )
     126            insert( *barWait, j );
     127        return;
     128    }
     129    ticket = remove( *barWait );
     130    if ( ticket == -1 ) {
     131        insert( *barWait, -1 );
     132        return;
     133    }
     134
     135    // last one out
     136    if ( size == 1 || ticket == size - 2 ) {
     137        for ( j; size )
     138            insert( *entryWait, j );
     139    }
     140}
     141barrier b;
     142
     143bool done = false;
     144// thread main
     145void main(Task & this) {
     146    for ( ;; ) {
     147        if ( done ) break;
     148        wait( b );
     149    }
     150}
     151
     152int main() {
     153    {
     154        Task t[Tasks];
     155
     156        sleep(10`s);
     157        done = true;
     158
     159        flush( b );
     160    } // wait for tasks to terminate
     161    return 0;
     162}
     163\end{cfacode}
     164
     165In Listing~\ref{l:cfa_resume} an example of channel closing with resumption is used. This program uses resumption in the \code{Consumer} thread main to ensure that all elements in the channel are removed before the consumer thread terminates. The producer only has a \code{catch} so the moment it receives an exception it terminates, whereas the consumer will continue to remove from the closed channel via handling resumptions until the buffer is empty, which then throws a termination exception. If the same program was implemented in Go it would require explicit synchronization with both producers and consumers by some mechanism outside the channel to ensure that all elements were removed before task termination.
     166
     167\begin{cfacode}[tabsize=3,caption={\CFA channel resumption usage},label={l:cfa_resume}]
     168channel( int ) chan{ 128 };
     169
     170// Consumer thread main
     171void main(Consumer & this) {
     172    size_t runs = 0;
     173    try {
     174        for ( ;; ) {
     175            remove( chan );
     176        }
     177    } catchResume ( channel_closed * e ) {}
     178    catch ( channel_closed * e ) {}
     179}
     180
     181// Producer thread main
     182void main(Producer & this) {
     183    int j = 0;
     184    try {
     185        for ( ;;j++ ) {
     186            insert( chan, j );
     187        }
     188    } catch ( channel_closed * e ) {}
     189}
     190
     191int main( int argc, char * argv[] ) {
     192    {
     193        Consumers c[4];
     194        Producer p[4];
     195
     196        sleep(10`s);
     197
     198        for ( i; Channels )
     199            close( channels[i] );
     200    }
     201    return 0;
     202}
     203\end{cfacode}
     204
     205\section{Performance}
     206
     207Given that the base implementation of the \CFA channels is very similar to the Go implementation, this section aims to show that the performance of the two implementations are comparable. One microbenchmark is conducted to compare Go and \CFA. The benchmark is a ten second experiment where producers and consumers operate on a channel in parallel and throughput is measured. The number of cores is varied to measure how throughtput scales. The cores are divided equally between producers and consumers, with one producer or consumer owning each core. The results of the benchmark are shown in Figure~\ref{f:chanPerf}. The performance of Go and \CFA channels on this microbenchmark is comparable. Note, it is expected for the performance to decline as the number of cores increases as the channel operations all occur in a critical section so an increase in cores results in higher contention with no increase in parallelism.
     208
    17209
    18210\begin{figure}
    19 \begin{lrbox}{\myboxA}
    20 \begin{cfacode}[aboveskip=0pt,belowskip=0pt,basicstyle=\footnotesize]
    21 int size;
    22 int front, back, count;
    23 TYPE * buffer;
    24 cond_var prods, cons;
    25 lock mx;
    26 
    27 void insert( TYPE elem ){
    28 
    29     lock(mx);
    30 
    31     // wait if buffer is full
    32     // insert finished by consumer
    33     if (count == size){
    34         wait(prods, mx, &elem);
    35         // no reacquire
    36         return;
    37     }
    38 
    39 
    40 
    41 
    42     if (!empty(cons)){
    43         // do consumer work
    44         front(cons) = &elem;
    45         notify_one(cons);
    46     }
    47 
    48 
    49    
    50     else
    51         insert_(chan, elem);
    52 
    53 
    54     unlock(mx);
    55 }
    56 \end{cfacode}
    57 \end{lrbox}
    58 \begin{lrbox}{\myboxB}
    59 \begin{cfacode}[aboveskip=0pt,belowskip=0pt,basicstyle=\footnotesize]
    60 int size;
    61 int front, back, count;
    62 TYPE * buffer;
    63 thread * chair;
    64 TYPE * chair_elem;
    65 lock c_lock, p_lock, mx;
    66 void insert( TYPE elem ){
    67     lock(p_lock);
    68     lock(mx);
    69 
    70     // wait if buffer is full
    71     // insert finished by consumer
    72     if (count == size){
    73         chair = this_thread();
    74         chair_elem = &elem;
    75         unlock(mx);
    76         park();
    77         unlock(p_lock);
    78         return;
    79     }
    80 
    81     if (chair != 0p){
    82         // do consumer work
    83         chair_elem = &elem;
    84         unpark(chair);
    85         chair = 0p;
    86         unlock(mx);
    87         unlock(p_lock);
    88         return;
    89     } else
    90         insert_(chan, elem);
    91 
    92     unlock(mx);
    93     unlock(p_lock);
    94 }
    95 \end{cfacode}
    96 \end{lrbox}
    97 \subfloat[Go implementation]{\label{f:GoChanImpl}\usebox\myboxA}
    98 \hspace{5pt}
    99 \vrule
    100 \hspace{5pt}
    101 \subfloat[\CFA implementation]{\label{f:cfaChanImpl}\usebox\myboxB}
    102 \caption{Comparison of channel implementations}
    103 \label{f:ChanComp}
     211    \centering
     212    \begin{subfigure}{0.5\textwidth}
     213        \centering
     214        \scalebox{0.5}{\input{figures/nasus_Channel_Contention.pgf}}
     215        \subcaption{AMD \CFA Channel Benchmark}\label{f:chanAMD}
     216    \end{subfigure}\hfill
     217    \begin{subfigure}{0.5\textwidth}
     218        \centering
     219        \scalebox{0.5}{\input{figures/pyke_Channel_Contention.pgf}}
     220        \subcaption{Intel \CFA Channel Benchmark}\label{f:chanIntel}
     221    \end{subfigure}
     222    \caption{The channel contention benchmark comparing \CFA and Go channel throughput (higher is better).}
     223    \label{f:chanPerf}
    104224\end{figure}
    105 
    106 Go and \CFA have similar channel implementation when it comes to how they solve the producer-consumer problem. Both implementations attempt to minimize double blocking by requiring cooperation from signalling threads. If a consumer or producer is blocked, whichever thread signals it to proceed completes the blocked thread's operation for them so that the blocked thread does not need to acquire any locks. Channels in \CFA go a step further in preventing double blocking. In Figure~\ref{f:ChanComp}, the producer-consumer solutions used by Go and \CFA are presented. Some liberties are taken to simplify the code, such as removing special casing for zero-size buffers, and abstracting the non-concurrent insert into a helper, \code{insert_}. Only the insert routine is presented, as the remove routine is symmetric.
    107 In the Go implementation \ref{f:GoChanImpl}, first mutual exclusion is acquired. Then if the buffer is full the producer waits on a condition variable and releases the mx lock. Note it will not reacquire the lock upon waking. The 3rd argument to \code{wait} is a pointer that is stored per thread on the condition variable. This pointer can be accessed when the waiting thread is at the from of the condition variable's queue by calling \code{front}. This allows arbitrary data to be stored with waiting tasks in the queue, eliminating the need for a second queue just for data. This producer that waits stores a pointer to the element it wanted to insert, so that the consumer that signals them can insert the element for the producer before signalling. If the buffer is not full a producer will proceed to check if any consumers are waiting. If so then the producer puts its value directly into the consumers hands, bypassing the usage of the buffer. If there are no waiting consumers, the producer inserts the value into the buffer and leaves.
    108 The \CFA implementation \ref{f:cfaChanImpl} it follows a similar pattern to the Go implementation, but instead uses three locks and no condition variables. The main idea is that the \CFA implementation forgoes the use of a condition variable by making all producers wait on the outer lock \code{p_lock} once a single producer has to wait inside the critical section. This also happens with consumers. This further reduces double blocking by ensuring that the only threads that can enter the critical section after a producer is blocked are consumers and vice versa. Additionally, entering consumers to not have to contend for the \code{mx} lock once producers are waiting and vice versa. Since only at most one thread will be waiting in the critical section, condition variables are not needed and a barebones thread \code{park} and \code{unpark} will suffice. The operation \code{park} blocks a thread and \code{unpark} is passed a pointer to a thread to wake up. This algorithm can be written using a single condition variable instead of park/unpark, but using park/unpark eliminates the need for any queueing operations. Now with the understanding of park/unpark it is clear to see the similarity between the two algorithms. The main difference being the \code{p_lock} acquisitions and releases. Note that \code{p_lock} is held until after waking from \code{park}, which provides the guarantee than no other producers will enter until the first producer to enter makes progress.
    109 
    110 \section{Safety and Productivity}
    111 
    112 
    113 \section{Performance}
  • doc/theses/colby_parsons_MMAth/text/mutex_stmt.tex

    r70056ed reb47a80  
    1818
    1919\section{Other Languages}
    20 There are similar concepts to the mutex statement that exist in other languages. Java has a feature called a synchronized statement, which looks identical to \CFA's mutex statement, but it has some differences. The synchronized statement only accepts one item in its clause. Any object can be passed to the synchronized statement in Java since all objects in Java are monitors, and the synchronized statement acquires that object's monitor. In \CC there is a feature in the \code{<mutex>} header called scoped\_lock, which is also similar to the mutex statement. The scoped\_lock is a class that takes in any number of locks in its constructor, and acquires them in a deadlock-free manner. It then releases them when the scoped\_lock object is deallocated, thus using RAII. An example of \CC scoped\_lock usage is shown in Listing~\ref{l:cc_scoped_lock}.
     20There are similar concepts to the mutex statement that exist in other languages. Java has a feature called a synchronized statement, which looks identical to \CFA's mutex statement, but it has some differences. The synchronized statement only accepts a single object in its clause. Any object can be passed to the synchronized statement in Java since all objects in Java are monitors, and the synchronized statement acquires that object's monitor. In \CC there is a feature in the standard library \code{<mutex>} header called scoped\_lock, which is also similar to the mutex statement. The scoped\_lock is a class that takes in any number of locks in its constructor, and acquires them in a deadlock-free manner. It then releases them when the scoped\_lock object is deallocated, thus using RAII. An example of \CC scoped\_lock usage is shown in Listing~\ref{l:cc_scoped_lock}.
    2121
    2222\begin{cppcode}[tabsize=3,caption={\CC scoped\_lock usage},label={l:cc_scoped_lock}]
     
    2929
    3030\section{\CFA implementation}
    31 The \CFA mutex statement can be seen as a combination of the similar featurs in Java and \CC. It can acquire more that one lock in a deadlock-free manner, and releases them via RAII like \CC, however the syntax is identical to the Java synchronized statement. This syntactic choice was made so that the body of the mutex statement is its own scope. Compared to the scoped\_lock, which relies on its enclosing scope, the mutex statement's introduced scope can provide visual clarity as to what code is being protected by the mutex statement, and where the mutual exclusion ends. \CFA's mutex statement and \CC's scoped\_lock both use parametric polymorphism to allow user defined types to work with the feature. \CFA's implementation requires types to support the routines \code{lock()} and \code{unlock()}, whereas \CC requires those routines, plus \code{try_lock()}. The scoped\_lock requires an additional routine since it differs from the mutex statement in how it implements deadlock avoidance.
     31The \CFA mutex statement takes some ideas from both the Java and \CC features. The mutex statement can acquire more that one lock in a deadlock-free manner, and releases them via RAII like \CC, however the syntax is identical to the Java synchronized statement. This syntactic choice was made so that the body of the mutex statement is its own scope. Compared to the scoped\_lock, which relies on its enclosing scope, the mutex statement's introduced scope can provide visual clarity as to what code is being protected by the mutex statement, and where the mutual exclusion ends. \CFA's mutex statement and \CC's scoped\_lock both use parametric polymorphism to allow user defined types to work with the feature. \CFA's implementation requires types to support the routines \code{lock()} and \code{unlock()}, whereas \CC requires those routines, plus \code{try_lock()}. The scoped\_lock requires an additional routine since it differs from the mutex statement in how it implements deadlock avoidance.
    3232
    33 The parametric polymorphism allows for locking to be defined for types that may want convenient mutual exclusion. An example is \CFA's \code{sout}. \code{sout} is \CFA's output stream, similar to \CC's \code{cout}. \code{sout} has routines that match the mutex statement trait, so the mutex statement can be used to lock the output stream while producing output. In this case, the mutex statement allows the programmer to acquire mutual exclusion over an object without having to know the internals of the object or what locks it needs to acquire. The ability to do so provides both improves safety and programmer productivity since it abstracts away the concurrent details and provides an interface for optional thread-safety. This is a commonly used feature when producing output from a concurrent context, since producing output is not thread safe by default. This use case is shown in Listing~\ref{l:sout}.
     33The parametric polymorphism allows for locking to be defined for types that may want convenient mutual exclusion. An example of one such use case in \CFA is \code{sout}. The output stream in \CFA is called \code{sout}, and functions similarly to \CC's \code{cout}. \code{sout} has routines that satisfy the mutex statement trait, so the mutex statement can be used to lock the output stream while producing output. In this case, the mutex statement allows the programmer to acquire mutual exclusion over an object without having to know the internals of the object or what locks need to be acquired. The ability to do so provides both improves safety and programmer productivity since it abstracts away the concurrent details and provides an interface for optional thread-safety. This is a commonly used feature when producing output from a concurrent context, since producing output is not thread safe by default. This use case is shown in Listing~\ref{l:sout}.
    3434
    3535\begin{cfacode}[tabsize=3,caption={\CFA sout with mutex statement},label={l:sout}]
    36     mutex( sout )
    37         sout | "This output is protected by mutual exclusion!";
     36mutex( sout )
     37    sout | "This output is protected by mutual exclusion!";
    3838\end{cfacode}
    3939
    4040\section{Deadlock Avoidance}
    41 The mutex statement uses the deadlock prevention technique of lock ordering, where the circular-wait condition of a deadlock cannot occur if all locks are acquired in the same order. The scoped\_lock uses a deadlock avoidance algorithm where all locks after the first are acquired using \code{try_lock} and if any of the attempts to lock fails, all locks so far are released. This repeats until all locks are acquired successfully. The deadlock avoidance algorithm used by scoped\_lock is shown in Listing~\ref{l:cc_deadlock_avoid}. The algorithm presented is taken straight from the \code{<mutex>} header source, with some renaming and comments for clarity.
     41The mutex statement uses the deadlock prevention technique of lock ordering, where the circular-wait condition of a deadlock cannot occur if all locks are acquired in the same order. The scoped\_lock uses a deadlock avoidance algorithm where all locks after the first are acquired using \code{try_lock} and if any of the attempts to lock fails, all locks so far are released. This repeats until all locks are acquired successfully. The deadlock avoidance algorithm used by scoped\_lock is shown in Listing~\ref{l:cc_deadlock_avoid}. The algorithm presented is taken directly from the source code of the \code{<mutex>} header, with some renaming and comments for clarity.
    4242
    4343\begin{cppcode}[tabsize=3,caption={\CC scoped\_lock deadlock avoidance algorithm},label={l:cc_deadlock_avoid}]
     
    5959\end{cppcode}
    6060
    61 The algorithm in \ref{l:cc_deadlock_avoid} successfully avoids deadlock, however there is a potential livelock scenario. Given two threads $A$ and $B$, who create a scoped\_lock with two locks $L1$ and $L2$, a livelock can form as follows. Thread $A$ creates a scoped\_lock with $L1, L2$ in that order, $B$ creates a scoped lock with the order $L2, L1$. Both threads acquire the first lock in their order and then fail the try\_lock since the other lock is held. They then reset their start lock to be their 2nd lock and try again. This time $A$ has order $L2, L1$, and $B$ has order $L1, L2$. This is identical to the starting setup, but with the ordering swapped among threads. As such if they each acquire their first lock before the other acquires their second, they can livelock indefinitely.
     61The algorithm in \ref{l:cc_deadlock_avoid} successfully avoids deadlock, however there is a potential livelock scenario. Given two threads $A$ and $B$, who create a scoped\_lock with two locks $L1$ and $L2$, a livelock can form as follows. Thread $A$ creates a scoped\_lock with $L1$, $L2$, and $B$ creates a scoped lock with the order $L2$, $L1$. Both threads acquire the first lock in their order and then fail the try\_lock since the other lock is held. They then reset their start lock to be their 2nd lock and try again. This time $A$ has order $L2$, $L1$, and $B$ has order $L1$, $L2$. This is identical to the starting setup, but with the ordering swapped among threads. As such, if they each acquire their first lock before the other acquires their second, they can livelock indefinitely.
     62
    6263The lock ordering algorithm used in the mutex statement in \CFA is both deadlock and livelock free. It sorts the locks based on memory address and then acquires them. For locks fewer than 7, it sorts using hard coded sorting methods that perform the minimum number of swaps for a given number of locks. For 7 or more locks insertion sort is used. These sorting algorithms were chosen since it is rare to have to hold more than  a handful of locks at a time. It is worth mentioning that the downside to the sorting approach is that it is not fully compatible with usages of the same locks outside the mutex statement. If more than one lock is held by a mutex statement, if more than one lock is to be held elsewhere, it must be acquired via the mutex statement, or else the required ordering will not occur. Comparitively, if the scoped\_lock is used and the same locks are acquired elsewhere, there is no concern of the scoped\_lock deadlocking, due to its avoidance scheme, but it may livelock.
    6364
  • doc/theses/colby_parsons_MMAth/thesis.tex

    r70056ed reb47a80  
    113113%----------------------------------------------------------------------
    114114
    115 % \input{intro}
     115\input{intro}
    116116
    117 % \input{CFA_intro}
     117\input{CFA_intro}
    118118
    119 % \input{CFA_concurrency}
     119\input{CFA_concurrency}
    120120
    121 % \input{mutex_stmt}
     121\input{mutex_stmt}
    122122
    123123\input{channels}
    124124
    125 % \input{actors}
     125\input{actors}
    126126
    127127\clearpage
  • libcfa/src/concurrency/actor.hfa

    r70056ed reb47a80  
    3535
    3636// show stats
    37 // #define STATS
     37// #define ACTOR_STATS
    3838
    3939// forward decls
     
    122122    copy_queue * c_queue;           // current queue
    123123    volatile bool being_processed;  // flag to prevent concurrent processing
    124     #ifdef STATS
     124    #ifdef ACTOR_STATS
    125125    unsigned int id;
    126126    size_t missed;                  // transfers skipped due to being_processed flag being up
     
    132132    c_queue = owned_queue;
    133133    being_processed = false;
    134     #ifdef STATS
     134    #ifdef ACTOR_STATS
    135135    id = i;
    136136    missed = 0;
     
    153153    // check if queue is being processed elsewhere
    154154    if ( unlikely( being_processed ) ) {
    155         #ifdef STATS
     155        #ifdef ACTOR_STATS
    156156        missed++;
    157157        #endif
     
    175175struct worker_info {
    176176    volatile unsigned long long stamp;
    177     #ifdef STATS
     177    #ifdef ACTOR_STATS
    178178    size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
    179179    unsigned long long processed;
     
    182182};
    183183static inline void ?{}( worker_info & this ) {
    184     #ifdef STATS
     184    #ifdef ACTOR_STATS
    185185    this.stolen_from = 0;
    186186    this.try_steal = 0;                             // attempts to steal
     
    194194}
    195195
    196 // #ifdef STATS
     196// #ifdef ACTOR_STATS
    197197// unsigned int * stolen_arr;
    198198// unsigned int * replaced_queue;
     
    206206};
    207207
    208 #ifdef STATS
     208#ifdef ACTOR_STATS
    209209// aggregate counters for statistics
    210210size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     
    235235}; // executor
    236236
    237 // #ifdef STATS
     237// #ifdef ACTOR_STATS
    238238// __spinlock_t out_lock;
    239239// #endif
    240240static inline void ^?{}( worker & mutex this ) with(this) {
    241     #ifdef STATS
     241    #ifdef ACTOR_STATS
    242242    __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    243243    __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     
    276276        no_steal = true;
    277277   
    278     #ifdef STATS
     278    #ifdef ACTOR_STATS
    279279    // stolen_arr = aalloc( nrqueues );
    280280    // replaced_queue = aalloc( nrqueues );
     
    341341    } // for
    342342
    343     #ifdef STATS
     343    #ifdef ACTOR_STATS
    344344    size_t misses = 0;
    345345    for ( i; nrqueues ) {
     
    358358    if ( seperate_clus ) delete( cluster );
    359359
    360     #ifdef STATS // print formatted stats
     360    #ifdef ACTOR_STATS // print formatted stats
    361361    printf("    Actor System Stats:\n");
    362362    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     
    404404    ticket = __get_next_ticket( *__actor_executor_ );
    405405    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    406     #ifdef STATS
     406    #ifdef ACTOR_STATS
    407407    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    408408    #endif
     
    513513            continue;
    514514
    515         #ifdef STATS
     515        #ifdef ACTOR_STATS
    516516        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    517517        if ( curr_steal_queue ) {
     
    526526        #else
    527527        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    528         #endif // STATS
     528        #endif // ACTOR_STATS
    529529
    530530        return;
     
    558558
    559559void main( worker & this ) with(this) {
    560     // #ifdef STATS
     560    // #ifdef ACTOR_STATS
    561561    // for ( i; executor_->nrqueues ) {
    562562    //     replaced_queue[i] = 0;
     
    587587        }
    588588        transfer( *curr_work_queue, &current_queue );
    589         #ifdef STATS
     589        #ifdef ACTOR_STATS
    590590        executor_->w_infos[id].gulps++;
    591         #endif // STATS
     591        #endif // ACTOR_STATS
    592592        #ifdef __STEAL
    593593        if ( isEmpty( *current_queue ) ) {
     
    599599            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    600600           
    601             #ifdef STATS
     601            #ifdef ACTOR_STATS
    602602            executor_->w_infos[id].try_steal++;
    603             #endif // STATS
     603            #endif // ACTOR_STATS
    604604           
    605605            steal_work( this, start + prng( range ) );
     
    608608        #endif // __STEAL
    609609        while ( ! isEmpty( *current_queue ) ) {
    610             #ifdef STATS
     610            #ifdef ACTOR_STATS
    611611            executor_->w_infos[id].processed++;
    612612            #endif
     
    636636
    637637static inline void __reset_stats() {
    638     #ifdef STATS
     638    #ifdef ACTOR_STATS
    639639    __total_tries = 0;
    640640    __total_stolen = 0;
  • libcfa/src/concurrency/channel.hfa

    r70056ed reb47a80  
    33#include <locks.hfa>
    44#include <list.hfa>
    5 
    6 #define __COOP_CHANNEL
    7 #ifdef __PREVENTION_CHANNEL
    8 forall( T ) {
    9 struct channel {
    10     size_t size, count, front, back;
    11     T * buffer;
    12     thread$ * chair;
    13     T * chair_elem;
    14     exp_backoff_then_block_lock c_lock, p_lock;
    15     __spinlock_t mutex_lock;
    16     char __padding[64]; // avoid false sharing in arrays of channels
    17 };
    18 
    19 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    20     size = _size;
    21     front = back = count = 0;
    22     buffer = aalloc( size );
    23     chair = 0p;
    24     mutex_lock{};
    25     c_lock{};
    26     p_lock{};
    27 }
    28 
    29 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    30 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    31 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    32 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    33 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
    34 
    35 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    36     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    37     count += 1;
    38     back++;
    39     if ( back == size ) back = 0;
    40 }
    41 
    42 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    43     lock( p_lock );
    44     lock( mutex_lock __cfaabi_dbg_ctx2 );
    45 
    46     // have to check for the zero size channel case
    47     if ( size == 0 && chair != 0p ) {
    48         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    49         unpark( chair );
    50         chair = 0p;
    51         unlock( mutex_lock );
    52         unlock( p_lock );
    53         unlock( c_lock );
    54         return;
    55     }
    56 
    57     // wait if buffer is full, work will be completed by someone else
    58     if ( count == size ) {
    59         chair = active_thread();
    60         chair_elem = &elem;
    61         unlock( mutex_lock );
    62         park( );
    63         return;
    64     } // if
    65 
    66     if ( chair != 0p ) {
    67         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    68         unpark( chair );
    69         chair = 0p;
    70         unlock( mutex_lock );
    71         unlock( p_lock );
    72         unlock( c_lock );
    73         return;
    74     }
    75     insert_( chan, elem );
    76 
    77     unlock( mutex_lock );
    78     unlock( p_lock );
    79 }
    80 
    81 static inline T remove( channel(T) & chan ) with(chan) {
    82     lock( c_lock );
    83     lock( mutex_lock __cfaabi_dbg_ctx2 );
    84     T retval;
    85 
    86     // have to check for the zero size channel case
    87     if ( size == 0 && chair != 0p ) {
    88         memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
    89         unpark( chair );
    90         chair = 0p;
    91         unlock( mutex_lock );
    92         unlock( p_lock );
    93         unlock( c_lock );
    94         return retval;
    95     }
    96 
    97     // wait if buffer is empty, work will be completed by someone else
    98     if ( count == 0 ) {
    99         chair = active_thread();
    100         chair_elem = &retval;
    101         unlock( mutex_lock );
    102         park( );
    103         return retval;
    104     }
    105 
    106     // Remove from buffer
    107     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    108     count -= 1;
    109     front++;
    110     if ( front == size ) front = 0;
    111 
    112     if ( chair != 0p ) {
    113         insert_( chan, *chair_elem );  // do waiting producer work
    114         unpark( chair );
    115         chair = 0p;
    116         unlock( mutex_lock );
    117         unlock( p_lock );
    118         unlock( c_lock );
    119         return retval;
    120     }
    121 
    122     unlock( mutex_lock );
    123     unlock( c_lock );
    124     return retval;
    125 }
    126 
    127 } // forall( T )
    128 #endif
    129 
    130 #ifdef __COOP_CHANNEL
     5#include <mutex_stmt.hfa>
    1316
    1327// link field used for threads waiting on channel
     
    14823}
    14924
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
     30
     31// returns true if woken due to shutdown
     32// blocks thread on list and releases passed lock
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
     36    unlock( lock );
     37    park();
     38    return w.elem == 0p;
     39}
     40
     41// void * used for some fields since exceptions don't work with parametric polymorphism currently
     42exception channel_closed {
     43    // on failed insert elem is a ptr to the element attempting to be inserted
     44    // on failed remove elem ptr is 0p
     45    // on resumption of a failed insert this elem will be inserted
     46    // so a user may modify it in the resumption handler
     47    void * elem;
     48
     49    // pointer to chan that is closed
     50    void * closed_chan;
     51};
     52vtable(channel_closed) channel_closed_vt;
     53
     54// #define CHAN_STATS // define this to get channel stats printed in dtor
     55
    15056forall( T ) {
    15157
    152 struct channel {
    153     size_t size;
    154     size_t front, back, count;
     58struct __attribute__((aligned(128))) channel {
     59    size_t size, front, back, count;
    15560    T * buffer;
    156     dlist( wait_link ) prods, cons;
    157     exp_backoff_then_block_lock mutex_lock;
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
     64    #ifdef CHAN_STATS
     65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     66    #endif
    15867};
    15968
     
    16574    cons{};
    16675    mutex_lock{};
     76    closed = false;
     77    #ifdef CHAN_STATS
     78    blocks = 0;
     79    operations = 0;
     80    #endif
    16781}
    16882
    16983static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    170 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     84static inline void ^?{}( channel(T) &c ) with(c) {
     85    #ifdef CHAN_STATS
     86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
     87    #endif
     88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
     89    delete( buffer );
     90}
    17191static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    17292static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     
    17595static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    17696
    177 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     97// closes the channel and notifies all blocked threads
     98static inline void close( channel(T) & chan ) with(chan) {
     99    lock( mutex_lock );
     100    closed = true;
     101
     102    // flush waiting consumers and producers
     103    while ( has_waiting_consumers( chan ) ) {
     104        cons`first.elem = 0p;
     105        wake_one( cons );
     106    }
     107    while ( has_waiting_producers( chan ) ) {
     108        prods`first.elem = 0p;
     109        wake_one( prods );
     110    }
     111    unlock(mutex_lock);
     112}
     113
     114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
     115
     116static inline void flush( channel(T) & chan, T elem ) with(chan) {
     117    lock( mutex_lock );
     118    while ( count == 0 && !cons`isEmpty ) {
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
     121    }
     122    unlock( mutex_lock );
     123}
     124
     125// handles buffer insert
     126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    178127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    179128    count += 1;
     
    182131}
    183132
    184 static inline void wake_one( dlist( wait_link ) & queue ) {
    185     wait_link & popped = try_pop_front( queue );
    186     unpark( popped.t );
    187 }
    188 
    189 static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
    190     wait_link w{ active_thread(), elem_ptr };
    191     insert_last( queue, w );
    192     unlock( lock );
    193     park();
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
     141// needed to avoid an extra copy in closed case
     142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     143    lock( mutex_lock );
     144    #ifdef CHAN_STATS
     145    operations++;
     146    #endif
     147    if ( count == size ) { unlock( mutex_lock ); return false; }
     148    __do_insert( chan, elem );
     149    unlock( mutex_lock );
     150    return true;
     151}
     152
     153// attempts a nonblocking insert
     154// returns true if insert was successful, false otherwise
     155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     156
     157// handles closed case of insert routine
     158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
     160    throwResume except; // throw closed resumption
     161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    194162}
    195163
    196164static inline void insert( channel(T) & chan, T elem ) with(chan) {
    197     lock( mutex_lock );
     165    // check for close before acquire mx
     166    if ( unlikely(closed) ) {
     167        __closed_insert( chan, elem );
     168        return;
     169    }
     170
     171    lock( mutex_lock );
     172
     173    #ifdef CHAN_STATS
     174    if ( !closed ) operations++;
     175    #endif
     176
     177    // if closed handle
     178    if ( unlikely(closed) ) {
     179        unlock( mutex_lock );
     180        __closed_insert( chan, elem );
     181        return;
     182    }
    198183
    199184    // have to check for the zero size channel case
     
    202187        wake_one( cons );
    203188        unlock( mutex_lock );
    204         return;
     189        return true;
    205190    }
    206191
    207192    // wait if buffer is full, work will be completed by someone else
    208193    if ( count == size ) {
    209         block( prods, &elem, mutex_lock );
     194        #ifdef CHAN_STATS
     195        blocks++;
     196        #endif
     197
     198        // check for if woken due to close
     199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     200            __closed_insert( chan, elem );
    210201        return;
    211202    } // if
     
    214205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    215206        wake_one( cons );
    216     } else insert_( chan, elem );
     207    } else __buf_insert( chan, elem );
    217208   
    218209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     216    count -= 1;
     217    front = (front + 1) % size;
     218}
     219
     220// does the buffer remove and potentially does waiting producer work
     221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     222    __buf_remove( chan, retval );
     223    if (count == size - 1 && !prods`isEmpty ) {
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     225        wake_one( prods );
     226    }
     227}
     228
     229// needed to avoid an extra copy in closed case and single return val case
     230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     231    lock( mutex_lock );
     232    #ifdef CHAN_STATS
     233    operations++;
     234    #endif
     235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     236    __do_remove( chan, retval );
     237    unlock( mutex_lock );
     238    return true;
     239}
     240
     241// attempts a nonblocking remove
     242// returns [T, true] if insert was successful
     243// returns [T, false] if insert was successful (T uninit)
     244static inline [T, bool] try_remove( channel(T) & chan ) {
     245    T retval;
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
     250    T retval;
     251    __internal_try_remove( chan, retval );
     252    return retval;
     253}
     254
     255// handles closed case of insert routine
     256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
     258    throwResume except; // throw resumption
     259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    219260}
    220261
    221262static inline T remove( channel(T) & chan ) with(chan) {
    222     lock( mutex_lock );
    223263    T retval;
     264    if ( unlikely(closed) ) {
     265        __closed_remove( chan, retval );
     266        return retval;
     267    }
     268    lock( mutex_lock );
     269
     270    #ifdef CHAN_STATS
     271    if ( !closed ) operations++;
     272    #endif
     273
     274    if ( unlikely(closed) ) {
     275        unlock( mutex_lock );
     276        __closed_remove( chan, retval );
     277        return retval;
     278    }
    224279
    225280    // have to check for the zero size channel case
     
    233288    // wait if buffer is empty, work will be completed by someone else
    234289    if (count == 0) {
    235         block( cons, &retval, mutex_lock );
     290        #ifdef CHAN_STATS
     291        blocks++;
     292        #endif
     293        // check for if woken due to close
     294        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     295            __closed_remove( chan, retval );
    236296        return retval;
    237297    }
    238298
    239299    // Remove from buffer
    240     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    241     count -= 1;
    242     front = (front + 1) % size;
    243 
    244     if (count == size - 1 && !prods`isEmpty ) {
    245         insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
    246         wake_one( prods );
    247     }
     300    __do_remove( chan, retval );
    248301
    249302    unlock( mutex_lock );
     
    251304}
    252305} // forall( T )
    253 #endif
    254 
    255 #ifdef __BARGE_CHANNEL
    256 forall( T ) {
    257 struct channel {
    258     size_t size;
    259     size_t front, back, count;
    260     T * buffer;
    261     fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
    262     exp_backoff_then_block_lock mutex_lock;
    263 };
    264 
    265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    266     size = _size;
    267     front = back = count = 0;
    268     buffer = aalloc( size );
    269     prods{};
    270     cons{};
    271     mutex_lock{};
    272 }
    273 
    274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    281 
    282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    283     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    284     count += 1;
    285     back++;
    286     if ( back == size ) back = 0;
    287 }
    288 
    289 
    290 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    291     lock( mutex_lock );
    292 
    293     while ( count == size ) {
    294         wait( prods, mutex_lock );
    295     } // if
    296 
    297     insert_( chan, elem );
    298    
    299     if ( !notify_one( cons ) && count < size )
    300         notify_one( prods );
    301 
    302     unlock( mutex_lock );
    303 }
    304 
    305 static inline T remove( channel(T) & chan ) with(chan) {
    306     lock( mutex_lock );
    307     T retval;
    308 
    309     while (count == 0) {
    310         wait( cons, mutex_lock );
    311     }
    312 
    313     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    314     count -= 1;
    315     front = (front + 1) % size;
    316 
    317     if ( !notify_one( prods ) && count > 0 )
    318         notify_one( cons );
    319 
    320     unlock( mutex_lock );
    321     return retval;
    322 }
    323 
    324 } // forall( T )
    325 #endif
    326 
    327 #ifdef __NO_WAIT_CHANNEL
    328 forall( T ) {
    329 struct channel {
    330     size_t size;
    331     size_t front, back, count;
    332     T * buffer;
    333     thread$ * chair;
    334     T * chair_elem;
    335     exp_backoff_then_block_lock c_lock, p_lock;
    336     __spinlock_t mutex_lock;
    337 };
    338 
    339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    340     size = _size;
    341     front = back = count = 0;
    342     buffer = aalloc( size );
    343     chair = 0p;
    344     mutex_lock{};
    345     c_lock{};
    346     p_lock{};
    347     lock( c_lock );
    348 }
    349 
    350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
    355 
    356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    357     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    358     count += 1;
    359     back++;
    360     if ( back == size ) back = 0;
    361 }
    362 
    363 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    364     lock( p_lock );
    365     lock( mutex_lock __cfaabi_dbg_ctx2 );
    366 
    367     insert_( chan, elem );
    368 
    369     if ( count != size )
    370         unlock( p_lock );
    371 
    372     if ( count == 1 )
    373         unlock( c_lock );
    374        
    375     unlock( mutex_lock );
    376 }
    377 
    378 static inline T remove( channel(T) & chan ) with(chan) {
    379     lock( c_lock );
    380     lock( mutex_lock __cfaabi_dbg_ctx2 );
    381     T retval;
    382 
    383     // Remove from buffer
    384     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    385     count -= 1;
    386     front = (front + 1) % size;
    387 
    388     if ( count != 0 )
    389         unlock( c_lock );
    390 
    391     if ( count == size - 1 )
    392         unlock( p_lock );
    393        
    394     unlock( mutex_lock );
    395     return retval;
    396 }
    397 
    398 } // forall( T )
    399 #endif
  • libcfa/src/concurrency/locks.hfa

    r70056ed reb47a80  
    3232#include <fstream.hfa>
    3333
    34 
    3534// futex headers
    3635#include <linux/futex.h>      /* Definition of FUTEX_* constants */
     
    155154// futex_mutex
    156155
    157 // - No cond var support
    158156// - Kernel thd blocking alternative to the spinlock
    159157// - No ownership (will deadlock on reacq)
     
    185183        int state;
    186184
    187        
    188         // // linear backoff omitted for now
    189         // for( int spin = 4; spin < 1024; spin += spin) {
    190         //      state = 0;
    191         //      // if unlocked, lock and return
    192         //      if (internal_try_lock(this, state)) return;
    193         //      if (2 == state) break;
    194         //      for (int i = 0; i < spin; i++) Pause();
    195         // }
    196 
    197         // no contention try to acquire
    198         if (internal_try_lock(this, state)) return;
     185        for( int spin = 4; spin < 1024; spin += spin) {
     186                state = 0;
     187                // if unlocked, lock and return
     188                if (internal_try_lock(this, state)) return;
     189                if (2 == state) break;
     190                for (int i = 0; i < spin; i++) Pause();
     191        }
     192
     193        // // no contention try to acquire
     194        // if (internal_try_lock(this, state)) return;
    199195       
    200196        // if not in contended state, set to be in contended state
     
    209205
    210206static inline void unlock(futex_mutex & this) with(this) {
    211         // if uncontended do atomice unlock and then return
    212         if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
     207        // if uncontended do atomic unlock and then return
     208    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
    213209       
    214210        // otherwise threads are blocked so we must wake one
    215         __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    216211        futex((int *)&val, FUTEX_WAKE, 1);
    217212}
     
    222217// to set recursion count after getting signalled;
    223218static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
     219
     220//-----------------------------------------------------------------------------
     221// go_mutex
     222
     223// - Kernel thd blocking alternative to the spinlock
     224// - No ownership (will deadlock on reacq)
     225// - Golang's flavour of mutex
     226// - Impl taken from Golang: src/runtime/lock_futex.go
     227struct go_mutex {
     228        // lock state any state other than UNLOCKED is locked
     229        // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 };
     230       
     231        // stores a lock state
     232        int val;
     233};
     234
     235static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
     236
     237static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     238        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
     239}
     240
     241static inline int internal_exchange(go_mutex & this, int swap ) with(this) {
     242        return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE);
     243}
     244
     245// if this is called recursively IT WILL DEADLOCK!!!!!
     246static inline void lock(go_mutex & this) with(this) {
     247        int state, init_state;
     248
     249    // speculative grab
     250    state = internal_exchange(this, 1);
     251    if ( !state ) return; // state == 0
     252    init_state = state;
     253    for (;;) {
     254        for( int i = 0; i < 4; i++ ) {
     255            while( !val ) { // lock unlocked
     256                state = 0;
     257                if (internal_try_lock(this, state, init_state)) return;
     258            }
     259            for (int i = 0; i < 30; i++) Pause();
     260        }
     261
     262        while( !val ) { // lock unlocked
     263            state = 0;
     264            if (internal_try_lock(this, state, init_state)) return;
     265        }
     266        sched_yield();
     267       
     268        // if not in contended state, set to be in contended state
     269        state = internal_exchange(this, 2);
     270        if ( !state ) return; // state == 0
     271        init_state = 2;
     272        futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     273    }
     274}
     275
     276static inline void unlock( go_mutex & this ) with(this) {
     277        // if uncontended do atomic unlock and then return
     278    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     279       
     280        // otherwise threads are blocked so we must wake one
     281        futex((int *)&val, FUTEX_WAKE, 1);
     282}
     283
     284static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
     285static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
     286static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    224287
    225288//-----------------------------------------------------------------------------
     
    271334        this.lock_value = 0;
    272335}
     336
     337static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    273338
    274339static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
Note: See TracChangeset for help on using the changeset viewer.