Ignore:
Timestamp:
Apr 28, 2021, 4:56:50 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
Children:
8d66610
Parents:
feacef9 (diff), b7fd2db6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • doc/theses/thierry_delisle_PhD/code/readyQ_proto/work_stealing.hpp

    rfeacef9 r5407cdc  
    66#include <memory>
    77#include <mutex>
     8#include <thread>
    89#include <type_traits>
    910
     
    1112#include "utils.hpp"
    1213#include "links.hpp"
     14#include "links2.hpp"
    1315#include "snzi.hpp"
    1416
     17// #include <x86intrin.h>
     18
    1519using namespace std;
     20
     21static const long long lim = 2000;
     22static const unsigned nqueues = 2;
     23
     24struct __attribute__((aligned(128))) timestamp_t {
     25        volatile unsigned long long val = 0;
     26};
     27
     28template<typename node_t>
     29struct __attribute__((aligned(128))) localQ_t {
     30        #ifdef NO_MPSC
     31                intrusive_queue_t<node_t> list;
     32
     33                inline auto ts() { return list.ts(); }
     34                inline auto lock() { return list.lock.lock(); }
     35                inline auto try_lock() { return list.lock.try_lock(); }
     36                inline auto unlock() { return list.lock.unlock(); }
     37
     38                inline auto push( node_t * node ) { return list.push( node ); }
     39                inline auto pop() { return list.pop(); }
     40        #else
     41                mpsc_queue<node_t> queue = {};
     42                spinlock_t _lock = {};
     43
     44                inline auto ts() { auto h = queue.head(); return h ? h->_links.ts : 0ull; }
     45                inline auto lock() { return _lock.lock(); }
     46                inline auto try_lock() { return _lock.try_lock(); }
     47                inline auto unlock() { return _lock.unlock(); }
     48
     49                inline auto push( node_t * node ) { return queue.push( node ); }
     50                inline auto pop() { return queue.pop(); }
     51        #endif
     52
     53
     54};
    1655
    1756template<typename node_t>
     
    2564
    2665        work_stealing(unsigned _numThreads, unsigned)
    27                 : numThreads(_numThreads)
    28                 , lists(new intrusive_queue_t<node_t>[numThreads])
    29                 , snzi( std::log2( numThreads / 2 ), 2 )
     66                : numThreads(_numThreads * nqueues)
     67                , lists(new localQ_t<node_t>[numThreads])
     68                // , lists(new intrusive_queue_t<node_t>[numThreads])
     69                , times(new timestamp_t[numThreads])
     70                // , snzi( std::log2( numThreads / 2 ), 2 )
    3071
    3172        {
     
    4081        __attribute__((noinline, hot)) void push(node_t * node) {
    4182                node->_links.ts = rdtscl();
    42                 if( node->_links.hint > numThreads ) {
    43                         node->_links.hint = tls.rng.next() % numThreads;
    44                         tls.stat.push.nhint++;
     83                // node->_links.ts = 1;
     84
     85                auto & list = *({
     86                        unsigned i;
     87                        #ifdef NO_MPSC
     88                                do {
     89                        #endif
     90                                tls.stats.push.attempt++;
     91                                // unsigned r = tls.rng1.next();
     92                                unsigned r = tls.it++;
     93                                if(tls.my_queue == outside) {
     94                                        i = r % numThreads;
     95                                } else {
     96                                        i = tls.my_queue + (r % nqueues);
     97                                }
     98                        #ifdef NO_MPSC
     99                                } while(!lists[i].try_lock());
     100                        #endif
     101                        &lists[i];
     102                });
     103
     104                list.push( node );
     105                #ifdef NO_MPSC
     106                        list.unlock();
     107                #endif
     108                // tls.rng2.set_raw_state( tls.rng1.get_raw_state());
     109                // count++;
     110                tls.stats.push.success++;
     111        }
     112
     113        __attribute__((noinline, hot)) node_t * pop() {
     114                if(tls.my_queue != outside) {
     115                        // if( tls.myfriend == outside ) {
     116                        //      auto r  = tls.rng1.next();
     117                        //      tls.myfriend = r % numThreads;
     118                        //      // assert(lists[(tls.it % nqueues) + tls.my_queue].ts() >= lists[((tls.it + 1) % nqueues) + tls.my_queue].ts());
     119                        //      tls.mytime = std::min(lists[(tls.it % nqueues) + tls.my_queue].ts(), lists[((tls.it + 1) % nqueues) + tls.my_queue].ts());
     120                        //      // times[tls.myfriend].val = 0;
     121                        //      // lists[tls.myfriend].val = 0;
     122                        // }
     123                        // // else if(times[tls.myfriend].val == 0) {
     124                        // // else if(lists[tls.myfriend].val == 0) {
     125                        // else if(times[tls.myfriend].val < tls.mytime) {
     126                        // // else if(times[tls.myfriend].val < lists[(tls.it % nqueues) + tls.my_queue].ts()) {
     127                        //      node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
     128                        //      tls.stats.help++;
     129                        //      tls.myfriend = outside;
     130                        //      if(n) return n;
     131                        // }
     132                        // if( tls.myfriend == outside ) {
     133                        //      auto r  = tls.rng1.next();
     134                        //      tls.myfriend = r % numThreads;
     135                        //      tls.mytime = lists[((tls.it + 1) % nqueues) + tls.my_queue].ts();
     136                        // }
     137                        // else {
     138                        //      if(times[tls.myfriend].val + 1000 < tls.mytime) {
     139                        //              node_t * n = try_pop(tls.myfriend, tls.stats.pop.help);
     140                        //              tls.stats.help++;
     141                        //              if(n) return n;
     142                        //      }
     143                        //      tls.myfriend = outside;
     144                        // }
     145
     146                        node_t * n = local();
     147                        if(n) return n;
    45148                }
    46149
    47                 unsigned i = node->_links.hint;
    48                 auto & list = lists[i];
    49                 list.lock.lock();
    50 
    51                 if(list.push( node )) {
    52                         snzi.arrive(i);
     150                // try steal
     151                for(int i = 0; i < 25; i++) {
     152                        node_t * n = steal();
     153                        if(n) return n;
    53154                }
    54155
    55                 list.lock.unlock();
    56         }
    57 
    58         __attribute__((noinline, hot)) node_t * pop() {
    59                 node_t * node;
    60                 while(true) {
    61                         if(!snzi.query()) {
    62                                 return nullptr;
    63                         }
    64 
    65                         {
    66                                 unsigned i = tls.my_queue;
    67                                 auto & list = lists[i];
    68                                 if( list.ts() != 0 ) {
    69                                         list.lock.lock();
    70                                         if((node = try_pop(i))) {
    71                                                 tls.stat.pop.local.success++;
    72                                                 break;
    73                                         }
    74                                         else {
    75                                                 tls.stat.pop.local.elock++;
    76                                         }
    77                                 }
    78                                 else {
    79                                         tls.stat.pop.local.espec++;
    80                                 }
    81                         }
    82 
    83                         tls.stat.pop.steal.tried++;
    84 
    85                         int i = tls.rng.next() % numThreads;
    86                         auto & list = lists[i];
    87                         if( list.ts() == 0 ) {
    88                                 tls.stat.pop.steal.empty++;
    89                                 continue;
    90                         }
    91 
    92                         if( !list.lock.try_lock() ) {
    93                                 tls.stat.pop.steal.locked++;
    94                                 continue;
    95                         }
    96 
    97                         if((node = try_pop(i))) {
    98                                 tls.stat.pop.steal.success++;
    99                                 break;
     156                return search();
     157        }
     158
     159private:
     160        inline node_t * local() {
     161                unsigned i = (--tls.it % nqueues) + tls.my_queue;
     162                node_t * n = try_pop(i, tls.stats.pop.local);
     163                if(n) return n;
     164                i = (--tls.it % nqueues) + tls.my_queue;
     165                return try_pop(i, tls.stats.pop.local);
     166        }
     167
     168        inline node_t * steal() {
     169                unsigned i = tls.rng2.prev() % numThreads;
     170                return try_pop(i, tls.stats.pop.steal);
     171        }
     172
     173        inline node_t * search() {
     174                unsigned offset = tls.rng2.prev();
     175                for(unsigned i = 0; i < numThreads; i++) {
     176                        unsigned idx = (offset + i) % numThreads;
     177                        node_t * thrd = try_pop(idx, tls.stats.pop.search);
     178                        if(thrd) {
     179                                return thrd;
    100180                        }
    101181                }
    102182
    103                 #if defined(READ)
    104                         const unsigned f = READ;
    105                         if(0 == (tls.it % f)) {
    106                                 unsigned i = tls.it / f;
    107                                 lists[i % numThreads].ts();
    108                         }
    109                         // lists[tls.it].ts();
    110                         tls.it++;
    111                 #endif
    112 
    113 
    114                 return node;
    115         }
    116 
    117 private:
    118         node_t * try_pop(unsigned i) {
     183                return nullptr;
     184        }
     185
     186private:
     187        struct attempt_stat_t {
     188                std::size_t attempt = { 0 };
     189                std::size_t elock   = { 0 };
     190                std::size_t eempty  = { 0 };
     191                std::size_t espec   = { 0 };
     192                std::size_t success = { 0 };
     193        };
     194
     195        node_t * try_pop(unsigned i, attempt_stat_t & stat) {
     196                assert(i < numThreads);
    119197                auto & list = lists[i];
     198                stat.attempt++;
     199
     200                // If the list is empty, don't try
     201                if(list.ts() == 0) { stat.espec++; return nullptr; }
     202
     203                // If we can't get the lock, move on
     204                if( !list.try_lock() ) { stat.elock++; return nullptr; }
    120205
    121206                // If list is empty, unlock and retry
    122207                if( list.ts() == 0 ) {
    123                         list.lock.unlock();
     208                        list.unlock();
     209                        stat.eempty++;
    124210                        return nullptr;
    125211                }
    126212
    127                         // Actually pop the list
    128                 node_t * node;
    129                 bool emptied;
    130                 std::tie(node, emptied) = list.pop();
    131                 assert(node);
    132 
    133                 if(emptied) {
    134                         snzi.depart(i);
    135                 }
    136 
    137                 // Unlock and return
    138                 list.lock.unlock();
    139                 return node;
     213                auto node = list.pop();
     214                list.unlock();
     215                stat.success++;
     216                #ifdef NO_MPSC
     217                        // times[i].val = 1;
     218                        times[i].val = node.first->_links.ts;
     219                        // lists[i].val = node.first->_links.ts;
     220                        return node.first;
     221                #else
     222                        times[i].val = node->_links.ts;
     223                        return node;
     224                #endif
    140225        }
    141226
     
    144229
    145230        static std::atomic_uint32_t ticket;
     231        static const unsigned outside = 0xFFFFFFFF;
     232
     233        static inline unsigned calc_preferred() {
     234                unsigned t = ticket++;
     235                if(t == 0) return outside;
     236                unsigned i = (t - 1) * nqueues;
     237                return i;
     238        }
     239
    146240        static __attribute__((aligned(128))) thread_local struct TLS {
    147                 Random     rng = { int(rdtscl()) };
    148                 unsigned   my_queue = ticket++;
     241                Random     rng1 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
     242                Random     rng2 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) };
     243                unsigned   it   = 0;
     244                unsigned   my_queue = calc_preferred();
     245                unsigned   myfriend = outside;
     246                unsigned long long int mytime = 0;
    149247                #if defined(READ)
    150248                        unsigned it = 0;
     
    152250                struct {
    153251                        struct {
    154                                 std::size_t nhint = { 0 };
     252                                std::size_t attempt = { 0 };
     253                                std::size_t success = { 0 };
    155254                        } push;
    156255                        struct {
    157                                 struct {
    158                                         std::size_t success = { 0 };
    159                                         std::size_t espec = { 0 };
    160                                         std::size_t elock = { 0 };
    161                                 } local;
    162                                 struct {
    163                                         std::size_t tried   = { 0 };
    164                                         std::size_t locked  = { 0 };
    165                                         std::size_t empty   = { 0 };
    166                                         std::size_t success = { 0 };
    167                                 } steal;
     256                                attempt_stat_t help;
     257                                attempt_stat_t local;
     258                                attempt_stat_t steal;
     259                                attempt_stat_t search;
    168260                        } pop;
    169                 } stat;
     261                        std::size_t help = { 0 };
     262                } stats;
    170263        } tls;
    171264
    172265private:
    173266        const unsigned numThreads;
    174         std::unique_ptr<intrusive_queue_t<node_t> []> lists;
    175         __attribute__((aligned(64))) snzi_t snzi;
     267        std::unique_ptr<localQ_t<node_t> []> lists;
     268        // std::unique_ptr<intrusive_queue_t<node_t> []> lists;
     269        std::unique_ptr<timestamp_t []> times;
     270        __attribute__((aligned(128))) std::atomic_size_t count;
    176271
    177272#ifndef NO_STATS
     
    179274        static struct GlobalStats {
    180275                struct {
    181                         std::atomic_size_t nhint = { 0 };
     276                        std::atomic_size_t attempt = { 0 };
     277                        std::atomic_size_t success = { 0 };
    182278                } push;
    183279                struct {
    184280                        struct {
     281                                std::atomic_size_t attempt = { 0 };
     282                                std::atomic_size_t elock   = { 0 };
     283                                std::atomic_size_t eempty  = { 0 };
     284                                std::atomic_size_t espec   = { 0 };
    185285                                std::atomic_size_t success = { 0 };
    186                                 std::atomic_size_t espec = { 0 };
    187                                 std::atomic_size_t elock = { 0 };
     286                        } help;
     287                        struct {
     288                                std::atomic_size_t attempt = { 0 };
     289                                std::atomic_size_t elock   = { 0 };
     290                                std::atomic_size_t eempty  = { 0 };
     291                                std::atomic_size_t espec   = { 0 };
     292                                std::atomic_size_t success = { 0 };
    188293                        } local;
    189294                        struct {
    190                                 std::atomic_size_t tried   = { 0 };
    191                                 std::atomic_size_t locked  = { 0 };
    192                                 std::atomic_size_t empty   = { 0 };
     295                                std::atomic_size_t attempt = { 0 };
     296                                std::atomic_size_t elock   = { 0 };
     297                                std::atomic_size_t eempty  = { 0 };
     298                                std::atomic_size_t espec   = { 0 };
    193299                                std::atomic_size_t success = { 0 };
    194300                        } steal;
     301                        struct {
     302                                std::atomic_size_t attempt = { 0 };
     303                                std::atomic_size_t elock   = { 0 };
     304                                std::atomic_size_t eempty  = { 0 };
     305                                std::atomic_size_t espec   = { 0 };
     306                                std::atomic_size_t success = { 0 };
     307                        } search;
    195308                } pop;
     309                std::atomic_size_t help = { 0 };
    196310        } global_stats;
    197311
    198312public:
    199313        static void stats_tls_tally() {
    200                 global_stats.push.nhint += tls.stat.push.nhint;
    201                 global_stats.pop.local.success += tls.stat.pop.local.success;
    202                 global_stats.pop.local.espec   += tls.stat.pop.local.espec  ;
    203                 global_stats.pop.local.elock   += tls.stat.pop.local.elock  ;
    204                 global_stats.pop.steal.tried   += tls.stat.pop.steal.tried  ;
    205                 global_stats.pop.steal.locked  += tls.stat.pop.steal.locked ;
    206                 global_stats.pop.steal.empty   += tls.stat.pop.steal.empty  ;
    207                 global_stats.pop.steal.success += tls.stat.pop.steal.success;
    208         }
    209 
    210         static void stats_print(std::ostream & os ) {
     314                global_stats.push.attempt += tls.stats.push.attempt;
     315                global_stats.push.success += tls.stats.push.success;
     316                global_stats.pop.help  .attempt += tls.stats.pop.help  .attempt;
     317                global_stats.pop.help  .elock   += tls.stats.pop.help  .elock  ;
     318                global_stats.pop.help  .eempty  += tls.stats.pop.help  .eempty ;
     319                global_stats.pop.help  .espec   += tls.stats.pop.help  .espec  ;
     320                global_stats.pop.help  .success += tls.stats.pop.help  .success;
     321                global_stats.pop.local .attempt += tls.stats.pop.local .attempt;
     322                global_stats.pop.local .elock   += tls.stats.pop.local .elock  ;
     323                global_stats.pop.local .eempty  += tls.stats.pop.local .eempty ;
     324                global_stats.pop.local .espec   += tls.stats.pop.local .espec  ;
     325                global_stats.pop.local .success += tls.stats.pop.local .success;
     326                global_stats.pop.steal .attempt += tls.stats.pop.steal .attempt;
     327                global_stats.pop.steal .elock   += tls.stats.pop.steal .elock  ;
     328                global_stats.pop.steal .eempty  += tls.stats.pop.steal .eempty ;
     329                global_stats.pop.steal .espec   += tls.stats.pop.steal .espec  ;
     330                global_stats.pop.steal .success += tls.stats.pop.steal .success;
     331                global_stats.pop.search.attempt += tls.stats.pop.search.attempt;
     332                global_stats.pop.search.elock   += tls.stats.pop.search.elock  ;
     333                global_stats.pop.search.eempty  += tls.stats.pop.search.eempty ;
     334                global_stats.pop.search.espec   += tls.stats.pop.search.espec  ;
     335                global_stats.pop.search.success += tls.stats.pop.search.success;
     336                global_stats.help += tls.stats.help;
     337        }
     338
     339        static void stats_print(std::ostream & os, double duration ) {
    211340                std::cout << "----- Work Stealing Stats -----" << std::endl;
    212341
    213                 double stealSucc = double(global_stats.pop.steal.success) / global_stats.pop.steal.tried;
    214                 os << "Push to new Q : " << std::setw(15) << global_stats.push.nhint << "\n";
    215                 os << "Local Pop     : " << std::setw(15) << global_stats.pop.local.success << "\n";
    216                 os << "Steal Pop     : " << std::setw(15) << global_stats.pop.steal.success << "(" << global_stats.pop.local.espec << "s, " << global_stats.pop.local.elock << "l)\n";
    217                 os << "Steal Success : " << std::setw(15) << stealSucc << "(" << global_stats.pop.steal.tried << " tries)\n";
    218                 os << "Steal Fails   : " << std::setw(15) << global_stats.pop.steal.empty << "e, " << global_stats.pop.steal.locked << "l\n";
     342                double push_suc = (100.0 * double(global_stats.push.success) / global_stats.push.attempt);
     343                double push_len = double(global_stats.push.attempt     ) / global_stats.push.success;
     344                os << "Push   Pick : " << push_suc << " %, len " << push_len << " (" << global_stats.push.attempt      << " / " << global_stats.push.success << ")\n";
     345
     346                double hlp_suc = (100.0 * double(global_stats.pop.help.success) / global_stats.pop.help.attempt);
     347                double hlp_len = double(global_stats.pop.help.attempt     ) / global_stats.pop.help.success;
     348                os << "Help        : " << hlp_suc << " %, len " << hlp_len << " (" << global_stats.pop.help.attempt      << " / " << global_stats.pop.help.success << ")\n";
     349                os << "Help Fail   : " << global_stats.pop.help.espec << "s, " << global_stats.pop.help.eempty << "e, " << global_stats.pop.help.elock << "l\n";
     350
     351                double pop_suc = (100.0 * double(global_stats.pop.local.success) / global_stats.pop.local.attempt);
     352                double pop_len = double(global_stats.pop.local.attempt     ) / global_stats.pop.local.success;
     353                os << "Local       : " << pop_suc << " %, len " << pop_len << " (" << global_stats.pop.local.attempt      << " / " << global_stats.pop.local.success << ")\n";
     354                os << "Local Fail  : " << global_stats.pop.local.espec << "s, " << global_stats.pop.local.eempty << "e, " << global_stats.pop.local.elock << "l\n";
     355
     356                double stl_suc = (100.0 * double(global_stats.pop.steal.success) / global_stats.pop.steal.attempt);
     357                double stl_len = double(global_stats.pop.steal.attempt     ) / global_stats.pop.steal.success;
     358                os << "Steal       : " << stl_suc << " %, len " << stl_len << " (" << global_stats.pop.steal.attempt      << " / " << global_stats.pop.steal.success << ")\n";
     359                os << "Steal Fail  : " << global_stats.pop.steal.espec << "s, " << global_stats.pop.steal.eempty << "e, " << global_stats.pop.steal.elock << "l\n";
     360
     361                double srh_suc = (100.0 * double(global_stats.pop.search.success) / global_stats.pop.search.attempt);
     362                double srh_len = double(global_stats.pop.search.attempt     ) / global_stats.pop.search.success;
     363                os << "Search      : " << srh_suc << " %, len " << srh_len << " (" << global_stats.pop.search.attempt      << " / " << global_stats.pop.search.success << ")\n";
     364                os << "Search Fail : " << global_stats.pop.search.espec << "s, " << global_stats.pop.search.eempty << "e, " << global_stats.pop.search.elock << "l\n";
     365                os << "Helps       : " << std::setw(15) << std::scientific << global_stats.help / duration << "/sec (" << global_stats.help  << ")\n";
    219366        }
    220367private:
Note: See TracChangeset for help on using the changeset viewer.