Changes in / [9082d7e8:ff443e5]


Ignore:
Files:
1 added
33 deleted
6 edited

Legend:

Unmodified
Added
Removed
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/cfa/contend.cfa

    r9082d7e8 rff443e5  
    88
    99// user defines this
    10 // #define BIG 1
     10#define BIG 1
    1111
    1212owner_lock o;
    1313
    14 size_t total_operations = 0;
     14unsigned long long total_operations = 0;
    1515
    1616struct bigObject {
     
    3636Channel * channels;
    3737
    38 bool cons_done = false, prod_done = false;
     38volatile bool cons_done = false, prod_done = false;
    3939volatile int cons_done_count = 0;
    4040size_t cons_check = 0, prod_check = 0;
     
    4848}
    4949void main(Consumer & this) {
    50     size_t runs = 0;
     50    unsigned long long runs = 0;
    5151    size_t my_check = 0;
    5252    for ( ;; ) {
     
    7878}
    7979void main(Producer & this) {
    80     size_t runs = 0;
     80    unsigned long long runs = 0;
    8181    size_t my_check = 0;
    82     size_t my_id = this.i;
    8382    for ( ;; ) {
    8483        if ( prod_done ) break;
    8584        #ifdef BIG
    8685        bigObject j{(size_t)runs};
    87         insert( channels[ my_id ], j );
     86        insert( channels[ this.i ], j );
    8887        my_check = my_check ^ (j.a + j.b + j.c + j.d + j.d + j.e + j.f + j.g + j.h);
    8988        #else
    90         insert( channels[ my_id ], (size_t)runs );
     89        insert( channels[ this.i ], (size_t)runs );
    9190        my_check = my_check ^ ((size_t)runs);
    9291        #endif
     
    10099}
    101100
    102 static inline int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
    103     size_t Clusters = 1;
     101int test( size_t Processors, size_t Channels, size_t Producers, size_t Consumers, size_t ChannelSize ) {
     102    size_t Clusters = Processors;
    104103    // create a cluster
    105104    cluster clus[Clusters];
    106105    processor * proc[Processors];
    107106    for ( i; Processors ) {
    108         (*(proc[i] = malloc())){clus[i % Clusters]};
     107        (*(proc[i] = alloc())){clus[i % Clusters]};
    109108    }
    110109
     
    121120    Producer * p[Producers * Channels];
    122121
    123     for ( j; Channels ) {
    124         for ( i; Producers ) {
    125             (*(p[i] = malloc())){ j, clus[j % Clusters] };
    126         }
    127 
    128         for ( i; Consumers ) {
    129             (*(c[i] = malloc())){ j, clus[j % Clusters] };
    130         }
     122    for ( i; Consumers * Channels ) {
     123        (*(c[i] = alloc())){ i % Channels, clus[i % Clusters] };
     124    }
     125
     126    for ( i; Producers * Channels ) {
     127        (*(p[i] = alloc())){ i % Channels, clus[i % Clusters] };
    131128    }
    132129
     
    151148            }
    152149        }
     150       
    153151    }
    154152
     
    187185
    188186int main( int argc, char * argv[] ) {
    189     size_t Processors = 1, Channels = 1, Producers = 1, Consumers = 1, ChannelSize = 128;
     187    size_t Processors = 10, Channels = 1, Producers = 10, Consumers = 10, ChannelSize = 128;
    190188    switch ( argc ) {
    191189          case 3:
     
    208206                exit( EXIT_FAILURE );
    209207        } // switch
    210     Producers = Processors / 2;
    211     Consumers = Processors / 2;
     208    Producers = Processors;
     209    Consumers = Processors;
    212210    test(Processors, Channels, Producers, Consumers, ChannelSize);
    213211}
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/plotData.py

    r9082d7e8 rff443e5  
    3636    procs.append(int(val))
    3737
    38 # 3rd line has number of variants
     38# 3rd line has num locks args
     39line = readfile.readline()
     40locks = []
     41for val in line.split():
     42    locks.append(int(val))
     43
     44# 4th line has number of variants
    3945line = readfile.readline()
    4046names = line.split()
     
    4450lines = (line for line in lines if line) # Non-blank lines
    4551
    46 class Bench(Enum):
    47     Unset = 0
    48     Contend = 1
    49     Zero = 2
    50     Barrier = 3
    51     Churn = 4
    52     Daisy_Chain = 5
    53     Hot_Potato = 6
    54     Pub_Sub = 7
    55 
    5652nameSet = False
    57 currBench = Bench.Unset # default val
     53currLocks = -1 # default val
    5854count = 0
    5955procCount = 0
    6056currVariant = 0
    61 name = ""
     57name = "Aggregate Lock"
    6258var_name = ""
    6359sendData = [0.0 for j in range(numVariants)]
     
    6864    # print(line)
    6965   
    70     if currBench == Bench.Unset:
    71         if line == "contend:":
    72             name = "Contend"
    73             currBench = Bench.Contend
    74         elif line == "zero:":
    75             name = "Zero"
    76             currBench = Bench.Zero
    77         elif line == "barrier:":
    78             name = "Barrier"
    79             currBench = Bench.Barrier
    80         elif line == "churn:":
    81             name = "Churn"
    82             currBench = Bench.Churn
    83         elif line == "daisy_chain:":
    84             name = "Daisy_Chain"
    85             currBench = Bench.Daisy_Chain
    86         elif line == "hot_potato:":
    87             name = "Hot_Potato"
    88             currBench = Bench.Hot_Potato
    89         elif line == "pub_sub:":
    90             name = "Pub_Sub"
    91             currBench = Bench.Pub_Sub
    92         else:
    93             print("Expected benchmark name")
    94             print("Line: " + line)
    95             sys.exit()
     66    if currLocks == -1:
     67        lineArr = line.split()
     68        currLocks = lineArr[-1]
    9669        continue
    9770
     
    12295            if currVariant == numVariants:
    12396                fig, ax = plt.subplots()
    124                 plt.title(name + " Benchmark")
    125                 plt.ylabel("Throughput (channel operations)")
     97                plt.title(name + " Benchmark: " + str(currLocks) + " Locks")
     98                plt.ylabel("Throughput (entries)")
    12699                plt.xlabel("Cores")
    127100                for idx, arr in enumerate(data):
    128101                    plt.errorbar( procs, arr, [bars[idx][0], bars[idx][1]], capsize=2, marker='o' )
    129                
    130102                plt.yscale("log")
    131                 # plt.ylim(1, None)
    132                 # ax.get_yaxis().set_major_formatter(ticks.ScalarFormatter())
    133                 # else:
    134                 #     plt.ylim(0, None)
    135103                plt.xticks(procs)
    136104                ax.legend(names)
    137                 # fig.savefig("plots/" + machineName + name + ".png")
    138                 plt.savefig("plots/" + machineName + name + ".pgf")
     105                # fig.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks) + ".png")
     106                plt.savefig("plots/" + machineName + "Aggregate_Lock_" + str(currLocks) + ".pgf")
    139107                fig.clf()
    140108
    141109                # reset
    142                 currBench = Bench.Unset
     110                currLocks = -1
    143111                currVariant = 0
  • doc/theses/colby_parsons_MMAth/benchmarks/channels/run

    r9082d7e8 rff443e5  
    8585}
    8686
    87 numtimes=5
     87numtimes=11
     88
     89# locks=('-DLOCKS=L1' '-DLOCKS=L2' '-DLOCKS=L3' '-DLOCKS=L4' '-DLOCKS=L5' '-DLOCKS=L6' '-DLOCKS=L7' '-DLOCKS=L8')
     90# locks='1 2 3 4 5 6 7 8'
     91lock_flags=('-DLOCKS=L2' '-DLOCKS=L4' '-DLOCKS=L8')
     92locks=('2' '4' '8')
    8893
    8994num_threads='2 4 8 16 24 32'
    90 # num_threads='2'
     95# num_threads='2 4 8'
    9196
    9297# toggle benchmarks
    93 zero=${false}
    94 contend=${true}
    95 barrier=${false}
    96 churn=${false}
    97 daisy_chain=${false}
    98 hot_potato=${false}
    99 pub_sub=${false}
     98order=${true}
     99rand=${true}
     100baseline=${true}
    100101
    101102runCFA=${true}
    102 runGO=${true}
     103runCPP=${true}
    103104# runCFA=${false}
    104 # runGO=${false}
     105# runCPP=${false}
    105106
    106107cfa=~/cfa-cc/driver/cfa
     108cpp=g++
    107109
    108110# Helpers to minimize code duplication
     
    150152echo $num_threads
    151153
    152 if [ ${runCFA} -eq ${true} ]; then
    153     echo -n 'CFA '
    154 fi
    155 if [ ${runGO} -eq ${true} ]; then
    156     echo -n 'Go '
     154for i in ${!locks[@]}; do
     155        echo -n ${locks[$i]}' '
     156done
     157echo ""
     158
     159if [ ${runCFA} -eq ${true} ] && [ ${order} -eq ${true} ]; then
     160    echo -n 'CFA-order '
     161fi
     162if [ ${runCPP} -eq ${true} ] && [ ${order} -eq ${true} ]; then
     163    echo -n 'CPP-order '
     164fi
     165if [ ${runCFA} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then
     166    echo -n 'CFA-baseline '
     167fi
     168if [ ${runCPP} -eq ${true} ] && [ ${baseline} -eq ${true} ]; then
     169    echo -n 'CPP-baseline '
     170fi
     171if [ ${runCFA} -eq ${true} ] && [ ${rand} -eq ${true} ]; then
     172    echo -n 'CFA-rand '
     173fi
     174if [ ${runCPP} -eq ${true} ] && [ ${rand} -eq ${true} ]; then
     175    echo -n 'CPP-rand '
    157176fi
    158177echo ""
     
    163182cfa_flags='-quiet -O3 -nodebug -DNDEBUG'
    164183
     184# cpp flagse
     185cpp_flags='-O3 -std=c++17 -lpthread -pthread -DNDEBUG'
     186
    165187# run the benchmarks
    166188
    167 run_contend() {
     189run_order() {
    168190    post_args=${1}
    169191
    170192    if [ ${runCFA} -eq ${true} ] ; then
    171193        cd cfa # CFA RUN
    172         print_header 'CFA'
    173         ${cfa} ${cfa_flags} ${2}.cfa -o a.${hostname} > /dev/null 2>&1
     194        print_header 'CFA-'${3}
     195        ${cfa} ${cfa_flags} ${2} ${3}.cfa -o a.${hostname} > /dev/null 2>&1
    174196        run_bench
    175197        rm a.${hostname}
     
    177199    fi # done CFA
    178200
    179     if [ ${runGO} -eq ${true} ] ; then
    180         cd go/${2} # Go RUN
    181         print_header 'Go'
    182         go build -o a.${hostname} > /dev/null 2>&1
     201    if [ ${runCPP} -eq ${true} ] ; then
     202        cd cpp # CPP RUN
     203        print_header 'CPP-'${3}
     204        ${cpp} ${cpp_flags} ${2} ${3}.cc -o a.${hostname} > /dev/null 2>&1
    183205        run_bench
    184206        rm a.${hostname}
    185207        cd - > /dev/null
    186     fi # done Go
     208    fi # done CPP
    187209}
    188210
    189211# /usr/bin/time -f "%Uu %Ss %Er %Mkb"
    190 if [ ${contend} -eq ${true} ] ; then
    191     echo "contend: "
    192     run_contend '128' 'contend'
    193 fi
    194 
    195 if [ ${zero} -eq ${true} ] ; then
    196     echo "zero: "
    197     run_contend '0' 'contend'
    198 fi
    199 
    200 if [ ${barrier} -eq ${true} ] ; then
    201     echo "barrier: "
    202     run_contend '' 'barrier'
    203 fi
    204 
    205 if [ ${churn} -eq ${true} ] ; then
    206     echo "churn: "
    207     run_contend '' 'churn'
    208 fi
    209 
    210 if [ ${daisy_chain} -eq ${true} ] ; then
    211     echo "daisy_chain: "
    212     run_contend '' 'daisy_chain'
    213 fi
    214 
    215 if [ ${hot_potato} -eq ${true} ] ; then
    216     echo "hot_potato: "
    217     run_contend '' 'hot_potato'
    218 fi
    219 
    220 if [ ${pub_sub} -eq ${true} ] ; then
    221     echo "pub_sub: "
    222     run_contend '' 'pub_sub'
    223 fi
     212
     213for i in ${!locks[@]}; do
     214    echo "locks: "${locks[$i]}
     215    if [ ${order} -eq ${true} ] ; then
     216        run_order ${locks[$i]} ${lock_flags[$i]} 'order'
     217    fi
     218    if [ ${baseline} -eq ${true} ] ; then
     219        run_order ${locks[$i]} ${lock_flags[$i]} 'baseline'
     220    fi
     221    if [ ${rand} -eq ${true} ] ; then
     222        run_order ${locks[$i]} '-DLOCKS=L8' 'rand'
     223    fi
     224done
     225
     226
  • libcfa/src/concurrency/actor.hfa

    r9082d7e8 rff443e5  
    3535
    3636// show stats
    37 // #define ACTOR_STATS
     37// #define STATS
    3838
    3939// forward decls
     
    122122    copy_queue * c_queue;           // current queue
    123123    volatile bool being_processed;  // flag to prevent concurrent processing
    124     #ifdef ACTOR_STATS
     124    #ifdef STATS
    125125    unsigned int id;
    126126    size_t missed;                  // transfers skipped due to being_processed flag being up
     
    132132    c_queue = owned_queue;
    133133    being_processed = false;
    134     #ifdef ACTOR_STATS
     134    #ifdef STATS
    135135    id = i;
    136136    missed = 0;
     
    153153    // check if queue is being processed elsewhere
    154154    if ( unlikely( being_processed ) ) {
    155         #ifdef ACTOR_STATS
     155        #ifdef STATS
    156156        missed++;
    157157        #endif
     
    175175struct worker_info {
    176176    volatile unsigned long long stamp;
    177     #ifdef ACTOR_STATS
     177    #ifdef STATS
    178178    size_t stolen_from, try_steal, stolen, failed_swaps, msgs_stolen;
    179179    unsigned long long processed;
     
    182182};
    183183static inline void ?{}( worker_info & this ) {
    184     #ifdef ACTOR_STATS
     184    #ifdef STATS
    185185    this.stolen_from = 0;
    186186    this.try_steal = 0;                             // attempts to steal
     
    194194}
    195195
    196 // #ifdef ACTOR_STATS
     196// #ifdef STATS
    197197// unsigned int * stolen_arr;
    198198// unsigned int * replaced_queue;
     
    206206};
    207207
    208 #ifdef ACTOR_STATS
     208#ifdef STATS
    209209// aggregate counters for statistics
    210210size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0,
     
    235235}; // executor
    236236
    237 // #ifdef ACTOR_STATS
     237// #ifdef STATS
    238238// __spinlock_t out_lock;
    239239// #endif
    240240static inline void ^?{}( worker & mutex this ) with(this) {
    241     #ifdef ACTOR_STATS
     241    #ifdef STATS
    242242    __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);
    243243    __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);
     
    276276        no_steal = true;
    277277   
    278     #ifdef ACTOR_STATS
     278    #ifdef STATS
    279279    // stolen_arr = aalloc( nrqueues );
    280280    // replaced_queue = aalloc( nrqueues );
     
    341341    } // for
    342342
    343     #ifdef ACTOR_STATS
     343    #ifdef STATS
    344344    size_t misses = 0;
    345345    for ( i; nrqueues ) {
     
    358358    if ( seperate_clus ) delete( cluster );
    359359
    360     #ifdef ACTOR_STATS // print formatted stats
     360    #ifdef STATS // print formatted stats
    361361    printf("    Actor System Stats:\n");
    362362    printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);
     
    404404    ticket = __get_next_ticket( *__actor_executor_ );
    405405    __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );
    406     #ifdef ACTOR_STATS
     406    #ifdef STATS
    407407    __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );
    408408    #endif
     
    513513            continue;
    514514
    515         #ifdef ACTOR_STATS
     515        #ifdef STATS
    516516        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    517517        if ( curr_steal_queue ) {
     
    526526        #else
    527527        curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );
    528         #endif // ACTOR_STATS
     528        #endif // STATS
    529529
    530530        return;
     
    558558
    559559void main( worker & this ) with(this) {
    560     // #ifdef ACTOR_STATS
     560    // #ifdef STATS
    561561    // for ( i; executor_->nrqueues ) {
    562562    //     replaced_queue[i] = 0;
     
    587587        }
    588588        transfer( *curr_work_queue, &current_queue );
    589         #ifdef ACTOR_STATS
     589        #ifdef STATS
    590590        executor_->w_infos[id].gulps++;
    591         #endif // ACTOR_STATS
     591        #endif // STATS
    592592        #ifdef __STEAL
    593593        if ( isEmpty( *current_queue ) ) {
     
    599599            __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );
    600600           
    601             #ifdef ACTOR_STATS
     601            #ifdef STATS
    602602            executor_->w_infos[id].try_steal++;
    603             #endif // ACTOR_STATS
     603            #endif // STATS
    604604           
    605605            steal_work( this, start + prng( range ) );
     
    608608        #endif // __STEAL
    609609        while ( ! isEmpty( *current_queue ) ) {
    610             #ifdef ACTOR_STATS
     610            #ifdef STATS
    611611            executor_->w_infos[id].processed++;
    612612            #endif
     
    636636
    637637static inline void __reset_stats() {
    638     #ifdef ACTOR_STATS
     638    #ifdef STATS
    639639    __total_tries = 0;
    640640    __total_stolen = 0;
  • libcfa/src/concurrency/channel.hfa

    r9082d7e8 rff443e5  
    33#include <locks.hfa>
    44#include <list.hfa>
    5 #include <mutex_stmt.hfa>
     5
     6#define __COOP_CHANNEL
     7#ifdef __PREVENTION_CHANNEL
     8forall( T ) {
     9struct channel {
     10    size_t size, count, front, back;
     11    T * buffer;
     12    thread$ * chair;
     13    T * chair_elem;
     14    exp_backoff_then_block_lock c_lock, p_lock;
     15    __spinlock_t mutex_lock;
     16    char __padding[64]; // avoid false sharing in arrays of channels
     17};
     18
     19static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     20    size = _size;
     21    front = back = count = 0;
     22    buffer = aalloc( size );
     23    chair = 0p;
     24    mutex_lock{};
     25    c_lock{};
     26    p_lock{};
     27}
     28
     29static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     30static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     31static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     32static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     33static inline bool has_waiters( channel(T) & chan ) with(chan) { return chair != 0p; }
     34
     35static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     36    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     37    count += 1;
     38    back++;
     39    if ( back == size ) back = 0;
     40}
     41
     42static inline void insert( channel(T) & chan, T elem ) with( chan ) {
     43    lock( p_lock );
     44    lock( mutex_lock __cfaabi_dbg_ctx2 );
     45
     46    // have to check for the zero size channel case
     47    if ( size == 0 && chair != 0p ) {
     48        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
     49        unpark( chair );
     50        chair = 0p;
     51        unlock( mutex_lock );
     52        unlock( p_lock );
     53        unlock( c_lock );
     54        return;
     55    }
     56
     57    // wait if buffer is full, work will be completed by someone else
     58    if ( count == size ) {
     59        chair = active_thread();
     60        chair_elem = &elem;
     61        unlock( mutex_lock );
     62        park( );
     63        return;
     64    } // if
     65
     66    if ( chair != 0p ) {
     67        memcpy((void *)chair_elem, (void *)&elem, sizeof(T));
     68        unpark( chair );
     69        chair = 0p;
     70        unlock( mutex_lock );
     71        unlock( p_lock );
     72        unlock( c_lock );
     73        return;
     74    }
     75    insert_( chan, elem );
     76
     77    unlock( mutex_lock );
     78    unlock( p_lock );
     79}
     80
     81static inline T remove( channel(T) & chan ) with(chan) {
     82    lock( c_lock );
     83    lock( mutex_lock __cfaabi_dbg_ctx2 );
     84    T retval;
     85
     86    // have to check for the zero size channel case
     87    if ( size == 0 && chair != 0p ) {
     88        memcpy((void *)&retval, (void *)chair_elem, sizeof(T));
     89        unpark( chair );
     90        chair = 0p;
     91        unlock( mutex_lock );
     92        unlock( p_lock );
     93        unlock( c_lock );
     94        return retval;
     95    }
     96
     97    // wait if buffer is empty, work will be completed by someone else
     98    if ( count == 0 ) {
     99        chair = active_thread();
     100        chair_elem = &retval;
     101        unlock( mutex_lock );
     102        park( );
     103        return retval;
     104    }
     105
     106    // Remove from buffer
     107    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     108    count -= 1;
     109    front++;
     110    if ( front == size ) front = 0;
     111
     112    if ( chair != 0p ) {
     113        insert_( chan, *chair_elem );  // do waiting producer work
     114        unpark( chair );
     115        chair = 0p;
     116        unlock( mutex_lock );
     117        unlock( p_lock );
     118        unlock( c_lock );
     119        return retval;
     120    }
     121
     122    unlock( mutex_lock );
     123    unlock( c_lock );
     124    return retval;
     125}
     126
     127} // forall( T )
     128#endif
     129
     130#ifdef __COOP_CHANNEL
    6131
    7132// link field used for threads waiting on channel
     
    23148}
    24149
    25 // wake one thread from the list
    26 static inline void wake_one( dlist( wait_link ) & queue ) {
    27     wait_link & popped = try_pop_front( queue );
    28     unpark( popped.t );
    29 }
    30 
    31 // returns true if woken due to shutdown
    32 // blocks thread on list and releases passed lock
    33 static inline bool block( dlist( wait_link ) & queue, void * elem_ptr, go_mutex & lock ) {
    34     wait_link w{ active_thread(), elem_ptr };
    35     insert_last( queue, w );
    36     unlock( lock );
    37     park();
    38     return w.elem == 0p;
    39 }
    40 
    41 // void * used for some fields since exceptions don't work with parametric polymorphism currently
    42 exception channel_closed {
    43     // on failed insert elem is a ptr to the element attempting to be inserted
    44     // on failed remove elem ptr is 0p
    45     // on resumption of a failed insert this elem will be inserted
    46     // so a user may modify it in the resumption handler
    47     void * elem;
    48 
    49     // pointer to chan that is closed
    50     void * closed_chan;
    51 };
    52 vtable(channel_closed) channel_closed_vt;
    53 
    54 // #define CHAN_STATS // define this to get channel stats printed in dtor
    55 
    56150forall( T ) {
    57151
    58 struct __attribute__((aligned(128))) channel {
    59     size_t size, front, back, count;
     152struct channel {
     153    size_t size;
     154    size_t front, back, count;
    60155    T * buffer;
    61     dlist( wait_link ) prods, cons; // lists of blocked threads
    62     go_mutex mutex_lock;            // MX lock
    63     bool closed;                    // indicates channel close/open
    64     #ifdef CHAN_STATS
    65     size_t blocks, operations;      // counts total ops and ops resulting in a blocked thd
    66     #endif
     156    dlist( wait_link ) prods, cons;
     157    exp_backoff_then_block_lock mutex_lock;
    67158};
    68159
     
    74165    cons{};
    75166    mutex_lock{};
    76     closed = false;
    77     #ifdef CHAN_STATS
    78     blocks = 0;
    79     operations = 0;
    80     #endif
    81167}
    82168
    83169static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
    84 static inline void ^?{}( channel(T) &c ) with(c) {
    85     #ifdef CHAN_STATS
    86     printf("Channel %p Blocks: %lu, Operations: %lu, %.2f%% of ops blocked\n", &c, blocks, operations, ((double)blocks)/operations * 100);
    87     #endif
    88     verifyf( cons`isEmpty && prods`isEmpty, "Attempted to delete channel with waiting threads (Deadlock).\n" );
    89     delete( buffer );
    90 }
     170static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
    91171static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    92172static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     
    95175static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    96176
    97 // closes the channel and notifies all blocked threads
    98 static inline void close( channel(T) & chan ) with(chan) {
    99     lock( mutex_lock );
    100     closed = true;
    101 
    102     // flush waiting consumers and producers
    103     while ( has_waiting_consumers( chan ) ) {
    104         cons`first.elem = 0p;
    105         wake_one( cons );
    106     }
    107     while ( has_waiting_producers( chan ) ) {
    108         prods`first.elem = 0p;
    109         wake_one( prods );
    110     }
    111     unlock(mutex_lock);
    112 }
    113 
    114 static inline void is_closed( channel(T) & chan ) with(chan) { return closed; }
    115 
    116 static inline void flush( channel(T) & chan, T elem ) with(chan) {
    117     lock( mutex_lock );
    118     while ( count == 0 && !cons`isEmpty ) {
    119         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    120         wake_one( cons );
    121     }
    122     unlock( mutex_lock );
    123 }
    124 
    125 // handles buffer insert
    126 static inline void __buf_insert( channel(T) & chan, T & elem ) with(chan) {
     177static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
    127178    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
    128179    count += 1;
     
    131182}
    132183
    133 // does the buffer insert or hands elem directly to consumer if one is waiting
    134 static inline void __do_insert( channel(T) & chan, T & elem ) with(chan) {
    135     if ( count == 0 && !cons`isEmpty ) {
    136         memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    137         wake_one( cons );
    138     } else __buf_insert( chan, elem );
    139 }
    140 
    141 // needed to avoid an extra copy in closed case
    142 static inline bool __internal_try_insert( channel(T) & chan, T & elem ) with(chan) {
     184static inline void wake_one( dlist( wait_link ) & queue ) {
     185    wait_link & popped = try_pop_front( queue );
     186    unpark( popped.t );
     187}
     188
     189static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
     190    wait_link w{ active_thread(), elem_ptr };
     191    insert_last( queue, w );
     192    unlock( lock );
     193    park();
     194}
     195
     196static inline void insert( channel(T) & chan, T elem ) with(chan) {
    143197    lock( mutex_lock );
    144     #ifdef CHAN_STATS
    145     operations++;
    146     #endif
    147     if ( count == size ) { unlock( mutex_lock ); return false; }
    148     __do_insert( chan, elem );
    149     unlock( mutex_lock );
    150     return true;
    151 }
    152 
    153 // attempts a nonblocking insert
    154 // returns true if insert was successful, false otherwise
    155 static inline bool try_insert( channel(T) & chan, T elem ) { return __internal_try_insert( chan, elem ); }
    156 
    157 // handles closed case of insert routine
    158 static inline void __closed_insert( channel(T) & chan, T & elem ) with(chan) {
    159     channel_closed except{&channel_closed_vt, &elem, &chan };
    160     throwResume except; // throw closed resumption
    161     if ( !__internal_try_insert( chan, elem ) ) throw except; // if try to insert fails (would block), throw termination
    162 }
    163 
    164 static inline void insert( channel(T) & chan, T elem ) with(chan) {
    165     // check for close before acquire mx
    166     if ( unlikely(closed) ) {
    167         __closed_insert( chan, elem );
    168         return;
    169     }
    170 
    171     lock( mutex_lock );
    172 
    173     #ifdef CHAN_STATS
    174     if ( !closed ) operations++;
    175     #endif
    176 
    177     // if closed handle
    178     if ( unlikely(closed) ) {
    179         unlock( mutex_lock );
    180         __closed_insert( chan, elem );
    181         return;
    182     }
    183198
    184199    // have to check for the zero size channel case
     
    187202        wake_one( cons );
    188203        unlock( mutex_lock );
    189         return true;
     204        return;
    190205    }
    191206
    192207    // wait if buffer is full, work will be completed by someone else
    193208    if ( count == size ) {
    194         #ifdef CHAN_STATS
    195         blocks++;
    196         #endif
    197 
    198         // check for if woken due to close
    199         if ( unlikely( block( prods, &elem, mutex_lock ) ) )
    200             __closed_insert( chan, elem );
     209        block( prods, &elem, mutex_lock );
    201210        return;
    202211    } // if
     
    205214        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
    206215        wake_one( cons );
    207     } else __buf_insert( chan, elem );
     216    } else insert_( chan, elem );
    208217   
    209218    unlock( mutex_lock );
    210     return;
    211 }
    212 
    213 // handles buffer remove
    214 static inline void __buf_remove( channel(T) & chan, T & retval ) with(chan) {
    215     memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    216     count -= 1;
    217     front = (front + 1) % size;
    218 }
    219 
    220 // does the buffer remove and potentially does waiting producer work
    221 static inline void __do_remove( channel(T) & chan, T & retval ) with(chan) {
    222     __buf_remove( chan, retval );
    223     if (count == size - 1 && !prods`isEmpty ) {
    224         __buf_insert( chan, *(T *)prods`first.elem );  // do waiting producer work
    225         wake_one( prods );
    226     }
    227 }
    228 
    229 // needed to avoid an extra copy in closed case and single return val case
    230 static inline bool __internal_try_remove( channel(T) & chan, T & retval ) with(chan) {
     219}
     220
     221static inline T remove( channel(T) & chan ) with(chan) {
    231222    lock( mutex_lock );
    232     #ifdef CHAN_STATS
    233     operations++;
    234     #endif
    235     if ( count == 0 ) { unlock( mutex_lock ); return false; }
    236     __do_remove( chan, retval );
    237     unlock( mutex_lock );
    238     return true;
    239 }
    240 
    241 // attempts a nonblocking remove
    242 // returns [T, true] if insert was successful
    243 // returns [T, false] if insert was successful (T uninit)
    244 static inline [T, bool] try_remove( channel(T) & chan ) {
    245223    T retval;
    246     return [ retval, __internal_try_remove( chan, retval ) ];
    247 }
    248 
    249 static inline T try_remove( channel(T) & chan, T elem ) {
    250     T retval;
    251     __internal_try_remove( chan, retval );
    252     return retval;
    253 }
    254 
    255 // handles closed case of insert routine
    256 static inline void __closed_remove( channel(T) & chan, T & retval ) with(chan) {
    257     channel_closed except{&channel_closed_vt, 0p, &chan };
    258     throwResume except; // throw resumption
    259     if ( !__internal_try_remove( chan, retval ) ) throw except; // if try to remove fails (would block), throw termination
    260 }
    261 
    262 static inline T remove( channel(T) & chan ) with(chan) {
    263     T retval;
    264     if ( unlikely(closed) ) {
    265         __closed_remove( chan, retval );
    266         return retval;
    267     }
    268     lock( mutex_lock );
    269 
    270     #ifdef CHAN_STATS
    271     if ( !closed ) operations++;
    272     #endif
    273 
    274     if ( unlikely(closed) ) {
    275         unlock( mutex_lock );
    276         __closed_remove( chan, retval );
    277         return retval;
    278     }
    279224
    280225    // have to check for the zero size channel case
     
    288233    // wait if buffer is empty, work will be completed by someone else
    289234    if (count == 0) {
    290         #ifdef CHAN_STATS
    291         blocks++;
    292         #endif
    293         // check for if woken due to close
    294         if ( unlikely( block( cons, &retval, mutex_lock ) ) )
    295             __closed_remove( chan, retval );
     235        block( cons, &retval, mutex_lock );
    296236        return retval;
    297237    }
    298238
    299239    // Remove from buffer
    300     __do_remove( chan, retval );
     240    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     241    count -= 1;
     242    front = (front + 1) % size;
     243
     244    if (count == size - 1 && !prods`isEmpty ) {
     245        insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
     246        wake_one( prods );
     247    }
    301248
    302249    unlock( mutex_lock );
     
    304251}
    305252} // forall( T )
     253#endif
     254
     255#ifdef __BARGE_CHANNEL
     256forall( T ) {
     257struct channel {
     258    size_t size;
     259    size_t front, back, count;
     260    T * buffer;
     261    fast_cond_var( exp_backoff_then_block_lock ) prods, cons;
     262    exp_backoff_then_block_lock mutex_lock;
     263};
     264
     265static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     266    size = _size;
     267    front = back = count = 0;
     268    buffer = aalloc( size );
     269    prods{};
     270    cons{};
     271    mutex_lock{};
     272}
     273
     274static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     275static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     276static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     277static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     278static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
     279static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
     280static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
     281
     282static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     283    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     284    count += 1;
     285    back++;
     286    if ( back == size ) back = 0;
     287}
     288
     289
     290static inline void insert( channel(T) & chan, T elem ) with(chan) {
     291    lock( mutex_lock );
     292
     293    while ( count == size ) {
     294        wait( prods, mutex_lock );
     295    } // if
     296
     297    insert_( chan, elem );
     298   
     299    if ( !notify_one( cons ) && count < size )
     300        notify_one( prods );
     301
     302    unlock( mutex_lock );
     303}
     304
     305static inline T remove( channel(T) & chan ) with(chan) {
     306    lock( mutex_lock );
     307    T retval;
     308
     309    while (count == 0) {
     310        wait( cons, mutex_lock );
     311    }
     312
     313    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     314    count -= 1;
     315    front = (front + 1) % size;
     316
     317    if ( !notify_one( prods ) && count > 0 )
     318        notify_one( cons );
     319
     320    unlock( mutex_lock );
     321    return retval;
     322}
     323
     324} // forall( T )
     325#endif
     326
     327#ifdef __NO_WAIT_CHANNEL
     328forall( T ) {
     329struct channel {
     330    size_t size;
     331    size_t front, back, count;
     332    T * buffer;
     333    thread$ * chair;
     334    T * chair_elem;
     335    exp_backoff_then_block_lock c_lock, p_lock;
     336    __spinlock_t mutex_lock;
     337};
     338
     339static inline void ?{}( channel(T) &c, size_t _size ) with(c) {
     340    size = _size;
     341    front = back = count = 0;
     342    buffer = aalloc( size );
     343    chair = 0p;
     344    mutex_lock{};
     345    c_lock{};
     346    p_lock{};
     347    lock( c_lock );
     348}
     349
     350static inline void ?{}( channel(T) &c ){ ((channel(T) &)c){ 0 }; }
     351static inline void ^?{}( channel(T) &c ) with(c) { delete( buffer ); }
     352static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
     353static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
     354static inline bool has_waiters( channel(T) & chan ) with(chan) { return c_lock.lock_value != 0; }
     355
     356static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     357    memcpy((void *)&buffer[back], (void *)&elem, sizeof(T));
     358    count += 1;
     359    back++;
     360    if ( back == size ) back = 0;
     361}
     362
     363static inline void insert( channel(T) & chan, T elem ) with( chan ) {
     364    lock( p_lock );
     365    lock( mutex_lock __cfaabi_dbg_ctx2 );
     366
     367    insert_( chan, elem );
     368
     369    if ( count != size )
     370        unlock( p_lock );
     371
     372    if ( count == 1 )
     373        unlock( c_lock );
     374       
     375    unlock( mutex_lock );
     376}
     377
     378static inline T remove( channel(T) & chan ) with(chan) {
     379    lock( c_lock );
     380    lock( mutex_lock __cfaabi_dbg_ctx2 );
     381    T retval;
     382
     383    // Remove from buffer
     384    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
     385    count -= 1;
     386    front = (front + 1) % size;
     387
     388    if ( count != 0 )
     389        unlock( c_lock );
     390
     391    if ( count == size - 1 )
     392        unlock( p_lock );
     393       
     394    unlock( mutex_lock );
     395    return retval;
     396}
     397
     398} // forall( T )
     399#endif
  • libcfa/src/concurrency/locks.hfa

    r9082d7e8 rff443e5  
    3232#include <fstream.hfa>
    3333
     34
    3435// futex headers
    3536#include <linux/futex.h>      /* Definition of FUTEX_* constants */
     
    154155// futex_mutex
    155156
     157// - No cond var support
    156158// - Kernel thd blocking alternative to the spinlock
    157159// - No ownership (will deadlock on reacq)
     
    183185        int state;
    184186
    185         for( int spin = 4; spin < 1024; spin += spin) {
    186                 state = 0;
    187                 // if unlocked, lock and return
    188                 if (internal_try_lock(this, state)) return;
    189                 if (2 == state) break;
    190                 for (int i = 0; i < spin; i++) Pause();
    191         }
    192 
    193         // // no contention try to acquire
    194         // if (internal_try_lock(this, state)) return;
     187       
     188        // // linear backoff omitted for now
     189        // for( int spin = 4; spin < 1024; spin += spin) {
     190        //      state = 0;
     191        //      // if unlocked, lock and return
     192        //      if (internal_try_lock(this, state)) return;
     193        //      if (2 == state) break;
     194        //      for (int i = 0; i < spin; i++) Pause();
     195        // }
     196
     197        // no contention try to acquire
     198        if (internal_try_lock(this, state)) return;
    195199       
    196200        // if not in contended state, set to be in contended state
     
    205209
    206210static inline void unlock(futex_mutex & this) with(this) {
    207         // if uncontended do atomic unlock and then return
    208     if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
     211        // if uncontended do atomice unlock and then return
     212        if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
    209213       
    210214        // otherwise threads are blocked so we must wake one
     215        __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    211216        futex((int *)&val, FUTEX_WAKE, 1);
    212217}
     
    217222// to set recursion count after getting signalled;
    218223static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
    219 
    220 //-----------------------------------------------------------------------------
    221 // go_mutex
    222 
    223 // - Kernel thd blocking alternative to the spinlock
    224 // - No ownership (will deadlock on reacq)
    225 // - Golang's flavour of mutex
    226 // - Impl taken from Golang: src/runtime/lock_futex.go
    227 struct go_mutex {
    228         // lock state any state other than UNLOCKED is locked
    229         // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 };
    230        
    231         // stores a lock state
    232         int val;
    233 };
    234 
    235 static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
    236 
    237 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
    238         return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
    239 }
    240 
    241 static inline int internal_exchange(go_mutex & this, int swap ) with(this) {
    242         return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE);
    243 }
    244 
    245 const int __go_mtx_spins = 4;
    246 const int __go_mtx_pauses = 30;
    247 // if this is called recursively IT WILL DEADLOCK!!!!!
    248 static inline void lock(go_mutex & this) with(this) {
    249         int state, init_state;
    250 
    251     // speculative grab
    252     state = internal_exchange(this, 1);
    253     if ( !state ) return; // state == 0
    254     init_state = state;
    255     for (;;) {
    256         for( int i = 0; i < __go_mtx_spins; i++ ) {
    257             while( !val ) { // lock unlocked
    258                 state = 0;
    259                 if (internal_try_lock(this, state, init_state)) return;
    260             }
    261             for (int i = 0; i < __go_mtx_pauses; i++) Pause();
    262         }
    263 
    264         while( !val ) { // lock unlocked
    265             state = 0;
    266             if (internal_try_lock(this, state, init_state)) return;
    267         }
    268         sched_yield();
    269        
    270         // if not in contended state, set to be in contended state
    271         state = internal_exchange(this, 2);
    272         if ( !state ) return; // state == 0
    273         init_state = 2;
    274         futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK
    275     }
    276 }
    277 
    278 static inline void unlock( go_mutex & this ) with(this) {
    279         // if uncontended do atomic unlock and then return
    280     if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
    281        
    282         // otherwise threads are blocked so we must wake one
    283         futex((int *)&val, FUTEX_WAKE, 1);
    284 }
    285 
    286 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }
    287 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}
    288 static inline void on_wakeup( go_mutex & f, size_t recursion ) {}
    289224
    290225//-----------------------------------------------------------------------------
     
    336271        this.lock_value = 0;
    337272}
    338 
    339 static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
    340273
    341274static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
Note: See TracChangeset for help on using the changeset viewer.