% ====================================================================== % ====================================================================== \chapter{Channels}\label{s:channels} % ====================================================================== % ====================================================================== Most modern concurrent programming languages do not subscribe to just one style of communication among threads and provide features that support multiple approaches. Channels are a concurrent-language feature used to perform \Newterm{message-passing concurrency}: a model of concurrency where threads communicate by sending data as messages (mostly non\-blocking) and synchronizing by receiving sent messages (blocking). This model is an alternative to shared-memory concurrency, where threads communicate directly by changing shared state. Channels were first introduced by Kahn~\cite{Kahn74} and extended by Hoare~\cite{CSP} (CSP). Both papers present a pseudo (unimplemented) concurrent language where processes communicate using input/output channels to send data. Both languages are highly restrictive. Kahn's language restricts a reading process to only wait for data on a single channel at a time and different writing processes cannot send data on the same channel. Hoare's language restricts both the sender and receiver to explicitly name the process that is the destination of a channel send or the source of a channel receive. These channel semantics remove the ability to have an anonymous sender or receiver. Additionally all channel operations in CSP are synchronous (no buffering). Advanced channels as a programming language feature has been popularized in recent years by the language Go~\cite{Go}, which encourages the use of channels as its fundamental concurrent feature. It was the popularity of Go channels that lead to their implementation in \CFA. Neither Go nor \CFA channels have the restrictions of the early channel-based concurrent systems. Other popular languages and libraries that provide channels include C++ Boost~\cite{boost:channel}, Rust~\cite{rust:channel}, Haskell~\cite{haskell:channel}, and OCaml~\cite{ocaml:channel}. Boost channels only support asynchronous (non-blocking) operations, and Rust channels are limited to only having one consumer per channel. Haskell channels are unbounded in size, and OCaml channels are zero-size. These restrictions in Haskell and OCaml are likely due to their functional approach, which results in them both using a list as the underlying data structure for their channel. These languages and libraries are not discussed further, as their channel implementation is not comparable to the bounded-buffer style channels present in Go and \CFA. \section{Producer-Consumer Problem} A channel is an abstraction for a shared-memory buffer, which turns the implementation of a channel into the producer-consumer problem. The producer-consumer problem, also known as the bounded-buffer problem, was introduced by Dijkstra~\cite[\S~4.1]{Dijkstra65}. In the problem, threads interact with a buffer in two ways: producing threads insert values into the buffer and consuming threads remove values from the buffer. In general, a buffer needs protection to ensure a producer only inserts into a non-full buffer and a consumer only removes from a non-empty buffer (synchronization). As well, a buffer needs protection from concurrent access by multiple producers or consumers attempting to insert or remove simultaneously (MX). \section{Channel Size}\label{s:ChannelSize} Channels come in three flavours of buffers: \begin{enumerate} \item Zero sized implies the communication is synchronous, \ie the producer must wait for the consumer to arrive or vice versa for a value to be communicated. \item Fixed sized (bounded) implies the communication is asynchronous, \ie the producer can proceed up to the buffer size and vice versa for the consumer with respect to removal. \item Infinite sized (unbounded) implies the communication is asynchronous, \ie the producer never waits but the consumer waits when the buffer is empty. Since memory is finite, all unbounded buffers are ultimately bounded; this restriction must be part of its implementation. \end{enumerate} In general, the order values are processed by the consumer does not affect the correctness of the producer-consumer problem. For example, the buffer can be \gls{lifo}, \gls{fifo}, or prioritized with respect to insertion and removal. However, like MX, a buffer should ensure every value is eventually removed after some reasonable bounded time (no long-term starvation). The simplest way to prevent starvation is to implement the buffer as a queue, either with a cyclic array or linked nodes. \section{First-Come First-Served} As pointed out, a bounded buffer requires MX among multiple producers or consumers. This MX should be fair among threads, independent of the \gls{fifo} buffer being fair among values. Fairness among threads is called \gls{fcfs} and was defined by Lamport~\cite[p.~454]{Lamport74}. \gls{fcfs} is defined in relation to a doorway~\cite[p.~330]{Lamport86II}, which is the point at which an ordering among threads can be established. Given this doorway, a CS is said to be \gls{fcfs}, if threads access the shared resource in the order they proceed through the doorway. A consequence of \gls{fcfs} execution is the elimination of \Newterm{barging}, where barging means a thread arrives at a CS with waiting threads, and the MX protecting the CS allows the arriving thread to enter the CS ahead of one or more of the waiting threads. \gls{fcfs} is a fairness property that prevents unequal access to the shared resource and prevents starvation, however it comes at a cost. Implementing an algorithm with \gls{fcfs} can lead to \Newterm{double blocking}, where arriving threads block outside the doorway waiting for a thread in the lock entry-protocol and inside the doorway waiting for a thread in the CS. An analogue is boarding an airplane: first you wait to get through security to the departure gates (short term), and then wait again at the departure gate for the airplane (long term). As such, algorithms that are not \gls{fcfs} (barging) can be more performant by skipping the wait for the CS and entering directly; however, this performance gain comes by introducing unfairness with possible starvation for waiting threads. \section{Channel Implementation}\label{s:chan_impl} Currently, only the Go programming language provides user-level threading where the primary communication mechanism is channels. Experiments were conducted that varied the producer-consumer algorithm and lock type used inside the channel. With the exception of non-\gls{fcfs} or non-\gls{fifo} algorithms, no algorithm or lock usage in the channel implementation was found to be consistently more performant that Go's choice of algorithm and lock implementation. Performance of channels can be improved by sharding the underlying buffer \cite{Dice11}. However, the \gls{fifo} property is lost, which is undesirable for user-facing channels. Therefore, the low-level channel implementation in \CFA is largely copied from the Go implementation, but adapted to the \CFA type and runtime systems. As such the research contributions added by \CFA's channel implementation lie in the realm of safety and productivity features. The Go channel implementation utilizes cooperation among threads to achieve good performance~\cite{go:chan}. This cooperation only occurs when producers or consumers need to block due to the buffer being full or empty. In these cases, a blocking thread stores their relevant data in a shared location and the signalling thread completes the blocking thread's operation before waking them; \ie the blocking thread has no work to perform after it unblocks because the signalling threads has done this work. This approach is similar to wait morphing for locks~\cite[p.~82]{Butenhof97} and improves performance in a few ways. First, each thread interacting with the channel only acquires and releases the internal channel lock once. As a result, contention on the internal lock is decreased; only entering threads compete for the lock since unblocking threads do not reacquire the lock. The other advantage of Go's wait-morphing approach is that it eliminates the bottleneck of waiting for signalled threads to run. Note, the property of acquiring/releasing the lock only once can also be achieved with a different form of cooperation, called \Newterm{baton passing}. Baton passing occurs when one thread acquires a lock but does not release it, and instead signals a thread inside the critical section, conceptually ``passing'' the mutual exclusion from the signalling thread to the signalled thread. The baton-passing approach has threads cooperate to pass mutual exclusion without additional lock acquires or releases; the wait-morphing approach has threads cooperate by completing the signalled thread's operation, thus removing a signalled thread's need for mutual exclusion after unblocking. While baton passing is useful in some algorithms, it results in worse channel performance than the Go approach. In the baton-passing approach, all threads need to wait for the signalled thread to reach the front of the ready queue, context switch, and run before other operations on the channel can proceed, since the signalled thread holds mutual exclusion; in the wait-morphing approach, since the operation is completed before the signal, other threads can continue to operate on the channel without waiting for the signalled thread to run. In this work, all channel sizes \see{Sections~\ref{s:ChannelSize}} are implemented with bounded buffers. However, only non-zero-sized buffers are analysed because of their complexity and higher usage. \section{Safety and Productivity} Channels in \CFA come with safety and productivity features to aid users. The features include the following. \begin{itemize} \item Toggle-able statistic collection on channel behaviour that count channel and blocking operations. Tracking blocking operations helps illustrate usage for tuning the channel size, where the aim is to reduce blocking. \item Deadlock detection on channel deallocation. If threads are blocked inside a channel when it terminates, this case is detected and the user is informed, as this can cause a deadlock. \item A @flush@ routine that delivers copies of an element to all waiting consumers, flushing the buffer. Programmers use this mechanism to broadcast a sentinel value to multiple consumers. Additionally, the @flush@ routine is more performant than looping around the @insert@ operation since it can deliver the elements without having to reacquire mutual exclusion for each element sent. \item Go-style @?< consume from chan } catch( channel_closed * ) { // empty or unhandled resume } } int main() { Consumer c[4]; Producer p[4]; sleep( 10`s ); close( chan ); } \end{cfa} \end{lrbox} \subfloat[Go style]{\label{l:go_chan_term}\usebox\myboxA} \hspace*{3pt} \vrule \hspace*{3pt} \subfloat[\CFA style]{\label{l:cfa_chan_term}\usebox\myboxB} \caption{Channel Termination Examples 1 and 2. Code specific to example 2 is highlighted.} \label{f:ChannelTermination} \end{figure} Figure~\ref{l:go_chan_term} shows the Go solution. Since some of the elements being passed through the channel are zero-valued, closing the channel in Go does not aid in communicating shutdown. Instead, a different mechanism to communicate with the consumers and producers needs to be used. Flag variables are common in Go-channel shutdown-code to avoid panics on a channel, meaning the channel shutdown has to be communicated with threads before it occurs. Hence, the two flags @cons_done@ and @prod_done@ are used to communicate with the producers and consumers, respectively. Furthermore, producers and consumers need to shutdown separately to ensure that producers terminate before the channel is closed to avoid panicking, and to avoid the case where all the consumers terminate first, which can result in a deadlock for producers if the channel is full. The producer flag is set first; then after all producers terminate, the consumer flag is set and the channel is closed leaving elements in the buffer. To purge the buffer, a loop is added (red) that iterates over the closed channel to process any remaining values. Figure~\ref{l:cfa_chan_term} shows the \CFA solution. Here, shutdown is communicated directly to both producers and consumers via the @close@ call. A @Producer@ thread knows to stop producing when the @insert@ call on a closed channel raises exception @channel_closed@. If a @Consumer@ thread ignores the first resumption exception from the @close@, the exception is reraised as a termination exception and elements are left in the buffer. If a @Consumer@ thread handles the resumptions exceptions (red), control returns to complete the remove. A @Consumer@ thread knows to stop consuming after all elements of a closed channel are removed and the consumer would block, which causes a termination raise of @channel_closed@. The \CFA semantics allow users to communicate channel shutdown directly through the channel, without having to share extra state between threads. Additionally, when the channel needs to be drained, \CFA provides users with easy options for processing the leftover channel values in the main thread or in the consumer threads. Figure~\ref{f:ChannelBarrierTermination} shows a final shutdown example using channels to implement a barrier. A Go and \CFA style solution are presented but both are implemented using \CFA syntax so they can be easily compared. Implementing a barrier is interesting because threads are both producers and consumers on the barrier-internal channels, @entryWait@ and @barWait@. The outline for the barrier implementation starts by initially filling the @entryWait@ channel with $N$ tickets in the barrier constructor, allowing $N$ arriving threads to remove these values and enter the barrier. After @entryWait@ is empty, arriving threads block when removing. However, the arriving threads that entered the barrier cannot leave the barrier until $N$ threads have arrived. Hence, the entering threads block on the empty @barWait@ channel until the $N$th arriving thread inserts $N-1$ elements into @barWait@ to unblock the $N-1$ threads calling @remove@. The race between these arriving threads blocking on @barWait@ and the $N$th thread inserting values into @barWait@ does not affect correctness; \ie an arriving thread may or may not block on channel @barWait@ to get its value. Finally, the last thread to remove from @barWait@ with ticket $N-2$, refills channel @entryWait@ with $N$ values to start the next group into the barrier. Now, the two channels makes termination synchronization between producers and consumers difficult. Interestingly, the shutdown details for this problem are also applicable to other problems with threads producing and consuming from the same channel. The Go-style solution cannot use the Go @close@ call since all threads are both potentially producers and consumers, causing panics on close to be unavoidable without complex synchronization. As such in Figure \ref{l:go_chan_bar}, a flush routine is needed to insert a sentinel value, @-1@, to inform threads waiting in the buffer they need to leave the barrier. This sentinel value has to be checked at two points along the fast-path and sentinel values daisy-chained into the buffers. Furthermore, an additional flag @done@ is needed to communicate to threads once they have left the barrier that they are done. Also note that in the Go version~\ref{l:go_chan_bar}, the size of the barrier channels has to be larger than in the \CFA version to ensure that the main thread does not block when attempting to clear the barrier. For The \CFA solution~\ref{l:cfa_chan_bar}, the barrier shutdown results in an exception being thrown at threads operating on it, to inform waiting threads they must leave the barrier. This avoids the need to use a separate communication method other than the barrier, and avoids extra conditional checks on the fast path of the barrier implementation. \begin{figure} \centering \begin{lrbox}{\myboxA} \begin{cfa}[aboveskip=0pt,belowskip=0pt] struct barrier { channel( int ) barWait, entryWait; int size; }; void ?{}( barrier & this, int size ) with(this) { barWait{size + 1}; entryWait{size + 1}; this.size = size; for ( i; size ) insert( entryWait, i ); } void wait( barrier & this ) with(this) { int ticket = remove( entryWait ); @if ( ticket == -1 ) { insert( entryWait, -1 ); return; }@ if ( ticket == size - 1 ) { for ( i; size - 1 ) insert( barWait, i ); return; } ticket = remove( barWait ); @if ( ticket == -1 ) { insert( barWait, -1 ); return; }@ if ( size == 1 || ticket == size - 2 ) { // last ? for ( i; size ) insert( entryWait, i ); } } void flush(barrier & this) with(this) { @insert( entryWait, -1 ); insert( barWait, -1 );@ } enum { Threads = 4 }; barrier b{Threads}; @bool done = false;@ thread Thread {}; void main( Thread & this ) { for () { @if ( done ) break;@ wait( b ); } } int main() { Thread t[Threads]; sleep(10`s); done = true; flush( b ); } // wait for threads to terminate \end{cfa} \end{lrbox} \begin{lrbox}{\myboxB} \begin{cfa}[aboveskip=0pt,belowskip=0pt] struct barrier { channel( int ) barWait, entryWait; int size; }; void ?{}( barrier & this, int size ) with(this) { barWait{size}; entryWait{size}; this.size = size; for ( i; size ) insert( entryWait, i ); } void wait( barrier & this ) with(this) { int ticket = remove( entryWait ); if ( ticket == size - 1 ) { for ( i; size - 1 ) insert( barWait, i ); return; } ticket = remove( barWait ); if ( size == 1 || ticket == size - 2 ) { // last ? for ( i; size ) insert( entryWait, i ); } } void flush(barrier & this) with(this) { @close( barWait ); close( entryWait );@ } enum { Threads = 4 }; barrier b{Threads}; thread Thread {}; void main( Thread & this ) { @try {@ for () wait( b ); @} catch ( channel_closed * ) {}@ } int main() { Thread t[Threads]; sleep(10`s); flush( b ); } // wait for threads to terminate \end{cfa} \end{lrbox} \subfloat[Go style]{\label{l:go_chan_bar}\usebox\myboxA} \hspace*{3pt} \vrule \hspace*{3pt} \subfloat[\CFA style]{\label{l:cfa_chan_bar}\usebox\myboxB} \caption{Channel Barrier Termination} \label{f:ChannelBarrierTermination} \end{figure} \section{Performance} Given that the base implementation of the \CFA channels is very similar to the Go implementation, this section aims to show the performance of the two implementations are comparable. The microbenchmark for the channel comparison is similar to Figure~\ref{f:ChannelTermination}, where the number of threads and processors is set from the command line. The processors are divided equally between producers and consumers, with one producer or consumer owning each core. The number of cores is varied to measure how throughput scales. The results of the benchmark are shown in Figure~\ref{f:chanPerf}. The performance of Go and \CFA channels on this microbenchmark is comparable. Note, the performance should decline as the number of cores increases as the channel operations occur in a critical section, so increasing cores results in higher contention with no increase in parallelism. \begin{figure} \centering \subfloat[AMD \CFA Channel Benchmark]{ \resizebox{0.5\textwidth}{!}{\input{figures/nasus_Channel_Contention.pgf}} \label{f:chanAMD} } \subfloat[Intel \CFA Channel Benchmark]{ \resizebox{0.5\textwidth}{!}{\input{figures/pyke_Channel_Contention.pgf}} \label{f:chanIntel} } \caption{The channel contention benchmark comparing \CFA and Go channel throughput (higher is better).} \label{f:chanPerf} \end{figure} % Local Variables: % % tab-width: 4 % % End: %