Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/ready_queue.cfa

    rb808625 rfc59df78  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
     19// #define USE_MPSC
    1920
    2021#define USE_RELAXED_FIFO
     
    9293        this.alloc = 0;
    9394        this.ready = 0;
     95        this.lock  = false;
    9496        this.data  = alloc(this.max);
    95         this.write_lock  = false;
    96 
     97
     98        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
     99        /*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
    97100        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
    98101        /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
     
    103106}
    104107
     108void ?{}( __scheduler_lock_id_t & this, __processor_id_t * proc ) {
     109        this.handle = proc;
     110        this.lock   = false;
     111        #ifdef __CFA_WITH_VERIFY__
     112                this.owned  = false;
     113        #endif
     114}
    105115
    106116//=======================================================================
    107117// Lock-Free registering/unregistering of threads
    108 unsigned register_proc_id( void ) with(*__scheduler_lock) {
     118void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    109119        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    110         bool * handle = (bool *)&kernelTLS().sched_lock;
    111120
    112121        // Step - 1 : check if there is already space in the data
     
    115124        // Check among all the ready
    116125        for(uint_fast32_t i = 0; i < s; i++) {
    117                 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
    118                 /* paranoid */ verify( handle != *cell );
    119 
    120                 bool * null = 0p; // Re-write every loop since compare thrashes it
    121                 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
    122                         && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    123                         /* paranoid */ verify(i < ready);
    124                         /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
    125                         return i;
     126                __processor_id_t * null = 0p; // Re-write every loop since compare thrashes it
     127                if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
     128                        && __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     129                        /*paranoid*/ verify(i < ready);
     130                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
     131                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
     132                        proc->id = i;
    126133                }
    127134        }
     
    134141
    135142        // Step - 3 : Mark space as used and then publish it.
    136         data[n] = handle;
     143        __scheduler_lock_id_t * storage = (__scheduler_lock_id_t *)&data[n];
     144        (*storage){ proc };
    137145        while() {
    138146                unsigned copy = n;
     
    146154
    147155        // Return new spot.
    148         /* paranoid */ verify(n < ready);
    149         /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
    150         return n;
    151 }
    152 
    153 void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
    154         /* paranoid */ verify(id < ready);
    155         /* paranoid */ verify(id == kernelTLS().sched_id);
    156         /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
    157 
    158         bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
    159 
    160         __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
     156        /*paranoid*/ verify(n < ready);
     157        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
     158        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
     159        proc->id = n;
     160}
     161
     162void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     163        unsigned id = proc->id;
     164        /*paranoid*/ verify(id < ready);
     165        /*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
     166        __atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
    161167
    162168        __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
     
    168174uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
    169175        /* paranoid */ verify( ! __preemption_enabled() );
    170         /* paranoid */ verify( ! kernelTLS().sched_lock );
    171176
    172177        // Step 1 : lock global lock
    173178        // It is needed to avoid processors that register mid Critical-Section
    174179        //   to simply lock their own lock and enter.
    175         __atomic_acquire( &write_lock );
     180        __atomic_acquire( &lock );
    176181
    177182        // Step 2 : lock per-proc lock
     
    181186        uint_fast32_t s = ready;
    182187        for(uint_fast32_t i = 0; i < s; i++) {
    183                 volatile bool * llock = data[i];
    184                 if(llock) __atomic_acquire( llock );
     188                __atomic_acquire( &data[i].lock );
    185189        }
    186190
     
    199203        // Alternative solution : return s in write_lock and pass it to write_unlock
    200204        for(uint_fast32_t i = 0; i < last_s; i++) {
    201                 volatile bool * llock = data[i];
    202                 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
     205                verify(data[i].lock);
     206                __atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
    203207        }
    204208
    205209        // Step 2 : release global lock
    206         /*paranoid*/ assert(true == write_lock);
    207         __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
     210        /*paranoid*/ assert(true == lock);
     211        __atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
    208212
    209213        /* paranoid */ verify( ! __preemption_enabled() );
     
    249253        }
    250254
    251         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     255        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    252256                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    253257
    254                 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     258                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    255259                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     260
     261                // write timestamp
     262                thrd->link.ts = rdtscl();
    256263
    257264                bool local;
     
    273280                        #endif
    274281
     282                #if defined(USE_MPSC)
     283                        // mpsc always succeeds
     284                } while( false );
     285                #else
    275286                        // If we can't lock it retry
    276287                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     288                #endif
    277289
    278290                // Actually push it
    279291                push(lanes.data[i], thrd);
    280292
    281                 // Unlock and return
    282                 __atomic_unlock( &lanes.data[i].lock );
     293                #if !defined(USE_MPSC)
     294                        // Unlock and return
     295                        __atomic_unlock( &lanes.data[i].lock );
     296                #endif
    283297
    284298                // Mark the current index in the tls rng instance as having an item
     
    336350#endif
    337351#if defined(USE_WORK_STEALING)
    338         __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
     352        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    339353                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    340354
    341                 // #define USE_PREFERRED
    342                 #if !defined(USE_PREFERRED)
    343                 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     355                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    344356                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    345                 #else
    346                         unsigned preferred = thrd->preferred;
    347                         const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
    348                         /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    349 
    350                         unsigned r = preferred % READYQ_SHARD_FACTOR;
    351                         const unsigned start = preferred - r;
    352                 #endif
     357
     358                // write timestamp
     359                thrd->link.ts = rdtscl();
    353360
    354361                // Try to pick a lane and lock it
     
    364371                        }
    365372                        else {
    366                                 #if !defined(USE_PREFERRED)
    367                                         processor * proc = kernelTLS().this_processor;
    368                                         unsigned r = proc->rdq.its++;
    369                                         i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
    370                                 #else
    371                                         i = start + (r++ % READYQ_SHARD_FACTOR);
    372                                 #endif
     373                                processor * proc = kernelTLS().this_processor;
     374                                unsigned r = proc->rdq.its++;
     375                                i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
    373376                        }
     377
     378
     379                #if defined(USE_MPSC)
     380                        // mpsc always succeeds
     381                } while( false );
     382                #else
    374383                        // If we can't lock it retry
    375384                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     385                #endif
    376386
    377387                // Actually push it
    378388                push(lanes.data[i], thrd);
    379389
    380                 // Unlock and return
    381                 __atomic_unlock( &lanes.data[i].lock );
     390                #if !defined(USE_MPSC)
     391                        // Unlock and return
     392                        __atomic_unlock( &lanes.data[i].lock );
     393                #endif
    382394
    383395                #if !defined(__CFA_NO_STATISTICS__)
     
    398410
    399411                if(proc->rdq.target == -1u) {
    400                         unsigned long long min = ts(lanes.data[proc->rdq.id]);
    401                         for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
    402                                 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
    403                                 if(tsc < min) min = tsc;
    404                         }
    405                         proc->rdq.cutoff = min;
    406412                        proc->rdq.target = __tls_rand() % lanes.count;
     413                        unsigned it1  = proc->rdq.itr;
     414                        unsigned it2  = proc->rdq.itr + 1;
     415                        unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
     416                        unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR);
     417                        unsigned long long tsc1 = ts(lanes.data[idx1]);
     418                        unsigned long long tsc2 = ts(lanes.data[idx2]);
     419                        proc->rdq.cutoff = min(tsc1, tsc2);
     420                        if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull;
    407421                }
    408422                else {
    409423                        unsigned target = proc->rdq.target;
    410424                        proc->rdq.target = -1u;
    411                         const unsigned long long bias = 0; //2_500_000_000;
    412                         const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
    413                         if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
     425                        if(lanes.tscs[target].tv < proc->rdq.cutoff) {
    414426                                $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
    415427                                if(t) return t;
     
    418430
    419431                for(READYQ_SHARD_FACTOR) {
    420                         unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
     432                        unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR);
    421433                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
    422434                }
     
    450462        // If list looks empty retry
    451463        if( is_empty(lane) ) {
     464                __STATS( stats.espec++; )
    452465                return 0p;
    453466        }
     
    455468        // If we can't get the lock retry
    456469        if( !__atomic_try_acquire(&lane.lock) ) {
     470                __STATS( stats.elock++; )
    457471                return 0p;
    458472        }
     
    461475        if( is_empty(lane) ) {
    462476                __atomic_unlock(&lane.lock);
     477                __STATS( stats.eempty++; )
    463478                return 0p;
    464479        }
     
    466481        // Actually pop the list
    467482        struct $thread * thrd;
    468         unsigned long long tsv;
    469         [thrd, tsv] = pop(lane);
     483        thrd = pop(lane);
    470484
    471485        /* paranoid */ verify(thrd);
    472         /* paranoid */ verify(tsv);
    473486        /* paranoid */ verify(lane.lock);
    474487
     
    480493
    481494        #if defined(USE_WORK_STEALING)
    482                 lanes.tscs[w].tv = tsv;
     495                lanes.tscs[w].tv = thrd->link.ts;
    483496        #endif
    484 
    485         thrd->preferred = w;
    486497
    487498        // return the popped thread
     
    511522// Check that all the intrusive queues in the data structure are still consistent
    512523static void check( __ready_queue_t & q ) with (q) {
    513         #if defined(__CFA_WITH_VERIFY__)
     524        #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
    514525                {
    515526                        for( idx ; lanes.count ) {
     
    517528                                assert(!lanes.data[idx].lock);
    518529
    519                                         if(is_empty(sl)) {
    520                                                 assert( sl.anchor.next == 0p );
    521                                                 assert( sl.anchor.ts   == 0  );
    522                                                 assert( mock_head(sl)  == sl.prev );
    523                                         } else {
    524                                                 assert( sl.anchor.next != 0p );
    525                                                 assert( sl.anchor.ts   != 0  );
    526                                                 assert( mock_head(sl)  != sl.prev );
    527                                         }
     530                                assert(head(sl)->link.prev == 0p );
     531                                assert(head(sl)->link.next->link.prev == head(sl) );
     532                                assert(tail(sl)->link.next == 0p );
     533                                assert(tail(sl)->link.prev->link.next == tail(sl) );
     534
     535                                if(is_empty(sl)) {
     536                                        assert(tail(sl)->link.prev == head(sl));
     537                                        assert(head(sl)->link.next == tail(sl));
     538                                } else {
     539                                        assert(tail(sl)->link.prev != head(sl));
     540                                        assert(head(sl)->link.next != tail(sl));
     541                                }
    528542                        }
    529543                }
     
    546560// fixes the list so that the pointers back to anchors aren't left dangling
    547561static inline void fix(__intrusive_lane_t & ll) {
    548                         if(is_empty(ll)) {
    549                                 verify(ll.anchor.next == 0p);
    550                                 ll.prev = mock_head(ll);
    551                         }
    552 }
    553 
    554 static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
     562        #if !defined(USE_MPSC)
     563                // if the list is not empty then follow he pointer and fix its reverse
     564                if(!is_empty(ll)) {
     565                        head(ll)->link.next->link.prev = head(ll);
     566                        tail(ll)->link.prev->link.next = tail(ll);
     567                }
     568                // Otherwise just reset the list
     569                else {
     570                        verify(tail(ll)->link.next == 0p);
     571                        tail(ll)->link.prev = head(ll);
     572                        head(ll)->link.next = tail(ll);
     573                        verify(head(ll)->link.prev == 0p);
     574                }
     575        #endif
     576}
     577
     578static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
    555579        processor * it = &list`first;
    556580        for(unsigned i = 0; i < count; i++) {
     
    573597                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
    574598                for(i; lanes.count) {
    575                         unsigned long long tsc = ts(lanes.data[i]);
    576                         lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
     599                        lanes.tscs[i].tv = ts(lanes.data[i]);
    577600                }
    578601        #endif
     
    663686                        while(!is_empty(lanes.data[idx])) {
    664687                                struct $thread * thrd;
    665                                 unsigned long long _;
    666                                 [thrd, _] = pop(lanes.data[idx]);
    667 
    668                                 push(cltr, thrd, true);
     688                                thrd = pop(lanes.data[idx]);
     689
     690                                push(cltr, thrd);
    669691
    670692                                // for printing count the number of displaced threads
     
    703725        /* paranoid */ verify( ready_mutate_islocked() );
    704726}
    705 
    706 #if !defined(__CFA_NO_STATISTICS__)
    707         unsigned cnt(const __ready_queue_t & this, unsigned idx) {
    708                 /* paranoid */ verify(this.lanes.count > idx);
    709                 return this.lanes.data[idx].cnt;
    710         }
    711 #endif
Note: See TracChangeset for help on using the changeset viewer.