Ignore:
Timestamp:
May 21, 2021, 4:48:10 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
f1bce515
Parents:
5407cdc (diff), 7404cdc (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    r5407cdc r8d66610  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
    19 // #define USE_MPSC
    2019
    2120#define USE_RELAXED_FIFO
     
    9392        this.alloc = 0;
    9493        this.ready = 0;
    95         this.lock  = false;
    9694        this.data  = alloc(this.max);
    97 
    98         /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
    99         /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
     95        this.write_lock  = false;
     96
    10097        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
    10198        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
     
    106103}
    107104
    108 void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
    109         this.handle = proc;
    110         this.lock   = false;
    111         #ifdef __CFA_WITH_VERIFY__
    112                 this.owned  = false;
    113         #endif
    114 }
    115105
    116106//=======================================================================
    117107// Lock-Free registering/unregistering of threads
    118 void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     108unsigned register_proc_id( void ) with(*__scheduler_lock) {
    119109        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
     110        bool * handle = (bool *)&kernelTLS().sched_lock;
    120111
    121112        // Step - 1 : check if there is already space in the data
     
    124115        // Check among all the ready
    125116        for(uint_fast32_t i = 0; i < s; i++) {
    126                 __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
    127                 if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
    128                         && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    129                         /*paranoid*/ verify(i < ready);
    130                         /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
    131                         /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
    132                         proc->id = i;
     117                bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
     118                /* paranoid */ verify( handle != *cell );
     119
     120                bool * null = 0p; // Re-write every loop since compare thrashes it
     121                if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
     122                        && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     123                        /* paranoid */ verify(i < ready);
     124                        /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
     125                        return i;
    133126                }
    134127        }
     
    141134
    142135        // Step - 3 : Mark space as used and then publish it.
    143         __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
    144         (*storage){ proc };
     136        data[n] = handle;
    145137        while() {
    146138                unsigned copy = n;
     
    154146
    155147        // Return new spot.
    156         /*paranoid*/ verify(n < ready);
    157         /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
    158         /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
    159         proc->id = n;
    160 }
    161 
    162 void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    163         unsigned id = proc->id;
    164         /*paranoid*/ verify(id < ready);
    165         /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
    166         __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
     148        /* paranoid */ verify(n < ready);
     149        /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
     150        return n;
     151}
     152
     153void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
     154        /* paranoid */ verify(id < ready);
     155        /* paranoid */ verify(id == kernelTLS().sched_id);
     156        /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
     157
     158        bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
     159
     160        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
    167161
    168162        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
     
    174168uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
    175169        /* paranoid */ verify( ! __preemption_enabled() );
     170        /* paranoid */ verify( ! kernelTLS().sched_lock );
    176171
    177172        // Step 1 : lock global lock
    178173        // It is needed to avoid processors that register mid Critical-Section
    179174        //   to simply lock their own lock and enter.
    180         __atomic_acquire( &lock );
     175        __atomic_acquire( &write_lock );
    181176
    182177        // Step 2 : lock per-proc lock
     
    186181        uint_fast32_t s = ready;
    187182        for(uint_fast32_t i = 0; i < s; i++) {
    188                 __atomic_acquire( &data[i].lock );
     183                volatile bool * llock = data[i];
     184                if(llock) __atomic_acquire( llock );
    189185        }
    190186
     
    203199        // Alternative solution : return s in write_lock and pass it to write_unlock
    204200        for(uint_fast32_t i = 0; i < last_s; i++) {
    205                 verify(data[i].lock);
    206                 __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
     201                volatile bool * llock = data[i];
     202                if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
    207203        }
    208204
    209205        // Step 2 : release global lock
    210         /*paranoid*/ assert(true == lock);
    211         __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
     206        /*paranoid*/ assert(true == write_lock);
     207        __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
    212208
    213209        /* paranoid */ verify( ! __preemption_enabled() );
     
    253249        }
    254250
    255         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     251        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
    256252                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    257253
    258                 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     254                const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    259255                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    260 
    261                 // write timestamp
    262                 thrd->link.ts = rdtscl();
    263256
    264257                bool local;
     
    280273                        #endif
    281274
    282                 #if defined(USE_MPSC)
    283                         // mpsc always succeeds
    284                 } while( false );
    285                 #else
    286275                        // If we can't lock it retry
    287276                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    288                 #endif
    289277
    290278                // Actually push it
    291279                push(lanes.data[i], thrd);
    292280
    293                 #if !defined(USE_MPSC)
    294                         // Unlock and return
    295                         __atomic_unlock( &lanes.data[i].lock );
    296                 #endif
     281                // Unlock and return
     282                __atomic_unlock( &lanes.data[i].lock );
    297283
    298284                // Mark the current index in the tls rng instance as having an item
     
    350336#endif
    351337#if defined(USE_WORK_STEALING)
    352         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     338        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
    353339                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    354340
    355                 const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     341                // #define USE_PREFERRED
     342                #if !defined(USE_PREFERRED)
     343                const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    356344                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    357 
    358                 // write timestamp
    359                 thrd->link.ts = rdtscl();
     345                #else
     346                        unsigned preferred = thrd->preferred;
     347                        const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
     348                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
     349
     350                        unsigned r = preferred % READYQ_SHARD_FACTOR;
     351                        const unsigned start = preferred - r;
     352                #endif
    360353
    361354                // Try to pick a lane and lock it
     
    371364                        }
    372365                        else {
    373                                 processor * proc = kernelTLS().this_processor;
    374                                 unsigned r = proc->rdq.its++;
    375                                 i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     366                                #if !defined(USE_PREFERRED)
     367                                        processor * proc = kernelTLS().this_processor;
     368                                        unsigned r = proc->rdq.its++;
     369                                        i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     370                                #else
     371                                        i = start + (r++ % READYQ_SHARD_FACTOR);
     372                                #endif
    376373                        }
    377 
    378 
    379                 #if defined(USE_MPSC)
    380                         // mpsc always succeeds
    381                 } while( false );
    382                 #else
    383374                        // If we can't lock it retry
    384375                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    385                 #endif
    386376
    387377                // Actually push it
    388378                push(lanes.data[i], thrd);
    389379
    390                 #if !defined(USE_MPSC)
    391                         // Unlock and return
    392                         __atomic_unlock( &lanes.data[i].lock );
    393                 #endif
     380                // Unlock and return
     381                __atomic_unlock( &lanes.data[i].lock );
    394382
    395383                #if !defined(__CFA_NO_STATISTICS__)
     
    410398
    411399                if(proc->rdq.target == -1u) {
     400                        unsigned long long min = ts(lanes.data[proc->rdq.id]);
     401                        for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
     402                                unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
     403                                if(tsc < min) min = tsc;
     404                        }
     405                        proc->rdq.cutoff = min;
    412406                        proc->rdq.target = __tls_rand() % lanes.count;
    413                         unsigned it1  = proc->rdq.itr;
    414                         unsigned it2  = proc->rdq.itr + 1;
    415                         unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
    416                         unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR);
    417                         unsigned long long tsc1 = ts(lanes.data[idx1]);
    418                         unsigned long long tsc2 = ts(lanes.data[idx2]);
    419                         proc->rdq.cutoff = min(tsc1, tsc2);
    420                         if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull;
    421407                }
    422408                else {
    423409                        unsigned target = proc->rdq.target;
    424410                        proc->rdq.target = -1u;
    425                         if(lanes.tscs[target].tv < proc->rdq.cutoff) {
     411                        const unsigned long long bias = 0; //2_500_000_000;
     412                        const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
     413                        if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
    426414                                $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    427415                                if(t) return t;
     
    430418
    431419                for(READYQ_SHARD_FACTOR) {
    432                         unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR);
     420                        unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
    433421                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    434422                }
     
    462450        // If list looks empty retry
    463451        if( is_empty(lane) ) {
    464                 __STATS( stats.espec++; )
    465452                return 0p;
    466453        }
     
    468455        // If we can't get the lock retry
    469456        if( !__atomic_try_acquire(&lane.lock) ) {
    470                 __STATS( stats.elock++; )
    471457                return 0p;
    472458        }
     
    475461        if( is_empty(lane) ) {
    476462                __atomic_unlock(&lane.lock);
    477                 __STATS( stats.eempty++; )
    478463                return 0p;
    479464        }
     
    481466        // Actually pop the list
    482467        struct $thread * thrd;
    483         thrd = pop(lane);
     468        unsigned long long tsv;
     469        [thrd, tsv] = pop(lane);
    484470
    485471        /* paranoid */ verify(thrd);
     472        /* paranoid */ verify(tsv);
    486473        /* paranoid */ verify(lane.lock);
    487474
     
    493480
    494481        #if defined(USE_WORK_STEALING)
    495                 lanes.tscs[w].tv = thrd->link.ts;
     482                lanes.tscs[w].tv = tsv;
    496483        #endif
     484
     485        thrd->preferred = w;
    497486
    498487        // return the popped thread
     
    522511// Check that all the intrusive queues in the data structure are still consistent
    523512static void check( __ready_queue_t & q ) with (q) {
    524         #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
     513        #if defined(__CFA_WITH_VERIFY__)
    525514                {
    526515                        for( idx ; lanes.count ) {
     
    528517                                assert(!lanes.data[idx].lock);
    529518
    530                                 assert(head(sl)->link.prev == 0p );
    531                                 assert(head(sl)->link.next->link.prev == head(sl) );
    532                                 assert(tail(sl)->link.next == 0p );
    533                                 assert(tail(sl)->link.prev->link.next == tail(sl) );
    534 
    535                                 if(is_empty(sl)) {
    536                                         assert(tail(sl)->link.prev == head(sl));
    537                                         assert(head(sl)->link.next == tail(sl));
    538                                 } else {
    539                                         assert(tail(sl)->link.prev != head(sl));
    540                                         assert(head(sl)->link.next != tail(sl));
    541                                 }
     519                                        if(is_empty(sl)) {
     520                                                assert( sl.anchor.next == 0p );
     521                                                assert( sl.anchor.ts   == 0  );
     522                                                assert( mock_head(sl)  == sl.prev );
     523                                        } else {
     524                                                assert( sl.anchor.next != 0p );
     525                                                assert( sl.anchor.ts   != 0  );
     526                                                assert( mock_head(sl)  != sl.prev );
     527                                        }
    542528                        }
    543529                }
     
    560546// fixes the list so that the pointers back to anchors aren't left dangling
    561547static inline void fix(__intrusive_lane_t & ll) {
    562         #if !defined(USE_MPSC)
    563                 // if the list is not empty then follow he pointer and fix its reverse
    564                 if(!is_empty(ll)) {
    565                         head(ll)->link.next->link.prev = head(ll);
    566                         tail(ll)->link.prev->link.next = tail(ll);
    567                 }
    568                 // Otherwise just reset the list
    569                 else {
    570                         verify(tail(ll)->link.next == 0p);
    571                         tail(ll)->link.prev = head(ll);
    572                         head(ll)->link.next = tail(ll);
    573                         verify(head(ll)->link.prev == 0p);
    574                 }
    575         #endif
    576 }
    577 
    578 static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
     548                        if(is_empty(ll)) {
     549                                verify(ll.anchor.next == 0p);
     550                                ll.prev = mock_head(ll);
     551                        }
     552}
     553
     554static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
    579555        processor * it = &list`first;
    580556        for(unsigned i = 0; i < count; i++) {
     
    597573                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    598574                for(i; lanes.count) {
    599                         lanes.tscs[i].tv = ts(lanes.data[i]);
     575                        unsigned long long tsc = ts(lanes.data[i]);
     576                        lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
    600577                }
    601578        #endif
     
    686663                        while(!is_empty(lanes.data[idx])) {
    687664                                struct $thread * thrd;
    688                                 thrd = pop(lanes.data[idx]);
    689 
    690                                 push(cltr, thrd);
     665                                unsigned long long _;
     666                                [thrd, _] = pop(lanes.data[idx]);
     667
     668                                push(cltr, thrd, true);
    691669
    692670                                // for printing count the number of displaced threads
     
    725703        /* paranoid */ verify( ready_mutate_islocked() );
    726704}
     705
     706#if !defined(__CFA_NO_STATISTICS__)
     707        unsigned cnt(const __ready_queue_t & this, unsigned idx) {
     708                /* paranoid */ verify(this.lanes.count > idx);
     709                return this.lanes.data[idx].cnt;
     710        }
     711#endif
Note: See TracChangeset for help on using the changeset viewer.