source: libcfa/src/concurrency/ready_queue.cfa @ 3c4bf05

ADTast-experimentalenumpthread-emulationqualifiedEnum
Last change on this file since 3c4bf05 was 3c4bf05, checked in by Thierry Delisle <tdelisle@…>, 2 years ago

Removed old scheduler algorithms which weren't as good.

  • Property mode set to 100644
File size: 23.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19// #define __CFA_DEBUG_PRINT_READY_QUEUE__
20
21
22#define USE_AWARE_STEALING
23
24#include "bits/defs.hfa"
25#include "device/cpu.hfa"
26#include "kernel_private.hfa"
27
28#include "stdlib.hfa"
29#include "limits.hfa"
30#include "math.hfa"
31
32#include <errno.h>
33#include <unistd.h>
34
35extern "C" {
36        #include <sys/syscall.h>  // __NR_xxx
37}
38
39#include "ready_subqueue.hfa"
40
41static const size_t cache_line_size = 64;
42
43#if !defined(__CFA_NO_STATISTICS__)
44        #define __STATS(...) __VA_ARGS__
45#else
46        #define __STATS(...)
47#endif
48
49// No overriden function, no environment variable, no define
50// fall back to a magic number
51#ifndef __CFA_MAX_PROCESSORS__
52        #define __CFA_MAX_PROCESSORS__ 1024
53#endif
54
55#define READYQ_SHARD_FACTOR 2
56#define SEQUENTIAL_SHARD 2
57
58static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
59static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
60static inline struct thread$ * search(struct cluster * cltr);
61
62
63// returns the maximum number of processors the RWLock support
64__attribute__((weak)) unsigned __max_processors() {
65        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
66        if(!max_cores_s) {
67                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
68                return __CFA_MAX_PROCESSORS__;
69        }
70
71        char * endptr = 0p;
72        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
73        if(max_cores_l < 1 || max_cores_l > 65535) {
74                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
75                return __CFA_MAX_PROCESSORS__;
76        }
77        if('\0' != *endptr) {
78                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
79                return __CFA_MAX_PROCESSORS__;
80        }
81
82        return max_cores_l;
83}
84
85#if   defined(CFA_HAVE_LINUX_LIBRSEQ)
86        // No forward declaration needed
87        #define __kernel_rseq_register rseq_register_current_thread
88        #define __kernel_rseq_unregister rseq_unregister_current_thread
89#elif defined(CFA_HAVE_LINUX_RSEQ_H)
90        static void __kernel_raw_rseq_register  (void);
91        static void __kernel_raw_rseq_unregister(void);
92
93        #define __kernel_rseq_register __kernel_raw_rseq_register
94        #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
95#else
96        // No forward declaration needed
97        // No initialization needed
98        static inline void noop(void) {}
99
100        #define __kernel_rseq_register noop
101        #define __kernel_rseq_unregister noop
102#endif
103
104//=======================================================================
105// Cluster wide reader-writer lock
106//=======================================================================
107void  ?{}(__scheduler_RWLock_t & this) {
108        this.max   = __max_processors();
109        this.alloc = 0;
110        this.ready = 0;
111        this.data  = alloc(this.max);
112        this.write_lock  = false;
113
114        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
115        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
116
117}
118void ^?{}(__scheduler_RWLock_t & this) {
119        free(this.data);
120}
121
122
123//=======================================================================
124// Lock-Free registering/unregistering of threads
125unsigned register_proc_id( void ) with(*__scheduler_lock) {
126        __kernel_rseq_register();
127
128        bool * handle = (bool *)&kernelTLS().sched_lock;
129
130        // Step - 1 : check if there is already space in the data
131        uint_fast32_t s = ready;
132
133        // Check among all the ready
134        for(uint_fast32_t i = 0; i < s; i++) {
135                bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
136                /* paranoid */ verify( handle != *cell );
137
138                bool * null = 0p; // Re-write every loop since compare thrashes it
139                if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
140                        && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
141                        /* paranoid */ verify(i < ready);
142                        /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
143                        return i;
144                }
145        }
146
147        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
148
149        // Step - 2 : F&A to get a new spot in the array.
150        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
151        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
152
153        // Step - 3 : Mark space as used and then publish it.
154        data[n] = handle;
155        while() {
156                unsigned copy = n;
157                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
158                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
159                        break;
160                Pause();
161        }
162
163        // Return new spot.
164        /* paranoid */ verify(n < ready);
165        /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
166        return n;
167}
168
169void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
170        /* paranoid */ verify(id < ready);
171        /* paranoid */ verify(id == kernelTLS().sched_id);
172        /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
173
174        bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
175
176        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
177
178        __kernel_rseq_unregister();
179}
180
181//-----------------------------------------------------------------------
182// Writer side : acquire when changing the ready queue, e.g. adding more
183//  queues or removing them.
184uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
185        /* paranoid */ verify( ! __preemption_enabled() );
186
187        // Step 1 : lock global lock
188        // It is needed to avoid processors that register mid Critical-Section
189        //   to simply lock their own lock and enter.
190        __atomic_acquire( &write_lock );
191
192        // Make sure we won't deadlock ourself
193        // Checking before acquiring the writer lock isn't safe
194        // because someone else could have locked us.
195        /* paranoid */ verify( ! kernelTLS().sched_lock );
196
197        // Step 2 : lock per-proc lock
198        // Processors that are currently being registered aren't counted
199        //   but can't be in read_lock or in the critical section.
200        // All other processors are counted
201        uint_fast32_t s = ready;
202        for(uint_fast32_t i = 0; i < s; i++) {
203                volatile bool * llock = data[i];
204                if(llock) __atomic_acquire( llock );
205        }
206
207        /* paranoid */ verify( ! __preemption_enabled() );
208        return s;
209}
210
211void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
212        /* paranoid */ verify( ! __preemption_enabled() );
213
214        // Step 1 : release local locks
215        // This must be done while the global lock is held to avoid
216        //   threads that where created mid critical section
217        //   to race to lock their local locks and have the writer
218        //   immidiately unlock them
219        // Alternative solution : return s in write_lock and pass it to write_unlock
220        for(uint_fast32_t i = 0; i < last_s; i++) {
221                volatile bool * llock = data[i];
222                if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
223        }
224
225        // Step 2 : release global lock
226        /*paranoid*/ assert(true == write_lock);
227        __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
228
229        /* paranoid */ verify( ! __preemption_enabled() );
230}
231
232//=======================================================================
233// Cforall Ready Queue used for scheduling
234//=======================================================================
235unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) {
236        /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );
237        /* paranoid */ verifyf( instsc  < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );
238        /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg );
239
240        const unsigned long long new_val = currtsc > instsc ? currtsc - instsc : 0;
241        const unsigned long long total_weight = 16;
242        const unsigned long long new_weight   = 4;
243        const unsigned long long old_weight = total_weight - new_weight;
244        const unsigned long long ret = ((new_weight * new_val) + (old_weight * old_avg)) / total_weight;
245        return ret;
246}
247
248void ?{}(__ready_queue_t & this) with (this) {
249        lanes.data   = 0p;
250        lanes.tscs   = 0p;
251        lanes.caches = 0p;
252        lanes.help   = 0p;
253        lanes.count  = 0;
254}
255
256void ^?{}(__ready_queue_t & this) with (this) {
257        free(lanes.data);
258        free(lanes.tscs);
259        free(lanes.caches);
260        free(lanes.help);
261}
262
263//-----------------------------------------------------------------------
264__attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
265        processor * const proc = kernelTLS().this_processor;
266        const bool external = (!proc) || (cltr != proc->cltr);
267        const bool remote   = hint == UNPARK_REMOTE;
268
269        unsigned i;
270        if( external || remote ) {
271                // Figure out where thread was last time and make sure it's valid
272                /* paranoid */ verify(thrd->preferred >= 0);
273                if(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count) {
274                        /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count);
275                        unsigned start = thrd->preferred * READYQ_SHARD_FACTOR;
276                        do {
277                                unsigned r = __tls_rand();
278                                i = start + (r % READYQ_SHARD_FACTOR);
279                                /* paranoid */ verify( i < lanes.count );
280                                // If we can't lock it retry
281                        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
282                } else {
283                        do {
284                                i = __tls_rand() % lanes.count;
285                        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
286                }
287        } else {
288                do {
289                        unsigned r = proc->rdq.its++;
290                        i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
291                        /* paranoid */ verify( i < lanes.count );
292                        // If we can't lock it retry
293                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
294        }
295
296        // Actually push it
297        push(lanes.data[i], thrd);
298
299        // Unlock and return
300        __atomic_unlock( &lanes.data[i].lock );
301
302        #if !defined(__CFA_NO_STATISTICS__)
303                if(unlikely(external || remote)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
304                else __tls_stats()->ready.push.local.success++;
305        #endif
306}
307
308static inline unsigned long long calc_cutoff(const unsigned long long ctsc, const processor * proc, __ready_queue_t & rdq) {
309        unsigned start = proc->rdq.id;
310        unsigned long long max = 0;
311        for(i; READYQ_SHARD_FACTOR) {
312                unsigned long long ptsc = ts(rdq.lanes.data[start + i]);
313                if(ptsc != -1ull) {
314                        /* paranoid */ verify( start + i < rdq.lanes.count );
315                        unsigned long long tsc = moving_average(ctsc, ptsc, rdq.lanes.tscs[start + i].ma);
316                        if(tsc > max) max = tsc;
317                }
318        }
319        return (max + 2 * max) / 2;
320}
321
322__attribute__((hot)) struct thread$ * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
323        /* paranoid */ verify( lanes.count > 0 );
324        /* paranoid */ verify( kernelTLS().this_processor );
325        /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
326
327        processor * const proc = kernelTLS().this_processor;
328        unsigned this = proc->rdq.id;
329        /* paranoid */ verify( this < lanes.count );
330        __cfadbg_print_safe(ready_queue, "Kernel : pop from %u\n", this);
331
332        // Figure out the current cpu and make sure it is valid
333        const int cpu = __kernel_getcpu();
334        /* paranoid */ verify(cpu >= 0);
335        /* paranoid */ verify(cpu < cpu_info.hthrd_count);
336        unsigned this_cache = cpu_info.llc_map[cpu].cache;
337
338        // Super important: don't write the same value over and over again
339        // We want to maximise our chances that his particular values stays in cache
340        if(lanes.caches[this / READYQ_SHARD_FACTOR].id != this_cache)
341                __atomic_store_n(&lanes.caches[this / READYQ_SHARD_FACTOR].id, this_cache, __ATOMIC_RELAXED);
342
343        const unsigned long long ctsc = rdtscl();
344
345        if(proc->rdq.target == MAX) {
346                uint64_t chaos = __tls_rand();
347                unsigned ext = chaos & 0xff;
348                unsigned other  = (chaos >> 8) % (lanes.count);
349
350                if(ext < 3 || __atomic_load_n(&lanes.caches[other / READYQ_SHARD_FACTOR].id, __ATOMIC_RELAXED) == this_cache) {
351                        proc->rdq.target = other;
352                }
353        }
354        else {
355                const unsigned target = proc->rdq.target;
356                __cfadbg_print_safe(ready_queue, "Kernel : %u considering helping %u, tcsc %llu\n", this, target, lanes.tscs[target].tv);
357                /* paranoid */ verify( lanes.tscs[target].tv != MAX );
358                if(target < lanes.count) {
359                        const unsigned long long cutoff = calc_cutoff(ctsc, proc, cltr->ready_queue);
360                        const unsigned long long age = moving_average(ctsc, lanes.tscs[target].tv, lanes.tscs[target].ma);
361                        __cfadbg_print_safe(ready_queue, "Kernel : Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, this, age, cutoff, age > cutoff ? "yes" : "no");
362                        if(age > cutoff) {
363                                thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
364                                if(t) return t;
365                        }
366                }
367                proc->rdq.target = MAX;
368        }
369
370        for(READYQ_SHARD_FACTOR) {
371                unsigned i = this + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
372                if(thread$ * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
373        }
374
375        // All lanes where empty return 0p
376        return 0p;
377
378}
379__attribute__((hot)) struct thread$ * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
380        unsigned i = __tls_rand() % lanes.count;
381        return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
382}
383__attribute__((hot)) struct thread$ * pop_search(struct cluster * cltr) {
384        return search(cltr);
385}
386
387//=======================================================================
388// Various Ready Queue utilities
389//=======================================================================
390// these function work the same or almost the same
391// whether they are using work-stealing or relaxed fifo scheduling
392
393//-----------------------------------------------------------------------
394// try to pop from a lane given by index w
395static inline struct thread$ * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
396        /* paranoid */ verify( w < lanes.count );
397        __STATS( stats.attempt++; )
398
399        // Get relevant elements locally
400        __intrusive_lane_t & lane = lanes.data[w];
401
402        // If list looks empty retry
403        if( is_empty(lane) ) {
404                return 0p;
405        }
406
407        // If we can't get the lock retry
408        if( !__atomic_try_acquire(&lane.lock) ) {
409                return 0p;
410        }
411
412        // If list is empty, unlock and retry
413        if( is_empty(lane) ) {
414                __atomic_unlock(&lane.lock);
415                return 0p;
416        }
417
418        // Actually pop the list
419        struct thread$ * thrd;
420        unsigned long long tsc_before = ts(lane);
421        unsigned long long tsv;
422        [thrd, tsv] = pop(lane);
423
424        /* paranoid */ verify(thrd);
425        /* paranoid */ verify(tsv);
426        /* paranoid */ verify(lane.lock);
427
428        // Unlock and return
429        __atomic_unlock(&lane.lock);
430
431        // Update statistics
432        __STATS( stats.success++; )
433
434        if (tsv != MAX) {
435                unsigned long long now = rdtscl();
436                unsigned long long pma = __atomic_load_n(&lanes.tscs[w].ma, __ATOMIC_RELAXED);
437                __atomic_store_n(&lanes.tscs[w].tv, tsv, __ATOMIC_RELAXED);
438                __atomic_store_n(&lanes.tscs[w].ma, moving_average(now, tsc_before, pma), __ATOMIC_RELAXED);
439        }
440
441        thrd->preferred = w / READYQ_SHARD_FACTOR;
442
443        // return the popped thread
444        return thrd;
445}
446
447//-----------------------------------------------------------------------
448// try to pop from any lanes making sure you don't miss any threads push
449// before the start of the function
450static inline struct thread$ * search(struct cluster * cltr) with (cltr->ready_queue) {
451        /* paranoid */ verify( lanes.count > 0 );
452        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
453        unsigned offset = __tls_rand();
454        for(i; count) {
455                unsigned idx = (offset + i) % count;
456                struct thread$ * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
457                if(thrd) {
458                        return thrd;
459                }
460        }
461
462        // All lanes where empty return 0p
463        return 0p;
464}
465
466//-----------------------------------------------------------------------
467// get preferred ready for new thread
468unsigned ready_queue_new_preferred() {
469        unsigned pref = MAX;
470        if(struct thread$ * thrd = publicTLS_get( this_thread )) {
471                pref = thrd->preferred;
472        }
473
474        return pref;
475}
476
477//-----------------------------------------------------------------------
478// Check that all the intrusive queues in the data structure are still consistent
479static void check( __ready_queue_t & q ) with (q) {
480        #if defined(__CFA_WITH_VERIFY__)
481                {
482                        for( idx ; lanes.count ) {
483                                __intrusive_lane_t & sl = lanes.data[idx];
484                                assert(!lanes.data[idx].lock);
485
486                                        if(is_empty(sl)) {
487                                                assert( sl.anchor.next == 0p );
488                                                assert( sl.anchor.ts   == -1llu );
489                                                assert( mock_head(sl)  == sl.prev );
490                                        } else {
491                                                assert( sl.anchor.next != 0p );
492                                                assert( sl.anchor.ts   != -1llu );
493                                                assert( mock_head(sl)  != sl.prev );
494                                        }
495                        }
496                }
497        #endif
498}
499
500//-----------------------------------------------------------------------
501// Given 2 indexes, pick the list with the oldest push an try to pop from it
502static inline struct thread$ * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
503        // Pick the bet list
504        int w = i;
505        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
506                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
507        }
508
509        return try_pop(cltr, w __STATS(, stats));
510}
511
512// Call this function of the intrusive list was moved using memcpy
513// fixes the list so that the pointers back to anchors aren't left dangling
514static inline void fix(__intrusive_lane_t & ll) {
515                        if(is_empty(ll)) {
516                                verify(ll.anchor.next == 0p);
517                                ll.prev = mock_head(ll);
518                        }
519}
520
521static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
522        processor * it = &list`first;
523        for(unsigned i = 0; i < count; i++) {
524                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
525                it->rdq.id = value;
526                it->rdq.target = MAX;
527                value += READYQ_SHARD_FACTOR;
528                it = &(*it)`next;
529        }
530}
531
532static void reassign_cltr_id(struct cluster * cltr) {
533        unsigned preferred = 0;
534        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
535        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
536}
537
538static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
539        lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
540        for(i; lanes.count) {
541                lanes.tscs[i].tv = rdtscl();
542                lanes.tscs[i].ma = 0;
543        }
544}
545
546// Grow the ready queue
547void ready_queue_grow(struct cluster * cltr) {
548        size_t ncount;
549        int target = cltr->procs.total;
550
551        /* paranoid */ verify( ready_mutate_islocked() );
552        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
553
554        // Make sure that everything is consistent
555        /* paranoid */ check( cltr->ready_queue );
556
557        // grow the ready queue
558        with( cltr->ready_queue ) {
559                // Find new count
560                // Make sure we always have atleast 1 list
561                if(target >= 2) {
562                        ncount = target * READYQ_SHARD_FACTOR;
563                } else {
564                        ncount = SEQUENTIAL_SHARD;
565                }
566
567                // Allocate new array (uses realloc and memcpies the data)
568                lanes.data = alloc( ncount, lanes.data`realloc );
569
570                // Fix the moved data
571                for( idx; (size_t)lanes.count ) {
572                        fix(lanes.data[idx]);
573                }
574
575                // Construct new data
576                for( idx; (size_t)lanes.count ~ ncount) {
577                        (lanes.data[idx]){};
578                }
579
580                // Update original
581                lanes.count = ncount;
582
583                lanes.caches = alloc( target, lanes.caches`realloc );
584        }
585
586        fix_times(cltr);
587
588        reassign_cltr_id(cltr);
589
590        // Make sure that everything is consistent
591        /* paranoid */ check( cltr->ready_queue );
592
593        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
594
595        /* paranoid */ verify( ready_mutate_islocked() );
596}
597
598// Shrink the ready queue
599void ready_queue_shrink(struct cluster * cltr) {
600        /* paranoid */ verify( ready_mutate_islocked() );
601        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
602
603        // Make sure that everything is consistent
604        /* paranoid */ check( cltr->ready_queue );
605
606        int target = cltr->procs.total;
607
608        with( cltr->ready_queue ) {
609                // Remember old count
610                size_t ocount = lanes.count;
611
612                // Find new count
613                // Make sure we always have atleast 1 list
614                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
615                /* paranoid */ verify( ocount >= lanes.count );
616                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
617
618                // for printing count the number of displaced threads
619                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
620                        __attribute__((unused)) size_t displaced = 0;
621                #endif
622
623                // redistribute old data
624                for( idx; (size_t)lanes.count ~ ocount) {
625                        // Lock is not strictly needed but makes checking invariants much easier
626                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
627                        verify(locked);
628
629                        // As long as we can pop from this lane to push the threads somewhere else in the queue
630                        while(!is_empty(lanes.data[idx])) {
631                                struct thread$ * thrd;
632                                unsigned long long _;
633                                [thrd, _] = pop(lanes.data[idx]);
634
635                                push(cltr, thrd, true);
636
637                                // for printing count the number of displaced threads
638                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
639                                        displaced++;
640                                #endif
641                        }
642
643                        // Unlock the lane
644                        __atomic_unlock(&lanes.data[idx].lock);
645
646                        // TODO print the queue statistics here
647
648                        ^(lanes.data[idx]){};
649                }
650
651                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
652
653                // Allocate new array (uses realloc and memcpies the data)
654                lanes.data = alloc( lanes.count, lanes.data`realloc );
655
656                // Fix the moved data
657                for( idx; (size_t)lanes.count ) {
658                        fix(lanes.data[idx]);
659                }
660
661                lanes.caches = alloc( target, lanes.caches`realloc );
662        }
663
664        fix_times(cltr);
665
666
667        reassign_cltr_id(cltr);
668
669        // Make sure that everything is consistent
670        /* paranoid */ check( cltr->ready_queue );
671
672        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
673        /* paranoid */ verify( ready_mutate_islocked() );
674}
675
676#if !defined(__CFA_NO_STATISTICS__)
677        unsigned cnt(const __ready_queue_t & this, unsigned idx) {
678                /* paranoid */ verify(this.lanes.count > idx);
679                return this.lanes.data[idx].cnt;
680        }
681#endif
682
683
684#if   defined(CFA_HAVE_LINUX_LIBRSEQ)
685        // No definition needed
686#elif defined(CFA_HAVE_LINUX_RSEQ_H)
687
688        #if defined( __x86_64 ) || defined( __i386 )
689                #define RSEQ_SIG        0x53053053
690        #elif defined( __ARM_ARCH )
691                #ifdef __ARMEB__
692                #define RSEQ_SIG    0xf3def5e7      /* udf    #24035    ; 0x5de3 (ARMv6+) */
693                #else
694                #define RSEQ_SIG    0xe7f5def3      /* udf    #24035    ; 0x5de3 */
695                #endif
696        #endif
697
698        extern void __disable_interrupts_hard();
699        extern void __enable_interrupts_hard();
700
701        static void __kernel_raw_rseq_register  (void) {
702                /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
703
704                // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
705                int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
706                if(ret != 0) {
707                        int e = errno;
708                        switch(e) {
709                        case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
710                        case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
711                        case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
712                        case EBUSY : abort("KERNEL ERROR: rseq register already registered");
713                        case EPERM : abort("KERNEL ERROR: rseq register sig  argument  on unregistration does not match the signature received on registration");
714                        default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
715                        }
716                }
717        }
718
719        static void __kernel_raw_rseq_unregister(void) {
720                /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
721
722                // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
723                int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
724                if(ret != 0) {
725                        int e = errno;
726                        switch(e) {
727                        case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
728                        case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
729                        case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
730                        case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
731                        case EPERM : abort("KERNEL ERROR: rseq unregister sig  argument  on unregistration does not match the signature received on registration");
732                        default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
733                        }
734                }
735        }
736#else
737        // No definition needed
738#endif
Note: See TracBrowser for help on using the repository browser.