source: libcfa/src/concurrency/ready_queue.cfa @ df7597e0

ADTast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since df7597e0 was df7597e0, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Fix the cpu-based ready to actually use cpu_info
rather then being a stupider work stealing approach.

  • Property mode set to 100644
File size: 27.4 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19// #define __CFA_DEBUG_PRINT_READY_QUEUE__
20
21
22// #define USE_RELAXED_FIFO
23// #define USE_WORK_STEALING
24
25#include "bits/defs.hfa"
26#include "device/cpu.hfa"
27#include "kernel_private.hfa"
28
29#include "stdlib.hfa"
30#include "math.hfa"
31
32#include <unistd.h>
33
34#include "ready_subqueue.hfa"
35
36static const size_t cache_line_size = 64;
37
38#if !defined(__CFA_NO_STATISTICS__)
39        #define __STATS(...) __VA_ARGS__
40#else
41        #define __STATS(...)
42#endif
43
44// No overriden function, no environment variable, no define
45// fall back to a magic number
46#ifndef __CFA_MAX_PROCESSORS__
47        #define __CFA_MAX_PROCESSORS__ 1024
48#endif
49
50#if   defined(USE_CPU_WORK_STEALING)
51        #define READYQ_SHARD_FACTOR 2
52#elif defined(USE_RELAXED_FIFO)
53        #define BIAS 4
54        #define READYQ_SHARD_FACTOR 4
55        #define SEQUENTIAL_SHARD 1
56#elif defined(USE_WORK_STEALING)
57        #define READYQ_SHARD_FACTOR 2
58        #define SEQUENTIAL_SHARD 2
59#else
60        #error no scheduling strategy selected
61#endif
62
63static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
64static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
65static inline struct $thread * search(struct cluster * cltr);
66static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
67
68
69// returns the maximum number of processors the RWLock support
70__attribute__((weak)) unsigned __max_processors() {
71        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
72        if(!max_cores_s) {
73                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
74                return __CFA_MAX_PROCESSORS__;
75        }
76
77        char * endptr = 0p;
78        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
79        if(max_cores_l < 1 || max_cores_l > 65535) {
80                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
81                return __CFA_MAX_PROCESSORS__;
82        }
83        if('\0' != *endptr) {
84                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
85                return __CFA_MAX_PROCESSORS__;
86        }
87
88        return max_cores_l;
89}
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
94void  ?{}(__scheduler_RWLock_t & this) {
95        this.max   = __max_processors();
96        this.alloc = 0;
97        this.ready = 0;
98        this.data  = alloc(this.max);
99        this.write_lock  = false;
100
101        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
102        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
103
104}
105void ^?{}(__scheduler_RWLock_t & this) {
106        free(this.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
112unsigned register_proc_id( void ) with(*__scheduler_lock) {
113        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
114        bool * handle = (bool *)&kernelTLS().sched_lock;
115
116        // Step - 1 : check if there is already space in the data
117        uint_fast32_t s = ready;
118
119        // Check among all the ready
120        for(uint_fast32_t i = 0; i < s; i++) {
121                bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122                /* paranoid */ verify( handle != *cell );
123
124                bool * null = 0p; // Re-write every loop since compare thrashes it
125                if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126                        && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127                        /* paranoid */ verify(i < ready);
128                        /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129                        return i;
130                }
131        }
132
133        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
134
135        // Step - 2 : F&A to get a new spot in the array.
136        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
137        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
138
139        // Step - 3 : Mark space as used and then publish it.
140        data[n] = handle;
141        while() {
142                unsigned copy = n;
143                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145                        break;
146                Pause();
147        }
148
149        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
150
151        // Return new spot.
152        /* paranoid */ verify(n < ready);
153        /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
154        return n;
155}
156
157void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
158        /* paranoid */ verify(id < ready);
159        /* paranoid */ verify(id == kernelTLS().sched_id);
160        /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
161
162        bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
163
164        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
165
166        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
167}
168
169//-----------------------------------------------------------------------
170// Writer side : acquire when changing the ready queue, e.g. adding more
171//  queues or removing them.
172uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
173        /* paranoid */ verify( ! __preemption_enabled() );
174        /* paranoid */ verify( ! kernelTLS().sched_lock );
175
176        // Step 1 : lock global lock
177        // It is needed to avoid processors that register mid Critical-Section
178        //   to simply lock their own lock and enter.
179        __atomic_acquire( &write_lock );
180
181        // Step 2 : lock per-proc lock
182        // Processors that are currently being registered aren't counted
183        //   but can't be in read_lock or in the critical section.
184        // All other processors are counted
185        uint_fast32_t s = ready;
186        for(uint_fast32_t i = 0; i < s; i++) {
187                volatile bool * llock = data[i];
188                if(llock) __atomic_acquire( llock );
189        }
190
191        /* paranoid */ verify( ! __preemption_enabled() );
192        return s;
193}
194
195void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
196        /* paranoid */ verify( ! __preemption_enabled() );
197
198        // Step 1 : release local locks
199        // This must be done while the global lock is held to avoid
200        //   threads that where created mid critical section
201        //   to race to lock their local locks and have the writer
202        //   immidiately unlock them
203        // Alternative solution : return s in write_lock and pass it to write_unlock
204        for(uint_fast32_t i = 0; i < last_s; i++) {
205                volatile bool * llock = data[i];
206                if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
207        }
208
209        // Step 2 : release global lock
210        /*paranoid*/ assert(true == write_lock);
211        __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
212
213        /* paranoid */ verify( ! __preemption_enabled() );
214}
215
216//=======================================================================
217// Cforall Ready Queue used for scheduling
218//=======================================================================
219void ?{}(__ready_queue_t & this) with (this) {
220        #if defined(USE_CPU_WORK_STEALING)
221                lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
222                lanes.data = alloc( lanes.count );
223                lanes.tscs = alloc( lanes.count );
224
225                for( idx; (size_t)lanes.count ) {
226                        (lanes.data[idx]){};
227                        lanes.tscs[idx].tv = rdtscl();
228                }
229        #else
230                lanes.data  = 0p;
231                lanes.tscs  = 0p;
232                lanes.count = 0;
233        #endif
234}
235
236void ^?{}(__ready_queue_t & this) with (this) {
237        #if !defined(USE_CPU_WORK_STEALING)
238                verify( SEQUENTIAL_SHARD == lanes.count );
239        #endif
240
241        free(lanes.data);
242        free(lanes.tscs);
243}
244
245//-----------------------------------------------------------------------
246#if defined(USE_CPU_WORK_STEALING)
247        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
248                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
249
250                processor * const proc = kernelTLS().this_processor;
251                const bool external = !push_local || (!proc) || (cltr != proc->cltr);
252
253                const int cpu = __kernel_getcpu();
254                /* paranoid */ verify(cpu >= 0);
255                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
256                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
257
258                const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
259                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
260                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
261                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
262
263                const int start = map.self * READYQ_SHARD_FACTOR;
264                unsigned i;
265                do {
266                        unsigned r;
267                        if(unlikely(external)) { r = __tls_rand(); }
268                        else { r = proc->rdq.its++; }
269                        i = start + (r % READYQ_SHARD_FACTOR);
270                        // If we can't lock it retry
271                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
272
273                // Actually push it
274                push(lanes.data[i], thrd);
275
276                // Unlock and return
277                __atomic_unlock( &lanes.data[i].lock );
278
279                #if !defined(__CFA_NO_STATISTICS__)
280                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
281                        else __tls_stats()->ready.push.local.success++;
282                #endif
283
284                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
285
286        }
287
288        // Pop from the ready queue from a given cluster
289        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
290                /* paranoid */ verify( lanes.count > 0 );
291                /* paranoid */ verify( kernelTLS().this_processor );
292
293                const int cpu = __kernel_getcpu();
294                /* paranoid */ verify(cpu >= 0);
295                /* paranoid */ verify(cpu < cpu_info.hthrd_count);
296                /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
297
298                const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
299                /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
300                /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
301                /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
302
303                processor * const proc = kernelTLS().this_processor;
304                const int start = map.self * READYQ_SHARD_FACTOR;
305
306                // Did we already have a help target
307                if(proc->rdq.target == -1u) {
308                        // if We don't have a
309                        unsigned long long min = ts(lanes.data[start]);
310                        for(i; READYQ_SHARD_FACTOR) {
311                                unsigned long long tsc = ts(lanes.data[start + i]);
312                                if(tsc < min) min = tsc;
313                        }
314                        proc->rdq.cutoff = min;
315                        proc->rdq.target = (map.start * READYQ_SHARD_FACTOR) + (__tls_rand() % (map.count* READYQ_SHARD_FACTOR));
316                }
317                else {
318                        const unsigned long long bias = 0; //2_500_000_000;
319                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
320                        {
321                                unsigned target = proc->rdq.target;
322                                proc->rdq.target = -1u;
323                                if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
324                                        $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
325                                        proc->rdq.last = target;
326                                        if(t) return t;
327                                }
328                        }
329
330                        unsigned last = proc->rdq.last;
331                        if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
332                                $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
333                                if(t) return t;
334                        }
335                        else {
336                                proc->rdq.last = -1u;
337                        }
338                }
339
340                for(READYQ_SHARD_FACTOR) {
341                        unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
342                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
343                }
344
345                // All lanes where empty return 0p
346                return 0p;
347        }
348
349        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
350                processor * const proc = kernelTLS().this_processor;
351                unsigned last = proc->rdq.last;
352
353                unsigned i = __tls_rand() % lanes.count;
354                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
355        }
356        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
357                return search(cltr);
358        }
359#endif
360#if defined(USE_RELAXED_FIFO)
361        //-----------------------------------------------------------------------
362        // get index from random number with or without bias towards queues
363        static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
364                unsigned i;
365                bool local;
366                unsigned rlow  = r % BIAS;
367                unsigned rhigh = r / BIAS;
368                if((0 != rlow) && preferred >= 0) {
369                        // (BIAS - 1) out of BIAS chances
370                        // Use perferred queues
371                        i = preferred + (rhigh % READYQ_SHARD_FACTOR);
372                        local = true;
373                }
374                else {
375                        // 1 out of BIAS chances
376                        // Use all queues
377                        i = rhigh;
378                        local = false;
379                }
380                return [i, local];
381        }
382
383        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
384                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
385
386                const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
387                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
388
389                bool local;
390                int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
391
392                // Try to pick a lane and lock it
393                unsigned i;
394                do {
395                        // Pick the index of a lane
396                        unsigned r = __tls_rand_fwd();
397                        [i, local] = idx_from_r(r, preferred);
398
399                        i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
400
401                        #if !defined(__CFA_NO_STATISTICS__)
402                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
403                                else if(local) __tls_stats()->ready.push.local.attempt++;
404                                else __tls_stats()->ready.push.share.attempt++;
405                        #endif
406
407                        // If we can't lock it retry
408                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
409
410                // Actually push it
411                push(lanes.data[i], thrd);
412
413                // Unlock and return
414                __atomic_unlock( &lanes.data[i].lock );
415
416                // Mark the current index in the tls rng instance as having an item
417                __tls_rand_advance_bck();
418
419                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
420
421                // Update statistics
422                #if !defined(__CFA_NO_STATISTICS__)
423                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
424                        else if(local) __tls_stats()->ready.push.local.success++;
425                        else __tls_stats()->ready.push.share.success++;
426                #endif
427        }
428
429        // Pop from the ready queue from a given cluster
430        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
431                /* paranoid */ verify( lanes.count > 0 );
432                /* paranoid */ verify( kernelTLS().this_processor );
433                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
434
435                unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
436                int preferred = kernelTLS().this_processor->rdq.id;
437
438
439                // As long as the list is not empty, try finding a lane that isn't empty and pop from it
440                for(25) {
441                        // Pick two lists at random
442                        unsigned ri = __tls_rand_bck();
443                        unsigned rj = __tls_rand_bck();
444
445                        unsigned i, j;
446                        __attribute__((unused)) bool locali, localj;
447                        [i, locali] = idx_from_r(ri, preferred);
448                        [j, localj] = idx_from_r(rj, preferred);
449
450                        i %= count;
451                        j %= count;
452
453                        // try popping from the 2 picked lists
454                        struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
455                        if(thrd) {
456                                return thrd;
457                        }
458                }
459
460                // All lanes where empty return 0p
461                return 0p;
462        }
463
464        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
465        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
466                return search(cltr);
467        }
468#endif
469#if defined(USE_WORK_STEALING)
470        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
471                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
472
473                // #define USE_PREFERRED
474                #if !defined(USE_PREFERRED)
475                const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
476                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
477                #else
478                        unsigned preferred = thrd->preferred;
479                        const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
480                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
481
482                        unsigned r = preferred % READYQ_SHARD_FACTOR;
483                        const unsigned start = preferred - r;
484                #endif
485
486                // Try to pick a lane and lock it
487                unsigned i;
488                do {
489                        #if !defined(__CFA_NO_STATISTICS__)
490                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
491                                else __tls_stats()->ready.push.local.attempt++;
492                        #endif
493
494                        if(unlikely(external)) {
495                                i = __tls_rand() % lanes.count;
496                        }
497                        else {
498                                #if !defined(USE_PREFERRED)
499                                        processor * proc = kernelTLS().this_processor;
500                                        unsigned r = proc->rdq.its++;
501                                        i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
502                                #else
503                                        i = start + (r++ % READYQ_SHARD_FACTOR);
504                                #endif
505                        }
506                        // If we can't lock it retry
507                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
508
509                // Actually push it
510                push(lanes.data[i], thrd);
511
512                // Unlock and return
513                __atomic_unlock( &lanes.data[i].lock );
514
515                #if !defined(__CFA_NO_STATISTICS__)
516                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
517                        else __tls_stats()->ready.push.local.success++;
518                #endif
519
520                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
521        }
522
523        // Pop from the ready queue from a given cluster
524        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
525                /* paranoid */ verify( lanes.count > 0 );
526                /* paranoid */ verify( kernelTLS().this_processor );
527                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
528
529                processor * proc = kernelTLS().this_processor;
530
531                if(proc->rdq.target == -1u) {
532                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
533                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
534                                unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
535                                if(tsc < min) min = tsc;
536                        }
537                        proc->rdq.cutoff = min;
538                        proc->rdq.target = __tls_rand() % lanes.count;
539                }
540                else {
541                        unsigned target = proc->rdq.target;
542                        proc->rdq.target = -1u;
543                        const unsigned long long bias = 0; //2_500_000_000;
544                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
545                        if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
546                                $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
547                                if(t) return t;
548                        }
549                }
550
551                for(READYQ_SHARD_FACTOR) {
552                        unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
553                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
554                }
555                return 0p;
556        }
557
558        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
559                unsigned i = __tls_rand() % lanes.count;
560                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
561        }
562
563        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
564                return search(cltr);
565        }
566#endif
567
568//=======================================================================
569// Various Ready Queue utilities
570//=======================================================================
571// these function work the same or almost the same
572// whether they are using work-stealing or relaxed fifo scheduling
573
574//-----------------------------------------------------------------------
575// try to pop from a lane given by index w
576static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
577        __STATS( stats.attempt++; )
578
579        // Get relevant elements locally
580        __intrusive_lane_t & lane = lanes.data[w];
581
582        // If list looks empty retry
583        if( is_empty(lane) ) {
584                return 0p;
585        }
586
587        // If we can't get the lock retry
588        if( !__atomic_try_acquire(&lane.lock) ) {
589                return 0p;
590        }
591
592        // If list is empty, unlock and retry
593        if( is_empty(lane) ) {
594                __atomic_unlock(&lane.lock);
595                return 0p;
596        }
597
598        // Actually pop the list
599        struct $thread * thrd;
600        unsigned long long tsv;
601        [thrd, tsv] = pop(lane);
602
603        /* paranoid */ verify(thrd);
604        /* paranoid */ verify(tsv);
605        /* paranoid */ verify(lane.lock);
606
607        // Unlock and return
608        __atomic_unlock(&lane.lock);
609
610        // Update statistics
611        __STATS( stats.success++; )
612
613        #if defined(USE_WORK_STEALING)
614                lanes.tscs[w].tv = tsv;
615        #endif
616
617        thrd->preferred = w;
618
619        // return the popped thread
620        return thrd;
621}
622
623//-----------------------------------------------------------------------
624// try to pop from any lanes making sure you don't miss any threads push
625// before the start of the function
626static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
627        /* paranoid */ verify( lanes.count > 0 );
628        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
629        unsigned offset = __tls_rand();
630        for(i; count) {
631                unsigned idx = (offset + i) % count;
632                struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
633                if(thrd) {
634                        return thrd;
635                }
636        }
637
638        // All lanes where empty return 0p
639        return 0p;
640}
641
642//-----------------------------------------------------------------------
643// Check that all the intrusive queues in the data structure are still consistent
644static void check( __ready_queue_t & q ) with (q) {
645        #if defined(__CFA_WITH_VERIFY__)
646                {
647                        for( idx ; lanes.count ) {
648                                __intrusive_lane_t & sl = lanes.data[idx];
649                                assert(!lanes.data[idx].lock);
650
651                                        if(is_empty(sl)) {
652                                                assert( sl.anchor.next == 0p );
653                                                assert( sl.anchor.ts   == 0  );
654                                                assert( mock_head(sl)  == sl.prev );
655                                        } else {
656                                                assert( sl.anchor.next != 0p );
657                                                assert( sl.anchor.ts   != 0  );
658                                                assert( mock_head(sl)  != sl.prev );
659                                        }
660                        }
661                }
662        #endif
663}
664
665//-----------------------------------------------------------------------
666// Given 2 indexes, pick the list with the oldest push an try to pop from it
667static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
668        // Pick the bet list
669        int w = i;
670        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
671                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
672        }
673
674        return try_pop(cltr, w __STATS(, stats));
675}
676
677// Call this function of the intrusive list was moved using memcpy
678// fixes the list so that the pointers back to anchors aren't left dangling
679static inline void fix(__intrusive_lane_t & ll) {
680                        if(is_empty(ll)) {
681                                verify(ll.anchor.next == 0p);
682                                ll.prev = mock_head(ll);
683                        }
684}
685
686static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
687        processor * it = &list`first;
688        for(unsigned i = 0; i < count; i++) {
689                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
690                it->rdq.id = value;
691                it->rdq.target = -1u;
692                value += READYQ_SHARD_FACTOR;
693                it = &(*it)`next;
694        }
695}
696
697static void reassign_cltr_id(struct cluster * cltr) {
698        unsigned preferred = 0;
699        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
700        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
701}
702
703static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
704        #if defined(USE_WORK_STEALING)
705                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
706                for(i; lanes.count) {
707                        unsigned long long tsc = ts(lanes.data[i]);
708                        lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
709                }
710        #endif
711}
712
713#if defined(USE_CPU_WORK_STEALING)
714        // ready_queue size is fixed in this case
715        void ready_queue_grow(struct cluster * cltr) {}
716        void ready_queue_shrink(struct cluster * cltr) {}
717#else
718        // Grow the ready queue
719        void ready_queue_grow(struct cluster * cltr) {
720                size_t ncount;
721                int target = cltr->procs.total;
722
723                /* paranoid */ verify( ready_mutate_islocked() );
724                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
725
726                // Make sure that everything is consistent
727                /* paranoid */ check( cltr->ready_queue );
728
729                // grow the ready queue
730                with( cltr->ready_queue ) {
731                        // Find new count
732                        // Make sure we always have atleast 1 list
733                        if(target >= 2) {
734                                ncount = target * READYQ_SHARD_FACTOR;
735                        } else {
736                                ncount = SEQUENTIAL_SHARD;
737                        }
738
739                        // Allocate new array (uses realloc and memcpies the data)
740                        lanes.data = alloc( ncount, lanes.data`realloc );
741
742                        // Fix the moved data
743                        for( idx; (size_t)lanes.count ) {
744                                fix(lanes.data[idx]);
745                        }
746
747                        // Construct new data
748                        for( idx; (size_t)lanes.count ~ ncount) {
749                                (lanes.data[idx]){};
750                        }
751
752                        // Update original
753                        lanes.count = ncount;
754                }
755
756                fix_times(cltr);
757
758                reassign_cltr_id(cltr);
759
760                // Make sure that everything is consistent
761                /* paranoid */ check( cltr->ready_queue );
762
763                __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
764
765                /* paranoid */ verify( ready_mutate_islocked() );
766        }
767
768        // Shrink the ready queue
769        void ready_queue_shrink(struct cluster * cltr) {
770                /* paranoid */ verify( ready_mutate_islocked() );
771                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
772
773                // Make sure that everything is consistent
774                /* paranoid */ check( cltr->ready_queue );
775
776                int target = cltr->procs.total;
777
778                with( cltr->ready_queue ) {
779                        // Remember old count
780                        size_t ocount = lanes.count;
781
782                        // Find new count
783                        // Make sure we always have atleast 1 list
784                        lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
785                        /* paranoid */ verify( ocount >= lanes.count );
786                        /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
787
788                        // for printing count the number of displaced threads
789                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
790                                __attribute__((unused)) size_t displaced = 0;
791                        #endif
792
793                        // redistribute old data
794                        for( idx; (size_t)lanes.count ~ ocount) {
795                                // Lock is not strictly needed but makes checking invariants much easier
796                                __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
797                                verify(locked);
798
799                                // As long as we can pop from this lane to push the threads somewhere else in the queue
800                                while(!is_empty(lanes.data[idx])) {
801                                        struct $thread * thrd;
802                                        unsigned long long _;
803                                        [thrd, _] = pop(lanes.data[idx]);
804
805                                        push(cltr, thrd, true);
806
807                                        // for printing count the number of displaced threads
808                                        #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
809                                                displaced++;
810                                        #endif
811                                }
812
813                                // Unlock the lane
814                                __atomic_unlock(&lanes.data[idx].lock);
815
816                                // TODO print the queue statistics here
817
818                                ^(lanes.data[idx]){};
819                        }
820
821                        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
822
823                        // Allocate new array (uses realloc and memcpies the data)
824                        lanes.data = alloc( lanes.count, lanes.data`realloc );
825
826                        // Fix the moved data
827                        for( idx; (size_t)lanes.count ) {
828                                fix(lanes.data[idx]);
829                        }
830                }
831
832                fix_times(cltr);
833
834                reassign_cltr_id(cltr);
835
836                // Make sure that everything is consistent
837                /* paranoid */ check( cltr->ready_queue );
838
839                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
840                /* paranoid */ verify( ready_mutate_islocked() );
841        }
842#endif
843
844#if !defined(__CFA_NO_STATISTICS__)
845        unsigned cnt(const __ready_queue_t & this, unsigned idx) {
846                /* paranoid */ verify(this.lanes.count > idx);
847                return this.lanes.data[idx].cnt;
848        }
849#endif
Note: See TracBrowser for help on using the repository browser.