Ignore:
Timestamp:
May 29, 2023, 11:44:29 AM (2 years ago)
Author:
JiadaL <j82liang@…>
Branches:
ADT
Children:
fa2c005
Parents:
3a513d89 (diff), 2b78949 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into ADT

File:
1 edited

Legend:

Unmodified
Added
Removed
  • doc/theses/colby_parsons_MMAth/text/channels.tex

    r3a513d89 r044ae62  
    55% ======================================================================
    66
    7 Channels are a concurrent-language feature used to perform \Newterm{message-passing concurrency}: a model of concurrency where threads communicate by sending (mostly nonblocking) data as messages and synchronizing by receiving (blocking) sent data.
    8 This model is an alternative to shared-memory concurrency, where threads can communicate directly by changing shared state.
    97Most modern concurrent programming languages do not subscribe to just one style of communication among threads and provide features that support multiple approaches.
     8Channels are a concurrent-language feature used to perform \Newterm{message-passing concurrency}: a model of concurrency where threads communicate by sending data as messages (mostly non\-blocking) and synchronizing by receiving sent messages (blocking).
     9This model is an alternative to shared-memory concurrency, where threads communicate directly by changing shared state.
    1010
    1111Channels were first introduced by Kahn~\cite{Kahn74} and extended by Hoare~\cite{Hoare78} (CSP).
     
    1313Both languages are highly restrictive.
    1414Kahn's language restricts a reading process to only wait for data on a single channel at a time and different writing processes cannot send data on the same channel.
    15 Hoare's language restricts ...
    16 Channels as a programming language feature has been popularized in recent years due to the language Go, which encourages the use of channels as its fundamental concurrent feature.
    17 Go's restrictions are ...
    18 \CFA channels do not have these restrictions.
     15Hoare's language restricts both the sender and receiver to explicitly name the process that is the destination of a channel send or the source of a channel receive.
     16These channel semantics remove the ability to have an anonymous sender or receiver.
     17Additionally all channel operations in CSP are synchronous (no buffering).
     18Advanced channels as a programming language feature has been popularized in recent years by the language Go~\cite{Go}, which encourages the use of channels as its fundamental concurrent feature.
     19It was the popularity of Go channels that lead to their implemention in \CFA.
     20Neither Go nor \CFA channels have the restrictions of the early channel-based concurrent systems.
    1921
    2022\section{Producer-Consumer Problem}
    21 Most channels in modern programming languages are built on top of a shared memory buffer.
    22 While it is possible to create a channel that contains an unbounded buffer, most implementations opt to only support a fixed size channel, where the size is given at the time of channel creation.
    23 This turns the implementation of a channel into the producer-consumer problem.
    24 The producer-consumer problem, also known as the bounded buffer problem, was introduced by Dijkstra in his book Cooperating Sequential Processes\cite{Dijkstra65}.
    25 In the problem threads interact with the buffer in two ways, either consuming values by removing them from the buffer, or producing values and inserting them in the buffer.
    26 The buffer needs to be protected from concurrent access since each item in the buffer should only be produced and consumed once.
    27 Additionally, a consumer can only remove from a non-empty buffer and a producer can only insert into a non-full buffer.
     23A channel is an abstraction for a shared-memory buffer, which turns the implementation of a channel into the producer-consumer problem.
     24The producer-consumer problem, also known as the bounded-buffer problem, was introduced by Dijkstra~\cite[\S~4.1]{Dijkstra65}.
     25In the problem, threads interact with a buffer in two ways: producing threads insert values into the buffer and consuming threads remove values from the buffer.
     26In general, a buffer needs protection to ensure a producer only inserts into a non-full buffer and a consumer only removes from a non-empty buffer (synchronization).
     27As well, a buffer needs protection from concurrent access by multiple producers or consumers attempting to insert or remove simultaneously (MX).
     28
     29\section{Channel Size}\label{s:ChannelSize}
     30Channels come in three flavours of buffers:
     31\begin{enumerate}
     32\item
     33Zero sized implies the communication is synchronous, \ie the producer must wait for the consumer to arrive or vice versa for a value to be communicated.
     34\item
     35Fixed sized (bounded) implies the communication is asynchronous, \ie the producer can proceed up to the buffer size and vice versa for the consumer with respect to removal.
     36\item
     37Infinite sized (unbounded) implies the communication is asynchronous, \ie the producer never waits but the consumer waits when the buffer is empty.
     38Since memory is finite, all unbounded buffers are ultimately bounded;
     39this restriction must be part of its implementation.
     40\end{enumerate}
     41
     42In general, the order values are processed by the consumer does not affect the correctness of the producer-consumer problem.
     43For example, the buffer can be LIFO, FIFO, or prioritized with respect to insertion and removal.
     44However, like MX, a buffer should ensure every value is eventually removed after some reasonable bounded time (no long-term starvation).
     45The simplest way to prevent starvation is to implement the buffer as a queue, either with a cyclic array or linked nodes.
    2846
    2947\section{First-Come First-Served}
    30 The channel implementations that will be discussed are \gls{fcfs}.
    31 This term was defined by Lamport~\cite{Lamport74}.
     48As pointed out, a bounded buffer requires MX among multiple producers or consumers.
     49This MX should be fair among threads, independent of the FIFO buffer being fair among values.
     50Fairness among threads is called \gls{fcfs} and was defined by Lamport~\cite[p.~454]{Lamport74}.
    3251\gls{fcfs} is defined in relation to a doorway~\cite[p.~330]{Lamport86II}, which is the point at which an ordering among threads can be established.
    33 Given this doorway, a critical section is said to be \gls{fcfs}, if threads access the shared resource in the order they proceed through the doorway.
    34 \gls{fcfs} is a fairness property which prevents unequal access to the shared resource and prevents starvation, however it can come at a cost.
    35 Implementing an algorithm with \gls{fcfs} can lead to double blocking, where entering threads may need to block to allow other threads to proceed first, resulting in blocking both inside and outside the doorway.
    36 As such algorithms that are not \gls{fcfs} may be more performant but that performance comes with the downside of likely introducing starvation and unfairness.
     52Given this doorway, a CS is said to be \gls{fcfs}, if threads access the shared resource in the order they proceed through the doorway.
     53A consequence of \gls{fcfs} execution is the elimination of \Newterm{barging}, where barging means a thread arrives at a CS with waiting threads, and the MX protecting the CS allows the arriving thread to enter the CS ahead of one or more of the waiting threads.
     54
     55\gls{fcfs} is a fairness property that prevents unequal access to the shared resource and prevents starvation, however it comes at a cost.
     56Implementing an algorithm with \gls{fcfs} can lead to \Newterm{double blocking}, where arriving threads block outside the doorway waiting for a thread in the lock entry-protocol and inside the doorway waiting for a thread in the CS.
     57An analogue is boarding an airplane: first you wait to get through security to the departure gates (short term), and then wait again at the departure gate for the airplane (long term).
     58As such, algorithms that are not \gls{fcfs} (barging) can be more performant by skipping the wait for the CS and entering directly;
     59however, this performance gain comes by introducing unfairness with possible starvation for waiting threads.
    3760
    3861\section{Channel Implementation}
    39 The channel implementation in \CFA is a near carbon copy of the Go implementation.
    40 Experimentation was conducted that varied the producer-consumer problem algorithm and lock type used inside the channel.
    41 With the exception of non-\gls{fcfs} algorithms, no algorithm or lock usage in the channel implementation was found to be consistently more performant that Go's choice of algorithm and lock implementation.
     62Currently, only the Go programming language provides user-level threading where the primary communication mechanism is channels.
     63Experiments were conducted that varied the producer-consumer problem algorithm and lock type used inside the channel.
     64With the exception of non-\gls{fcfs} or non-FIFO algorithms, no algorithm or lock usage in the channel implementation was found to be consistently more performant that Go's choice of algorithm and lock implementation.
     65Performance of channels can be improved by sharding the underlying buffer \cite{Dice11}.
     66In doing so the FIFO property is lost, which is undesireable for user-facing channels.
     67Therefore, the low-level channel implementation in \CFA is largely copied from the Go implementation, but adapted to the \CFA type and runtime systems.
    4268As such the research contributions added by \CFA's channel implementation lie in the realm of safety and productivity features.
     69
     70The Go channel implementation utilitizes cooperation between threads to achieve good performance~\cite{go:chan}.
     71The cooperation between threads only occurs when producers or consumers need to block due to the buffer being full or empty.
     72In these cases the blocking thread stores their relevant data in a shared location and the signalling thread will complete their operation before waking them.
     73This helps improve performance in a few ways.
     74First, each thread interacting with the channel with only acquire and release the internal channel lock exactly once.
     75This decreases contention on the internal lock, as only entering threads will compete for the lock since signalled threads never reacquire the lock.
     76The other advantage of the cooperation approach is that it eliminates the potential bottleneck of waiting for signalled threads.
     77The property of acquiring/releasing the lock only once can be achieved without cooperation by \Newterm{baton passing} the lock.
     78Baton passing is when one thread acquires a lock but does not release it, and instead signals a thread inside the critical section conceptually "passing" the mutual exclusion to the signalled thread.
     79While baton passing is useful in some algorithms, it results in worse performance than the cooperation approach in channel implementations since all entering threads then need to wait for the blocked thread to reach the front of the ready queue and run before other operations on the channel can proceed.
     80
     81In this work, all channel sizes \see{Sections~\ref{s:ChannelSize}} are implemented with bounded buffers.
     82However, only non-zero-sized buffers are analysed because of their complexity and higher usage.
    4383
    4484\section{Safety and Productivity}
     
    4787
    4888\begin{itemize}
    49 \item Toggle-able statistic collection on channel behaviour that counts channel operations, and the number of the operations that block.
    50 Tracking blocking operations helps users tune their channel size or channel usage when the channel is used for buffering, where the aim is to have as few blocking operations as possible.
    51 \item Deadlock detection on deallocation of the channel.
    52 If any threads are blocked inside the channel when it terminates it is detected and informs the user, as this would cause a deadlock.
    53 \item A \code{flush} routine that delivers copies of an element to all waiting consumers, flushing the buffer.
    54 Programmers can use this to easily to broadcast data to multiple consumers.
    55 Additionally, the \code{flush} routine is more performant then looping around the \code{insert} operation since it can deliver the elements without having to reacquire mutual exclusion for each element sent.
     89\item Toggle-able statistic collection on channel behaviour that count channel and blocking operations.
     90Tracking blocking operations helps illustrate usage for tuning the channel size, where the aim is to reduce blocking.
     91
     92\item Deadlock detection on channel deallocation.
     93If threads are blocked inside a channel when it terminates, this case is detected and the user is informed, as this can cause a deadlock.
     94
     95\item A @flush@ routine that delivers copies of an element to all waiting consumers, flushing the buffer.
     96Programmers use this mechanism to broadcast a sentinel value to multiple consumers.
     97Additionally, the @flush@ routine is more performant then looping around the @insert@ operation since it can deliver the elements without having to reacquire mutual exclusion for each element sent.
    5698\end{itemize}
    5799
    58 The other safety and productivity feature of \CFA channels deals with concurrent termination.
     100\subsection{Toggle-able Statistics}
     101As discussed, a channel is a concurrent layer over a bounded buffer.
     102To achieve efficient buffering users should aim for as few blocking operations on a channel as possible.
     103Often to achieve this users may change the buffer size, shard a channel into multiple channels, or tweak the number of producer and consumer threads.
     104Fo users to be able to make informed decisions when tuning channel usage, toggle-able channel statistics are provided.
     105The statistics are toggled at compile time via the @CHAN_STATS@ macro to ensure that they are entirely elided when not used.
     106When statistics are turned on, four counters are maintained per channel, two for producers and two for consumers.
     107The two counters per type of operation track the number of blocking operations and total operations.
     108In the channel destructor the counters are printed out aggregated and also per type of operation.
     109An example use case of the counters follows.
     110A user is buffering information between producer and consumer threads and wants to analyze channel performance.
     111Via the statistics they see that producers block for a large percentage of their operations while consumers do not block often.
     112They then can use this information to adjust their number of producers/consumers or channel size to achieve a larger percentage of non-blocking producer operations, thus increasing their channel throughput.
     113
     114\subsection{Deadlock Detection}
     115The deadlock detection in the \CFA channels is fairly basic.
     116It only detects the case where threads are blocked on the channel during deallocation.
     117This case is guaranteed to deadlock since the list holding the blocked thread is internal to the channel and will be deallocated.
     118If a user maintained a separate reference to a thread and unparked it outside the channel they could avoid the deadlock, but would run into other runtime errors since the thread would access channel data after waking that is now deallocated.
     119More robust deadlock detection surrounding channel usage would have to be implemented separate from the channel implementation since it would require knowledge about the threading system and other channel/thread state.
     120
     121\subsection{Program Shutdown}
    59122Terminating concurrent programs is often one of the most difficult parts of writing concurrent code, particularly if graceful termination is needed.
    60 The difficulty of graceful termination often arises from the usage of synchronization primitives which need to be handled carefully during shutdown.
     123The difficulty of graceful termination often arises from the usage of synchronization primitives that need to be handled carefully during shutdown.
    61124It is easy to deadlock during termination if threads are left behind on synchronization primitives.
    62125Additionally, most synchronization primitives are prone to \gls{toctou} issues where there is race between one thread checking the state of a concurrent object and another thread changing the state.
    63126\gls{toctou} issues with synchronization primitives often involve a race between one thread checking the primitive for blocked threads and another thread blocking on it.
    64 Channels are a particularly hard synchronization primitive to terminate since both sending and receiving off a channel can block.
     127Channels are a particularly hard synchronization primitive to terminate since both sending and receiving to/from a channel can block.
    65128Thus, improperly handled \gls{toctou} issues with channels often result in deadlocks as threads trying to perform the termination may end up unexpectedly blocking in their attempt to help other threads exit the system.
    66129
    67 % C_TODO: add reference to select chapter, add citation to go channels info
    68 Go channels provide a set of tools to help with concurrent shutdown.
    69 Channels in Go have a \code{close} operation and a \code{select} statement that both can be used to help threads terminate.
    70 The \code{select} statement will be discussed in \ref{}, where \CFA's \code{waituntil} statement will be compared with the Go \code{select} statement.
    71 The \code{close} operation on a channel in Go changes the state of the channel.
    72 When a channel is closed, sends to the channel will panic and additional calls to \code{close} will panic.
    73 Receives are handled differently where receivers will never block on a closed channel and will continue to remove elements from the channel.
    74 Once a channel is empty, receivers can continue to remove elements, but will receive the zero-value version of the element type.
    75 To aid in avoiding unwanted zero-value elements, Go provides the ability to iterate over a closed channel to remove the remaining elements.
    76 These design choices for Go channels enforce a specific interaction style with channels during termination, where careful thought is needed to ensure that additional \code{close} calls don't occur and that no sends occur after channels are closed.
     130\paragraph{Go channels} provide a set of tools to help with concurrent shutdown~\cite{go:chan}.
     131Channels in Go have a @close@ operation and a \Go{select} statement that both can be used to help threads terminate.
     132The \Go{select} statement is discussed in \ref{s:waituntil}, where \CFA's @waituntil@ statement is compared with the Go \Go{select} statement.
     133
     134The @close@ operation on a channel in Go changes the state of the channel.
     135When a channel is closed, sends to the channel panic along with additional calls to @close@.
     136Receives are handled differently.
     137Receivers (consumers) never block on a closed channel and continue to remove elements from the channel.
     138Once a channel is empty, receivers can continue to remove elements, but receive the zero-value version of the element type.
     139To avoid unwanted zero-value elements, Go provides the ability to iterate over a closed channel to remove the remaining elements.
     140These Go design choices enforce a specific interaction style with channels during termination: careful thought is needed to ensure additional @close@ calls do not occur and no sends occur after a channel is closed.
    77141These design choices fit Go's paradigm of error management, where users are expected to explicitly check for errors, rather than letting errors occur and catching them.
    78 If errors need to occur in Go, return codes are used to pass error information where they are needed.
    79 Note that panics in Go can be caught, but it is not considered an idiomatic way to write Go programs.
     142If errors need to occur in Go, return codes are used to pass error information up call levels.
     143Note, panics in Go can be caught, but it is not the idiomatic way to write Go programs.
    80144
    81145While Go's channel closing semantics are powerful enough to perform any concurrent termination needed by a program, their lack of ease of use leaves much to be desired.
    82 Since both closing and sending panic, once a channel is closed, a user often has to synchronize the senders to a channel before the channel can be closed to avoid panics.
    83 However, in doing so it renders the \code{close} operation nearly useless, as the only utilities it provides are the ability to ensure that receivers no longer block on the channel, and will receive zero-valued elements.
    84 This can be useful if the zero-typed element is recognized as a sentinel value, but if another sentinel value is preferred, then \code{close} only provides its non-blocking feature.
    85 To avoid \gls{toctou} issues during shutdown, a busy wait with a \code{select} statement is often used to add or remove elements from a channel.
     146Since both closing and sending panic once a channel is closed, a user often has to synchronize the senders (producers) before the channel can be closed to avoid panics.
     147However, in doing so it renders the @close@ operation nearly useless, as the only utilities it provides are the ability to ensure receivers no longer block on the channel and receive zero-valued elements.
     148This functionality is only useful if the zero-typed element is recognized as a sentinel value, but if another sentinel value is necessary, then @close@ only provides the non-blocking feature.
     149To avoid \gls{toctou} issues during shutdown, a busy wait with a \Go{select} statement is often used to add or remove elements from a channel.
    86150Due to Go's asymmetric approach to channel shutdown, separate synchronization between producers and consumers of a channel has to occur during shutdown.
    87151
    88 In \CFA, exception handling is an encouraged paradigm and has full language support \cite{Beach21}.
    89 As such \CFA uses an exception based approach to channel shutdown that is symmetric for both producers and consumers, and supports graceful shutdown.Exceptions in \CFA support both termination and resumption.Termination exceptions operate in the same way as exceptions seen in many popular programming languages such as \CC, Python and Java.
    90 Resumption exceptions are a style of exception that when caught run the corresponding catch block in the same way that termination exceptions do.
    91 The difference between the exception handling mechanisms arises after the exception is handled.
    92 In termination handling, the control flow continues into the code following the catch after the exception is handled.
    93 In resumption handling, the control flow returns to the site of the \code{throw}, allowing the control to continue where it left off.
    94 Note that in resumption, since control can return to the point of error propagation, the stack is not unwound during resumption propagation.
    95 In \CFA if a resumption is not handled, it is reraised as a termination.
    96 This mechanism can be used to create a flexible and robust termination system for channels.
    97 
    98 When a channel in \CFA is closed, all subsequent calls to the channel will throw a resumption exception at the caller.
    99 If the resumption is handled, then the caller will proceed to attempt to complete their operation.
    100 If the resumption is not handled it is then rethrown as a termination exception.
    101 Or, if the resumption is handled, but the subsequent attempt at an operation would block, a termination exception is thrown.
    102 These termination exceptions allow for non-local transfer that can be used to great effect to eagerly and gracefully shut down a thread.
     152\paragraph{\CFA channels} have access to an extensive exception handling mechanism~\cite{Beach21}.
     153As such \CFA uses an exception-based approach to channel shutdown that is symmetric for both producers and consumers, and supports graceful shutdown.
     154
     155Exceptions in \CFA support both termination and resumption.
     156\Newterm{Termination exception}s perform a dynamic call that unwinds the stack preventing the exception handler from returning to the raise point, such as in \CC, Python and Java.
     157\Newterm{Resumption exception}s perform a dynamic call that does not unwind the stack allowing the exception handler to return to the raise point.
     158In \CFA, if a resumption exception is not handled, it is reraised as a termination exception.
     159This mechanism is used to create a flexible and robust termination system for channels.
     160
     161When a channel in \CFA is closed, all subsequent calls to the channel raise a resumption exception at the caller.
     162If the resumption is handled, the caller attempts to complete the channel operation.
     163However, if channel operation would block, a termination exception is thrown.
     164If the resumption is not handled, the exception is rethrown as a termination.
     165These termination exceptions allow for non-local transfer that is used to great effect to eagerly and gracefully shut down a thread.
    103166When a channel is closed, if there are any blocked producers or consumers inside the channel, they are woken up and also have a resumption thrown at them.
    104 The resumption exception, \code{channel_closed}, has a couple fields to aid in handling the exception.
     167The resumption exception, @channel_closed@, has a couple fields to aid in handling the exception.
    105168The exception contains a pointer to the channel it was thrown from, and a pointer to an element.
    106169In exceptions thrown from remove the element pointer will be null.
     
    112175This should not be an issue, since termination is rarely an fast-path of an application and ensuring that termination can be implemented correctly with ease is the aim of the exception approach.
    113176
    114 To highlight the differences between \CFA's and Go's close semantics, an example program is presented.
    115 The program is a barrier implemented using two channels shown in Listings~\ref{l:cfa_chan_bar} and \ref{l:go_chan_bar}.
     177\section{\CFA / Go channel Examples}
     178To highlight the differences between \CFA's and Go's close semantics, three examples will be presented.
     179The first example is a simple shutdown case, where there are producer threads and consumer threads operating on a channel for a fixed duration.
     180Once the duration ends, producers and consumers terminate without worrying about any leftover values in the channel.
     181The second example extends the first example by requiring the channel to be empty upon shutdown.
     182Both the first and second example are shown in Figure~\ref{f:ChannelTermination}.
     183
     184
     185First the Go solutions to these examples shown in Figure~\ref{l:go_chan_term} are discussed.
     186Since some of the elements being passed through the channel are zero-valued, closing the channel in Go does not aid in communicating shutdown.
     187Instead, a different mechanism to communicate with the consumers and producers needs to be used.
     188This use of an additional flag or communication method is common in Go channel shutdown code, since to avoid panics on a channel, the shutdown of a channel often has to be communicated with threads before it occurs.
     189In this example, a flag is used to communicate with producers and another flag is used for consumers.
     190Producers and consumers need separate avenues of communication both so that producers terminate before the channel is closed to avoid panicking, and to avoid the case where all the consumers terminate first, which can result in a deadlock for producers if the channel is full.
     191The producer flag is set first, then after producers terminate the consumer flag is set and the channel is closed.
     192In the second example where all values need to be consumed, the main thread iterates over the closed channel to process any remaining values.
     193
     194
     195In the \CFA solutions in Figure~\ref{l:cfa_chan_term}, shutdown is communicated directly to both producers and consumers via the @close@ call.
     196In the first example where all values do not need to be consumed, both producers and consumers do not handle the resumption and finish once they receive the termination exception.
     197The second \CFA example where all values must be consumed highlights how resumption is used with channel shutdown.
     198The @Producer@ thread-main knows to stop producing when the @insert@ call on a closed channel raises exception @channel_closed@.
     199The @Consumer@ thread-main knows to stop consuming after all elements of a closed channel are removed and the call to @remove@ would block.
     200Hence, the consumer knows the moment the channel closes because a resumption exception is raised, caught, and ignored, and then control returns to @remove@ to return another item from the buffer.
     201Only when the buffer is drained and the call to @remove@ would block, a termination exception is raised to stop consuming.
     202The \CFA semantics allow users to communicate channel shutdown directly through the channel, without having to share extra state between threads.
     203Additionally, when the channel needs to be drained, \CFA provides users with easy options for processing the leftover channel values in the main thread or in the consumer threads.
     204If one wishes to consume the leftover values in the consumer threads in Go, extra synchronization between the main thread and the consumer threads is needed.
     205
     206\begin{figure}
     207\centering
     208
     209\begin{lrbox}{\myboxA}
     210\begin{cfa}[aboveskip=0pt,belowskip=0pt]
     211channel( size_t ) Channel{ ChannelSize };
     212
     213thread Consumer {};
     214void main( Consumer & this ) {
     215    try {
     216        for ( ;; )
     217            remove( Channel );
     218    @} catchResume( channel_closed * ) { @
     219    // handled resume => consume from chan
     220    } catch( channel_closed * ) {
     221        // empty or unhandled resume
     222    }
     223}
     224
     225thread Producer {};
     226void main( Producer & this ) {
     227    size_t count = 0;
     228    try {
     229        for ( ;; )
     230            insert( Channel, count++ );
     231    } catch ( channel_closed * ) {
     232        // unhandled resume or full
     233    }
     234}
     235
     236int main( int argc, char * argv[] ) {
     237    Consumer c[Consumers];
     238    Producer p[Producers];
     239    sleep(Duration`s);
     240    close( Channel );
     241    return 0;
     242}
     243\end{cfa}
     244\end{lrbox}
     245
     246\begin{lrbox}{\myboxB}
     247\begin{cfa}[aboveskip=0pt,belowskip=0pt]
     248var cons_done, prod_done bool = false, false;
     249var prodJoin chan int = make(chan int, Producers)
     250var consJoin chan int = make(chan int, Consumers)
     251
     252func consumer( channel chan uint64 ) {
     253    for {
     254        if cons_done { break }
     255        <-channel
     256    }
     257    consJoin <- 0 // synch with main thd
     258}
     259
     260func producer( channel chan uint64 ) {
     261    var count uint64 = 0
     262    for {
     263        if prod_done { break }
     264        channel <- count++
     265    }
     266    prodJoin <- 0 // synch with main thd
     267}
     268
     269func main() {
     270    channel = make(chan uint64, ChannelSize)
     271    for j := 0; j < Consumers; j++ {
     272        go consumer( channel )
     273    }
     274    for j := 0; j < Producers; j++ {
     275        go producer( channel )
     276    }
     277    time.Sleep(time.Second * Duration)
     278    prod_done = true
     279    for j := 0; j < Producers ; j++ {
     280        <-prodJoin // wait for prods
     281    }
     282    cons_done = true
     283    close(channel) // ensure no cons deadlock
     284    @for elem := range channel { @
     285        // process leftover values
     286    @}@
     287    for j := 0; j < Consumers; j++{
     288        <-consJoin // wait for cons
     289    }
     290}
     291\end{cfa}
     292\end{lrbox}
     293
     294\subfloat[\CFA style]{\label{l:cfa_chan_term}\usebox\myboxA}
     295\hspace*{3pt}
     296\vrule
     297\hspace*{3pt}
     298\subfloat[Go style]{\label{l:go_chan_term}\usebox\myboxB}
     299\caption{Channel Termination Examples 1 and 2. Code specific to example 2 is highlighted.}
     300\label{f:ChannelTermination}
     301\end{figure}
     302
     303The final shutdown example uses channels to implement a barrier.
     304It is shown in Figure~\ref{f:ChannelBarrierTermination}.
     305The problem of implementing a barrier is chosen since threads are both producers and consumers on the barrier-internal channels, which removes the ability to easily synchronize producers before consumers during shutdown.
     306As such, while the shutdown details will be discussed with this problem in mind, they are also applicable to other problems taht have individual threads both producing and consuming from channels.
    116307Both of these examples are implemented using \CFA syntax so that they can be easily compared.
    117 Listing~\ref{l:go_chan_bar} uses go-style channel close semantics and Listing~\ref{l:cfa_chan_bar} uses \CFA close semantics.
    118 In this problem it is infeasible to use the Go \code{close} call since all tasks are both potentially producers and consumers, causing panics on close to be unavoidable.
    119 As such in Listing~\ref{l:go_chan_bar} to implement a flush routine for the buffer, a sentinel value of $-1$ has to be used to indicate to threads that they need to leave the barrier.
     308Figure~\ref{l:cfa_chan_bar} uses \CFA-style channel close semantics and Figure~\ref{l:go_chan_bar} uses Go-style close semantics.
     309In this example it is infeasible to use the Go @close@ call since all threads are both potentially producers and consumers, causing panics on close to be unavoidable without complex synchronization.
     310As such in Figure~\ref{l:go_chan_bar} to implement a flush routine for the buffer, a sentinel value of @-1@ has to be used to indicate to threads that they need to leave the barrier.
    120311This sentinel value has to be checked at two points.
    121 Furthermore, an additional flag \code{done} is needed to communicate to threads once they have left the barrier that they are done.
    122 This use of an additional flag or communication method is common in Go channel shutdown code, since to avoid panics on a channel, the shutdown of a channel often has to be communicated with threads before it occurs.
     312Furthermore, an additional flag @done@ is needed to communicate to threads once they have left the barrier that they are done.
     313
    123314In the \CFA version~\ref{l:cfa_chan_bar}, the barrier shutdown results in an exception being thrown at threads operating on it, which informs the threads that they must terminate.
    124315This avoids the need to use a separate communication method other than the barrier, and avoids extra conditional checks on the fast path of the barrier implementation.
    125316Also note that in the Go version~\ref{l:go_chan_bar}, the size of the barrier channels has to be larger than in the \CFA version to ensure that the main thread does not block when attempting to clear the barrier.
    126317
    127 \begin{cfa}[caption={\CFA channel barrier termination},label={l:cfa_chan_bar}]
     318\begin{figure}
     319\centering
     320
     321\begin{lrbox}{\myboxA}
     322\begin{cfa}[aboveskip=0pt,belowskip=0pt]
    128323struct barrier {
    129         channel( int ) barWait;
    130         channel( int ) entryWait;
     324        channel( int ) barWait, entryWait;
    131325        int size;
    132 }
    133 void ?{}(barrier & this, int size) with(this) {
    134         barWait{size};
    135         entryWait{size};
     326};
     327void ?{}( barrier & this, int size ) with(this) {
     328        barWait{size};   entryWait{size};
    136329        this.size = size;
    137         for ( j; size )
    138                 insert( *entryWait, j );
    139 }
    140 
     330        for ( i; size )
     331                insert( entryWait, i );
     332}
     333void wait( barrier & this ) with(this) {
     334        int ticket = remove( entryWait );
     335
     336        if ( ticket == size - 1 ) {
     337                for ( i; size - 1 )
     338                        insert( barWait, i );
     339                return;
     340        }
     341        ticket = remove( barWait );
     342
     343        if ( size == 1 || ticket == size - 2 ) { // last ?
     344                for ( i; size )
     345                        insert( entryWait, i );
     346        }
     347}
    141348void flush(barrier & this) with(this) {
    142         close(barWait);
    143         close(entryWait);
    144 }
    145 void wait(barrier & this) with(this) {
    146         int ticket = remove( *entryWait );
     349        @close( barWait );   close( entryWait );@
     350}
     351enum { Threads = 4 };
     352barrier b{Threads};
     353
     354thread Thread {};
     355void main( Thread & this ) {
     356        @try {@
     357                for ()
     358                        wait( b );
     359        @} catch ( channel_closed * ) {}@
     360}
     361int main() {
     362        Thread t[Threads];
     363        sleep(10`s);
     364
     365        flush( b );
     366} // wait for threads to terminate
     367\end{cfa}
     368\end{lrbox}
     369
     370\begin{lrbox}{\myboxB}
     371\begin{cfa}[aboveskip=0pt,belowskip=0pt]
     372struct barrier {
     373        channel( int ) barWait, entryWait;
     374        int size;
     375};
     376void ?{}( barrier & this, int size ) with(this) {
     377        barWait{size + 1};   entryWait{size + 1};
     378        this.size = size;
     379        for ( i; size )
     380                insert( entryWait, i );
     381}
     382void wait( barrier & this ) with(this) {
     383        int ticket = remove( entryWait );
     384        @if ( ticket == -1 ) { insert( entryWait, -1 ); return; }@
    147385        if ( ticket == size - 1 ) {
    148                 for ( j; size - 1 )
    149                         insert( *barWait, j );
     386                for ( i; size - 1 )
     387                        insert( barWait, i );
    150388                return;
    151389        }
    152         ticket = remove( *barWait );
    153 
    154         // last one out
    155         if ( size == 1 || ticket == size - 2 ) {
    156                 for ( j; size )
    157                         insert( *entryWait, j );
    158         }
    159 }
    160 barrier b{Tasks};
    161 
    162 // thread main
    163 void main(Task & this) {
    164         try {
    165                 for ( ;; ) {
    166                         wait( b );
    167                 }
    168         } catch ( channel_closed * e ) {}
    169 }
    170 
     390        ticket = remove( barWait );
     391        @if ( ticket == -1 ) { insert( barWait, -1 ); return; }@
     392        if ( size == 1 || ticket == size - 2 ) { // last ?
     393                for ( i; size )
     394                        insert( entryWait, i );
     395        }
     396}
     397void flush(barrier & this) with(this) {
     398        @insert( entryWait, -1 );   insert( barWait, -1 );@
     399}
     400enum { Threads = 4 };
     401barrier b{Threads};
     402@bool done = false;@
     403thread Thread {};
     404void main( Thread & this ) {
     405        for () {
     406          @if ( done ) break;@
     407                wait( b );
     408        }
     409}
    171410int main() {
    172         {
    173                 Task t[Tasks];
    174 
    175                 sleep(10`s);
    176                 flush( b );
    177         } // wait for tasks to terminate
    178         return 0;
    179 }
     411        Thread t[Threads];
     412        sleep(10`s);
     413        done = true;
     414        flush( b );
     415} // wait for threads to terminate
    180416\end{cfa}
    181 
    182 \begin{cfa}[caption={Go channel barrier termination},label={l:go_chan_bar}]
    183 
    184 struct barrier {
    185         channel( int ) barWait;
    186         channel( int ) entryWait;
    187         int size;
    188 }
    189 void ?{}(barrier & this, int size) with(this) {
    190         barWait{size + 1};
    191         entryWait{size + 1};
    192         this.size = size;
    193         for ( j; size )
    194                 insert( *entryWait, j );
    195 }
    196 
    197 void flush(barrier & this) with(this) {
    198         insert( *entryWait, -1 );
    199         insert( *barWait, -1 );
    200 }
    201 void wait(barrier & this) with(this) {
    202         int ticket = remove( *entryWait );
    203         if ( ticket == -1 ) {
    204                 insert( *entryWait, -1 );
    205                 return;
    206         }
    207         if ( ticket == size - 1 ) {
    208                 for ( j; size - 1 )
    209                         insert( *barWait, j );
    210                 return;
    211         }
    212         ticket = remove( *barWait );
    213         if ( ticket == -1 ) {
    214                 insert( *barWait, -1 );
    215                 return;
    216         }
    217 
    218         // last one out
    219         if ( size == 1 || ticket == size - 2 ) {
    220                 for ( j; size )
    221                         insert( *entryWait, j );
    222         }
    223 }
    224 barrier b;
    225 
    226 bool done = false;
    227 // thread main
    228 void main(Task & this) {
    229         for ( ;; ) {
    230                 if ( done ) break;
    231                 wait( b );
    232         }
    233 }
    234 
    235 int main() {
    236         {
    237                 Task t[Tasks];
    238 
    239                 sleep(10`s);
    240                 done = true;
    241 
    242                 flush( b );
    243         } // wait for tasks to terminate
    244         return 0;
    245 }
    246 \end{cfa}
    247 
    248 In Listing~\ref{l:cfa_resume} an example of channel closing with resumption is used.
    249 This program uses resumption in the \code{Consumer} thread main to ensure that all elements in the channel are removed before the consumer thread terminates.
    250 The producer only has a \code{catch} so the moment it receives an exception it terminates, whereas the consumer will continue to remove from the closed channel via handling resumptions until the buffer is empty, which then throws a termination exception.
    251 If the same program was implemented in Go it would require explicit synchronization with both producers and consumers by some mechanism outside the channel to ensure that all elements were removed before task termination.
    252 
    253 \begin{cfa}[caption={\CFA channel resumption usage},label={l:cfa_resume}]
    254 channel( int ) chan{ 128 };
    255 
    256 // Consumer thread main
    257 void main(Consumer & this) {
    258         size_t runs = 0;
    259         try {
    260                 for ( ;; ) {
    261                         remove( chan );
    262                 }
    263         } catchResume ( channel_closed * e ) {}
    264         catch ( channel_closed * e ) {}
    265 }
    266 
    267 // Producer thread main
    268 void main(Producer & this) {
    269         int j = 0;
    270         try {
    271                 for ( ;;j++ ) {
    272                         insert( chan, j );
    273                 }
    274         } catch ( channel_closed * e ) {}
    275 }
    276 
    277 int main( int argc, char * argv[] ) {
    278         {
    279                 Consumers c[4];
    280                 Producer p[4];
    281 
    282                 sleep(10`s);
    283 
    284                 for ( i; Channels )
    285                         close( channels[i] );
    286         }
    287         return 0;
    288 }
    289 \end{cfa}
     417\end{lrbox}
     418
     419\subfloat[\CFA style]{\label{l:cfa_chan_bar}\usebox\myboxA}
     420\hspace*{3pt}
     421\vrule
     422\hspace*{3pt}
     423\subfloat[Go style]{\label{l:go_chan_bar}\usebox\myboxB}
     424\caption{Channel Barrier Termination}
     425\label{f:ChannelBarrierTermination}
     426\end{figure}
    290427
    291428\section{Performance}
    292429
    293 Given that the base implementation of the \CFA channels is very similar to the Go implementation, this section aims to show that the performance of the two implementations are comparable.
    294 One microbenchmark is conducted to compare Go and \CFA.
    295 The benchmark is a ten second experiment where producers and consumers operate on a channel in parallel and throughput is measured.
     430Given that the base implementation of the \CFA channels is very similar to the Go implementation, this section aims to show the performance of the two implementations are comparable.
     431The microbenchmark for the channel comparison is similar to Figure~\ref{f:ChannelTermination}, where the number of threads and processors is set from the command line.
     432The processors are divided equally between producers and consumers, with one producer or consumer owning each core.
    296433The number of cores is varied to measure how throughput scales.
    297 The cores are divided equally between producers and consumers, with one producer or consumer owning each core.
     434
    298435The results of the benchmark are shown in Figure~\ref{f:chanPerf}.
    299436The performance of Go and \CFA channels on this microbenchmark is comparable.
    300 Note, it is expected for the performance to decline as the number of cores increases as the channel operations all occur in a critical section so an increase in cores results in higher contention with no increase in parallelism.
    301 
     437Note, the performance should decline as the number of cores increases as the channel operations occur in a critical section, so increasing cores results in higher contention with no increase in parallelism.
    302438
    303439\begin{figure}
Note: See TracChangeset for help on using the changeset viewer.