Changeset 50aeb6f


Ignore:
Timestamp:
Sep 26, 2019, 4:25:04 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
1e24d13
Parents:
b2a37b0
Message:

Small tweaks to the memory layout

Location:
doc/theses/thierry_delisle_PhD/code
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • doc/theses/thierry_delisle_PhD/code/assert.hpp

    rb2a37b0 r50aeb6f  
    11#pragma once
    22
     3#ifndef NDEBUG
    34#include <cassert>
    45#include <cstdlib>
     
    1718        }                                   \
    1819})
     20#else
     21#define assertf(cond, ...)
     22#endif
  • doc/theses/thierry_delisle_PhD/code/processor_list.hpp

    rb2a37b0 r50aeb6f  
    188188        }
    189189
     190        //-----------------------------------------------------------------------
     191        // Checking support
     192        uint_fast32_t epoch_check() {
     193                // Step 1 : lock global lock
     194                // It is needed to avoid processors that register mid Critical-Section
     195                //   to simply lock their own lock and enter.
     196                while(lock.load(std::memory_order_relaxed))
     197                        asm volatile("pause");
     198
     199                // Step 2 : lock per-proc lock
     200                // Processors that are currently being registered aren't counted
     201                //   but can't be in read_lock or in the critical section.
     202                // All other processors are counted
     203                uint_fast32_t s = ready;
     204                for(uint_fast32_t i = 0; i < s; i++) {
     205                        while(data[i].lock.load(std::memory_order_relaxed))
     206                                asm volatile("pause");
     207                }
     208
     209                return s;
     210        }
     211
    190212public:
    191213};
  • doc/theses/thierry_delisle_PhD/code/processor_list_fast.cpp

    rb2a37b0 r50aeb6f  
    1919        unsigned id;
    2020};
    21 void run(unsigned nthread, double duration, unsigned writes) {
     21void run(unsigned nthread, double duration, unsigned writes, unsigned epochs) {
    2222        assert(writes < 100);
    2323
     
    3030        // Data to check everything is OK
    3131        size_t write_committed = 0ul;
    32         std::atomic_size_t lock_cnt_write = { 0ul };
    33         std::atomic_size_t lock_cnt_read  = { 0ul };
     32        struct {
     33                std::atomic_size_t write = { 0ul };
     34                std::atomic_size_t read  = { 0ul };
     35                std::atomic_size_t epoch = { 0ul };
     36        } lock_cnt;
    3437
    3538        // Flag to signal termination
     
    3942        unsigned i = 1;
    4043        for(auto & t : threads) {
    41                 t = new std::thread([&done, &list, &barrier, &write_committed, &lock_cnt_write, &lock_cnt_read, writes](unsigned tid) {
     44                t = new std::thread([&done, &list, &barrier, &write_committed, &lock_cnt, writes, epochs](unsigned tid) {
    4245                        Random rand(tid + rdtscl());
    4346                        processor proc;
     
    4548                        size_t writes_cnt = 0;
    4649                        size_t reads_cnt = 0;
     50                        size_t epoch_cnt = 0;
    4751
    4852                        affinity(tid);
     
    5155
    5256                        while(__builtin_expect(!done, true)) {
    53                                 if ((rand.next() % 100) < writes) {
     57                                auto r = rand.next() % 100;
     58                                if (r < writes) {
    5459                                        auto n = list.write_lock();
    5560                                        write_committed++;
     
    5762                                        assert(writes_cnt < -2ul);
    5863                                        list.write_unlock(n);
     64                                }
     65                                else if(r < epochs) {
     66                                        list.epoch_check();
     67                                        epoch_cnt++;
    5968                                }
    6069                                else {
     
    7079                        auto p = list.unregister(proc.id);
    7180                        assert(&proc == p);
    72                         lock_cnt_write += writes_cnt;
    73                         lock_cnt_read  += reads_cnt;
     81                        lock_cnt.write += writes_cnt;
     82                        lock_cnt.read  += reads_cnt;
     83                        lock_cnt.epoch += epoch_cnt;
    7484                }, i++);
    7585        }
     
    98108        }
    99109
    100         assert(write_committed == lock_cnt_write);
     110        assert(write_committed == lock_cnt.write);
    101111
    102         size_t ops_sec = size_t(double(lock_cnt_read + lock_cnt_write) / duration);
     112        size_t totalop = lock_cnt.read + lock_cnt.write + lock_cnt.epoch;
     113        size_t ops_sec = size_t(double(totalop) / duration);
    103114        size_t ops_thread = ops_sec / nthread;
    104115        double dur_nano = duration_cast<std::nano>(1.0);
    105116
    106117        std::cout << "Duration      : " << duration << "s\n";
    107         std::cout << "Total ops     : " << (lock_cnt_read + lock_cnt_write) << "(" << lock_cnt_read << "r, " << lock_cnt_write << "w)\n";
     118        std::cout << "Total ops     : " << totalop << "(" << lock_cnt.read << "r, " << lock_cnt.write << "w, " << lock_cnt.epoch << "e)\n";
    108119        std::cout << "Ops/sec       : " << ops_sec << "\n";
    109120        std::cout << "Ops/sec/thread: " << ops_thread << "\n";
     
    121132        unsigned nthreads = 2;
    122133        unsigned writes   = 0;
     134        unsigned epochs   = 0;
    123135
    124136        std::cout.imbue(std::locale(""));
     
    126138        switch (argc)
    127139        {
     140        case 5:
     141                epochs = std::stoul(argv[4]);
     142                [[fallthrough]];
    128143        case 4:
    129144                writes = std::stoul(argv[3]);
    130                 if( writes >= 100 ) {
    131                         std::cerr << "Writes must be valid percentage, was " << argv[3] << "(" << writes << ")" << std::endl;
     145                if( (writes + epochs) > 100 ) {
     146                        std::cerr << "Writes + Epochs must be valid percentage, was " << argv[3] << " + " << argv[4] << "(" << writes << " + " << epochs << ")" << std::endl;
    132147                        usage(argv);
    133148                }
     
    152167        check_cache_line_size();
    153168
    154         std::cout << "Running " << nthreads << " threads for " << duration << " seconds with " << writes << "% writes" << std::endl;
    155         run(nthreads, duration, writes);
     169        std::cout << "Running " << nthreads << " threads for " << duration << " seconds with " << writes << "% writes and " << epochs << "% epochs" << std::endl;
     170        run(nthreads, duration, writes, epochs + writes);
    156171
    157172        return 0;
  • doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp

    rb2a37b0 r50aeb6f  
    1313#include "utils.hpp"
    1414
    15 struct Node {
     15struct __attribute__((aligned(64))) Node {
    1616        static std::atomic_size_t creates;
    1717        static std::atomic_size_t destroys;
     
    3333
    3434static const constexpr int nodes_per_threads = 128;
     35struct NodeArray {
     36        __attribute__((aligned(64))) Node * array[nodes_per_threads];
     37        __attribute__((aligned(64))) char pad;
     38};
    3539
    3640bool enable_stats = false;
    3741
    38 __attribute__((aligned(64))) thread_local pick_stat local_pick;
    39 
    40 void run(unsigned nthread, double duration) {
     42struct local_stat_t {
     43        size_t in  = 0;
     44        size_t out = 0;
     45        size_t empty = 0;
     46        size_t crc_in  = 0;
     47        size_t crc_out = 0;
     48};
     49
     50__attribute__((noinline)) void run_body(
     51        std::atomic<bool>& done,
     52        Random & rand,
     53        Node * (&my_nodes)[128],
     54        local_stat_t & local,
     55        relaxed_list<Node> & list
     56) {
     57        while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
     58                int idx = rand.next() % nodes_per_threads;
     59                if (auto node = my_nodes[idx]) {
     60                        local.crc_in += node->value;
     61                        list.push(node);
     62                        my_nodes[idx] = nullptr;
     63                        local.in++;
     64                }
     65                else if(auto node = list.pop()) {
     66                        local.crc_out += node->value;
     67                        my_nodes[idx] = node;
     68                        local.out++;
     69                }
     70                else {
     71                        local.empty++;
     72                }
     73        }
     74}
     75
     76void run(unsigned nthread, unsigned nqueues, unsigned fill, double duration) {
    4177        // List being tested
    42         relaxed_list<Node> list = { nthread * 2 };
     78        relaxed_list<Node> list = { nthread * nqueues };
    4379
    4480        // Barrier for synchronization
     
    5288                std::atomic_size_t crc_in  = { 0 };
    5389                std::atomic_size_t crc_out = { 0 };
    54                 std::atomic_size_t pick_at = { 0 };
    55                 std::atomic_size_t pick_su = { 0 };
     90                struct {
     91                        struct {
     92                                std::atomic_size_t attempt = { 0 };
     93                                std::atomic_size_t success = { 0 };
     94                        } push;
     95                        struct {
     96                                std::atomic_size_t attempt = { 0 };
     97                                std::atomic_size_t success = { 0 };
     98                        } pop;
     99                } pick;
    56100        } global;
    57101
     
    61105        // Prep nodes
    62106        std::cout << "Initializing" << std::endl;
    63         std::vector<Node *> all_nodes[nthread];
     107        NodeArray all_nodes[nthread];
    64108        for(auto & nodes : all_nodes) {
    65109                Random rand(rdtscl());
    66                 nodes.resize(nodes_per_threads);
    67                 for(auto & node : nodes) {
    68                         node = new Node(rand.next() % 100);
     110                for(auto & node : nodes.array) {
     111                        auto r = rand.next() % 100;
     112                        if(r < fill)
     113                                node = new Node(rand.next() % 100);
    69114                }
    70115
    71116                for(int i = 0; i < 10; i++) {
    72117                        int idx = rand.next() % nodes_per_threads;
    73                         if (auto node = nodes[idx]) {
     118                        if (auto node = nodes.array[idx]) {
    74119                                global.crc_in += node->value;
    75120                                list.push(node);
    76                                 nodes[idx] = nullptr;
     121                                nodes.array[idx] = nullptr;
    77122                        }
    78123                }
     
    84129        unsigned i = 1;
    85130        for(auto & t : threads) {
    86                 auto & my_nodes = all_nodes[i - 1];
     131                auto & my_nodes = all_nodes[i - 1].array;
    87132                t = new std::thread([&done, &list, &barrier, &global, &my_nodes](unsigned tid) {
    88133                        Random rand(tid + rdtscl());
    89134
    90                         size_t local_in  = 0;
    91                         size_t local_out = 0;
    92                         size_t local_empty = 0;
    93                         size_t local_crc_in  = 0;
    94                         size_t local_crc_out = 0;
    95 
    96                         affinity(tid);
     135                        local_stat_t local;
     136
     137                        // affinity(tid);
    97138
    98139                        barrier.wait(tid);
     
    100141                        // EXPERIMENT START
    101142
    102                         while(__builtin_expect(!done, true)) {
    103                                 int idx = rand.next() % nodes_per_threads;
    104                                 if (auto node = my_nodes[idx]) {
    105                                         local_crc_in += node->value;
    106                                         list.push(node);
    107                                         my_nodes[idx] = nullptr;
    108                                         local_in++;
    109                                 }
    110                                 else if(auto node = list.pop2()) {
    111                                         local_crc_out += node->value;
    112                                         my_nodes[idx] = node;
    113                                         local_out++;
    114                                 }
    115                                 else {
    116                                         local_empty++;
    117                                 }
    118                         }
     143                        run_body(done, rand, my_nodes, local, list);
    119144
    120145                        // EXPERIMENT END
     
    122147                        barrier.wait(tid);
    123148
    124                         global.in    += local_in;
    125                         global.out   += local_out;
    126                         global.empty += local_empty;
     149                        global.in    += local.in;
     150                        global.out   += local.out;
     151                        global.empty += local.empty;
    127152
    128153                        for(auto node : my_nodes) {
     
    130155                        }
    131156
    132                         global.crc_in  += local_crc_in;
    133                         global.crc_out += local_crc_out;
    134 
    135                         global.pick_at += local_pick.attempt;
    136                         global.pick_su += local_pick.success;
     157                        global.crc_in  += local.crc_in;
     158                        global.crc_out += local.crc_out;
     159
     160                        global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
     161                        global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
     162                        global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
     163                        global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
    137164                }, i++);
    138165        }
     
    143170
    144171        while(true) {
    145                 usleep(1000);
     172                usleep(100000);
    146173                auto now = Clock::now();
    147174                duration_t durr = now - before;
     
    150177                        break;
    151178                }
     179                std::cout << "\r" << durr.count();
     180                std::cout.flush();
    152181        }
    153182
     
    156185        duration_t durr = after - before;
    157186        duration = durr.count();
    158         std::cout << "Closing down" << std::endl;
     187        std::cout << "\nClosing down" << std::endl;
    159188
    160189        for(auto t : threads) {
     
    181210
    182211        std::cout << "Duration      : " << duration << "s\n";
     212        std::cout << "ns/Op         : " << ( dur_nano / ops_thread )<< "\n";
     213        std::cout << "Ops/sec/thread: " << ops_thread << "\n";
     214        std::cout << "Ops/sec       : " << ops_sec << "\n";
    183215        std::cout << "Total ops     : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
    184         std::cout << "Ops/sec       : " << ops_sec << "\n";
    185         std::cout << "Ops/sec/thread: " << ops_thread << "\n";
    186         std::cout << "ns/Op         : " << ( dur_nano / ops_thread )<< "\n";
    187         std::cout << "Pick %        : " << (100.0 * double(global.pick_su) / global.pick_at) << "(" << global.pick_su << " / " << global.pick_at << ")\n";
     216        #ifndef NO_STATS
     217                double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
     218                double pop_sur  = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
     219                std::cout << "Push Pick %   : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
     220                std::cout << "Pop  Pick %   : " << pop_sur  << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
     221        #endif
    188222}
    189223
    190224void usage(char * argv[]) {
    191         std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS]" << std::endl;;
     225        std::cerr << argv[0] << ": [DURATION (FLOAT:SEC)] [NTHREADS] [NQUEUES] [FILL]" << std::endl;;
    192226        std::exit(1);
    193227}
     
    197231        double duration   = 5.0;
    198232        unsigned nthreads = 2;
     233        unsigned nqueues  = 2;
     234        unsigned fill     = 100;
    199235
    200236        std::cout.imbue(std::locale(""));
     
    202238        switch (argc)
    203239        {
     240        case 5:
     241                nqueues = std::stoul(argv[4]);
     242                [[fallthrough]];
     243        case 4:
     244                nqueues = std::stoul(argv[3]);
     245                [[fallthrough]];
    204246        case 3:
    205247                nthreads = std::stoul(argv[2]);
     
    221263        check_cache_line_size();
    222264
    223         std::cout << "Running " << nthreads << " threads for " << duration << " seconds" << std::endl;
    224         run(nthreads, duration);
     265        std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
     266        run(nthreads, nqueues, duration, fill);
    225267
    226268        return 0;
     
    228270
    229271template<>
    230 thread_local Random relaxed_list<Node>::rng_g = { int(rdtscl()) };
     272thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
     273
     274template<>
     275relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
     276
     277const char * __my_progname = "Relaxed List";
  • doc/theses/thierry_delisle_PhD/code/relaxed_list.hpp

    rb2a37b0 r50aeb6f  
    11#pragma once
    22
     3#ifndef NO_STATS
    34#include <iostream>
     5#endif
     6
    47#include <memory>
    58#include <mutex>
     
    1114using namespace std;
    1215
     16struct spinlock_t {
     17        std::atomic_bool ll = { false };
     18
     19        inline void lock() {
     20                while( __builtin_expect(ll.exchange(true),false) ) {
     21                        while(ll.load(std::memory_order_relaxed))
     22                                asm volatile("pause");
     23                }
     24        }
     25
     26        inline bool try_lock() {
     27                return false == ll.exchange(true);
     28        }
     29
     30        inline void unlock() {
     31                ll.store(false, std::memory_order_release);
     32        }
     33
     34        inline explicit operator bool() {
     35                return ll.load(std::memory_order_relaxed);
     36        }
     37};
     38
     39
    1340extern bool enable_stats;
    1441
    15 
    1642struct pick_stat {
    17         size_t attempt = 0;
    18         size_t success = 0;
    19 };
    20 
    21 extern __attribute__((aligned(64))) thread_local pick_stat local_pick;
     43        struct {
     44                size_t attempt = 0;
     45                size_t success = 0;
     46        } push;
     47        struct {
     48                size_t attempt = 0;
     49                size_t success = 0;
     50        } pop;
     51};
    2252
    2353template<typename node_t>
     
    2858};
    2959
    30 struct spinlock_t {
    31         std::atomic_bool ll = { false };
    32 
    33         inline void lock() {
    34                 while( __builtin_expect(ll.exchange(true),false) ) {
    35                         while(ll.load(std::memory_order_relaxed))
    36                                 asm volatile("pause");
    37                 }
    38         }
    39 
    40         inline void unlock() {
    41                 ll.store(false, std::memory_order_release);
    42         }
    43 };
    44 
    4560template<typename node_t>
    46 class relaxed_list {
     61class __attribute__((aligned(128))) relaxed_list {
    4762        static_assert(std::is_same<decltype(node_t::_links), _LinksFields_t<node_t>>::value, "Node must have a links field");
    4863
     
    5065public:
    5166        relaxed_list(unsigned numLists)
    52                 : numLists(numLists)
     67                : numNonEmpty{0}
    5368                , lists(new intrusive_queue_t[numLists])
    54                 , numNonEmpty(0)
     69                , numLists(numLists)
    5570        {}
    5671
    57         void push(node_t * node) {
    58                 int i = rng_g.next() % numLists;
    59                 lists[i].push(node, numNonEmpty);
     72        ~relaxed_list() {
     73                lists.reset();
     74                #ifndef NO_STATS
     75                        std::cout << "Difference   : "
     76                                << size_t(double(intrusive_queue_t::stat::dif.value) / intrusive_queue_t::stat::dif.num  ) << " avg\t"
     77                                << intrusive_queue_t::stat::dif.max << "max" << std::endl;
     78                #endif
     79        }
     80
     81        __attribute__((noinline, hot)) void push(node_t * node) {
     82                node->_links.ts = rdtscl();
     83
     84                while(true) {
     85                        // Pick a random list
     86                        int i = tls.rng.next() % numLists;
     87
     88                        #ifndef NO_STATS
     89                                tls.pick.push.attempt++;
     90                        #endif
     91
     92                        // If we can't lock it retry
     93                        if( !lists[i].lock.try_lock() ) continue;
     94
     95                        // Actually push it
     96                        lists[i].push(node, numNonEmpty);
     97                        assert(numNonEmpty <= (int)numLists);
     98
     99                        // Unlock and return
     100                        lists[i].lock.unlock();
     101
     102                        #ifndef NO_STATS
     103                                tls.pick.push.success++;
     104                        #endif
     105                        return;
     106                }
    60107        }
    61108
    62         node_t * pop() {
    63                 int i = pickRandomly(-1);
    64                 int j = pickRandomly(i);
    65 
    66                 if(i == -1) {
    67                         return nullptr;
    68                 }
    69 
    70                 auto guard = lock(i, j);
    71                 auto & list = best(i, j);
    72                 return list.pop(numNonEmpty);
     109        __attribute__((noinline, hot)) node_t * pop() {
     110                while(numNonEmpty != 0) {
     111                        // Pick two lists at random
     112                        int i = tls.rng.next() % numLists;
     113                        int j = tls.rng.next() % numLists;
     114
     115                        #ifndef NO_STATS
     116                                tls.pick.pop.attempt++;
     117                        #endif
     118
     119                        // Pick the bet list
     120                        int w = i;
     121                        if( __builtin_expect(lists[j].ts() != 0, true) ) {
     122                                w = (lists[i].ts() < lists[j].ts()) ? i : j;
     123                        }
     124
     125                        auto & list = lists[w];
     126                        // If list looks empty retry
     127                        if( list.ts() == 0 ) continue;
     128
     129                        // If we can't get the lock retry
     130                        if( !list.lock.try_lock() ) continue;
     131
     132                        // If list is empty, unlock and retry
     133                        if( list.ts() == 0 ) {
     134                                list.lock.unlock();
     135                                continue;
     136                        }
     137
     138                        // Actually pop the list
     139                        auto node = list.pop(numNonEmpty);
     140                        assert(node);
     141
     142                        // Unlock and return
     143                        list.lock.unlock();
     144                        assert(numNonEmpty >= 0);
     145                        #ifndef NO_STATS
     146                                tls.pick.pop.success++;
     147                        #endif
     148                        return node;
     149                }
     150
     151                return nullptr;
    73152        }
    74153
    75         node_t * pop2() {
    76                 int i = pickRandomly(-1);
    77                 int j = pickRandomly(i);
    78 
    79                 if(i == -1) {
    80                         return nullptr;
    81                 }
    82 
    83                 auto & list = best2(i, j);
    84                 return list.pop2(numNonEmpty);
    85         }
    86 
    87154private:
    88155
    89         class intrusive_queue_t {
     156        class __attribute__((aligned(128))) intrusive_queue_t {
    90157        public:
    91158                typedef spinlock_t lock_t;
    92159
    93160                friend class relaxed_list<node_t>;
     161
     162                struct stat {
     163                        ssize_t diff = 0;
     164
     165                        static struct Dif {
     166                                ssize_t value = 0;
     167                                size_t  num   = 0;
     168                                ssize_t max   = 0;
     169                        } dif;
     170                };
    94171
    95172        private:
     
    98175                };
    99176
    100                 struct stat {
    101                         size_t push = 0;
    102                         size_t pop  = 0;
    103                 };
    104 
    105                 __attribute__((aligned(64))) lock_t lock;
    106                 __attribute__((aligned(64))) bool empty;
    107                 stat s;
     177                lock_t lock;
    108178                sentinel_t before;
    109179                sentinel_t after;
     180                stat s;
    110181
    111182                static constexpr auto fields_offset = offsetof( node_t, _links );
    112183        public:
    113184                intrusive_queue_t()
    114                         : empty(true)
    115                         , before{{ nullptr, tail() }}
     185                        : before{{ nullptr, tail() }}
    116186                        , after {{ head(), nullptr }}
    117187                {
     
    122192                        assert(tail()->_links.next == nullptr);
    123193                        assert(tail()->_links.prev == head() );
     194                        assert(sizeof(*this) == 128);
     195                        assert((intptr_t(this) % 128) == 0);
    124196                }
    125197
    126198                ~intrusive_queue_t() {
    127                         std::cout << " Push: " << s.push << "\tPop: " << s.pop << "\t(this: " << this << ")" << std::endl;
    128                 }
    129 
    130                 node_t * head() const {
    131                         return reinterpret_cast<node_t *>(
     199                        #ifndef NO_STATS
     200                                stat::dif.value+= s.diff;
     201                                stat::dif.num  ++;
     202                                stat::dif.max  = std::max(stat::dif.max, s.diff);
     203                        #endif
     204                }
     205
     206                inline node_t * head() const {
     207                        node_t * rhead = reinterpret_cast<node_t *>(
    132208                                reinterpret_cast<uintptr_t>( &before ) - fields_offset
    133209                        );
    134                 }
    135 
    136                 node_t * tail() const {
    137                         return reinterpret_cast<node_t *>(
     210                        assert(rhead);
     211                        return rhead;
     212                }
     213
     214                inline node_t * tail() const {
     215                        node_t * rtail = reinterpret_cast<node_t *>(
    138216                                reinterpret_cast<uintptr_t>( &after ) - fields_offset
    139217                        );
    140                 }
    141 
    142                 void push(node_t * node, volatile int & nonEmpty) {
     218                        assert(rtail);
     219                        return rtail;
     220                }
     221
     222                inline void push(node_t * node, std::atomic_int & nonEmpty) {
     223                        assert(lock);
     224                        assert(node->_links.ts != 0);
    143225                        node_t * tail = this->tail();
    144                         std::lock_guard<lock_t> guard(lock);
    145                         node->_links.ts = rdtscl();
    146226
    147227                        node_t * prev = tail->_links.prev;
    148228                        // assertf(node->_links.ts >= prev->_links.ts,
    149                         // "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts);
     229                        //      "New node has smaller timestamp: %llu < %llu", node->_links.ts, prev->_links.ts);
    150230                        node->_links.next = tail;
    151231                        node->_links.prev = prev;
    152232                        prev->_links.next = node;
    153233                        tail->_links.prev = node;
    154                         if(empty) {
    155                                 __atomic_fetch_add(&nonEmpty, 1, __ATOMIC_SEQ_CST);
    156                                 empty = false;
    157                         }
    158                         if(enable_stats) s.push++;
    159                 }
    160 
    161                 node_t * pop(volatile int & nonEmpty) {
     234                        if(before._links.ts == 0l) {
     235                                nonEmpty += 1;
     236                                before._links.ts = node->_links.ts;
     237                        }
     238                        #ifndef NO_STATS
     239                                if(enable_stats) s.diff++;
     240                        #endif
     241                }
     242
     243                inline node_t * pop(std::atomic_int & nonEmpty) {
     244                        assert(lock);
    162245                        node_t * head = this->head();
    163246                        node_t * tail = this->tail();
     
    171254
    172255                        if(next == tail) {
    173                                 empty = true;
    174                                 __atomic_fetch_sub(&nonEmpty, 1, __ATOMIC_SEQ_CST);
    175                         }
    176                         if(enable_stats) s.pop++;
     256                                before._links.ts = 0l;
     257                                nonEmpty -= 1;
     258                        }
     259                        else {
     260                                assert(next->_links.ts != 0);
     261                                before._links.ts = next->_links.ts;
     262                                assert(before._links.ts != 0);
     263                        }
     264                        #ifndef NO_STATS
     265                                if(enable_stats) s.diff--;
     266                        #endif
    177267                        return node;
    178268                }
    179269
    180                 node_t * pop2(volatile int & nonEmpty) {
    181                         node_t * head = this->head();
    182                         node_t * tail = this->tail();
    183 
    184                         std::lock_guard<lock_t> guard(lock);
    185                         node_t * node = head->_links.next;
    186                         node_t * next = node->_links.next;
    187                         if(node == tail) return nullptr;
    188 
    189                         head->_links.next = next;
    190                         next->_links.prev = head;
    191 
    192                         if(next == tail) {
    193                                 empty = true;
    194                                 __atomic_fetch_sub(&nonEmpty, 1, __ATOMIC_SEQ_CST);
    195                         }
    196                         if(enable_stats) s.pop++;
    197                         return node;
    198                 }
    199 
    200                 static intrusive_queue_t & best(intrusive_queue_t & lhs, intrusive_queue_t & rhs) {
    201                         bool lhs_empty = lhs.empty;
    202                         bool rhs_empty = rhs.empty;
    203 
    204                         if(lhs_empty && rhs_empty) return lhs;
    205                         if(!lhs_empty && rhs_empty) return lhs;
    206                         if(lhs_empty && !rhs_empty) return rhs;
    207                         node_t * lhs_head = lhs.head()->_links.next;
    208                         node_t * rhs_head = rhs.head()->_links.next;
    209 
    210                         assert(lhs_head != lhs.tail());
    211                         assert(rhs_head != rhs.tail());
    212 
    213                         if(lhs_head->_links.ts < lhs_head->_links.ts) {
    214                                 return lhs;
    215                         } else {
    216                                 return rhs;
    217                         }
    218                 }
    219 
    220                 static intrusive_queue_t & best2(intrusive_queue_t & lhs, intrusive_queue_t & rhs) {
    221                         node_t * lhs_head = lhs.head()->_links.next;
    222                         node_t * rhs_head = rhs.head()->_links.next;
    223 
    224                         bool lhs_empty = lhs_head != lhs.tail();
    225                         bool rhs_empty = rhs_head != rhs.tail();
    226                         if(lhs_empty && rhs_empty) return lhs;
    227                         if(!lhs_empty && rhs_empty) return lhs;
    228                         if(lhs_empty && !rhs_empty) return rhs;
    229 
    230                         if(lhs_head->_links.ts < lhs_head->_links.ts) {
    231                                 return lhs;
    232                         } else {
    233                                 return rhs;
    234                         }
     270                long long ts() const {
     271                        return before._links.ts;
    235272                }
    236273        };
    237274
    238275
     276public:
     277
     278        static __attribute__((aligned(128))) thread_local struct TLS {
     279                Random    rng = { int(rdtscl()) };
     280                pick_stat pick;
     281        } tls;
     282
    239283private:
    240 
    241         static thread_local Random rng_g;
    242         __attribute__((aligned(64))) const unsigned numLists;
    243         std::unique_ptr<intrusive_queue_t []> lists;
    244         __attribute__((aligned(64))) volatile int numNonEmpty; // number of non-empty lists
    245 
    246 
    247 private:
    248 
    249 
    250 
    251 private:
    252         int pickRandomly(const int avoid) {
    253                 int j;
    254                 do {
    255                         local_pick.attempt++;
    256                         j = rng_g.next() % numLists;
    257                         if (numNonEmpty < 1 + (avoid != -1)) return -1;
    258                 } while (j == avoid || lists[j].empty);
    259                 local_pick.success++;
    260                 return j;
    261         }
    262 
    263 private:
    264 
    265         struct queue_guard {
    266                 intrusive_queue_t * lists;
    267                 int i, j;
    268 
    269                 queue_guard(intrusive_queue_t * lists, int i, int j)
    270                         : lists(lists), i(i), j(j)
    271                 {
    272                         if(i >= 0) lists[i].lock.lock();
    273                         if(j >= 0) lists[j].lock.lock();
    274                 }
    275 
    276                 queue_guard(const queue_guard &) = delete;
    277                 queue_guard(queue_guard &&) = default;
    278 
    279                 ~queue_guard() {
    280                         if(i >= 0) lists[i].lock.unlock();
    281                         if(j >= 0) lists[j].lock.unlock();
    282                 }
    283         };
    284 
    285         auto lock(int i, int j) {
    286                 assert(i >= 0);
    287                 assert(i != j);
    288                 if(j < i) return queue_guard(lists.get(), j, i);
    289                 return queue_guard(lists.get(), i, j);
    290         }
    291 
    292         intrusive_queue_t & best(int i, int j) {
    293                 assert(i != -1);
    294                 if(j == -1) return lists[i];
    295                 return intrusive_queue_t::best(lists[i], lists[j]);
    296         }
    297 
    298         intrusive_queue_t & best2(int i, int j) {
    299                 assert(i != -1);
    300                 if(j == -1) return lists[i];
    301                 return intrusive_queue_t::best2(lists[i], lists[j]);
    302         }
    303 };
     284        std::atomic_int numNonEmpty; // number of non-empty lists
     285        __attribute__((aligned(64))) std::unique_ptr<intrusive_queue_t []> lists;
     286        const unsigned numLists;
     287
     288public:
     289        static const constexpr size_t sizeof_queue = sizeof(intrusive_queue_t);
     290};
Note: See TracChangeset for help on using the changeset viewer.