source: libcfa/src/concurrency/ready_queue.cfa @ d2b5d2d

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since d2b5d2d was 62502cc4, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Fixed deadlock where threads could acquire the central scheduler lock for writing while preemption was enabled, leading to any attempt at running any thread to deadlock.
Also added runtime checks to catch new code which could forget to disable interrupts

  • Property mode set to 100644
File size: 18.0 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17// #define __CFA_DEBUG_PRINT_READY_QUEUE__
18
19// #define USE_SNZI
20
21#include "bits/defs.hfa"
22#include "kernel_private.hfa"
23
24#define _GNU_SOURCE
25#include "stdlib.hfa"
26#include "math.hfa"
27
28#include <unistd.h>
29
30#include "snzi.hfa"
31#include "ready_subqueue.hfa"
32
33static const size_t cache_line_size = 64;
34
35// No overriden function, no environment variable, no define
36// fall back to a magic number
37#ifndef __CFA_MAX_PROCESSORS__
38        #define __CFA_MAX_PROCESSORS__ 1024
39#endif
40
41#define BIAS 16
42
43// returns the maximum number of processors the RWLock support
44__attribute__((weak)) unsigned __max_processors() {
45        const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
46        if(!max_cores_s) {
47                __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
48                return __CFA_MAX_PROCESSORS__;
49        }
50
51        char * endptr = 0p;
52        long int max_cores_l = strtol(max_cores_s, &endptr, 10);
53        if(max_cores_l < 1 || max_cores_l > 65535) {
54                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
55                return __CFA_MAX_PROCESSORS__;
56        }
57        if('\0' != *endptr) {
58                __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
59                return __CFA_MAX_PROCESSORS__;
60        }
61
62        return max_cores_l;
63}
64
65//=======================================================================
66// Cluster wide reader-writer lock
67//=======================================================================
68void  ?{}(__scheduler_RWLock_t & this) {
69        this.max   = __max_processors();
70        this.alloc = 0;
71        this.ready = 0;
72        this.lock  = false;
73        this.data  = alloc(this.max);
74
75        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
76        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
77        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
78        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
79
80}
81void ^?{}(__scheduler_RWLock_t & this) {
82        free(this.data);
83}
84
85void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
86        this.handle = proc;
87        this.lock   = false;
88        #ifdef __CFA_WITH_VERIFY__
89                this.owned  = false;
90        #endif
91}
92
93//=======================================================================
94// Lock-Free registering/unregistering of threads
95unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
96        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
97
98        // Step - 1 : check if there is already space in the data
99        uint_fast32_t s = ready;
100
101        // Check among all the ready
102        for(uint_fast32_t i = 0; i < s; i++) {
103                __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
104                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
105                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
106                        /*paranoid*/ verify(i < ready);
107                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
108                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
109                        return i;
110                }
111        }
112
113        if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
114
115        // Step - 2 : F&A to get a new spot in the array.
116        uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
117        if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
118
119        // Step - 3 : Mark space as used and then publish it.
120        __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
121        (*storage){ proc };
122        while(true) {
123                unsigned copy = n;
124                if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
125                        && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
126                        break;
127                asm volatile("pause");
128        }
129
130        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
131
132        // Return new spot.
133        /*paranoid*/ verify(n < ready);
134        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
135        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
136        return n;
137}
138
139void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
140        unsigned id = proc->id;
141        /*paranoid*/ verify(id < ready);
142        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
143        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
144
145        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
146}
147
148//-----------------------------------------------------------------------
149// Writer side : acquire when changing the ready queue, e.g. adding more
150//  queues or removing them.
151uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
152        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
153
154        // Step 1 : lock global lock
155        // It is needed to avoid processors that register mid Critical-Section
156        //   to simply lock their own lock and enter.
157        __atomic_acquire( &lock );
158
159        // Step 2 : lock per-proc lock
160        // Processors that are currently being registered aren't counted
161        //   but can't be in read_lock or in the critical section.
162        // All other processors are counted
163        uint_fast32_t s = ready;
164        for(uint_fast32_t i = 0; i < s; i++) {
165                __atomic_acquire( &data[i].lock );
166        }
167
168        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
169        return s;
170}
171
172void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
173        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
174
175        // Step 1 : release local locks
176        // This must be done while the global lock is held to avoid
177        //   threads that where created mid critical section
178        //   to race to lock their local locks and have the writer
179        //   immidiately unlock them
180        // Alternative solution : return s in write_lock and pass it to write_unlock
181        for(uint_fast32_t i = 0; i < last_s; i++) {
182                verify(data[i].lock);
183                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
184        }
185
186        // Step 2 : release global lock
187        /*paranoid*/ assert(true == lock);
188        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
189
190        /* paranoid */ verify( ! kernelTLS.preemption_state.enabled );
191}
192
193//=======================================================================
194// Cforall Reqdy Queue used for scheduling
195//=======================================================================
196void ?{}(__ready_queue_t & this) with (this) {
197        lanes.data  = 0p;
198        lanes.count = 0;
199}
200
201void ^?{}(__ready_queue_t & this) with (this) {
202        verify( 1 == lanes.count );
203        #ifdef USE_SNZI
204                verify( !query( snzi ) );
205        #endif
206        free(lanes.data);
207}
208
209//-----------------------------------------------------------------------
210__attribute__((hot)) bool query(struct cluster * cltr) {
211        #ifdef USE_SNZI
212                return query(cltr->ready_queue.snzi);
213        #endif
214        return true;
215}
216
217//-----------------------------------------------------------------------
218__attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
219        __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
220
221        // write timestamp
222        thrd->link.ts = rdtscl();
223
224        #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
225                bool local = false;
226                int preferred =
227                        //*
228                        kernelTLS.this_processor ? kernelTLS.this_processor->id * 4 : -1;
229                        /*/
230                        thrd->link.preferred * 4;
231                        //*/
232
233
234        #endif
235
236        // Try to pick a lane and lock it
237        unsigned i;
238        do {
239                // Pick the index of a lane
240                #if defined(BIAS)
241                        unsigned r = __tls_rand();
242                        unsigned rlow  = r % BIAS;
243                        unsigned rhigh = r / BIAS;
244                        if((0 != rlow) && preferred >= 0) {
245                                // (BIAS - 1) out of BIAS chances
246                                // Use perferred queues
247                                i = preferred + (rhigh % 4);
248
249                                #if !defined(__CFA_NO_STATISTICS__)
250                                        local = true;
251                                        __tls_stats()->ready.pick.push.local++;
252                                #endif
253                        }
254                        else {
255                                // 1 out of BIAS chances
256                                // Use all queues
257                                i = rhigh;
258                                local = false;
259                        }
260                #else
261                        i = __tls_rand();
262                #endif
263
264                i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
265
266                #if !defined(__CFA_NO_STATISTICS__)
267                        __tls_stats()->ready.pick.push.attempt++;
268                #endif
269
270                // If we can't lock it retry
271        } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
272
273        bool first = false;
274
275        // Actually push it
276        bool lane_first = push(lanes.data[i], thrd);
277
278        #ifdef USE_SNZI
279                // If this lane used to be empty we need to do more
280                if(lane_first) {
281                        // Check if the entire queue used to be empty
282                        first = !query(snzi);
283
284                        // Update the snzi
285                        arrive( snzi, i );
286                }
287        #endif
288
289        // Unlock and return
290        __atomic_unlock( &lanes.data[i].lock );
291
292        __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
293
294        // Update statistics
295        #if !defined(__CFA_NO_STATISTICS__)
296                #if defined(BIAS)
297                        if( local ) __tls_stats()->ready.pick.push.lsuccess++;
298                #endif
299                __tls_stats()->ready.pick.push.success++;
300        #endif
301
302        // return whether or not the list was empty before this push
303        return first;
304}
305
306static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
307static struct $thread * try_pop(struct cluster * cltr, unsigned i);
308
309// Pop from the ready queue from a given cluster
310__attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
311        /* paranoid */ verify( lanes.count > 0 );
312        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
313        #if defined(BIAS)
314                // Don't bother trying locally too much
315                int local_tries = 8;
316        #endif
317
318        // As long as the list is not empty, try finding a lane that isn't empty and pop from it
319        #ifdef USE_SNZI
320                while( query(snzi) ) {
321        #else
322                for(25) {
323        #endif
324                // Pick two lists at random
325                unsigned i,j;
326                #if defined(BIAS)
327                        #if !defined(__CFA_NO_STATISTICS__)
328                                bool local = false;
329                        #endif
330                        uint64_t r = __tls_rand();
331                        unsigned rlow  = r % BIAS;
332                        uint64_t rhigh = r / BIAS;
333                        if(local_tries && 0 != rlow) {
334                                // (BIAS - 1) out of BIAS chances
335                                // Use perferred queues
336                                unsigned pid = kernelTLS.this_processor->id * 4;
337                                i = pid + (rhigh % 4);
338                                j = pid + ((rhigh >> 32ull) % 4);
339
340                                // count the tries
341                                local_tries--;
342
343                                #if !defined(__CFA_NO_STATISTICS__)
344                                        local = true;
345                                        __tls_stats()->ready.pick.pop.local++;
346                                #endif
347                        }
348                        else {
349                                // 1 out of BIAS chances
350                                // Use all queues
351                                i = rhigh;
352                                j = rhigh >> 32ull;
353                        }
354                #else
355                        i = __tls_rand();
356                        j = __tls_rand();
357                #endif
358
359                i %= count;
360                j %= count;
361
362                // try popping from the 2 picked lists
363                struct $thread * thrd = try_pop(cltr, i, j);
364                if(thrd) {
365                        #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
366                                if( local ) __tls_stats()->ready.pick.pop.lsuccess++;
367                        #endif
368                        return thrd;
369                }
370        }
371
372        // All lanes where empty return 0p
373        return 0p;
374}
375
376__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
377        /* paranoid */ verify( lanes.count > 0 );
378        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
379        unsigned offset = __tls_rand();
380        for(i; count) {
381                unsigned idx = (offset + i) % count;
382                struct $thread * thrd = try_pop(cltr, idx);
383                if(thrd) {
384                        return thrd;
385                }
386        }
387
388        // All lanes where empty return 0p
389        return 0p;
390}
391
392
393//-----------------------------------------------------------------------
394// Given 2 indexes, pick the list with the oldest push an try to pop from it
395static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
396        #if !defined(__CFA_NO_STATISTICS__)
397                __tls_stats()->ready.pick.pop.attempt++;
398        #endif
399
400        // Pick the bet list
401        int w = i;
402        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
403                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
404        }
405
406        return try_pop(cltr, w);
407}
408
409static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
410        // Get relevant elements locally
411        __intrusive_lane_t & lane = lanes.data[w];
412
413        // If list looks empty retry
414        if( is_empty(lane) ) return 0p;
415
416        // If we can't get the lock retry
417        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
418
419
420        // If list is empty, unlock and retry
421        if( is_empty(lane) ) {
422                __atomic_unlock(&lane.lock);
423                return 0p;
424        }
425
426        // Actually pop the list
427        struct $thread * thrd;
428        thrd = pop(lane);
429
430        /* paranoid */ verify(thrd);
431        /* paranoid */ verify(lane.lock);
432
433        #ifdef USE_SNZI
434                // If this was the last element in the lane
435                if(emptied) {
436                        depart( snzi, w );
437                }
438        #endif
439
440        // Unlock and return
441        __atomic_unlock(&lane.lock);
442
443        // Update statistics
444        #if !defined(__CFA_NO_STATISTICS__)
445                __tls_stats()->ready.pick.pop.success++;
446        #endif
447
448        // Update the thread bias
449        thrd->link.preferred = w / 4;
450
451        // return the popped thread
452        return thrd;
453}
454//-----------------------------------------------------------------------
455
456bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
457        for(i; lanes.count) {
458                __intrusive_lane_t & lane = lanes.data[i];
459
460                bool removed = false;
461
462                __atomic_acquire(&lane.lock);
463                        if(head(lane)->link.next == thrd) {
464                                $thread * pthrd;
465                                pthrd = pop(lane);
466
467                                /* paranoid */ verify( pthrd == thrd );
468
469                                removed = true;
470                                #ifdef USE_SNZI
471                                        if(emptied) {
472                                                depart( snzi, i );
473                                        }
474                                #endif
475                        }
476                __atomic_unlock(&lane.lock);
477
478                if( removed ) return true;
479        }
480        return false;
481}
482
483//-----------------------------------------------------------------------
484
485static void check( __ready_queue_t & q ) with (q) {
486        #if defined(__CFA_WITH_VERIFY__)
487                {
488                        for( idx ; lanes.count ) {
489                                __intrusive_lane_t & sl = lanes.data[idx];
490                                assert(!lanes.data[idx].lock);
491
492                                assert(head(sl)->link.prev == 0p );
493                                assert(head(sl)->link.next->link.prev == head(sl) );
494                                assert(tail(sl)->link.next == 0p );
495                                assert(tail(sl)->link.prev->link.next == tail(sl) );
496
497                                if(sl.before.link.ts == 0l) {
498                                        assert(tail(sl)->link.prev == head(sl));
499                                        assert(head(sl)->link.next == tail(sl));
500                                } else {
501                                        assert(tail(sl)->link.prev != head(sl));
502                                        assert(head(sl)->link.next != tail(sl));
503                                }
504                        }
505                }
506        #endif
507}
508
509// Call this function of the intrusive list was moved using memcpy
510// fixes the list so that the pointers back to anchors aren't left dangling
511static inline void fix(__intrusive_lane_t & ll) {
512        // if the list is not empty then follow he pointer and fix its reverse
513        if(!is_empty(ll)) {
514                head(ll)->link.next->link.prev = head(ll);
515                tail(ll)->link.prev->link.next = tail(ll);
516        }
517        // Otherwise just reset the list
518        else {
519                verify(tail(ll)->link.next == 0p);
520                tail(ll)->link.prev = head(ll);
521                head(ll)->link.next = tail(ll);
522                verify(head(ll)->link.prev == 0p);
523        }
524}
525
526// Grow the ready queue
527void ready_queue_grow  (struct cluster * cltr, int target) {
528        /* paranoid */ verify( ready_mutate_islocked() );
529        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
530
531        // Make sure that everything is consistent
532        /* paranoid */ check( cltr->ready_queue );
533
534        // grow the ready queue
535        with( cltr->ready_queue ) {
536                #ifdef USE_SNZI
537                        ^(snzi){};
538                #endif
539
540                // Find new count
541                // Make sure we always have atleast 1 list
542                size_t ncount = target >= 2 ? target * 4: 1;
543
544                // Allocate new array (uses realloc and memcpies the data)
545                lanes.data = alloc(lanes.data, ncount);
546
547                // Fix the moved data
548                for( idx; (size_t)lanes.count ) {
549                        fix(lanes.data[idx]);
550                }
551
552                // Construct new data
553                for( idx; (size_t)lanes.count ~ ncount) {
554                        (lanes.data[idx]){};
555                }
556
557                // Update original
558                lanes.count = ncount;
559
560                #ifdef USE_SNZI
561                        // Re-create the snzi
562                        snzi{ log2( lanes.count / 8 ) };
563                        for( idx; (size_t)lanes.count ) {
564                                if( !is_empty(lanes.data[idx]) ) {
565                                        arrive(snzi, idx);
566                                }
567                        }
568                #endif
569        }
570
571        // Make sure that everything is consistent
572        /* paranoid */ check( cltr->ready_queue );
573
574        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
575
576        /* paranoid */ verify( ready_mutate_islocked() );
577}
578
579// Shrink the ready queue
580void ready_queue_shrink(struct cluster * cltr, int target) {
581        /* paranoid */ verify( ready_mutate_islocked() );
582        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
583
584        // Make sure that everything is consistent
585        /* paranoid */ check( cltr->ready_queue );
586
587        with( cltr->ready_queue ) {
588                #ifdef USE_SNZI
589                        ^(snzi){};
590                #endif
591
592                // Remember old count
593                size_t ocount = lanes.count;
594
595                // Find new count
596                // Make sure we always have atleast 1 list
597                lanes.count = target >= 2 ? target * 4: 1;
598                /* paranoid */ verify( ocount >= lanes.count );
599                /* paranoid */ verify( lanes.count == target * 4 || target < 2 );
600
601                // for printing count the number of displaced threads
602                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
603                        __attribute__((unused)) size_t displaced = 0;
604                #endif
605
606                // redistribute old data
607                for( idx; (size_t)lanes.count ~ ocount) {
608                        // Lock is not strictly needed but makes checking invariants much easier
609                        __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
610                        verify(locked);
611
612                        // As long as we can pop from this lane to push the threads somewhere else in the queue
613                        while(!is_empty(lanes.data[idx])) {
614                                struct $thread * thrd;
615                                thrd = pop(lanes.data[idx]);
616
617                                push(cltr, thrd);
618
619                                // for printing count the number of displaced threads
620                                #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
621                                        displaced++;
622                                #endif
623                        }
624
625                        // Unlock the lane
626                        __atomic_unlock(&lanes.data[idx].lock);
627
628                        // TODO print the queue statistics here
629
630                        ^(lanes.data[idx]){};
631                }
632
633                __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
634
635                // Allocate new array (uses realloc and memcpies the data)
636                lanes.data = alloc(lanes.data, lanes.count);
637
638                // Fix the moved data
639                for( idx; (size_t)lanes.count ) {
640                        fix(lanes.data[idx]);
641                }
642
643                #ifdef USE_SNZI
644                        // Re-create the snzi
645                        snzi{ log2( lanes.count / 8 ) };
646                        for( idx; (size_t)lanes.count ) {
647                                if( !is_empty(lanes.data[idx]) ) {
648                                        arrive(snzi, idx);
649                                }
650                        }
651                #endif
652        }
653
654        // Make sure that everything is consistent
655        /* paranoid */ check( cltr->ready_queue );
656
657        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
658        /* paranoid */ verify( ready_mutate_islocked() );
659}
Note: See TracBrowser for help on using the repository browser.