Changeset eb47a80
- Timestamp:
- Mar 30, 2023, 9:48:06 PM (2 months ago)
- Branches:
- ADT, master
- Children:
- 0b66ef9
- Parents:
- 70056ed (diff), 6e83384 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Files:
-
- 41 added
- 13 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
doc/theses/colby_parsons_MMAth/Makefile
r70056ed reb47a80 46 46 figures/nasus_Aggregate_Lock_4 \ 47 47 figures/nasus_Aggregate_Lock_8 \ 48 figures/nasus_Channel_Contention \ 49 figures/pyke_Channel_Contention \ 48 50 } 49 51 -
doc/theses/colby_parsons_MMAth/benchmarks/channels/cfa/contend.cfa
r70056ed reb47a80 8 8 9 9 // user defines this 10 #define BIG 110 // #define BIG 1 11 11 12 12 owner_lock o; 13 13 14 unsigned long longtotal_operations = 0;14 size_t total_operations = 0; 15 15 16 16 struct bigObject { … … 36 36 Channel * channels; 37 37 38 volatilebool cons_done = false, prod_done = false;38 bool cons_done = false, prod_done = false; 39 39 volatile int cons_done_count = 0; 40 40 size_t cons_check = 0, prod_check = 0; … … 48 48 } 49 49 void main(Consumer & this) { 50 unsigned long longruns = 0;50 size_t runs = 0; 51 51 size_t my_check = 0; 52 52 for ( ;; ) { … … 78 78 } 79 79 void main(Producer & this) { 80 unsigned long longruns = 0;80 size_t runs = 0; 81 81 size_t my_check = 0; 82 size_t my_id = this.i; 82 83 for ( ;; ) { 83 84 if ( prod_done ) break; 84 85 #ifdef BIG 85 86 bigObject j{(size_t)runs}; 86 insert( channels[ this.i], j );87 insert( channels[ my_id ], j ); 87 88 my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h); 88 89 #else 89 insert( channels[ this.i], (size_t)runs );90 insert( channels[ my_id ], (size_t)runs ); 90 91 my_check = my_check ^ ((size_t)runs); 91 92 #endif … … 99 100 } 100 101 101 int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {102 size_t Clusters = Processors;102 static inline int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) { 103 size_t Clusters = 1; 103 104 // create a cluster 104 105 cluster clus[Clusters]; 105 106 processor * proc[Processors]; 106 107 for ( i; Processors ) { 107 (*(proc[i] = alloc())){clus[i % Clusters]};108 (*(proc[i] = malloc())){clus[i % Clusters]}; 108 109 } 109 110 … … 120 121 Producer * p[Producers * Channels]; 121 122 122 for ( i; Consumers * Channels ) { 123 (*(c[i] = alloc())){ i % Channels, clus[i % Clusters] }; 124 } 125 126 for ( i; Producers * Channels ) { 127 (*(p[i] = alloc())){ i % Channels, clus[i % Clusters] }; 123 for ( j; Channels ) { 124 for ( i; Producers ) { 125 (*(p[i] = malloc())){ j, clus[j % Clusters] }; 126 } 127 128 for ( i; Consumers ) { 129 (*(c[i] = malloc())){ j, clus[j % Clusters] }; 130 } 128 131 } 129 132 … … 148 151 } 149 152 } 150 151 153 } 152 154 … … 185 187 186 188 int main( int argc, char * argv[] ) { 187 size_t Processors = 1 0, Channels = 1, Producers = 10, Consumers = 10, ChannelSize = 128;189 size_t Processors = 1, Channels = 1, Producers = 1, Consumers = 1, ChannelSize = 128; 188 190 switch ( argc ) { 189 191 case 3: … … 206 208 exit( EXIT_FAILURE ); 207 209 } // switch 208 Producers = Processors ;209 Consumers = Processors ;210 Producers = Processors / 2; 211 Consumers = Processors / 2; 210 212 test(Processors, Channels, Producers, Consumers, ChannelSize); 211 213 } -
doc/theses/colby_parsons_MMAth/benchmarks/channels/go/contend/contend.go
r70056ed reb47a80 64 64 case 3: 65 65 if os.Args[2] != "d" { // default ? 66 ChannelSize, _ = strconv.Atoi( os.Args[ 1] )66 ChannelSize, _ = strconv.Atoi( os.Args[2] ) 67 67 if ChannelSize < 0 { usage(); } 68 68 } // if … … 78 78 } // switch 79 79 runtime.GOMAXPROCS( Processors ); 80 ProdsPerChan = Processors 81 ConsPerChan = Processors 80 ProdsPerChan = Processors /2 81 ConsPerChan = Processors / 2 82 82 83 83 // fmt.Println("Processors: ",Processors," Channels: ",Channels," ProdsPerChan: ",ProdsPerChan," ConsPerChan: ",ConsPerChan," Channel Size: ",ChannelSize) … … 108 108 cons_done = true 109 109 for i := range chans { 110 J: for j := 0; j < ConsPerChan; j++{110 L: for { 111 111 select { 112 case chans[i] <- 0:113 112 case k := <-chans[i]: 113 cons_check = cons_check ^ k 114 114 default: 115 break J115 break L 116 116 } 117 117 } 118 118 } 119 for i := range chans { 120 close(chans[i]) 121 } 122 119 123 for j := 0; j < ConsPerChan * Channels; j++{ 120 124 <-consJoin 121 125 } 122 126 123 // for i := range chans { 124 // L: for { 125 // select { 126 // case k := <-chans[i]: 127 // cons_check = cons_check ^ k 128 // default: 129 // break L 130 // } 131 // } 132 // } 127 133 128 if cons_check != prod_check { 134 129 fmt.Println("\nChecksum mismatch: Cons: %d, Prods: %d", cons_check, prod_check) 135 130 } 136 for i := range chans {137 close(chans[i])138 }139 131 fmt.Println(total_operations) 140 132 } -
doc/theses/colby_parsons_MMAth/benchmarks/channels/plotData.py
r70056ed reb47a80 36 36 procs.append(int(val)) 37 37 38 # 3rd line has num locks args 39 line = readfile.readline() 40 locks = [] 41 for val in line.split(): 42 locks.append(int(val)) 43 44 # 4th line has number of variants 38 # 3rd line has number of variants 45 39 line = readfile.readline() 46 40 names = line.split() … … 50 44 lines = (line for line in lines if line) # Non-blank lines 51 45 46 class Bench(Enum): 47 Unset = 0 48 Contend = 1 49 Zero = 2 50 Barrier = 3 51 Churn = 4 52 Daisy_Chain = 5 53 Hot_Potato = 6 54 Pub_Sub = 7 55 52 56 nameSet = False 53 curr Locks = -1# default val57 currBench = Bench.Unset # default val 54 58 count = 0 55 59 procCount = 0 56 60 currVariant = 0 57 name = " Aggregate Lock"61 name = "" 58 62 var_name = "" 59 63 sendData = [0.0 for j in range(numVariants)] … … 64 68 # print(line) 65 69 66 if currLocks == -1: 67 lineArr = line.split() 68 currLocks = lineArr[-1] 70 if currBench == Bench.Unset: 71 if line == "contend:": 72 name = "Contend" 73 currBench = Bench.Contend 74 elif line == "zero:": 75 name = "Zero" 76 currBench = Bench.Zero 77 elif line == "barrier:": 78 name = "Barrier" 79 currBench = Bench.Barrier 80 elif line == "churn:": 81 name = "Churn" 82 currBench = Bench.Churn 83 elif line == "daisy_chain:": 84 name = "Daisy_Chain" 85 currBench = Bench.Daisy_Chain 86 elif line == "hot_potato:": 87 name = "Hot_Potato" 88 currBench = Bench.Hot_Potato 89 elif line == "pub_sub:": 90 name = "Pub_Sub" 91 currBench = Bench.Pub_Sub 92 else: 93 print("Expected benchmark name") 94 print("Line: " + line) 95 sys.exit() 69 96 continue 70 97 … … 95 122 if currVariant == numVariants: 96 123 fig, ax = plt.subplots() 97 plt.title(name + " Benchmark : " + str(currLocks) + " Locks")98 plt.ylabel("Throughput ( entries)")124 plt.title(name + " Benchmark") 125 plt.ylabel("Throughput (channel operations)") 99 126 plt.xlabel("Cores") 100 127 for idx, arr in enumerate(data): 101 128 plt.errorbar( procs, arr, [bars[idx][0], bars[idx][1]], capsize=2, marker='o' ) 129 102 130 plt.yscale("log") 131 # plt.ylim(1, None) 132 # ax.get_yaxis().set_major_formatter(ticks.ScalarFormatter()) 133 # else: 134 # plt.ylim(0, None) 103 135 plt.xticks(procs) 104 136 ax.legend(names) 105 # fig.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks)+ ".png")106 plt.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks)+ ".pgf")137 # fig.savefig("plots/" + machineName + name + ".png") 138 plt.savefig("plots/" + machineName + name + ".pgf") 107 139 fig.clf() 108 140 109 141 # reset 110 curr Locks = -1142 currBench = Bench.Unset 111 143 currVariant = 0 -
doc/theses/colby_parsons_MMAth/benchmarks/channels/run
r70056ed reb47a80 85 85 } 86 86 87 numtimes=11 88 89 # locks=('-DLOCKS=L1' '-DLOCKS=L2' '-DLOCKS=L3' '-DLOCKS=L4' '-DLOCKS=L5' '-DLOCKS=L6' '-DLOCKS=L7' '-DLOCKS=L8') 90 # locks='1 2 3 4 5 6 7 8' 91 lock_flags=('-DLOCKS=L2' '-DLOCKS=L4' '-DLOCKS=L8') 92 locks=('2' '4' '8') 87 numtimes=5 93 88 94 89 num_threads='2 4 8 16 24 32' 95 # num_threads='2 4 8'90 # num_threads='2' 96 91 97 92 # toggle benchmarks 98 order=${true} 99 rand=${true} 100 baseline=${true} 93 zero=${false} 94 contend=${true} 95 barrier=${false} 96 churn=${false} 97 daisy_chain=${false} 98 hot_potato=${false} 99 pub_sub=${false} 101 100 102 101 runCFA=${true} 103 run CPP=${true}102 runGO=${true} 104 103 # runCFA=${false} 105 # run CPP=${false}104 # runGO=${false} 106 105 107 106 cfa=~/cfa-cc/driver/cfa 108 cpp=g++109 107 110 108 # Helpers to minimize code duplication … … 152 150 echo $num_threads 153 151 154 for i in ${!locks[@]}; do 155 echo -n ${locks[$i]}' ' 156 done 157 echo "" 158 159 if [ ${runCFA} -eq ${true} ] && [ ${order} -eq ${true} ]; then 160 echo -n 'CFA-order ' 161 fi 162 if [ ${runCPP} -eq ${true} ] && [ ${order} -eq ${true} ]; then 163 echo -n 'CPP-order ' 164 fi 165 if [ ${runCFA} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then 166 echo -n 'CFA-baseline ' 167 fi 168 if [ ${runCPP} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then 169 echo -n 'CPP-baseline ' 170 fi 171 if [ ${runCFA} -eq ${true} ] && [ ${rand} -eq ${true} ]; then 172 echo -n 'CFA-rand ' 173 fi 174 if [ ${runCPP} -eq ${true} ] && [ ${rand} -eq ${true} ]; then 175 echo -n 'CPP-rand ' 152 if [ ${runCFA} -eq ${true} ]; then 153 echo -n 'CFA ' 154 fi 155 if [ ${runGO} -eq ${true} ]; then 156 echo -n 'Go ' 176 157 fi 177 158 echo "" … … 182 163 cfa_flags='-quiet -O3 -nodebug -DNDEBUG' 183 164 184 # cpp flagse185 cpp_flags='-O3 -std=c++17 -lpthread -pthread -DNDEBUG'186 187 165 # run the benchmarks 188 166 189 run_ order() {167 run_contend() { 190 168 post_args=${1} 191 169 192 170 if [ ${runCFA} -eq ${true} ] ; then 193 171 cd cfa # CFA RUN 194 print_header 'CFA -'${3}195 ${cfa} ${cfa_flags} ${2} ${3}.cfa -o a.${hostname} > /dev/null 2>&1172 print_header 'CFA' 173 ${cfa} ${cfa_flags} ${2}.cfa -o a.${hostname} > /dev/null 2>&1 196 174 run_bench 197 175 rm a.${hostname} … … 199 177 fi # done CFA 200 178 201 if [ ${run CPP} -eq ${true} ] ; then202 cd cpp # CPPRUN203 print_header ' CPP-'${3}204 ${cpp} ${cpp_flags} ${2} ${3}.cc-o a.${hostname} > /dev/null 2>&1179 if [ ${runGO} -eq ${true} ] ; then 180 cd go/${2} # Go RUN 181 print_header 'Go' 182 go build -o a.${hostname} > /dev/null 2>&1 205 183 run_bench 206 184 rm a.${hostname} 207 185 cd - > /dev/null 208 fi # done CPP186 fi # done Go 209 187 } 210 188 211 189 # /usr/bin/time -f "%Uu %Ss %Er %Mkb" 212 213 for i in ${!locks[@]}; do 214 echo "locks: "${locks[$i]} 215 if [ ${order} -eq ${true} ] ; then 216 run_order ${locks[$i]} ${lock_flags[$i]} 'order' 217 fi 218 if [ ${baseline} -eq ${true} ] ; then 219 run_order ${locks[$i]} ${lock_flags[$i]} 'baseline' 220 fi 221 if [ ${rand} -eq ${true} ] ; then 222 run_order ${locks[$i]} '-DLOCKS=L8' 'rand' 223 fi 224 done 225 226 190 if [ ${contend} -eq ${true} ] ; then 191 echo "contend: " 192 run_contend '128' 'contend' 193 fi 194 195 if [ ${zero} -eq ${true} ] ; then 196 echo "zero: " 197 run_contend '0' 'contend' 198 fi 199 200 if [ ${barrier} -eq ${true} ] ; then 201 echo "barrier: " 202 run_contend '' 'barrier' 203 fi 204 205 if [ ${churn} -eq ${true} ] ; then 206 echo "churn: " 207 run_contend '' 'churn' 208 fi 209 210 if [ ${daisy_chain} -eq ${true} ] ; then 211 echo "daisy_chain: " 212 run_contend '' 'daisy_chain' 213 fi 214 215 if [ ${hot_potato} -eq ${true} ] ; then 216 echo "hot_potato: " 217 run_contend '' 'hot_potato' 218 fi 219 220 if [ ${pub_sub} -eq ${true} ] ; then 221 echo "pub_sub: " 222 run_contend '' 'pub_sub' 223 fi -
doc/theses/colby_parsons_MMAth/glossary.tex
r70056ed reb47a80 9 9 % \textit{Synonyms : User threads, Lightweight threads, Green threads, Virtual threads, Tasks.} 10 10 % } 11 11 % C_TODO: replace usages of these acronyms with \acrshort{name} 12 12 \newacronym{tls}{TLS}{Thread Local Storage} 13 13 \newacronym{api}{API}{Application Program Interface} … … 16 16 \newacronym{rtti}{RTTI}{Run-Time Type Information} 17 17 \newacronym{fcfs}{FCFS}{First Come First Served} 18 \newacronym{toctou}{TOCTOU}{time-of-check to time-of-use} -
doc/theses/colby_parsons_MMAth/local.bib
r70056ed reb47a80 47 47 publisher={ACM New York, NY, USA} 48 48 } 49 50 @mastersthesis{Beach21, 51 author={{Beach, Andrew James}}, 52 title={Exception Handling in C∀}, 53 year={2021}, 54 publisher="UWSpace", 55 url={http://hdl.handle.net/10012/17617} 56 } -
doc/theses/colby_parsons_MMAth/text/actors.tex
r70056ed reb47a80 334 334 335 335 \section{Safety and Productivity} 336 \CFA's actor system comes with a suite of safety and productivity features. Most of these features are present in \CFA's debug mode, but are removed when code is compiled in nodebug mode. Some of the features include:336 \CFA's actor system comes with a suite of safety and productivity features. Most of these features are present in \CFA's debug mode, but are removed when code is compiled in nodebug mode. The suit of features include the following. 337 337 338 338 \begin{itemize} -
doc/theses/colby_parsons_MMAth/text/channels.tex
r70056ed reb47a80 5 5 % ====================================================================== 6 6 7 Channels were first introduced by Hoare in his paper Communicating Sequentual Processes~\cite{Hoare78}, where he proposes a concurrent language that communicates across processes using input/output channels to send data inbetween processes. Channels are used to perform message passing concurrency, a model of concurrency where threads communicate by sending data to each other, and synchronizing via the passing mechanism. This is an alternative to shared memory concurrency, where threads can communicate directly by changing shared memory state. Most modern concurrent programming languages do not subscribe to just one style of communication between threads, and provide features that support both. Channels as a programming language feature has been popularized in recent years due to the language Go, which encourages the use of channels as its fundamental concurrent feature.7 Channels were first introduced by Hoare in his paper Communicating Sequentual Processes~\cite{Hoare78}, where he proposes a concurrent language that communicates across processes using input/output channels to send data. Channels are a concurrent language feature used to perform message passing concurrency, a model of concurrency where threads communicate by sending data as messages, and synchronizing via the message passing mechanism. This is an alternative to shared memory concurrency, where threads can communicate directly by changing shared memory state. Most modern concurrent programming languages do not subscribe to just one style of communication between threads, and provide features that support both. Channels as a programming language feature has been popularized in recent years due to the language Go, which encourages the use of channels as its fundamental concurrent feature. 8 8 9 9 \section{Producer-Consumer Problem} … … 14 14 15 15 \section{Channel Implementation} 16 % C_TODO: rewrite to reflect on current impl 16 The channel implementation in \CFA is a near carbon copy of the Go implementation. Experimentation was conducted that varied the producer-consumer problem algorithm and lock type used inside the channel. With the exception of non-FCFS algorithms, no algorithm or lock usage in the channel implementation was found to be consistently more performant that Go's choice of algorithm and lock implementation. As such the research contributions added by \CFA's channel implementation lie in the realm of safety and productivity features. 17 18 \section{Safety and Productivity} 19 Channels in \CFA come with safety and productivity features to aid users. The features include the following. 20 21 \begin{itemize} 22 \item Toggle-able statistic collection on channel behvaiour that counts channel operations, and the number of the operations that block. Tracking blocking operations helps users tune their channel size or channel usage when the channel is used for buffering, where the aim is to have as few blocking operations as possible. 23 \item Deadlock detection on deallocation of the channel. If any threads are blocked inside the channel when it terminates it is detected and informs the user, as this would cause a deadlock. 24 \item A \code{flush} routine that delivers copies of an element to all waiting consumers, flushing the buffer. Programmers can use this to easily to broadcast data to multiple consumers. Additionally, the \code{flush} routine is more performant then looping around the \code{insert} operation since it can deliver the elements without having to reaquire mutual exclusion for each element sent. 25 \end{itemize} 26 27 The other safety and productivity feature of \CFA channels deals with concurrent termination. Terminating concurrent programs is often one of the most difficult parts of writing concurrent code, particularly if graceful termination is needed. The difficulty of graceful termination often arises from the usage of synchronization primitives which need to be handled carefully during shutdown. It is easy to deadlock during termination if threads are left behind on synchronization primitives. Additionally, most synchronization primitives are prone to time-of-check to time-of-use (TOCTOU) issues where there is race between one thread checking the state of a concurrent object and another thread changing the state. TOCTOU issues with synchronization primitives often involve a race between one thread checking the primitive for blocked threads and another thread blocking on it. Channels are a particularly hard synchronization primitive to terminate since both sending and receiving off a channel can block. Thus, improperly handled TOCTOU issues with channels often result in deadlocks as threads trying to perform the termination may end up unexpectedly blocking in their attempt to help other threads exit the system. 28 29 % C_TODO: add reference to select chapter, add citation to go channels info 30 Go channels provide a set of tools to help with concurrent shutdown. Channels in Go have a \code{close} operation and a \code{select} statement that both can be used to help threads terminate. The \code{select} statement will be discussed in \ref{}, where \CFA's \code{waituntil} statement will be compared with the Go \code{select} statement. The \code{close} operation on a channel in Go changes the state of the channel. When a channel is closed, sends to the channel will panic and additional calls to \code{close} will panic. Receives are handled differently where receivers will never block on a closed channel and will continue to remove elements from the channel. Once a channel is empty, receivers can continue to remove elements, but will receive the zero-value version of the element type. To aid in avoiding unwanted zero-value elements, Go provides the ability to iterate over a closed channel to remove the remaining elements. These design choices for Go channels enforce a specific interaction style with channels during termination, where careful thought is needed to ensure that additional \code{close} calls don't occur and that no sends occur after channels are closed. These design choices fit Go's paradigm of error management, where users are expected to explicitly check for errors, rather than letting errors occur and catching them. If errors need to occur in Go, return codes are used to pass error information where they are needed. Note that panics in Go can be caught, but it is not considered an idiomatic way to write Go programs. 31 32 While Go's channel closing semantics are powerful enough to perform any concurrent termination needed by a program, their lack of ease of use leaves much to be desired. Since both closing and sending panic, once a channel is closed, a user often has to synchronize the senders to a channel before the channel can be closed to avoid panics. However, in doing so it renders the \code{close} operation nearly useless, as the only utilities it provides are the ability to ensure that receivers no longer block on the channel, and will receive zero-valued elements. This can be useful if the zero-typed element is recognized as a sentinel value, but if another sentinel value is preferred, then \code{close} only provides its non-blocking feature. To avoid TOCTOU issues during shutdown, a busy wait with a \code{select} statement is often used to add or remove elements from a channel. Due to Go's asymmetric approach to channel shutdown, separate synchronization between producers and consumers of a channel has to occur during shutdown. 33 34 In \CFA, exception handling is an encouraged paradigm and has full language support \cite{}. 35 % \cite{Beach21}. TODO: this citation breaks when compiled. Need to fix and insert above 36 As such \CFA uses an exception based approach to channel shutdown that is symmetric for both producers and consumers, and supports graceful shutdown.Exceptions in \CFA support both termination and resumption.Termination exceptions operate in the same way as exceptions seen in many popular programming languages such as \CC, Python and Java. 37 Resumption exceptions are a style of exception that when caught run the corresponding catch block in the same way that termination exceptions do. 38 The difference between the exception handling mechanisms arises after the exception is handled. In termination handling, the control flow continues into the code following the catch after the exception is handled. In resumption handling, the control flow returns to the site of the \code{throw}, allowing the control to continue where it left off. Note that in resumption, since control can return to the point of error propagation, the stack is not unwound during resumption propagation. In \CFA if a resumption is not handled, it is reraised as a termination. This mechanism can be used to create a flexible and robust termination system for channels. 39 40 When a channel in \CFA is closed, all subsequent calls to the channel will throw a resumption exception at the caller. If the resumption is handled, then the caller will proceed to attempt to complete their operation. If the resumption is not handled it is then rethrown as a termination exception. Or, if the resumption is handled, but the subsequent attempt at an operation would block, a termination exception is thrown. These termination exceptions allow for non-local transfer that can be used to great effect to eagerly and gracefully shut down a thread. When a channel is closed, if there are any blocked producers or consumers inside the channel, they are woken up and also have a resumption thrown at them. The resumption exception, \code{channel_closed}, has a couple fields to aid in handling the exception. The exception contains a pointer to the channel it was thrown from, and a pointer to an element. In exceptions thrown from remove the element pointer will be null. In the case of insert the element pointer points to the element that the thread attempted to insert. This element pointer allows the handler to know which operation failed and also allows the element to not be lost on a failed insert since it can be moved elsewhere in the handler. Furthermore, due to \CFA's powerful exception system, this data can be used to choose handlers based which channel and operation failed. Exception handlers in \CFA have an optional predicate after the exception type which can be used to optionally trigger or skip handlers based on the content of an exception. It is worth mentioning that the approach of exceptions for termination may incur a larger performance cost during termination that the approach used in Go. This should not be an issue, since termination is rarely an fast-path of an application and ensuring that termination can be implemented correctly with ease is the aim of the exception approach. 41 42 To highlight the differences between \CFA's and Go's close semantics, an example program is presented. The program is a barrier implemented using two channels shown in Listings~\ref{l:cfa_chan_bar} and \ref{l:go_chan_bar}. Both of these exaples are implmented using \CFA syntax so that they can be easily compared. Listing~\ref{l:go_chan_bar} uses go-style channel close semantics and Listing~\ref{l:cfa_chan_bar} uses \CFA close semantics. In this problem it is infeasible to use the Go \code{close} call since all tasks are both potentially producers and consumers, causing panics on close to be unavoidable. As such in Listing~\ref{l:go_chan_bar} to implement a flush routine for the buffer, a sentinel value of $-1$ has to be used to indicate to threads that they need to leave the barrier. This sentinel value has to be checked at two points. Furthermore, an additional flag \code{done} is needed to communicate to threads once they have left the barrier that they are done. This use of an additional flag or communication method is common in Go channel shutdown code, since to avoid panics on a channel, the shutdown of a channel often has to be communicated with threads before it occurs. In the \CFA version~\ref{l:cfa_chan_bar}, the barrier shutdown results in an exception being thrown at threads operating on it, which informs the threads that they must terminate. This avoids the need to use a separate communication method other than the barrier, and avoids extra conditional checks on the fast path of the barrier implementation. Also note that in the Go version~\ref{l:go_chan_bar}, the size of the barrier channels has to be larger than in the \CFA version to ensure that the main thread does not block when attempting to clear the barrier. 43 44 \begin{cfacode}[tabsize=3,caption={\CFA channel barrier termination},label={l:cfa_chan_bar}] 45 struct barrier { 46 channel( int ) barWait; 47 channel( int ) entryWait; 48 int size; 49 } 50 void ?{}(barrier & this, int size) with(this) { 51 barWait{size}; 52 entryWait{size}; 53 this.size = size; 54 for ( j; size ) 55 insert( *entryWait, j ); 56 } 57 58 void flush(barrier & this) with(this) { 59 close(barWait); 60 close(entryWait); 61 } 62 void wait(barrier & this) with(this) { 63 int ticket = remove( *entryWait ); 64 if ( ticket == size - 1 ) { 65 for ( j; size - 1 ) 66 insert( *barWait, j ); 67 return; 68 } 69 ticket = remove( *barWait ); 70 71 // last one out 72 if ( size == 1 || ticket == size - 2 ) { 73 for ( j; size ) 74 insert( *entryWait, j ); 75 } 76 } 77 barrier b{Tasks}; 78 79 // thread main 80 void main(Task & this) { 81 try { 82 for ( ;; ) { 83 wait( b ); 84 } 85 } catch ( channel_closed * e ) {} 86 } 87 88 int main() { 89 { 90 Task t[Tasks]; 91 92 sleep(10`s); 93 flush( b ); 94 } // wait for tasks to terminate 95 return 0; 96 } 97 \end{cfacode} 98 99 \begin{cfacode}[tabsize=3,caption={Go channel barrier termination},label={l:go_chan_bar}] 100 101 struct barrier { 102 channel( int ) barWait; 103 channel( int ) entryWait; 104 int size; 105 } 106 void ?{}(barrier & this, int size) with(this) { 107 barWait{size + 1}; 108 entryWait{size + 1}; 109 this.size = size; 110 for ( j; size ) 111 insert( *entryWait, j ); 112 } 113 114 void flush(barrier & this) with(this) { 115 insert( *entryWait, -1 ); 116 insert( *barWait, -1 ); 117 } 118 void wait(barrier & this) with(this) { 119 int ticket = remove( *entryWait ); 120 if ( ticket == -1 ) { 121 insert( *entryWait, -1 ); 122 return; 123 } 124 if ( ticket == size - 1 ) { 125 for ( j; size - 1 ) 126 insert( *barWait, j ); 127 return; 128 } 129 ticket = remove( *barWait ); 130 if ( ticket == -1 ) { 131 insert( *barWait, -1 ); 132 return; 133 } 134 135 // last one out 136 if ( size == 1 || ticket == size - 2 ) { 137 for ( j; size ) 138 insert( *entryWait, j ); 139 } 140 } 141 barrier b; 142 143 bool done = false; 144 // thread main 145 void main(Task & this) { 146 for ( ;; ) { 147 if ( done ) break; 148 wait( b ); 149 } 150 } 151 152 int main() { 153 { 154 Task t[Tasks]; 155 156 sleep(10`s); 157 done = true; 158 159 flush( b ); 160 } // wait for tasks to terminate 161 return 0; 162 } 163 \end{cfacode} 164 165 In Listing~\ref{l:cfa_resume} an example of channel closing with resumption is used. This program uses resumption in the \code{Consumer} thread main to ensure that all elements in the channel are removed before the consumer thread terminates. The producer only has a \code{catch} so the moment it receives an exception it terminates, whereas the consumer will continue to remove from the closed channel via handling resumptions until the buffer is empty, which then throws a termination exception. If the same program was implemented in Go it would require explicit synchronization with both producers and consumers by some mechanism outside the channel to ensure that all elements were removed before task termination. 166 167 \begin{cfacode}[tabsize=3,caption={\CFA channel resumption usage},label={l:cfa_resume}] 168 channel( int ) chan{ 128 }; 169 170 // Consumer thread main 171 void main(Consumer & this) { 172 size_t runs = 0; 173 try { 174 for ( ;; ) { 175 remove( chan ); 176 } 177 } catchResume ( channel_closed * e ) {} 178 catch ( channel_closed * e ) {} 179 } 180 181 // Producer thread main 182 void main(Producer & this) { 183 int j = 0; 184 try { 185 for ( ;;j++ ) { 186 insert( chan, j ); 187 } 188 } catch ( channel_closed * e ) {} 189 } 190 191 int main( int argc, char * argv[] ) { 192 { 193 Consumers c[4]; 194 Producer p[4]; 195 196 sleep(10`s); 197 198 for ( i; Channels ) 199 close( channels[i] ); 200 } 201 return 0; 202 } 203 \end{cfacode} 204 205 \section{Performance} 206 207 Given that the base implementation of the \CFA channels is very similar to the Go implementation, this section aims to show that the performance of the two implementations are comparable. One microbenchmark is conducted to compare Go and \CFA. The benchmark is a ten second experiment where producers and consumers operate on a channel in parallel and throughput is measured. The number of cores is varied to measure how throughtput scales. The cores are divided equally between producers and consumers, with one producer or consumer owning each core. The results of the benchmark are shown in Figure~\ref{f:chanPerf}. The performance of Go and \CFA channels on this microbenchmark is comparable. Note, it is expected for the performance to decline as the number of cores increases as the channel operations all occur in a critical section so an increase in cores results in higher contention with no increase in parallelism. 208 17 209 18 210 \begin{figure} 19 \begin{lrbox}{\myboxA} 20 \begin{cfacode}[aboveskip=0pt,belowskip=0pt,basicstyle=\footnotesize] 21 int size; 22 int front, back, count; 23 TYPE * buffer; 24 cond_var prods, cons; 25 lock mx; 26 27 void insert( TYPE elem ){ 28 29 lock(mx); 30 31 // wait if buffer is full 32 // insert finished by consumer 33 if (count == size){ 34 wait(prods, mx, &elem); 35 // no reacquire 36 return; 37 } 38 39 40 41 42 if (!empty(cons)){ 43 // do consumer work 44 front(cons) = &elem; 45 notify_one(cons); 46 } 47 48 49 50 else 51 insert_(chan, elem); 52 53 54 unlock(mx); 55 } 56 \end{cfacode} 57 \end{lrbox} 58 \begin{lrbox}{\myboxB} 59 \begin{cfacode}[aboveskip=0pt,belowskip=0pt,basicstyle=\footnotesize] 60 int size; 61 int front, back, count; 62 TYPE * buffer; 63 thread * chair; 64 TYPE * chair_elem; 65 lock c_lock, p_lock, mx; 66 void insert( TYPE elem ){ 67 lock(p_lock); 68 lock(mx); 69 70 // wait if buffer is full 71 // insert finished by consumer 72 if (count == size){ 73 chair = this_thread(); 74 chair_elem = &elem; 75 unlock(mx); 76 park(); 77 unlock(p_lock); 78 return; 79 } 80 81 if (chair != 0p){ 82 // do consumer work 83 chair_elem = &elem; 84 unpark(chair); 85 chair = 0p; 86 unlock(mx); 87 unlock(p_lock); 88 return; 89 } else 90 insert_(chan, elem); 91 92 unlock(mx); 93 unlock(p_lock); 94 } 95 \end{cfacode} 96 \end{lrbox} 97 \subfloat[Go implementation]{\label{f:GoChanImpl}\usebox\myboxA} 98 \hspace{5pt} 99 \vrule 100 \hspace{5pt} 101 \subfloat[\CFA implementation]{\label{f:cfaChanImpl}\usebox\myboxB} 102 \caption{Comparison of channel implementations} 103 \label{f:ChanComp} 211 \centering 212 \begin{subfigure}{0.5\textwidth} 213 \centering 214 \scalebox{0.5}{\input{figures/nasus_Channel_Contention.pgf}} 215 \subcaption{AMD \CFA Channel Benchmark}\label{f:chanAMD} 216 \end{subfigure}\hfill 217 \begin{subfigure}{0.5\textwidth} 218 \centering 219 \scalebox{0.5}{\input{figures/pyke_Channel_Contention.pgf}} 220 \subcaption{Intel \CFA Channel Benchmark}\label{f:chanIntel} 221 \end{subfigure} 222 \caption{The channel contention benchmark comparing \CFA and Go channel throughput (higher is better).} 223 \label{f:chanPerf} 104 224 \end{figure} 105 106 Go and \CFA have similar channel implementation when it comes to how they solve the producer-consumer problem. Both implementations attempt to minimize double blocking by requiring cooperation from signalling threads. If a consumer or producer is blocked, whichever thread signals it to proceed completes the blocked thread's operation for them so that the blocked thread does not need to acquire any locks. Channels in \CFA go a step further in preventing double blocking. In Figure~\ref{f:ChanComp}, the producer-consumer solutions used by Go and \CFA are presented. Some liberties are taken to simplify the code, such as removing special casing for zero-size buffers, and abstracting the non-concurrent insert into a helper, \code{insert_}. Only the insert routine is presented, as the remove routine is symmetric.107 In the Go implementation \ref{f:GoChanImpl}, first mutual exclusion is acquired. Then if the buffer is full the producer waits on a condition variable and releases the mx lock. Note it will not reacquire the lock upon waking. The 3rd argument to \code{wait} is a pointer that is stored per thread on the condition variable. This pointer can be accessed when the waiting thread is at the from of the condition variable's queue by calling \code{front}. This allows arbitrary data to be stored with waiting tasks in the queue, eliminating the need for a second queue just for data. This producer that waits stores a pointer to the element it wanted to insert, so that the consumer that signals them can insert the element for the producer before signalling. If the buffer is not full a producer will proceed to check if any consumers are waiting. If so then the producer puts its value directly into the consumers hands, bypassing the usage of the buffer. If there are no waiting consumers, the producer inserts the value into the buffer and leaves.108 The \CFA implementation \ref{f:cfaChanImpl} it follows a similar pattern to the Go implementation, but instead uses three locks and no condition variables. The main idea is that the \CFA implementation forgoes the use of a condition variable by making all producers wait on the outer lock \code{p_lock} once a single producer has to wait inside the critical section. This also happens with consumers. This further reduces double blocking by ensuring that the only threads that can enter the critical section after a producer is blocked are consumers and vice versa. Additionally, entering consumers to not have to contend for the \code{mx} lock once producers are waiting and vice versa. Since only at most one thread will be waiting in the critical section, condition variables are not needed and a barebones thread \code{park} and \code{unpark} will suffice. The operation \code{park} blocks a thread and \code{unpark} is passed a pointer to a thread to wake up. This algorithm can be written using a single condition variable instead of park/unpark, but using park/unpark eliminates the need for any queueing operations. Now with the understanding of park/unpark it is clear to see the similarity between the two algorithms. The main difference being the \code{p_lock} acquisitions and releases. Note that \code{p_lock} is held until after waking from \code{park}, which provides the guarantee than no other producers will enter until the first producer to enter makes progress.109 110 \section{Safety and Productivity}111 112 113 \section{Performance} -
doc/theses/colby_parsons_MMAth/text/mutex_stmt.tex
r70056ed reb47a80 18 18 19 19 \section{Other Languages} 20 There are similar concepts to the mutex statement that exist in other languages. Java has a feature called a synchronized statement, which looks identical to \CFA's mutex statement, but it has some differences. The synchronized statement only accepts one item in its clause. Any object can be passed to the synchronized statement in Java since all objects in Java are monitors, and the synchronized statement acquires that object's monitor. In \CC there is a feature in the\code{<mutex>} header called scoped\_lock, which is also similar to the mutex statement. The scoped\_lock is a class that takes in any number of locks in its constructor, and acquires them in a deadlock-free manner. It then releases them when the scoped\_lock object is deallocated, thus using RAII. An example of \CC scoped\_lock usage is shown in Listing~\ref{l:cc_scoped_lock}.20 There are similar concepts to the mutex statement that exist in other languages. Java has a feature called a synchronized statement, which looks identical to \CFA's mutex statement, but it has some differences. The synchronized statement only accepts a single object in its clause. Any object can be passed to the synchronized statement in Java since all objects in Java are monitors, and the synchronized statement acquires that object's monitor. In \CC there is a feature in the standard library \code{<mutex>} header called scoped\_lock, which is also similar to the mutex statement. The scoped\_lock is a class that takes in any number of locks in its constructor, and acquires them in a deadlock-free manner. It then releases them when the scoped\_lock object is deallocated, thus using RAII. An example of \CC scoped\_lock usage is shown in Listing~\ref{l:cc_scoped_lock}. 21 21 22 22 \begin{cppcode}[tabsize=3,caption={\CC scoped\_lock usage},label={l:cc_scoped_lock}] … … 29 29 30 30 \section{\CFA implementation} 31 The \CFA mutex statement can be seen as a combination of the similar featurs in Java and \CC. It can acquire more that one lock in a deadlock-free manner, and releases them via RAII like \CC, however the syntax is identical to the Java synchronized statement. This syntactic choice was made so that the body of the mutex statement is its own scope. Compared to the scoped\_lock, which relies on its enclosing scope, the mutex statement's introduced scope can provide visual clarity as to what code is being protected by the mutex statement, and where the mutual exclusion ends. \CFA's mutex statement and \CC's scoped\_lock both use parametric polymorphism to allow user defined types to work with the feature. \CFA's implementation requires types to support the routines \code{lock()} and \code{unlock()}, whereas \CC requires those routines, plus \code{try_lock()}. The scoped\_lock requires an additional routine since it differs from the mutex statement in how it implements deadlock avoidance.31 The \CFA mutex statement takes some ideas from both the Java and \CC features. The mutex statement can acquire more that one lock in a deadlock-free manner, and releases them via RAII like \CC, however the syntax is identical to the Java synchronized statement. This syntactic choice was made so that the body of the mutex statement is its own scope. Compared to the scoped\_lock, which relies on its enclosing scope, the mutex statement's introduced scope can provide visual clarity as to what code is being protected by the mutex statement, and where the mutual exclusion ends. \CFA's mutex statement and \CC's scoped\_lock both use parametric polymorphism to allow user defined types to work with the feature. \CFA's implementation requires types to support the routines \code{lock()} and \code{unlock()}, whereas \CC requires those routines, plus \code{try_lock()}. The scoped\_lock requires an additional routine since it differs from the mutex statement in how it implements deadlock avoidance. 32 32 33 The parametric polymorphism allows for locking to be defined for types that may want convenient mutual exclusion. An example is \CFA's \code{sout}. \code{sout} is \CFA's output stream, similar to \CC's \code{cout}. \code{sout} has routines that match the mutex statement trait, so the mutex statement can be used to lock the output stream while producing output. In this case, the mutex statement allows the programmer to acquire mutual exclusion over an object without having to know the internals of the object or what locks it needs to acquire. The ability to do so provides both improves safety and programmer productivity since it abstracts away the concurrent details and provides an interface for optional thread-safety. This is a commonly used feature when producing output from a concurrent context, since producing output is not thread safe by default. This use case is shown in Listing~\ref{l:sout}.33 The parametric polymorphism allows for locking to be defined for types that may want convenient mutual exclusion. An example of one such use case in \CFA is \code{sout}. The output stream in \CFA is called \code{sout}, and functions similarly to \CC's \code{cout}. \code{sout} has routines that satisfy the mutex statement trait, so the mutex statement can be used to lock the output stream while producing output. In this case, the mutex statement allows the programmer to acquire mutual exclusion over an object without having to know the internals of the object or what locks need to be acquired. The ability to do so provides both improves safety and programmer productivity since it abstracts away the concurrent details and provides an interface for optional thread-safety. This is a commonly used feature when producing output from a concurrent context, since producing output is not thread safe by default. This use case is shown in Listing~\ref{l:sout}. 34 34 35 35 \begin{cfacode}[tabsize=3,caption={\CFA sout with mutex statement},label={l:sout}] 36 37 36 mutex( sout ) 37 sout | "This output is protected by mutual exclusion!"; 38 38 \end{cfacode} 39 39 40 40 \section{Deadlock Avoidance} 41 The mutex statement uses the deadlock prevention technique of lock ordering, where the circular-wait condition of a deadlock cannot occur if all locks are acquired in the same order. The scoped\_lock uses a deadlock avoidance algorithm where all locks after the first are acquired using \code{try_lock} and if any of the attempts to lock fails, all locks so far are released. This repeats until all locks are acquired successfully. The deadlock avoidance algorithm used by scoped\_lock is shown in Listing~\ref{l:cc_deadlock_avoid}. The algorithm presented is taken straight from the \code{<mutex>} header source, with some renaming and comments for clarity.41 The mutex statement uses the deadlock prevention technique of lock ordering, where the circular-wait condition of a deadlock cannot occur if all locks are acquired in the same order. The scoped\_lock uses a deadlock avoidance algorithm where all locks after the first are acquired using \code{try_lock} and if any of the attempts to lock fails, all locks so far are released. This repeats until all locks are acquired successfully. The deadlock avoidance algorithm used by scoped\_lock is shown in Listing~\ref{l:cc_deadlock_avoid}. The algorithm presented is taken directly from the source code of the \code{<mutex>} header, with some renaming and comments for clarity. 42 42 43 43 \begin{cppcode}[tabsize=3,caption={\CC scoped\_lock deadlock avoidance algorithm},label={l:cc_deadlock_avoid}] … … 59 59 \end{cppcode} 60 60 61 The algorithm in \ref{l:cc_deadlock_avoid} successfully avoids deadlock, however there is a potential livelock scenario. Given two threads $A$ and $B$, who create a scoped\_lock with two locks $L1$ and $L2$, a livelock can form as follows. Thread $A$ creates a scoped\_lock with $L1, L2$ in that order, $B$ creates a scoped lock with the order $L2, L1$. Both threads acquire the first lock in their order and then fail the try\_lock since the other lock is held. They then reset their start lock to be their 2nd lock and try again. This time $A$ has order $L2, L1$, and $B$ has order $L1, L2$. This is identical to the starting setup, but with the ordering swapped among threads. As such if they each acquire their first lock before the other acquires their second, they can livelock indefinitely. 61 The algorithm in \ref{l:cc_deadlock_avoid} successfully avoids deadlock, however there is a potential livelock scenario. Given two threads $A$ and $B$, who create a scoped\_lock with two locks $L1$ and $L2$, a livelock can form as follows. Thread $A$ creates a scoped\_lock with $L1$, $L2$, and $B$ creates a scoped lock with the order $L2$, $L1$. Both threads acquire the first lock in their order and then fail the try\_lock since the other lock is held. They then reset their start lock to be their 2nd lock and try again. This time $A$ has order $L2$, $L1$, and $B$ has order $L1$, $L2$. This is identical to the starting setup, but with the ordering swapped among threads. As such, if they each acquire their first lock before the other acquires their second, they can livelock indefinitely. 62 62 63 The lock ordering algorithm used in the mutex statement in \CFA is both deadlock and livelock free. It sorts the locks based on memory address and then acquires them. For locks fewer than 7, it sorts using hard coded sorting methods that perform the minimum number of swaps for a given number of locks. For 7 or more locks insertion sort is used. These sorting algorithms were chosen since it is rare to have to hold more than a handful of locks at a time. It is worth mentioning that the downside to the sorting approach is that it is not fully compatible with usages of the same locks outside the mutex statement. If more than one lock is held by a mutex statement, if more than one lock is to be held elsewhere, it must be acquired via the mutex statement, or else the required ordering will not occur. Comparitively, if the scoped\_lock is used and the same locks are acquired elsewhere, there is no concern of the scoped\_lock deadlocking, due to its avoidance scheme, but it may livelock. 63 64 -
doc/theses/colby_parsons_MMAth/thesis.tex
r70056ed reb47a80 113 113 %---------------------------------------------------------------------- 114 114 115 %\input{intro}115 \input{intro} 116 116 117 %\input{CFA_intro}117 \input{CFA_intro} 118 118 119 %\input{CFA_concurrency}119 \input{CFA_concurrency} 120 120 121 %\input{mutex_stmt}121 \input{mutex_stmt} 122 122 123 123 \input{channels} 124 124 125 %\input{actors}125 \input{actors} 126 126 127 127 \clearpage -
libcfa/src/concurrency/actor.hfa
r70056ed reb47a80 35 35 36 36 // show stats 37 // #define STATS37 // #define ACTOR_STATS 38 38 39 39 // forward decls … … 122 122 copy_queue * c_queue; // current queue 123 123 volatile bool being_processed; // flag to prevent concurrent processing 124 #ifdef STATS124 #ifdef ACTOR_STATS 125 125 unsigned int id; 126 126 size_t missed; // transfers skipped due to being_processed flag being up … … 132 132 c_queue = owned_queue; 133 133 being_processed = false; 134 #ifdef STATS134 #ifdef ACTOR_STATS 135 135 id = i; 136 136 missed = 0; … … 153 153 // check if queue is being processed elsewhere 154 154 if ( unlikely( being_processed ) ) { 155 #ifdef STATS155 #ifdef ACTOR_STATS 156 156 missed++; 157 157 #endif … … 175 175 struct worker_info { 176 176 volatile unsigned long long stamp; 177 #ifdef STATS177 #ifdef ACTOR_STATS 178 178 size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen; 179 179 unsigned long long processed; … … 182 182 }; 183 183 static inline void ?{}( worker_info & this ) { 184 #ifdef STATS184 #ifdef ACTOR_STATS 185 185 this.stolen_from = 0; 186 186 this.try_steal = 0; // attempts to steal … … 194 194 } 195 195 196 // #ifdef STATS196 // #ifdef ACTOR_STATS 197 197 // unsigned int * stolen_arr; 198 198 // unsigned int * replaced_queue; … … 206 206 }; 207 207 208 #ifdef STATS208 #ifdef ACTOR_STATS 209 209 // aggregate counters for statistics 210 210 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, … … 235 235 }; // executor 236 236 237 // #ifdef STATS237 // #ifdef ACTOR_STATS 238 238 // __spinlock_t out_lock; 239 239 // #endif 240 240 static inline void ^?{}( worker & mutex this ) with(this) { 241 #ifdef STATS241 #ifdef ACTOR_STATS 242 242 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 243 243 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); … … 276 276 no_steal = true; 277 277 278 #ifdef STATS278 #ifdef ACTOR_STATS 279 279 // stolen_arr = aalloc( nrqueues ); 280 280 // replaced_queue = aalloc( nrqueues ); … … 341 341 } // for 342 342 343 #ifdef STATS343 #ifdef ACTOR_STATS 344 344 size_t misses = 0; 345 345 for ( i; nrqueues ) { … … 358 358 if ( seperate_clus ) delete( cluster ); 359 359 360 #ifdef STATS // print formatted stats360 #ifdef ACTOR_STATS // print formatted stats 361 361 printf(" Actor System Stats:\n"); 362 362 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); … … 404 404 ticket = __get_next_ticket( *__actor_executor_ ); 405 405 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 406 #ifdef STATS406 #ifdef ACTOR_STATS 407 407 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 408 408 #endif … … 513 513 continue; 514 514 515 #ifdef STATS515 #ifdef ACTOR_STATS 516 516 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 517 517 if ( curr_steal_queue ) { … … 526 526 #else 527 527 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 528 #endif // STATS528 #endif // ACTOR_STATS 529 529 530 530 return; … … 558 558 559 559 void main( worker & this ) with(this) { 560 // #ifdef STATS560 // #ifdef ACTOR_STATS 561 561 // for ( i; executor_->nrqueues ) { 562 562 // replaced_queue[i] = 0; … … 587 587 } 588 588 transfer( *curr_work_queue, ¤t_queue ); 589 #ifdef STATS589 #ifdef ACTOR_STATS 590 590 executor_->w_infos[id].gulps++; 591 #endif // STATS591 #endif // ACTOR_STATS 592 592 #ifdef __STEAL 593 593 if ( isEmpty( *current_queue ) ) { … … 599 599 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 600 600 601 #ifdef STATS601 #ifdef ACTOR_STATS 602 602 executor_->w_infos[id].try_steal++; 603 #endif // STATS603 #endif // ACTOR_STATS 604 604 605 605 steal_work( this, start + prng( range ) ); … … 608 608 #endif // __STEAL 609 609 while ( ! isEmpty( *current_queue ) ) { 610 #ifdef STATS610 #ifdef ACTOR_STATS 611 611 executor_->w_infos[id].processed++; 612 612 #endif … … 636 636 637 637 static inline void __reset_stats() { 638 #ifdef STATS638 #ifdef ACTOR_STATS 639 639 __total_tries = 0; 640 640 __total_stolen = 0; -
libcfa/src/concurrency/channel.hfa
r70056ed reb47a80 3 3 #include <locks.hfa> 4 4 #include <list.hfa> 5 6 #define __COOP_CHANNEL 7 #ifdef __PREVENTION_CHANNEL 8 forall( T ) { 9 struct channel { 10 size_t size, count, front, back; 11 T * buffer; 12 thread$ * chair; 13 T * chair_elem; 14 exp_backoff_then_block_lock c_lock, p_lock; 15 __spinlock_t mutex_lock; 16 char __padding[64]; // avoid false sharing in arrays of channels 17 }; 18 19 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { 20 size = _size; 21 front = back = count = 0; 22 buffer = aalloc( size ); 23 chair = 0p; 24 mutex_lock{}; 25 c_lock{}; 26 p_lock{}; 27 } 28 29 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 30 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 31 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 32 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } 33 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; } 34 35 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 36 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 37 count += 1; 38 back++; 39 if ( back == size ) back = 0; 40 } 41 42 static inline void insert( channel(T) & chan, T elem ) with( chan ) { 43 lock( p_lock ); 44 lock( mutex_lock __cfaabi_dbg_ctx2 ); 45 46 // have to check for the zero size channel case 47 if ( size == 0 && chair != 0p ) { 48 memcpy((void *)chair_elem, (void *)&elem, sizeof(T)); 49 unpark( chair ); 50 chair = 0p; 51 unlock( mutex_lock ); 52 unlock( p_lock ); 53 unlock( c_lock ); 54 return; 55 } 56 57 // wait if buffer is full, work will be completed by someone else 58 if ( count == size ) { 59 chair = active_thread(); 60 chair_elem = &elem; 61 unlock( mutex_lock ); 62 park( ); 63 return; 64 } // if 65 66 if ( chair != 0p ) { 67 memcpy((void *)chair_elem, (void *)&elem, sizeof(T)); 68 unpark( chair ); 69 chair = 0p; 70 unlock( mutex_lock ); 71 unlock( p_lock ); 72 unlock( c_lock ); 73 return; 74 } 75 insert_( chan, elem ); 76 77 unlock( mutex_lock ); 78 unlock( p_lock ); 79 } 80 81 static inline T remove( channel(T) & chan ) with(chan) { 82 lock( c_lock ); 83 lock( mutex_lock __cfaabi_dbg_ctx2 ); 84 T retval; 85 86 // have to check for the zero size channel case 87 if ( size == 0 && chair != 0p ) { 88 memcpy((void *)&retval, (void *)chair_elem, sizeof(T)); 89 unpark( chair ); 90 chair = 0p; 91 unlock( mutex_lock ); 92 unlock( p_lock ); 93 unlock( c_lock ); 94 return retval; 95 } 96 97 // wait if buffer is empty, work will be completed by someone else 98 if ( count == 0 ) { 99 chair = active_thread(); 100 chair_elem = &retval; 101 unlock( mutex_lock ); 102 park( ); 103 return retval; 104 } 105 106 // Remove from buffer 107 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 108 count -= 1; 109 front++; 110 if ( front == size ) front = 0; 111 112 if ( chair != 0p ) { 113 insert_( chan, *chair_elem ); // do waiting producer work 114 unpark( chair ); 115 chair = 0p; 116 unlock( mutex_lock ); 117 unlock( p_lock ); 118 unlock( c_lock ); 119 return retval; 120 } 121 122 unlock( mutex_lock ); 123 unlock( c_lock ); 124 return retval; 125 } 126 127 } // forall( T ) 128 #endif 129 130 #ifdef __COOP_CHANNEL 5 #include <mutex_stmt.hfa> 131 6 132 7 // link field used for threads waiting on channel … … 148 23 } 149 24 25 // wake one thread from the list 26 static inline void wake_one( dlist( wait_link ) & queue ) { 27 wait_link & popped = try_pop_front( queue ); 28 unpark( popped.t ); 29 } 30 31 // returns true if woken due to shutdown 32 // blocks thread on list and releases passed lock 33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) { 34 wait_link w{ active_thread(), elem_ptr }; 35 insert_last( queue, w ); 36 unlock( lock ); 37 park(); 38 return w.elem == 0p; 39 } 40 41 // void * used for some fields since exceptions don't work with parametric polymorphism currently 42 exception channel_closed { 43 // on failed insert elem is a ptr to the element attempting to be inserted 44 // on failed remove elem ptr is 0p 45 // on resumption of a failed insert this elem will be inserted 46 // so a user may modify it in the resumption handler 47 void * elem; 48 49 // pointer to chan that is closed 50 void * closed_chan; 51 }; 52 vtable(channel_closed) channel_closed_vt; 53 54 // #define CHAN_STATS // define this to get channel stats printed in dtor 55 150 56 forall( T ) { 151 57 152 struct channel { 153 size_t size; 154 size_t front, back, count; 58 struct __attribute__((aligned(128))) channel { 59 size_t size, front, back, count; 155 60 T * buffer; 156 dlist( wait_link ) prods, cons; 157 exp_backoff_then_block_lock mutex_lock; 61 dlist( wait_link ) prods, cons; // lists of blocked threads 62 go_mutex mutex_lock; // MX lock 63 bool closed; // indicates channel close/open 64 #ifdef CHAN_STATS 65 size_t blocks, operations; // counts total ops and ops resulting in a blocked thd 66 #endif 158 67 }; 159 68 … … 165 74 cons{}; 166 75 mutex_lock{}; 76 closed = false; 77 #ifdef CHAN_STATS 78 blocks = 0; 79 operations = 0; 80 #endif 167 81 } 168 82 169 83 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; } 170 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); } 84 static inline void ^?{}( channel(T) &c ) with(c) { 85 #ifdef CHAN_STATS 86 printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100); 87 #endif 88 verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" ); 89 delete( buffer ); 90 } 171 91 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; } 172 92 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; } … … 175 95 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; } 176 96 177 static inline void insert_( channel(T) & chan, T & elem ) with(chan) { 97 // closes the channel and notifies all blocked threads 98 static inline void close( channel(T) & chan ) with(chan) { 99 lock( mutex_lock ); 100 closed = true; 101 102 // flush waiting consumers and producers 103 while ( has_waiting_consumers( chan ) ) { 104 cons`first.elem = 0p; 105 wake_one( cons ); 106 } 107 while ( has_waiting_producers( chan ) ) { 108 prods`first.elem = 0p; 109 wake_one( prods ); 110 } 111 unlock(mutex_lock); 112 } 113 114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; } 115 116 static inline void flush( channel(T) & chan, T elem ) with(chan) { 117 lock( mutex_lock ); 118 while ( count == 0 && !cons`isEmpty ) { 119 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 120 wake_one( cons ); 121 } 122 unlock( mutex_lock ); 123 } 124 125 // handles buffer insert 126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) { 178 127 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T)); 179 128 count += 1; … … 182 131 } 183 132 184 static inline void wake_one( dlist( wait_link ) & queue ) { 185 wait_link & popped = try_pop_front( queue ); 186 unpark( popped.t ); 187 } 188 189 static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) { 190 wait_link w{ active_thread(), elem_ptr }; 191 insert_last( queue, w ); 192 unlock( lock ); 193 park(); 133 // does the buffer insert or hands elem directly to consumer if one is waiting 134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) { 135 if ( count == 0 && !cons`isEmpty ) { 136 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 137 wake_one( cons ); 138 } else __buf_insert( chan, elem ); 139 } 140 141 // needed to avoid an extra copy in closed case 142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) { 143 lock( mutex_lock ); 144 #ifdef CHAN_STATS 145 operations++; 146 #endif 147 if ( count == size ) { unlock( mutex_lock ); return false; } 148 __do_insert( chan, elem ); 149 unlock( mutex_lock ); 150 return true; 151 } 152 153 // attempts a nonblocking insert 154 // returns true if insert was successful, false otherwise 155 static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); } 156 157 // handles closed case of insert routine 158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) { 159 channel_closed except{&channel_closed_vt, &elem, &chan }; 160 throwResume except; // throw closed resumption 161 if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination 194 162 } 195 163 196 164 static inline void insert( channel(T) & chan, T elem ) with(chan) { 197 lock( mutex_lock ); 165 // check for close before acquire mx 166 if ( unlikely(closed) ) { 167 __closed_insert( chan, elem ); 168 return; 169 } 170 171 lock( mutex_lock ); 172 173 #ifdef CHAN_STATS 174 if ( !closed ) operations++; 175 #endif 176 177 // if closed handle 178 if ( unlikely(closed) ) { 179 unlock( mutex_lock ); 180 __closed_insert( chan, elem ); 181 return; 182 } 198 183 199 184 // have to check for the zero size channel case … … 202 187 wake_one( cons ); 203 188 unlock( mutex_lock ); 204 return ;189 return true; 205 190 } 206 191 207 192 // wait if buffer is full, work will be completed by someone else 208 193 if ( count == size ) { 209 block( prods, &elem, mutex_lock ); 194 #ifdef CHAN_STATS 195 blocks++; 196 #endif 197 198 // check for if woken due to close 199 if ( unlikely( block( prods, &elem, mutex_lock ) ) ) 200 __closed_insert( chan, elem ); 210 201 return; 211 202 } // if … … 214 205 memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work 215 206 wake_one( cons ); 216 } else insert_( chan, elem );207 } else __buf_insert( chan, elem ); 217 208 218 209 unlock( mutex_lock ); 210 return; 211 } 212 213 // handles buffer remove 214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) { 215 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 216 count -= 1; 217 front = (front + 1) % size; 218 } 219 220 // does the buffer remove and potentially does waiting producer work 221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) { 222 __buf_remove( chan, retval ); 223 if (count == size - 1 && !prods`isEmpty ) { 224 __buf_insert( chan, *(T *)prods`first.elem ); // do waiting producer work 225 wake_one( prods ); 226 } 227 } 228 229 // needed to avoid an extra copy in closed case and single return val case 230 static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) { 231 lock( mutex_lock ); 232 #ifdef CHAN_STATS 233 operations++; 234 #endif 235 if ( count == 0 ) { unlock( mutex_lock ); return false; } 236 __do_remove( chan, retval ); 237 unlock( mutex_lock ); 238 return true; 239 } 240 241 // attempts a nonblocking remove 242 // returns [T, true] if insert was successful 243 // returns [T, false] if insert was successful (T uninit) 244 static inline [T, bool] try_remove( channel(T) & chan ) { 245 T retval; 246 return [ retval, __internal_try_remove( chan, retval ) ]; 247 } 248 249 static inline T try_remove( channel(T) & chan, T elem ) { 250 T retval; 251 __internal_try_remove( chan, retval ); 252 return retval; 253 } 254 255 // handles closed case of insert routine 256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) { 257 channel_closed except{&channel_closed_vt, 0p, &chan }; 258 throwResume except; // throw resumption 259 if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination 219 260 } 220 261 221 262 static inline T remove( channel(T) & chan ) with(chan) { 222 lock( mutex_lock );223 263 T retval; 264 if ( unlikely(closed) ) { 265 __closed_remove( chan, retval ); 266 return retval; 267 } 268 lock( mutex_lock ); 269 270 #ifdef CHAN_STATS 271 if ( !closed ) operations++; 272 #endif 273 274 if ( unlikely(closed) ) { 275 unlock( mutex_lock ); 276 __closed_remove( chan, retval ); 277 return retval; 278 } 224 279 225 280 // have to check for the zero size channel case … … 233 288 // wait if buffer is empty, work will be completed by someone else 234 289 if (count == 0) { 235 block( cons, &retval, mutex_lock ); 290 #ifdef CHAN_STATS 291 blocks++; 292 #endif 293 // check for if woken due to close 294 if ( unlikely( block( cons, &retval, mutex_lock ) ) ) 295 __closed_remove( chan, retval ); 236 296 return retval; 237 297 } 238 298 239 299 // Remove from buffer 240 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T)); 241 count -= 1; 242 front = (front + 1) % size; 243 244 if (count == size - 1 && !prods`isEmpty ) { 245 insert_( chan, *(T *)prods`first.elem ); // do waiting producer work 246 wake_one( prods ); 247 } 300 __do_remove( chan, retval ); 248 301 249 302 unlock( mutex_lock ); … … 251 304 } 252 305 } // forall( T ) 253 #endif254 255 #ifdef __BARGE_CHANNEL256 forall( T ) {257 struct channel {258 size_t size;259 size_t front, back, count;260 T * buffer;261 fast_cond_var( exp_backoff_then_block_lock ) prods, cons;262 exp_backoff_then_block_lock mutex_lock;263 };264 265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {266 size = _size;267 front = back = count = 0;268 buffer = aalloc( size );269 prods{};270 cons{};271 mutex_lock{};272 }273 274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }281 282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {283 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));284 count += 1;285 back++;286 if ( back == size ) back = 0;287 }288 289 290 static inline void insert( channel(T) & chan, T elem ) with(chan) {291 lock( mutex_lock );292 293 while ( count == size ) {294 wait( prods, mutex_lock );295 } // if296 297 insert_( chan, elem );298 299 if ( !notify_one( cons ) && count < size )300 notify_one( prods );301 302 unlock( mutex_lock );303 }304 305 static inline T remove( channel(T) & chan ) with(chan) {306 lock( mutex_lock );307 T retval;308 309 while (count == 0) {310 wait( cons, mutex_lock );311 }312 313 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));314 count -= 1;315 front = (front + 1) % size;316 317 if ( !notify_one( prods ) && count > 0 )318 notify_one( cons );319 320 unlock( mutex_lock );321 return retval;322 }323 324 } // forall( T )325 #endif326 327 #ifdef __NO_WAIT_CHANNEL328 forall( T ) {329 struct channel {330 size_t size;331 size_t front, back, count;332 T * buffer;333 thread$ * chair;334 T * chair_elem;335 exp_backoff_then_block_lock c_lock, p_lock;336 __spinlock_t mutex_lock;337 };338 339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {340 size = _size;341 front = back = count = 0;342 buffer = aalloc( size );343 chair = 0p;344 mutex_lock{};345 c_lock{};346 p_lock{};347 lock( c_lock );348 }349 350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }355 356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {357 memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));358 count += 1;359 back++;360 if ( back == size ) back = 0;361 }362 363 static inline void insert( channel(T) & chan, T elem ) with( chan ) {364 lock( p_lock );365 lock( mutex_lock __cfaabi_dbg_ctx2 );366 367 insert_( chan, elem );368 369 if ( count != size )370 unlock( p_lock );371 372 if ( count == 1 )373 unlock( c_lock );374 375 unlock( mutex_lock );376 }377 378 static inline T remove( channel(T) & chan ) with(chan) {379 lock( c_lock );380 lock( mutex_lock __cfaabi_dbg_ctx2 );381 T retval;382 383 // Remove from buffer384 memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));385 count -= 1;386 front = (front + 1) % size;387 388 if ( count != 0 )389 unlock( c_lock );390 391 if ( count == size - 1 )392 unlock( p_lock );393 394 unlock( mutex_lock );395 return retval;396 }397 398 } // forall( T )399 #endif -
libcfa/src/concurrency/locks.hfa
r70056ed reb47a80 32 32 #include <fstream.hfa> 33 33 34 35 34 // futex headers 36 35 #include <linux/futex.h> /* Definition of FUTEX_* constants */ … … 155 154 // futex_mutex 156 155 157 // - No cond var support158 156 // - Kernel thd blocking alternative to the spinlock 159 157 // - No ownership (will deadlock on reacq) … … 185 183 int state; 186 184 187 188 // // linear backoff omitted for now 189 // for( int spin = 4; spin < 1024; spin += spin) { 190 // state = 0; 191 // // if unlocked, lock and return 192 // if (internal_try_lock(this, state)) return; 193 // if (2 == state) break; 194 // for (int i = 0; i < spin; i++) Pause(); 195 // } 196 197 // no contention try to acquire 198 if (internal_try_lock(this, state)) return; 185 for( int spin = 4; spin < 1024; spin += spin) { 186 state = 0; 187 // if unlocked, lock and return 188 if (internal_try_lock(this, state)) return; 189 if (2 == state) break; 190 for (int i = 0; i < spin; i++) Pause(); 191 } 192 193 // // no contention try to acquire 194 // if (internal_try_lock(this, state)) return; 199 195 200 196 // if not in contended state, set to be in contended state … … 209 205 210 206 static inline void unlock(futex_mutex & this) with(this) { 211 // if uncontended do atomic eunlock and then return212 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel 207 // if uncontended do atomic unlock and then return 208 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return; 213 209 214 210 // otherwise threads are blocked so we must wake one 215 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);216 211 futex((int *)&val, FUTEX_WAKE, 1); 217 212 } … … 222 217 // to set recursion count after getting signalled; 223 218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 219 220 //----------------------------------------------------------------------------- 221 // go_mutex 222 223 // - Kernel thd blocking alternative to the spinlock 224 // - No ownership (will deadlock on reacq) 225 // - Golang's flavour of mutex 226 // - Impl taken from Golang: src/runtime/lock_futex.go 227 struct go_mutex { 228 // lock state any state other than UNLOCKED is locked 229 // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 }; 230 231 // stores a lock state 232 int val; 233 }; 234 235 static inline void ?{}( go_mutex & this ) with(this) { val = 0; } 236 237 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) { 238 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 239 } 240 241 static inline int internal_exchange(go_mutex & this, int swap ) with(this) { 242 return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE); 243 } 244 245 // if this is called recursively IT WILL DEADLOCK!!!!! 246 static inline void lock(go_mutex & this) with(this) { 247 int state, init_state; 248 249 // speculative grab 250 state = internal_exchange(this, 1); 251 if ( !state ) return; // state == 0 252 init_state = state; 253 for (;;) { 254 for( int i = 0; i < 4; i++ ) { 255 while( !val ) { // lock unlocked 256 state = 0; 257 if (internal_try_lock(this, state, init_state)) return; 258 } 259 for (int i = 0; i < 30; i++) Pause(); 260 } 261 262 while( !val ) { // lock unlocked 263 state = 0; 264 if (internal_try_lock(this, state, init_state)) return; 265 } 266 sched_yield(); 267 268 // if not in contended state, set to be in contended state 269 state = internal_exchange(this, 2); 270 if ( !state ) return; // state == 0 271 init_state = 2; 272 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK 273 } 274 } 275 276 static inline void unlock( go_mutex & this ) with(this) { 277 // if uncontended do atomic unlock and then return 278 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return; 279 280 // otherwise threads are blocked so we must wake one 281 futex((int *)&val, FUTEX_WAKE, 1); 282 } 283 284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); } 285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;} 286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {} 224 287 225 288 //----------------------------------------------------------------------------- … … 271 334 this.lock_value = 0; 272 335 } 336 337 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 273 338 274 339 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
Note: See TracChangeset
for help on using the changeset viewer.