source: doc/theses/thierry_delisle_PhD/code/relaxed_list.cpp @ 9421f3d8

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 9421f3d8 was 9421f3d8, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Adding some of the implemented code. Current state: relaxed list is achieves at least 6M ops/sec total

  • Property mode set to 100644
File size: 14.9 KB
Line 
1#include "relaxed_list.hpp"
2
3#include <array>
4#include <iomanip>
5#include <iostream>
6#include <locale>
7#include <string>
8#include <thread>
9#include <vector>
10
11#include <getopt.h>
12#include <unistd.h>
13#include <sys/sysinfo.h>
14
15#include "utils.hpp"
16
17struct __attribute__((aligned(64))) Node {
18        static std::atomic_size_t creates;
19        static std::atomic_size_t destroys;
20
21        _LinksFields_t<Node> _links;
22
23        int value;
24
25        Node() { creates++; }
26        Node(int value): value(value) { creates++; }
27        ~Node() { destroys++; }
28};
29
30std::atomic_size_t Node::creates  = { 0 };
31std::atomic_size_t Node::destroys = { 0 };
32
33bool enable_stats = false;
34
35template<>
36thread_local relaxed_list<Node>::TLS relaxed_list<Node>::tls = {};
37
38template<>
39relaxed_list<Node>::intrusive_queue_t::stat::Dif relaxed_list<Node>::intrusive_queue_t::stat::dif = {};
40
41// ================================================================================================
42//                        UTILS
43// ================================================================================================
44
45struct local_stat_t {
46        size_t in  = 0;
47        size_t out = 0;
48        size_t empty = 0;
49        size_t crc_in  = 0;
50        size_t crc_out = 0;
51};
52
53struct global_stat_t {
54        std::atomic_size_t in  = { 0 };
55        std::atomic_size_t out = { 0 };
56        std::atomic_size_t empty = { 0 };
57        std::atomic_size_t crc_in  = { 0 };
58        std::atomic_size_t crc_out = { 0 };
59        struct {
60                struct {
61                        std::atomic_size_t attempt = { 0 };
62                        std::atomic_size_t success = { 0 };
63                } push;
64                struct {
65                        std::atomic_size_t attempt = { 0 };
66                        std::atomic_size_t success = { 0 };
67                        std::atomic_size_t mask_attempt = { 0 };
68                } pop;
69        } pick;
70        struct {
71                struct {
72                        std::atomic_size_t value = { 0 };
73                        std::atomic_size_t count = { 0 };
74                } push;
75                struct {
76                        std::atomic_size_t value = { 0 };
77                        std::atomic_size_t count = { 0 };
78                } pop;
79        } qstat;
80};
81
82void tally_stats(global_stat_t & global, local_stat_t & local) {
83        global.in    += local.in;
84        global.out   += local.out;
85        global.empty += local.empty;
86
87        global.crc_in  += local.crc_in;
88        global.crc_out += local.crc_out;
89
90        global.pick.push.attempt += relaxed_list<Node>::tls.pick.push.attempt;
91        global.pick.push.success += relaxed_list<Node>::tls.pick.push.success;
92        global.pick.pop .attempt += relaxed_list<Node>::tls.pick.pop.attempt;
93        global.pick.pop .success += relaxed_list<Node>::tls.pick.pop.success;
94        global.pick.pop .mask_attempt += relaxed_list<Node>::tls.pick.pop.mask_attempt;
95
96        global.qstat.push.value += relaxed_list<Node>::tls.empty.push.value;
97        global.qstat.push.count += relaxed_list<Node>::tls.empty.push.count;
98        global.qstat.pop .value += relaxed_list<Node>::tls.empty.pop .value;
99        global.qstat.pop .count += relaxed_list<Node>::tls.empty.pop .count;
100}
101
102void waitfor(double & duration, barrier_t & barrier, std::atomic_bool & done) {
103        std::cout << "Starting" << std::endl;
104        auto before = Clock::now();
105        barrier.wait(0);
106
107        while(true) {
108                usleep(100000);
109                auto now = Clock::now();
110                duration_t durr = now - before;
111                if( durr.count() > duration ) {
112                        done = true;
113                        break;
114                }
115                std::cout << "\r" << std::setprecision(4) << durr.count();
116                std::cout.flush();
117        }
118
119        barrier.wait(0);
120        auto after = Clock::now();
121        duration_t durr = after - before;
122        duration = durr.count();
123        std::cout << "\rClosing down" << std::endl;
124}
125
126void print_stats(double duration, unsigned nthread, global_stat_t & global) {
127        assert(Node::creates == Node::destroys);
128        assert(global.crc_in == global.crc_out);
129
130        std::cout << "Done" << std::endl;
131
132        size_t ops = global.in + global.out;
133        size_t ops_sec = size_t(double(ops) / duration);
134        size_t ops_thread = ops_sec / nthread;
135        auto dur_nano = duration_cast<std::nano>(1.0);
136
137        std::cout << "Duration      : " << duration << "s\n";
138        std::cout << "ns/Op         : " << ( dur_nano / ops_thread )<< "\n";
139        std::cout << "Ops/sec/thread: " << ops_thread << "\n";
140        std::cout << "Ops/sec       : " << ops_sec << "\n";
141        std::cout << "Total ops     : " << ops << "(" << global.in << "i, " << global.out << "o, " << global.empty << "e)\n";
142        #ifndef NO_STATS
143                double push_sur = (100.0 * double(global.pick.push.success) / global.pick.push.attempt);
144                double pop_sur  = (100.0 * double(global.pick.pop .success) / global.pick.pop .attempt);
145
146                std::cout << "Push Pick %   : " << push_sur << "(" << global.pick.push.success << " / " << global.pick.push.attempt << ")\n";
147                std::cout << "Pop  Pick %   : " << pop_sur  << "(" << global.pick.pop .success << " / " << global.pick.pop .attempt << ")\n";
148                std::cout << "Pop mask trys : " << global.pick.pop.mask_attempt << std::endl;
149
150                double avgQ_push = double(global.qstat.push.value) / global.qstat.push.count;
151                double avgQ_pop  = double(global.qstat.pop .value) / global.qstat.pop .count;
152                double avgQ      = double(global.qstat.push.value + global.qstat.pop .value) / (global.qstat.push.count + global.qstat.pop .count);
153                std::cout << "Push   Avg Qs : " << avgQ_push << " (" << global.qstat.push.count << "ops)\n";
154                std::cout << "Pop    Avg Qs : " << avgQ_pop  << " (" << global.qstat.pop .count << "ops)\n";
155                std::cout << "Global Avg Qs : " << avgQ      << " (" << (global.qstat.push.count + global.qstat.pop .count) << "ops)\n";
156        #endif
157}
158
159// ================================================================================================
160//                        EXPERIMENTS
161// ================================================================================================
162
163// ================================================================================================
164__attribute__((noinline)) void runChurn_body(
165        std::atomic<bool>& done,
166        Random & rand,
167        Node * my_nodes[],
168        unsigned nslots,
169        local_stat_t & local,
170        relaxed_list<Node> & list
171) {
172        while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
173                int idx = rand.next() % nslots;
174                if (auto node = my_nodes[idx]) {
175                        local.crc_in += node->value;
176                        list.push(node);
177                        my_nodes[idx] = nullptr;
178                        local.in++;
179                }
180                else if(auto node = list.pop()) {
181                        local.crc_out += node->value;
182                        my_nodes[idx] = node;
183                        local.out++;
184                }
185                else {
186                        local.empty++;
187                }
188        }
189}
190
191void runChurn(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes, const unsigned nslots) {
192        std::cout << "Churn Benchmark" << std::endl;
193        assert(nnodes <= nslots);
194        // List being tested
195        relaxed_list<Node> list = { nthread * nqueues };
196
197        // Barrier for synchronization
198        barrier_t barrier(nthread + 1);
199
200        // Data to check everything is OK
201        global_stat_t global;
202
203        // Flag to signal termination
204        std::atomic_bool done  = { false };
205
206        // Prep nodes
207        std::cout << "Initializing ";
208        size_t npushed = 0;
209
210        Node** all_nodes[nthread];
211        for(auto & nodes : all_nodes) {
212                nodes = new __attribute__((aligned(64))) Node*[nslots + 8];
213                Random rand(rdtscl());
214                for(unsigned i = 0; i < nnodes; i++) {
215                        nodes[i] = new Node(rand.next() % 100);
216                }
217
218                for(unsigned i = nnodes; i < nslots; i++) {
219                        nodes[i] = nullptr;
220                }
221
222                for(int i = 0; i < 10 && i < (int)nslots; i++) {
223                        int idx = rand.next() % nslots;
224                        if (auto node = nodes[idx]) {
225                                global.crc_in += node->value;
226                                list.push(node);
227                                npushed++;
228                                nodes[idx] = nullptr;
229                        }
230                }
231        }
232
233        std::cout << nnodes << " nodes (" << nslots << " slots)" << std::endl;
234
235        enable_stats = true;
236
237        std::thread * threads[nthread];
238        unsigned i = 1;
239        for(auto & t : threads) {
240                auto & my_nodes = all_nodes[i - 1];
241                t = new std::thread([&done, &list, &barrier, &global, &my_nodes, nslots](unsigned tid) {
242                        Random rand(tid + rdtscl());
243
244                        local_stat_t local;
245
246                        // affinity(tid);
247
248                        barrier.wait(tid);
249
250                        // EXPERIMENT START
251
252                        runChurn_body(done, rand, my_nodes, nslots, local, list);
253
254                        // EXPERIMENT END
255
256                        barrier.wait(tid);
257
258                        tally_stats(global, local);
259
260                        for(unsigned i = 0; i < nslots; i++) {
261                                delete my_nodes[i];
262                        }
263                }, i++);
264        }
265
266        waitfor(duration, barrier, done);
267
268        for(auto t : threads) {
269                t->join();
270                delete t;
271        }
272
273        enable_stats = false;
274
275        while(auto node = list.pop()) {
276                global.crc_out += node->value;
277                delete node;
278        }
279
280        for(auto nodes : all_nodes) {
281                delete[] nodes;
282        }
283
284        print_stats(duration, nthread, global);
285}
286
287// ================================================================================================
288__attribute__((noinline)) void runPingPong_body(
289        std::atomic<bool>& done,
290        Node initial_nodes[],
291        unsigned nnodes,
292        local_stat_t & local,
293        relaxed_list<Node> & list
294) {
295        Node * nodes[nnodes];
296        {
297                unsigned i = 0;
298                for(auto & n : nodes) {
299                        n = &initial_nodes[i++];
300                }
301        }
302
303        while(__builtin_expect(!done.load(std::memory_order_relaxed), true)) {
304
305                for(Node * & node : nodes) {
306                        local.crc_in += node->value;
307                        list.push(node);
308                        local.in++;
309                }
310
311                // -----
312
313                for(Node * & node : nodes) {
314                        node = list.pop();
315                        assert(node);
316                        local.crc_out += node->value;
317                        local.out++;
318                }
319        }
320}
321
322void runPingPong(unsigned nthread, unsigned nqueues, double duration, unsigned nnodes) {
323        std::cout << "PingPong Benchmark" << std::endl;
324
325        // List being tested
326        relaxed_list<Node> list = { nthread * nqueues };
327
328        // Barrier for synchronization
329        barrier_t barrier(nthread + 1);
330
331        // Data to check everything is OK
332        global_stat_t global;
333
334        // Flag to signal termination
335        std::atomic_bool done  = { false };
336
337        std::cout << "Initializing ";
338        enable_stats = true;
339
340        std::thread * threads[nthread];
341        unsigned i = 1;
342        for(auto & t : threads) {
343                t = new std::thread([&done, &list, &barrier, &global, nnodes](unsigned tid) {
344                        Random rand(tid + rdtscl());
345
346                        Node nodes[nnodes];
347                        for(auto & n : nodes) {
348                                n.value = (int)rand.next() % 100;
349                        }
350
351                        local_stat_t local;
352
353                        // affinity(tid);
354
355                        barrier.wait(tid);
356
357                        // EXPERIMENT START
358
359                        runPingPong_body(done, nodes, nnodes, local, list);
360
361                        // EXPERIMENT END
362
363                        barrier.wait(tid);
364
365                        tally_stats(global, local);
366                }, i++);
367        }
368
369        waitfor(duration, barrier, done);
370
371        for(auto t : threads) {
372                t->join();
373                delete t;
374        }
375
376        enable_stats = false;
377
378        print_stats(duration, nthread, global);
379}
380
381bool iequals(const std::string& a, const std::string& b)
382{
383    return std::equal(a.begin(), a.end(),
384                      b.begin(), b.end(),
385                      [](char a, char b) {
386                          return std::tolower(a) == std::tolower(b);
387                      });
388}
389
390int main(int argc, char * argv[]) {
391
392        double duration   = 5.0;
393        unsigned nthreads = 2;
394        unsigned nqueues  = 4;
395        unsigned nnodes   = 100;
396        unsigned nslots   = 100;
397
398        enum {
399                Churn,
400                PingPong,
401                NONE
402        } benchmark = NONE;
403
404        std::cout.imbue(std::locale(""));
405
406        for(;;) {
407                static struct option options[] = {
408                        {"duration",  required_argument, 0, 'd'},
409                        {"nthreads",  required_argument, 0, 't'},
410                        {"nqueues",   required_argument, 0, 'q'},
411                        {"benchmark", required_argument, 0, 'b'},
412                        {0, 0, 0, 0}
413                };
414
415                int idx = 0;
416                int opt = getopt_long(argc, argv, "d:t:q:b:", options, &idx);
417
418                std::string arg = optarg ? optarg : "";
419                size_t len = 0;
420                switch(opt) {
421                        // Exit Case
422                        case -1:
423                                /* paranoid */ assert(optind <= argc);
424                                switch(benchmark) {
425                                case NONE:
426                                        std::cerr << "Must specify a benchmark" << std::endl;
427                                        goto usage;
428                                case PingPong:
429                                        nnodes = 1;
430                                        nslots = 1;
431                                        switch(argc - optind) {
432                                        case 0: break;
433                                        case 1:
434                                                try {
435                                                        arg = optarg = argv[optind];
436                                                        nnodes = stoul(optarg, &len);
437                                                        if(len != arg.size()) { throw std::invalid_argument(""); }
438                                                } catch(std::invalid_argument &) {
439                                                        std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl;
440                                                        goto usage;
441                                                }
442                                                break;
443                                        default:
444                                                std::cerr << "'PingPong' benchmark doesn't accept more than 2 extra arguments" << std::endl;
445                                                goto usage;
446                                        }
447                                        break;
448                                case Churn:
449                                        nnodes = 100;
450                                        nslots = 100;
451                                        switch(argc - optind) {
452                                        case 0: break;
453                                        case 1:
454                                                try {
455                                                        arg = optarg = argv[optind];
456                                                        nnodes = stoul(optarg, &len);
457                                                        if(len != arg.size()) { throw std::invalid_argument(""); }
458                                                        nslots = nnodes;
459                                                } catch(std::invalid_argument &) {
460                                                        std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl;
461                                                        goto usage;
462                                                }
463                                                break;
464                                        case 2:
465                                                try {
466                                                        arg = optarg = argv[optind];
467                                                        nnodes = stoul(optarg, &len);
468                                                        if(len != arg.size()) { throw std::invalid_argument(""); }
469                                                } catch(std::invalid_argument &) {
470                                                        std::cerr << "Number of nodes must be a positive integer, was " << arg << std::endl;
471                                                        goto usage;
472                                                }
473                                                try {
474                                                        arg = optarg = argv[optind + 1];
475                                                        nslots = stoul(optarg, &len);
476                                                        if(len != arg.size()) { throw std::invalid_argument(""); }
477                                                } catch(std::invalid_argument &) {
478                                                        std::cerr << "Number of slots must be a positive integer, was " << arg << std::endl;
479                                                        goto usage;
480                                                }
481                                                break;
482                                        default:
483                                                std::cerr << "'Churn' benchmark doesn't accept more than 2 extra arguments" << std::endl;
484                                                goto usage;
485                                        }
486                                        break;
487                                }
488                                goto run;
489                        // Benchmarks
490                        case 'b':
491                                if(benchmark != NONE) {
492                                        std::cerr << "Only when benchmark can be run" << std::endl;
493                                        goto usage;
494                                }
495                                if(iequals(arg, "churn")) {
496                                        benchmark = Churn;
497                                        break;
498                                }
499                                if(iequals(arg, "pingpong")) {
500                                        benchmark = PingPong;
501                                        break;
502                                }
503                                std::cerr << "Unkown benchmark " << arg << std::endl;
504                                goto usage;
505                        // Numeric Arguments
506                        case 'd':
507                                try {
508                                        duration = stod(optarg, &len);
509                                        if(len != arg.size()) { throw std::invalid_argument(""); }
510                                } catch(std::invalid_argument &) {
511                                        std::cerr << "Duration must be a valid double, was " << arg << std::endl;
512                                        goto usage;
513                                }
514                                break;
515                        case 't':
516                                try {
517                                        nthreads = stoul(optarg, &len);
518                                        if(len != arg.size()) { throw std::invalid_argument(""); }
519                                } catch(std::invalid_argument &) {
520                                        std::cerr << "Number of threads must be a positive integer, was " << arg << std::endl;
521                                        goto usage;
522                                }
523                                break;
524                        case 'q':
525                                try {
526                                        nqueues = stoul(optarg, &len);
527                                        if(len != arg.size()) { throw std::invalid_argument(""); }
528                                } catch(std::invalid_argument &) {
529                                        std::cerr << "Number of queues must be a positive integer, was " << arg << std::endl;
530                                        goto usage;
531                                }
532                                break;
533                        // Other cases
534                        default: /* ? */
535                                std::cerr << opt << std::endl;
536                        usage:
537                                std::cerr << "Usage: " << argv[0] << ": [options] -b churn [NNODES] [NSLOTS = NNODES]" << std::endl;
538                                std::cerr << "  or:  " << argv[0] << ": [options] -b pingpong [NNODES]" << std::endl;
539                                std::cerr << std::endl;
540                                std::cerr << "  -d, --duration=DURATION  Duration of the experiment, in seconds" << std::endl;
541                                std::cerr << "  -t, --nthreads=NTHREADS  Number of kernel threads" << std::endl;
542                                std::cerr << "  -q, --nqueues=NQUEUES    Number of queues per threads" << std::endl;
543                                std::exit(1);
544                }
545        }
546        run:
547
548        check_cache_line_size();
549
550        std::cout << "Running " << nthreads << " threads (" << (nthreads * nqueues) << " queues) for " << duration << " seconds" << std::endl;
551        switch(benchmark) {
552                case Churn:
553                        runChurn(nthreads, nqueues, duration, nnodes, nslots);
554                        break;
555                case PingPong:
556                        runPingPong(nthreads, nqueues, duration, nnodes);
557                        break;
558                default:
559                        abort();
560        }
561        return 0;
562}
563
564const char * __my_progname = "Relaxed List";
Note: See TracBrowser for help on using the repository browser.