Changes in / [ff443e5:9082d7e8]


Ignore:
Files:
33 added
1 deleted
6 edited

Legend:

Unmodified
Added
Removed
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/cfa/contend.cfa

    rff443e5 r9082d7e8  
    88
    99// user defines this
    10 #define BIG 1
     10// #define BIG 1
    1111
    1212owner_lock o;
    1313
    14 unsigned long long total_operations = 0;
     14size_t total_operations = 0;
    1515
    1616struct bigObject {
     
    3636Channel * channels;
    3737
    38 volatile bool cons_done = false, prod_done = false;
     38bool cons_done = false, prod_done = false;
    3939volatile int cons_done_count = 0;
    4040size_t cons_check = 0, prod_check = 0;
     
    4848}
    4949void main(Consumer & this) {
    50     unsigned long long runs = 0;
     50    size_t runs = 0;
    5151    size_t my_check = 0;
    5252    for ( ;; ) {
     
    7878}
    7979void main(Producer & this) {
    80     unsigned long long runs = 0;
     80    size_t runs = 0;
    8181    size_t my_check = 0;
     82    size_t my_id = this.i;
    8283    for ( ;; ) {
    8384        if ( prod_done ) break;
    8485        #ifdef BIG
    8586        bigObject j{(size_t)runs};
    86         insert( channels[ this.i ], j );
     87        insert( channels[ my_id ], j );
    8788        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
    8889        #else
    89         insert( channels[ this.i ], (size_t)runs );
     90        insert( channels[ my_id ], (size_t)runs );
    9091        my_check = my_check ^ ((size_t)runs);
    9192        #endif
     
    99100}
    100101
    101 int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
    102     size_t Clusters = Processors;
     102static inline int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
     103    size_t Clusters = 1;
    103104    // create a cluster
    104105    cluster clus[Clusters];
    105106    processor * proc[Processors];
    106107    for ( i; Processors ) {
    107         (*(proc[i] = alloc())){clus[i % Clusters]};
     108        (*(proc[i] = malloc())){clus[i % Clusters]};
    108109    }
    109110
     
    120121    Producer * p[Producers * Channels];
    121122
    122     for ( i; Consumers * Channels ) {
    123         (*(c[i] = alloc())){ i % Channels, clus[i % Clusters] };
    124     }
    125 
    126     for ( i; Producers * Channels ) {
    127         (*(p[i] = alloc())){ i % Channels, clus[i % Clusters] };
     123    for ( j; Channels ) {
     124        for ( i; Producers ) {
     125            (*(p[i] = malloc())){ j, clus[j % Clusters] };
     126        }
     127
     128        for ( i; Consumers ) {
     129            (*(c[i] = malloc())){ j, clus[j % Clusters] };
     130        }
    128131    }
    129132
     
    148151            }
    149152        }
    150        
    151153    }
    152154
     
    185187
    186188int main( int argc, char * argv[] ) {
    187     size_t Processors = 10, Channels = 1, Producers = 10, Consumers = 10, ChannelSize = 128;
     189    size_t Processors = 1, Channels = 1, Producers = 1, Consumers = 1, ChannelSize = 128;
    188190    switch ( argc ) {
    189191          case 3:
     
    206208                exit( EXIT_FAILURE );
    207209        } // switch
    208     Producers = Processors;
    209     Consumers = Processors;
     210    Producers = Processors / 2;
     211    Consumers = Processors / 2;
    210212    test(Processors, Channels, Producers, Consumers, ChannelSize);
    211213}
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/plotData.py

    rff443e5 r9082d7e8  
    3636    procs.append(int(val))
    3737
    38 # 3rd line has num locks args
    39 line = readfile.readline()
    40 locks = []
    41 for val in line.split():
    42     locks.append(int(val))
    43 
    44 # 4th line has number of variants
     38# 3rd line has number of variants
    4539line = readfile.readline()
    4640names = line.split()
     
    5044lines = (line for line in lines if line) # Non-blank lines
    5145
     46class Bench(Enum):
     47    Unset = 0
     48    Contend = 1
     49    Zero = 2
     50    Barrier = 3
     51    Churn = 4
     52    Daisy_Chain = 5
     53    Hot_Potato = 6
     54    Pub_Sub = 7
     55
    5256nameSet = False
    53 currLocks = -1 # default val
     57currBench = Bench.Unset # default val
    5458count = 0
    5559procCount = 0
    5660currVariant = 0
    57 name = "Aggregate Lock"
     61name = ""
    5862var_name = ""
    5963sendData = [0.0 for j in range(numVariants)]
     
    6468    # print(line)
    6569   
    66     if currLocks == -1:
    67         lineArr = line.split()
    68         currLocks = lineArr[-1]
     70    if currBench == Bench.Unset:
     71        if line == "contend:":
     72            name = "Contend"
     73            currBench = Bench.Contend
     74        elif line == "zero:":
     75            name = "Zero"
     76            currBench = Bench.Zero
     77        elif line == "barrier:":
     78            name = "Barrier"
     79            currBench = Bench.Barrier
     80        elif line == "churn:":
     81            name = "Churn"
     82            currBench = Bench.Churn
     83        elif line == "daisy_chain:":
     84            name = "Daisy_Chain"
     85            currBench = Bench.Daisy_Chain
     86        elif line == "hot_potato:":
     87            name = "Hot_Potato"
     88            currBench = Bench.Hot_Potato
     89        elif line == "pub_sub:":
     90            name = "Pub_Sub"
     91            currBench = Bench.Pub_Sub
     92        else:
     93            print("Expected benchmark name")
     94            print("Line: " + line)
     95            sys.exit()
    6996        continue
    7097
     
    95122            if currVariant == numVariants:
    96123                fig, ax = plt.subplots()
    97                 plt.title(name + " Benchmark: " + str(currLocks) + " Locks")
    98                 plt.ylabel("Throughput (entries)")
     124                plt.title(name + " Benchmark")
     125                plt.ylabel("Throughput (channel operations)")
    99126                plt.xlabel("Cores")
    100127                for idx, arr in enumerate(data):
    101128                    plt.errorbar( procs, arr, [bars[idx][0], bars[idx][1]], capsize=2, marker='o' )
     129               
    102130                plt.yscale("log")
     131                # plt.ylim(1, None)
     132                # ax.get_yaxis().set_major_formatter(ticks.ScalarFormatter())
     133                # else:
     134                #     plt.ylim(0, None)
    103135                plt.xticks(procs)
    104136                ax.legend(names)
    105                 # fig.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks) + ".png")
    106                 plt.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks) + ".pgf")
     137                # fig.savefig("plots/" + machineName + name + ".png")
     138                plt.savefig("plots/" + machineName + name + ".pgf")
    107139                fig.clf()
    108140
    109141                # reset
    110                 currLocks = -1
     142                currBench = Bench.Unset
    111143                currVariant = 0
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/run

    rff443e5 r9082d7e8  
    8585}
    8686
    87 numtimes=11
    88 
    89 # locks=('-DLOCKS=L1' '-DLOCKS=L2' '-DLOCKS=L3' '-DLOCKS=L4' '-DLOCKS=L5' '-DLOCKS=L6' '-DLOCKS=L7' '-DLOCKS=L8')
    90 # locks='1 2 3 4 5 6 7 8'
    91 lock_flags=('-DLOCKS=L2' '-DLOCKS=L4' '-DLOCKS=L8')
    92 locks=('2' '4' '8')
     87numtimes=5
    9388
    9489num_threads='2 4 8 16 24 32'
    95 # num_threads='2 4 8'
     90# num_threads='2'
    9691
    9792# toggle benchmarks
    98 order=${true}
    99 rand=${true}
    100 baseline=${true}
     93zero=${false}
     94contend=${true}
     95barrier=${false}
     96churn=${false}
     97daisy_chain=${false}
     98hot_potato=${false}
     99pub_sub=${false}
    101100
    102101runCFA=${true}
    103 runCPP=${true}
     102runGO=${true}
    104103# runCFA=${false}
    105 # runCPP=${false}
     104# runGO=${false}
    106105
    107106cfa=~/cfa-cc/driver/cfa
    108 cpp=g++
    109107
    110108# Helpers to minimize code duplication
     
    152150echo $num_threads
    153151
    154 for i in ${!locks[@]}; do
    155         echo -n ${locks[$i]}' '
    156 done
    157 echo ""
    158 
    159 if [ ${runCFA} -eq ${true} ] && [ ${order} -eq ${true} ]; then
    160     echo -n 'CFA-order '
    161 fi
    162 if [ ${runCPP} -eq ${true} ] && [ ${order} -eq ${true} ]; then
    163     echo -n 'CPP-order '
    164 fi
    165 if [ ${runCFA} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then
    166     echo -n 'CFA-baseline '
    167 fi
    168 if [ ${runCPP} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then
    169     echo -n 'CPP-baseline '
    170 fi
    171 if [ ${runCFA} -eq ${true} ] && [ ${rand} -eq ${true} ]; then
    172     echo -n 'CFA-rand '
    173 fi
    174 if [ ${runCPP} -eq ${true} ] && [ ${rand} -eq ${true} ]; then
    175     echo -n 'CPP-rand '
     152if [ ${runCFA} -eq ${true} ]; then
     153    echo -n 'CFA '
     154fi
     155if [ ${runGO} -eq ${true} ]; then
     156    echo -n 'Go '
    176157fi
    177158echo ""
     
    182163cfa_flags='-quiet -O3 -nodebug -DNDEBUG'
    183164
    184 # cpp flagse
    185 cpp_flags='-O3 -std=c++17 -lpthread -pthread -DNDEBUG'
    186 
    187165# run the benchmarks
    188166
    189 run_order() {
     167run_contend() {
    190168    post_args=${1}
    191169
    192170    if [ ${runCFA} -eq ${true} ] ; then
    193171        cd cfa # CFA RUN
    194         print_header 'CFA-'${3}
    195         ${cfa} ${cfa_flags} ${2} ${3}.cfa -o a.${hostname} > /dev/null 2>&1
     172        print_header 'CFA'
     173        ${cfa} ${cfa_flags} ${2}.cfa -o a.${hostname} > /dev/null 2>&1
    196174        run_bench
    197175        rm a.${hostname}
     
    199177    fi # done CFA
    200178
    201     if [ ${runCPP} -eq ${true} ] ; then
    202         cd cpp # CPP RUN
    203         print_header 'CPP-'${3}
    204         ${cpp} ${cpp_flags} ${2} ${3}.cc -o a.${hostname} > /dev/null 2>&1
     179    if [ ${runGO} -eq ${true} ] ; then
     180        cd go/${2} # Go RUN
     181        print_header 'Go'
     182        go build -o a.${hostname} > /dev/null 2>&1
    205183        run_bench
    206184        rm a.${hostname}
    207185        cd - > /dev/null
    208     fi # done CPP
     186    fi # done Go
    209187}
    210188
    211189# /usr/bin/time -f "%Uu %Ss %Er %Mkb"
    212 
    213 for i in ${!locks[@]}; do
    214     echo "locks: "${locks[$i]}
    215     if [ ${order} -eq ${true} ] ; then
    216         run_order ${locks[$i]} ${lock_flags[$i]} 'order'
    217     fi
    218     if [ ${baseline} -eq ${true} ] ; then
    219         run_order ${locks[$i]} ${lock_flags[$i]} 'baseline'
    220     fi
    221     if [ ${rand} -eq ${true} ] ; then
    222         run_order ${locks[$i]} '-DLOCKS=L8' 'rand'
    223     fi
    224 done
    225 
    226 
     190if [ ${contend} -eq ${true} ] ; then
     191    echo "contend: "
     192    run_contend '128' 'contend'
     193fi
     194
     195if [ ${zero} -eq ${true} ] ; then
     196    echo "zero: "
     197    run_contend '0' 'contend'
     198fi
     199
     200if [ ${barrier} -eq ${true} ] ; then
     201    echo "barrier: "
     202    run_contend '' 'barrier'
     203fi
     204
     205if [ ${churn} -eq ${true} ] ; then
     206    echo "churn: "
     207    run_contend '' 'churn'
     208fi
     209
     210if [ ${daisy_chain} -eq ${true} ] ; then
     211    echo "daisy_chain: "
     212    run_contend '' 'daisy_chain'
     213fi
     214
     215if [ ${hot_potato} -eq ${true} ] ; then
     216    echo "hot_potato: "
     217    run_contend '' 'hot_potato'
     218fi
     219
     220if [ ${pub_sub} -eq ${true} ] ; then
     221    echo "pub_sub: "
     222    run_contend '' 'pub_sub'
     223fi
  • libcfa/src/concurrency/actor.hfa

    rff443e5 r9082d7e8  
    3535
    3636// show stats
    37 // #define STATS
     37// #define ACTOR_STATS
    3838
    3939// forward decls
     
    122122    copy_queue * c_queue;           // current queue
    123123    volatile bool being_processed;  // flag to prevent concurrent processing
    124     #ifdef STATS
     124    #ifdef ACTOR_STATS
    125125    unsigned int id;
    126126    size_t missed;                  // transfers skipped due to being_processed flag being up
     
    132132    c_queue = owned_queue;
    133133    being_processed = false;
    134     #ifdef STATS
     134    #ifdef ACTOR_STATS
    135135    id = i;
    136136    missed = 0;
     
    153153    // check if queue is being processed elsewhere
    154154    if ( unlikely( being_processed ) ) {
    155         #ifdef STATS
     155        #ifdef ACTOR_STATS
    156156        missed++;
    157157        #endif
     
    175175struct worker_info {
    176176    volatile unsigned long long stamp;
    177     #ifdef STATS
     177    #ifdef ACTOR_STATS
    178178    size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
    179179    unsigned long long processed;
     
    182182};
    183183static inline void ?{}( worker_info & this ) {
    184     #ifdef STATS
     184    #ifdef ACTOR_STATS
    185185    this.stolen_from = 0;
    186186    this.try_steal = 0;                             // attempts to steal
     
    194194}
    195195
    196 // #ifdef STATS
     196// #ifdef ACTOR_STATS
    197197// unsigned int * stolen_arr;
    198198// unsigned int * replaced_queue;
     
    206206};
    207207
    208 #ifdef STATS
     208#ifdef ACTOR_STATS
    209209// aggregate counters for statistics
    210210size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     
    235235}; // executor
    236236
    237 // #ifdef STATS
     237// #ifdef ACTOR_STATS
    238238// __spinlock_t out_lock;
    239239// #endif
    240240static inline void ^?{}( worker & mutex this ) with(this) {
    241     #ifdef STATS
     241    #ifdef ACTOR_STATS
    242242    __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    243243    __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     
    276276        no_steal = true;
    277277   
    278     #ifdef STATS
     278    #ifdef ACTOR_STATS
    279279    // stolen_arr = aalloc( nrqueues );
    280280    // replaced_queue = aalloc( nrqueues );
     
    341341    } // for
    342342
    343     #ifdef STATS
     343    #ifdef ACTOR_STATS
    344344    size_t misses = 0;
    345345    for ( i; nrqueues ) {
     
    358358    if ( seperate_clus ) delete( cluster );
    359359
    360     #ifdef STATS // print formatted stats
     360    #ifdef ACTOR_STATS // print formatted stats
    361361    printf("    Actor System Stats:\n");
    362362    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     
    404404    ticket = __get_next_ticket( *__actor_executor_ );
    405405    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    406     #ifdef STATS
     406    #ifdef ACTOR_STATS
    407407    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    408408    #endif
     
    513513            continue;
    514514
    515         #ifdef STATS
     515        #ifdef ACTOR_STATS
    516516        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    517517        if ( curr_steal_queue ) {
     
    526526        #else
    527527        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    528         #endif // STATS
     528        #endif // ACTOR_STATS
    529529
    530530        return;
     
    558558
    559559void main( worker & this ) with(this) {
    560     // #ifdef STATS
     560    // #ifdef ACTOR_STATS
    561561    // for ( i; executor_->nrqueues ) {
    562562    //     replaced_queue[i] = 0;
     
    587587        }
    588588        transfer( *curr_work_queue, &current_queue );
    589         #ifdef STATS
     589        #ifdef ACTOR_STATS
    590590        executor_->w_infos[id].gulps++;
    591         #endif // STATS
     591        #endif // ACTOR_STATS
    592592        #ifdef __STEAL
    593593        if ( isEmpty( *current_queue ) ) {
     
    599599            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    600600           
    601             #ifdef STATS
     601            #ifdef ACTOR_STATS
    602602            executor_->w_infos[id].try_steal++;
    603             #endif // STATS
     603            #endif // ACTOR_STATS
    604604           
    605605            steal_work( this, start + prng( range ) );
     
    608608        #endif // __STEAL
    609609        while ( ! isEmpty( *current_queue ) ) {
    610             #ifdef STATS
     610            #ifdef ACTOR_STATS
    611611            executor_->w_infos[id].processed++;
    612612            #endif
     
    636636
    637637static inline void __reset_stats() {
    638     #ifdef STATS
     638    #ifdef ACTOR_STATS
    639639    __total_tries = 0;
    640640    __total_stolen = 0;
  • libcfa/src/concurrency/channel.hfa

    rff443e5 r9082d7e8  
    33#include <locks.hfa>
    44#include <list.hfa>
    5 
    6 #define __COOP_CHANNEL
    7 #ifdef __PREVENTION_CHANNEL
    8 forall( T ) {
    9 struct channel {
    10     size_t size, count, front, back;
    11     T * buffer;
    12     thread$ * chair;
    13     T * chair_elem;
    14     exp_backoff_then_block_lock c_lock, p_lock;
    15     __spinlock_t mutex_lock;
    16     char __padding[64]; // avoid false sharing in arrays of channels
    17 };
    18 
    19 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    20     size = _size;
    21     front = back = count = 0;
    22     buffer = aalloc( size );
    23     chair = 0p;
    24     mutex_lock{};
    25     c_lock{};
    26     p_lock{};
    27 }
    28 
    29 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    30 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    31 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    32 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    33 static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
    34 
    35 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    36     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    37     count += 1;
    38     back++;
    39     if ( back == size ) back = 0;
    40 }
    41 
    42 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    43     lock( p_lock );
    44     lock( mutex_lock __cfaabi_dbg_ctx2 );
    45 
    46     // have to check for the zero size channel case
    47     if ( size == 0 && chair != 0p ) {
    48         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    49         unpark( chair );
    50         chair = 0p;
    51         unlock( mutex_lock );
    52         unlock( p_lock );
    53         unlock( c_lock );
    54         return;
    55     }
    56 
    57     // wait if buffer is full, work will be completed by someone else
    58     if ( count == size ) {
    59         chair = active_thread();
    60         chair_elem = &elem;
    61         unlock( mutex_lock );
    62         park( );
    63         return;
    64     } // if
    65 
    66     if ( chair != 0p ) {
    67         memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
    68         unpark( chair );
    69         chair = 0p;
    70         unlock( mutex_lock );
    71         unlock( p_lock );
    72         unlock( c_lock );
    73         return;
    74     }
    75     insert_( chan, elem );
    76 
    77     unlock( mutex_lock );
    78     unlock( p_lock );
    79 }
    80 
    81 static inline T remove( channel(T) & chan ) with(chan) {
    82     lock( c_lock );
    83     lock( mutex_lock __cfaabi_dbg_ctx2 );
    84     T retval;
    85 
    86     // have to check for the zero size channel case
    87     if ( size == 0 && chair != 0p ) {
    88         memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
    89         unpark( chair );
    90         chair = 0p;
    91         unlock( mutex_lock );
    92         unlock( p_lock );
    93         unlock( c_lock );
    94         return retval;
    95     }
    96 
    97     // wait if buffer is empty, work will be completed by someone else
    98     if ( count == 0 ) {
    99         chair = active_thread();
    100         chair_elem = &retval;
    101         unlock( mutex_lock );
    102         park( );
    103         return retval;
    104     }
    105 
    106     // Remove from buffer
    107     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    108     count -= 1;
    109     front++;
    110     if ( front == size ) front = 0;
    111 
    112     if ( chair != 0p ) {
    113         insert_( chan, *chair_elem );  // do waiting producer work
    114         unpark( chair );
    115         chair = 0p;
    116         unlock( mutex_lock );
    117         unlock( p_lock );
    118         unlock( c_lock );
    119         return retval;
    120     }
    121 
    122     unlock( mutex_lock );
    123     unlock( c_lock );
    124     return retval;
    125 }
    126 
    127 } // forall( T )
    128 #endif
    129 
    130 #ifdef __COOP_CHANNEL
     5#include <mutex_stmt.hfa>
    1316
    1327// link field used for threads waiting on channel
     
    14823}
    14924
     25// wake one thread from the list
     26static inline void wake_one( dlist( wait_link ) & queue ) {
     27    wait_link & popped = try_pop_front( queue );
     28    unpark( popped.t );
     29}
     30
     31// returns true if woken due to shutdown
     32// blocks thread on list and releases passed lock
     33static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
     34    wait_link w{ active_thread(), elem_ptr };
     35    insert_last( queue, w );
     36    unlock( lock );
     37    park();
     38    return w.elem == 0p;
     39}
     40
     41// void * used for some fields since exceptions don't work with parametric polymorphism currently
     42exception channel_closed {
     43    // on failed insert elem is a ptr to the element attempting to be inserted
     44    // on failed remove elem ptr is 0p
     45    // on resumption of a failed insert this elem will be inserted
     46    // so a user may modify it in the resumption handler
     47    void * elem;
     48
     49    // pointer to chan that is closed
     50    void * closed_chan;
     51};
     52vtable(channel_closed) channel_closed_vt;
     53
     54// #define CHAN_STATS // define this to get channel stats printed in dtor
     55
    15056forall( T ) {
    15157
    152 struct channel {
    153     size_t size;
    154     size_t front, back, count;
     58struct __attribute__((aligned(128))) channel {
     59    size_t size, front, back, count;
    15560    T * buffer;
    156     dlist( wait_link ) prods, cons;
    157     exp_backoff_then_block_lock mutex_lock;
     61    dlist( wait_link ) prods, cons; // lists of blocked threads
     62    go_mutex mutex_lock;            // MX lock
     63    bool closed;                    // indicates channel close/open
     64    #ifdef CHAN_STATS
     65    size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
     66    #endif
    15867};
    15968
     
    16574    cons{};
    16675    mutex_lock{};
     76    closed = false;
     77    #ifdef CHAN_STATS
     78    blocks = 0;
     79    operations = 0;
     80    #endif
    16781}
    16882
    16983static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    170 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     84static inline void ^?{}( channel(T) &c ) with(c) {
     85    #ifdef CHAN_STATS
     86    printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
     87    #endif
     88    verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
     89    delete( buffer );
     90}
    17191static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    17292static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     
    17595static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    17696
    177 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     97// closes the channel and notifies all blocked threads
     98static inline void close( channel(T) & chan ) with(chan) {
     99    lock( mutex_lock );
     100    closed = true;
     101
     102    // flush waiting consumers and producers
     103    while ( has_waiting_consumers( chan ) ) {
     104        cons`first.elem = 0p;
     105        wake_one( cons );
     106    }
     107    while ( has_waiting_producers( chan ) ) {
     108        prods`first.elem = 0p;
     109        wake_one( prods );
     110    }
     111    unlock(mutex_lock);
     112}
     113
     114static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
     115
     116static inline void flush( channel(T) & chan, T elem ) with(chan) {
     117    lock( mutex_lock );
     118    while ( count == 0 && !cons`isEmpty ) {
     119        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     120        wake_one( cons );
     121    }
     122    unlock( mutex_lock );
     123}
     124
     125// handles buffer insert
     126static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
    178127    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    179128    count += 1;
     
    182131}
    183132
    184 static inline void wake_one( dlist( wait_link ) & queue ) {
    185     wait_link & popped = try_pop_front( queue );
    186     unpark( popped.t );
    187 }
    188 
    189 static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
    190     wait_link w{ active_thread(), elem_ptr };
    191     insert_last( queue, w );
    192     unlock( lock );
    193     park();
     133// does the buffer insert or hands elem directly to consumer if one is waiting
     134static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
     135    if ( count == 0 && !cons`isEmpty ) {
     136        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     137        wake_one( cons );
     138    } else __buf_insert( chan, elem );
     139}
     140
     141// needed to avoid an extra copy in closed case
     142static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     143    lock( mutex_lock );
     144    #ifdef CHAN_STATS
     145    operations++;
     146    #endif
     147    if ( count == size ) { unlock( mutex_lock ); return false; }
     148    __do_insert( chan, elem );
     149    unlock( mutex_lock );
     150    return true;
     151}
     152
     153// attempts a nonblocking insert
     154// returns true if insert was successful, false otherwise
     155static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
     156
     157// handles closed case of insert routine
     158static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
     159    channel_closed except{&channel_closed_vt, &elem, &chan };
     160    throwResume except; // throw closed resumption
     161    if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    194162}
    195163
    196164static inline void insert( channel(T) & chan, T elem ) with(chan) {
    197     lock( mutex_lock );
     165    // check for close before acquire mx
     166    if ( unlikely(closed) ) {
     167        __closed_insert( chan, elem );
     168        return;
     169    }
     170
     171    lock( mutex_lock );
     172
     173    #ifdef CHAN_STATS
     174    if ( !closed ) operations++;
     175    #endif
     176
     177    // if closed handle
     178    if ( unlikely(closed) ) {
     179        unlock( mutex_lock );
     180        __closed_insert( chan, elem );
     181        return;
     182    }
    198183
    199184    // have to check for the zero size channel case
     
    202187        wake_one( cons );
    203188        unlock( mutex_lock );
    204         return;
     189        return true;
    205190    }
    206191
    207192    // wait if buffer is full, work will be completed by someone else
    208193    if ( count == size ) {
    209         block( prods, &elem, mutex_lock );
     194        #ifdef CHAN_STATS
     195        blocks++;
     196        #endif
     197
     198        // check for if woken due to close
     199        if ( unlikely( block( prods, &elem, mutex_lock ) ) )
     200            __closed_insert( chan, elem );
    210201        return;
    211202    } // if
     
    214205        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    215206        wake_one( cons );
    216     } else insert_( chan, elem );
     207    } else __buf_insert( chan, elem );
    217208   
    218209    unlock( mutex_lock );
     210    return;
     211}
     212
     213// handles buffer remove
     214static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
     215    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     216    count -= 1;
     217    front = (front + 1) % size;
     218}
     219
     220// does the buffer remove and potentially does waiting producer work
     221static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
     222    __buf_remove( chan, retval );
     223    if (count == size - 1 && !prods`isEmpty ) {
     224        __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
     225        wake_one( prods );
     226    }
     227}
     228
     229// needed to avoid an extra copy in closed case and single return val case
     230static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     231    lock( mutex_lock );
     232    #ifdef CHAN_STATS
     233    operations++;
     234    #endif
     235    if ( count == 0 ) { unlock( mutex_lock ); return false; }
     236    __do_remove( chan, retval );
     237    unlock( mutex_lock );
     238    return true;
     239}
     240
     241// attempts a nonblocking remove
     242// returns [T, true] if insert was successful
     243// returns [T, false] if insert was successful (T uninit)
     244static inline [T, bool] try_remove( channel(T) & chan ) {
     245    T retval;
     246    return [ retval, __internal_try_remove( chan, retval ) ];
     247}
     248
     249static inline T try_remove( channel(T) & chan, T elem ) {
     250    T retval;
     251    __internal_try_remove( chan, retval );
     252    return retval;
     253}
     254
     255// handles closed case of insert routine
     256static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
     257    channel_closed except{&channel_closed_vt, 0p, &chan };
     258    throwResume except; // throw resumption
     259    if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    219260}
    220261
    221262static inline T remove( channel(T) & chan ) with(chan) {
    222     lock( mutex_lock );
    223263    T retval;
     264    if ( unlikely(closed) ) {
     265        __closed_remove( chan, retval );
     266        return retval;
     267    }
     268    lock( mutex_lock );
     269
     270    #ifdef CHAN_STATS
     271    if ( !closed ) operations++;
     272    #endif
     273
     274    if ( unlikely(closed) ) {
     275        unlock( mutex_lock );
     276        __closed_remove( chan, retval );
     277        return retval;
     278    }
    224279
    225280    // have to check for the zero size channel case
     
    233288    // wait if buffer is empty, work will be completed by someone else
    234289    if (count == 0) {
    235         block( cons, &retval, mutex_lock );
     290        #ifdef CHAN_STATS
     291        blocks++;
     292        #endif
     293        // check for if woken due to close
     294        if ( unlikely( block( cons, &retval, mutex_lock ) ) )
     295            __closed_remove( chan, retval );
    236296        return retval;
    237297    }
    238298
    239299    // Remove from buffer
    240     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    241     count -= 1;
    242     front = (front + 1) % size;
    243 
    244     if (count == size - 1 && !prods`isEmpty ) {
    245         insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
    246         wake_one( prods );
    247     }
     300    __do_remove( chan, retval );
    248301
    249302    unlock( mutex_lock );
     
    251304}
    252305} // forall( T )
    253 #endif
    254 
    255 #ifdef __BARGE_CHANNEL
    256 forall( T ) {
    257 struct channel {
    258     size_t size;
    259     size_t front, back, count;
    260     T * buffer;
    261     fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
    262     exp_backoff_then_block_lock mutex_lock;
    263 };
    264 
    265 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    266     size = _size;
    267     front = back = count = 0;
    268     buffer = aalloc( size );
    269     prods{};
    270     cons{};
    271     mutex_lock{};
    272 }
    273 
    274 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    275 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    276 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    277 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    278 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    279 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    280 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
    281 
    282 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    283     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    284     count += 1;
    285     back++;
    286     if ( back == size ) back = 0;
    287 }
    288 
    289 
    290 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    291     lock( mutex_lock );
    292 
    293     while ( count == size ) {
    294         wait( prods, mutex_lock );
    295     } // if
    296 
    297     insert_( chan, elem );
    298    
    299     if ( !notify_one( cons ) && count < size )
    300         notify_one( prods );
    301 
    302     unlock( mutex_lock );
    303 }
    304 
    305 static inline T remove( channel(T) & chan ) with(chan) {
    306     lock( mutex_lock );
    307     T retval;
    308 
    309     while (count == 0) {
    310         wait( cons, mutex_lock );
    311     }
    312 
    313     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    314     count -= 1;
    315     front = (front + 1) % size;
    316 
    317     if ( !notify_one( prods ) && count > 0 )
    318         notify_one( cons );
    319 
    320     unlock( mutex_lock );
    321     return retval;
    322 }
    323 
    324 } // forall( T )
    325 #endif
    326 
    327 #ifdef __NO_WAIT_CHANNEL
    328 forall( T ) {
    329 struct channel {
    330     size_t size;
    331     size_t front, back, count;
    332     T * buffer;
    333     thread$ * chair;
    334     T * chair_elem;
    335     exp_backoff_then_block_lock c_lock, p_lock;
    336     __spinlock_t mutex_lock;
    337 };
    338 
    339 static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
    340     size = _size;
    341     front = back = count = 0;
    342     buffer = aalloc( size );
    343     chair = 0p;
    344     mutex_lock{};
    345     c_lock{};
    346     p_lock{};
    347     lock( c_lock );
    348 }
    349 
    350 static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    351 static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    352 static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    353 static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    354 static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
    355 
    356 static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    357     memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    358     count += 1;
    359     back++;
    360     if ( back == size ) back = 0;
    361 }
    362 
    363 static inline void insert( channel(T) & chan, T elem ) with( chan ) {
    364     lock( p_lock );
    365     lock( mutex_lock __cfaabi_dbg_ctx2 );
    366 
    367     insert_( chan, elem );
    368 
    369     if ( count != size )
    370         unlock( p_lock );
    371 
    372     if ( count == 1 )
    373         unlock( c_lock );
    374        
    375     unlock( mutex_lock );
    376 }
    377 
    378 static inline T remove( channel(T) & chan ) with(chan) {
    379     lock( c_lock );
    380     lock( mutex_lock __cfaabi_dbg_ctx2 );
    381     T retval;
    382 
    383     // Remove from buffer
    384     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    385     count -= 1;
    386     front = (front + 1) % size;
    387 
    388     if ( count != 0 )
    389         unlock( c_lock );
    390 
    391     if ( count == size - 1 )
    392         unlock( p_lock );
    393        
    394     unlock( mutex_lock );
    395     return retval;
    396 }
    397 
    398 } // forall( T )
    399 #endif
  • libcfa/src/concurrency/locks.hfa

    rff443e5 r9082d7e8  
    3232#include <fstream.hfa>
    3333
    34 
    3534// futex headers
    3635#include <linux/futex.h>      /* Definition of FUTEX_* constants */
     
    155154// futex_mutex
    156155
    157 // - No cond var support
    158156// - Kernel thd blocking alternative to the spinlock
    159157// - No ownership (will deadlock on reacq)
     
    185183        int state;
    186184
    187        
    188         // // linear backoff omitted for now
    189         // for( int spin = 4; spin < 1024; spin += spin) {
    190         //      state = 0;
    191         //      // if unlocked, lock and return
    192         //      if (internal_try_lock(this, state)) return;
    193         //      if (2 == state) break;
    194         //      for (int i = 0; i < spin; i++) Pause();
    195         // }
    196 
    197         // no contention try to acquire
    198         if (internal_try_lock(this, state)) return;
     185        for( int spin = 4; spin < 1024; spin += spin) {
     186                state = 0;
     187                // if unlocked, lock and return
     188                if (internal_try_lock(this, state)) return;
     189                if (2 == state) break;
     190                for (int i = 0; i < spin; i++) Pause();
     191        }
     192
     193        // // no contention try to acquire
     194        // if (internal_try_lock(this, state)) return;
    199195       
    200196        // if not in contended state, set to be in contended state
     
    209205
    210206static inline void unlock(futex_mutex & this) with(this) {
    211         // if uncontended do atomice unlock and then return
    212         if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
     207        // if uncontended do atomic unlock and then return
     208    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
    213209       
    214210        // otherwise threads are blocked so we must wake one
    215         __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    216211        futex((int *)&val, FUTEX_WAKE, 1);
    217212}
     
    222217// to set recursion count after getting signalled;
    223218static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
     219
     220//-----------------------------------------------------------------------------
     221// go_mutex
     222
     223// - Kernel thd blocking alternative to the spinlock
     224// - No ownership (will deadlock on reacq)
     225// - Golang's flavour of mutex
     226// - Impl taken from Golang: src/runtime/lock_futex.go
     227struct go_mutex {
     228        // lock state any state other than UNLOCKED is locked
     229        // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 };
     230       
     231        // stores a lock state
     232        int val;
     233};
     234
     235static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
     236
     237static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     238        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
     239}
     240
     241static inline int internal_exchange(go_mutex & this, int swap ) with(this) {
     242        return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE);
     243}
     244
     245const int __go_mtx_spins = 4;
     246const int __go_mtx_pauses = 30;
     247// if this is called recursively IT WILL DEADLOCK!!!!!
     248static inline void lock(go_mutex & this) with(this) {
     249        int state, init_state;
     250
     251    // speculative grab
     252    state = internal_exchange(this, 1);
     253    if ( !state ) return; // state == 0
     254    init_state = state;
     255    for (;;) {
     256        for( int i = 0; i < __go_mtx_spins; i++ ) {
     257            while( !val ) { // lock unlocked
     258                state = 0;
     259                if (internal_try_lock(this, state, init_state)) return;
     260            }
     261            for (int i = 0; i < __go_mtx_pauses; i++) Pause();
     262        }
     263
     264        while( !val ) { // lock unlocked
     265            state = 0;
     266            if (internal_try_lock(this, state, init_state)) return;
     267        }
     268        sched_yield();
     269       
     270        // if not in contended state, set to be in contended state
     271        state = internal_exchange(this, 2);
     272        if ( !state ) return; // state == 0
     273        init_state = 2;
     274        futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
     275    }
     276}
     277
     278static inline void unlock( go_mutex & this ) with(this) {
     279        // if uncontended do atomic unlock and then return
     280    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     281       
     282        // otherwise threads are blocked so we must wake one
     283        futex((int *)&val, FUTEX_WAKE, 1);
     284}
     285
     286static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
     287static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
     288static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    224289
    225290//-----------------------------------------------------------------------------
     
    271336        this.lock_value = 0;
    272337}
     338
     339static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    273340
    274341static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
Note: See TracChangeset for help on using the changeset viewer.