source: libcfa/src/concurrency/ready_queue.cfa @ 6abcb4d

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 6abcb4d was 9cc3a18, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Major clean-up before attempting to add new scheduler

  • Property mode set to 100644
File size: 18.6 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19// #define USE_MPSC
20
21#define USE_RELAXED_FIFO
22// #define USE_WORK_STEALING
23
24#include "bits/defs.hfa"
25#include "kernel_private.hfa"
26
27#define _GNU_SOURCE
28#include "stdlib.hfa"
29#include "math.hfa"
30
31#include <unistd.h>
32
33#include "ready_subqueue.hfa"
34
35static const size_t cache_line_size = 64;
36
37// No overriden function, no environment variable, no define
38// fall back to a magic number
39#ifndef __CFA_MAX_PROCESSORS__
40        #define __CFA_MAX_PROCESSORS__ 1024
41#endif
42
43#if   defined(USE_RELAXED_FIFO)
44        #define BIAS 4
45        #define READYQ_SHARD_FACTOR 4
46#elif defined(USE_WORK_STEALING)
47        #define READYQ_SHARD_FACTOR 2
48#else
49        #error no scheduling strategy selected
50#endif
51
52static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
53static inline struct $thread * try_pop(struct cluster * cltr, unsigned w);
54static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
55
56
57// returns the maximum number of processors the RWLock support
58__attribute__((weak)) unsigned __max_processors() {
59        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
60        if(!max_cores_s) {
61                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
62                return __CFA_MAX_PROCESSORS__;
63        }
64
65        char * endptr = 0p;
66        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
67        if(max_cores_l < 1 || max_cores_l > 65535) {
68                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
69                return __CFA_MAX_PROCESSORS__;
70        }
71        if('\0' != *endptr) {
72                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
73                return __CFA_MAX_PROCESSORS__;
74        }
75
76        return max_cores_l;
77}
78
79//=======================================================================
80// Cluster wide reader-writer lock
81//=======================================================================
82void  ?{}(__scheduler_RWLock_t & this) {
83        this.max   = __max_processors();
84        this.alloc = 0;
85        this.ready = 0;
86        this.lock  = false;
87        this.data  = alloc(this.max);
88
89        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
90        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
91        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
92        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
93
94}
95void ^?{}(__scheduler_RWLock_t & this) {
96        free(this.data);
97}
98
99void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
100        this.handle = proc;
101        this.lock   = false;
102        #ifdef __CFA_WITH_VERIFY__
103                this.owned  = false;
104        #endif
105}
106
107//=======================================================================
108// Lock-Free registering/unregistering of threads
109void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
110        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
111
112        // Step - 1 : check if there is already space in the data
113        uint_fast32_t s = ready;
114
115        // Check among all the ready
116        for(uint_fast32_t i = 0; i < s; i++) {
117                __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
118                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
119                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
120                        /*paranoid*/ verify(i < ready);
121                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
122                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
123                        proc->id = i;
124                }
125        }
126
127        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
128
129        // Step - 2 : F&A to get a new spot in the array.
130        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
131        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
132
133        // Step - 3 : Mark space as used and then publish it.
134        __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
135        (*storage){ proc };
136        while() {
137                unsigned copy = n;
138                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
139                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
140                        break;
141                Pause();
142        }
143
144        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
145
146        // Return new spot.
147        /*paranoid*/ verify(n < ready);
148        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
149        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
150        proc->id = n;
151}
152
153void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
154        unsigned id = proc->id;
155        /*paranoid*/ verify(id < ready);
156        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
157        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
158
159        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
160}
161
162//-----------------------------------------------------------------------
163// Writer side : acquire when changing the ready queue, e.g. adding more
164//  queues or removing them.
165uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
166        /* paranoid */ verify( ! __preemption_enabled() );
167
168        // Step 1 : lock global lock
169        // It is needed to avoid processors that register mid Critical-Section
170        //   to simply lock their own lock and enter.
171        __atomic_acquire( &lock );
172
173        // Step 2 : lock per-proc lock
174        // Processors that are currently being registered aren't counted
175        //   but can't be in read_lock or in the critical section.
176        // All other processors are counted
177        uint_fast32_t s = ready;
178        for(uint_fast32_t i = 0; i < s; i++) {
179                __atomic_acquire( &data[i].lock );
180        }
181
182        /* paranoid */ verify( ! __preemption_enabled() );
183        return s;
184}
185
186void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
187        /* paranoid */ verify( ! __preemption_enabled() );
188
189        // Step 1 : release local locks
190        // This must be done while the global lock is held to avoid
191        //   threads that where created mid critical section
192        //   to race to lock their local locks and have the writer
193        //   immidiately unlock them
194        // Alternative solution : return s in write_lock and pass it to write_unlock
195        for(uint_fast32_t i = 0; i < last_s; i++) {
196                verify(data[i].lock);
197                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
198        }
199
200        // Step 2 : release global lock
201        /*paranoid*/ assert(true == lock);
202        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
203
204        /* paranoid */ verify( ! __preemption_enabled() );
205}
206
207//=======================================================================
208// Cforall Ready Queue used for scheduling
209//=======================================================================
210void ?{}(__ready_queue_t & this) with (this) {
211        lanes.data  = 0p;
212        lanes.tscs  = 0p;
213        lanes.count = 0;
214}
215
216void ^?{}(__ready_queue_t & this) with (this) {
217        verify( 1 == lanes.count );
218        free(lanes.data);
219        free(lanes.tscs);
220}
221
222//-----------------------------------------------------------------------
223__attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
224        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
225
226        const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
227        /* paranoid */ verify(external || kernelTLS().this_processor->cltr_id < lanes.count );
228
229        // write timestamp
230        thrd->link.ts = rdtscl();
231
232        bool local;
233        int preferred = external ? -1 : kernelTLS().this_processor->cltr_id;
234
235        // Try to pick a lane and lock it
236        unsigned i;
237        do {
238                // Pick the index of a lane
239                unsigned r = __tls_rand_fwd();
240                [i, local] = idx_from_r(r, preferred);
241
242                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
243
244                #if !defined(__CFA_NO_STATISTICS__)
245                        if(external) {
246                                if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.local, 1, __ATOMIC_RELAXED);
247                                __atomic_fetch_add(&cltr->stats->ready.pick.ext.attempt, 1, __ATOMIC_RELAXED);
248                        }
249                        else {
250                                if(local) __tls_stats()->ready.pick.push.local++;
251                                __tls_stats()->ready.pick.push.attempt++;
252                        }
253                #endif
254
255        #if defined(USE_MPSC)
256                // mpsc always succeeds
257        } while( false );
258        #else
259                // If we can't lock it retry
260        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
261        #endif
262
263        // Actually push it
264        push(lanes.data[i], thrd);
265
266        #if !defined(USE_MPSC)
267                // Unlock and return
268                __atomic_unlock( &lanes.data[i].lock );
269        #endif
270
271        // Mark the current index in the tls rng instance as having an item
272        __tls_rand_advance_bck();
273
274        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
275
276        // Update statistics
277        #if !defined(__CFA_NO_STATISTICS__)
278                if(external) {
279                        if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.lsuccess, 1, __ATOMIC_RELAXED);
280                        __atomic_fetch_add(&cltr->stats->ready.pick.ext.success, 1, __ATOMIC_RELAXED);
281                }
282                else {
283                        if(local) __tls_stats()->ready.pick.push.lsuccess++;
284                        __tls_stats()->ready.pick.push.success++;
285                }
286        #endif
287}
288
289// Pop from the ready queue from a given cluster
290__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
291        /* paranoid */ verify( lanes.count > 0 );
292        /* paranoid */ verify(kernelTLS().this_processor->cltr_id < lanes.count );
293
294        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
295        int preferred = kernelTLS().this_processor->cltr_id;
296
297
298        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
299        for(25) {
300                // Pick two lists at random
301                unsigned ri = __tls_rand_bck();
302                unsigned rj = __tls_rand_bck();
303
304                unsigned i, j;
305                __attribute__((unused)) bool locali, localj;
306                [i, locali] = idx_from_r(ri, preferred);
307                [j, localj] = idx_from_r(rj, preferred);
308
309                #if !defined(__CFA_NO_STATISTICS__)
310                        if(locali && localj) {
311                                __tls_stats()->ready.pick.pop.local++;
312                        }
313                #endif
314
315                i %= count;
316                j %= count;
317
318                // try popping from the 2 picked lists
319                struct $thread * thrd = try_pop(cltr, i, j);
320                if(thrd) {
321                        #if !defined(__CFA_NO_STATISTICS__)
322                                if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
323                        #endif
324                        return thrd;
325                }
326        }
327
328        // All lanes where empty return 0p
329        return 0p;
330}
331
332static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
333        lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
334        for(i; lanes.count) {
335                lanes.tscs[i].tv = ts(lanes.data[i]);
336        }
337
338}
339
340//=======================================================================
341// Various Ready Queue utilities
342//=======================================================================
343// these function work the same or almost the same
344// whether they are using work-stealing or relaxed fifo scheduling
345
346//-----------------------------------------------------------------------
347// get index from random number with or without bias towards queues
348static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
349        unsigned i;
350        bool local;
351        unsigned rlow  = r % BIAS;
352        unsigned rhigh = r / BIAS;
353        if((0 != rlow) && preferred >= 0) {
354                // (BIAS - 1) out of BIAS chances
355                // Use perferred queues
356                i = preferred + (rhigh % READYQ_SHARD_FACTOR);
357                local = true;
358        }
359        else {
360                // 1 out of BIAS chances
361                // Use all queues
362                i = rhigh;
363                local = false;
364        }
365        return [i, local];
366}
367
368//-----------------------------------------------------------------------
369// try to pop from a lane given by index w
370static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
371        // Get relevant elements locally
372        __intrusive_lane_t & lane = lanes.data[w];
373
374        // If list looks empty retry
375        if( is_empty(lane) ) return 0p;
376
377        // If we can't get the lock retry
378        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
379
380        // If list is empty, unlock and retry
381        if( is_empty(lane) ) {
382                __atomic_unlock(&lane.lock);
383                return 0p;
384        }
385
386        // Actually pop the list
387        struct $thread * thrd;
388        thrd = pop(lane);
389
390        /* paranoid */ verify(thrd);
391        /* paranoid */ verify(lane.lock);
392
393        // Unlock and return
394        __atomic_unlock(&lane.lock);
395
396        // Update statistics
397        #if !defined(__CFA_NO_STATISTICS__)
398                __tls_stats()->ready.pick.pop.success++;
399        #endif
400
401        #if defined(USE_WORKSTEALING)
402                lanes.times[i].val = thrd->links.ts;
403        #endif
404
405        // return the popped thread
406        return thrd;
407}
408
409//-----------------------------------------------------------------------
410// try to pop from any lanes making sure you don't miss any threads push
411// before the start of the function
412__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
413        /* paranoid */ verify( lanes.count > 0 );
414        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
415        unsigned offset = __tls_rand();
416        for(i; count) {
417                unsigned idx = (offset + i) % count;
418                struct $thread * thrd = try_pop(cltr, idx);
419                if(thrd) {
420                        return thrd;
421                }
422        }
423
424        // All lanes where empty return 0p
425        return 0p;
426}
427
428//-----------------------------------------------------------------------
429// Check that all the intrusive queues in the data structure are still consistent
430static void check( __ready_queue_t & q ) with (q) {
431        #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
432                {
433                        for( idx ; lanes.count ) {
434                                __intrusive_lane_t & sl = lanes.data[idx];
435                                assert(!lanes.data[idx].lock);
436
437                                assert(head(sl)->link.prev == 0p );
438                                assert(head(sl)->link.next->link.prev == head(sl) );
439                                assert(tail(sl)->link.next == 0p );
440                                assert(tail(sl)->link.prev->link.next == tail(sl) );
441
442                                if(is_empty(sl)) {
443                                        assert(tail(sl)->link.prev == head(sl));
444                                        assert(head(sl)->link.next == tail(sl));
445                                } else {
446                                        assert(tail(sl)->link.prev != head(sl));
447                                        assert(head(sl)->link.next != tail(sl));
448                                }
449                        }
450                }
451        #endif
452}
453
454//-----------------------------------------------------------------------
455// Given 2 indexes, pick the list with the oldest push an try to pop from it
456static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
457        #if !defined(__CFA_NO_STATISTICS__)
458                __tls_stats()->ready.pick.pop.attempt++;
459        #endif
460
461        // Pick the bet list
462        int w = i;
463        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
464                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
465        }
466
467        return try_pop(cltr, w);
468}
469
470// Call this function of the intrusive list was moved using memcpy
471// fixes the list so that the pointers back to anchors aren't left dangling
472static inline void fix(__intrusive_lane_t & ll) {
473        #if !defined(USE_MPSC)
474                // if the list is not empty then follow he pointer and fix its reverse
475                if(!is_empty(ll)) {
476                        head(ll)->link.next->link.prev = head(ll);
477                        tail(ll)->link.prev->link.next = tail(ll);
478                }
479                // Otherwise just reset the list
480                else {
481                        verify(tail(ll)->link.next == 0p);
482                        tail(ll)->link.prev = head(ll);
483                        head(ll)->link.next = tail(ll);
484                        verify(head(ll)->link.prev == 0p);
485                }
486        #endif
487}
488
489static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
490        processor * it = &list`first;
491        for(unsigned i = 0; i < count; i++) {
492                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
493                it->cltr_id = value;
494                value += READYQ_SHARD_FACTOR;
495                it = &(*it)`next;
496        }
497}
498
499static void reassign_cltr_id(struct cluster * cltr) {
500        unsigned preferred = 0;
501        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
502        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
503}
504
505// Grow the ready queue
506void ready_queue_grow(struct cluster * cltr) {
507        size_t ncount;
508        int target = cltr->procs.total;
509
510        /* paranoid */ verify( ready_mutate_islocked() );
511        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
512
513        // Make sure that everything is consistent
514        /* paranoid */ check( cltr->ready_queue );
515
516        // grow the ready queue
517        with( cltr->ready_queue ) {
518                // Find new count
519                // Make sure we always have atleast 1 list
520                if(target >= 2) {
521                        ncount = target * READYQ_SHARD_FACTOR;
522                } else {
523                        ncount = 1;
524                }
525
526                // Allocate new array (uses realloc and memcpies the data)
527                lanes.data = alloc( ncount, lanes.data`realloc );
528
529                // Fix the moved data
530                for( idx; (size_t)lanes.count ) {
531                        fix(lanes.data[idx]);
532                }
533
534                // Construct new data
535                for( idx; (size_t)lanes.count ~ ncount) {
536                        (lanes.data[idx]){};
537                }
538
539                // Update original
540                lanes.count = ncount;
541        }
542
543        fix_times(cltr);
544
545        reassign_cltr_id(cltr);
546
547        // Make sure that everything is consistent
548        /* paranoid */ check( cltr->ready_queue );
549
550        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
551
552        /* paranoid */ verify( ready_mutate_islocked() );
553}
554
555// Shrink the ready queue
556void ready_queue_shrink(struct cluster * cltr) {
557        /* paranoid */ verify( ready_mutate_islocked() );
558        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
559
560        // Make sure that everything is consistent
561        /* paranoid */ check( cltr->ready_queue );
562
563        int target = cltr->procs.total;
564
565        with( cltr->ready_queue ) {
566                // Remember old count
567                size_t ocount = lanes.count;
568
569                // Find new count
570                // Make sure we always have atleast 1 list
571                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: 1;
572                /* paranoid */ verify( ocount >= lanes.count );
573                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
574
575                // for printing count the number of displaced threads
576                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
577                        __attribute__((unused)) size_t displaced = 0;
578                #endif
579
580                // redistribute old data
581                for( idx; (size_t)lanes.count ~ ocount) {
582                        // Lock is not strictly needed but makes checking invariants much easier
583                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
584                        verify(locked);
585
586                        // As long as we can pop from this lane to push the threads somewhere else in the queue
587                        while(!is_empty(lanes.data[idx])) {
588                                struct $thread * thrd;
589                                thrd = pop(lanes.data[idx]);
590
591                                push(cltr, thrd);
592
593                                // for printing count the number of displaced threads
594                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
595                                        displaced++;
596                                #endif
597                        }
598
599                        // Unlock the lane
600                        __atomic_unlock(&lanes.data[idx].lock);
601
602                        // TODO print the queue statistics here
603
604                        ^(lanes.data[idx]){};
605                }
606
607                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
608
609                // Allocate new array (uses realloc and memcpies the data)
610                lanes.data = alloc( lanes.count, lanes.data`realloc );
611
612                // Fix the moved data
613                for( idx; (size_t)lanes.count ) {
614                        fix(lanes.data[idx]);
615                }
616        }
617
618        fix_times(cltr);
619
620        reassign_cltr_id(cltr);
621
622        // Make sure that everything is consistent
623        /* paranoid */ check( cltr->ready_queue );
624
625        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
626        /* paranoid */ verify( ready_mutate_islocked() );
627}
Note: See TracBrowser for help on using the repository browser.