Changeset b798713 for libcfa


Ignore:
Timestamp:
Dec 10, 2019, 4:23:09 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
f80f840
Parents:
0f9ceacb
Message:

Working ready queue

Location:
libcfa/src
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/bits/defs.hfa

    r0f9ceacb rb798713  
    5454}
    5555
    56 #define __CFA_NO_BIT_TEST_AND_SET__
     56// #define __CFA_NO_BIT_TEST_AND_SET__
    5757
    5858static inline bool bts(volatile unsigned long long int * target, unsigned long long int bit ) {
     
    6565        asm volatile(
    6666            "LOCK btsq %[bit], %[target]\n\t"
    67             :"=@ccc" (result)
     67            : "=@ccc" (result)
    6868            : [target] "m" (*target), [bit] "r" (bit)
    6969        );
  • libcfa/src/concurrency/invoke.h

    r0f9ceacb rb798713  
    158158        };
    159159
     160        // Link lists fields
     161        // instrusive link field for threads
     162        struct __thread_desc_link {
     163                struct thread_desc * next;
     164                struct thread_desc * prev;
     165                unsigned long long ts;
     166        };
     167
    160168        struct thread_desc {
    161169                // Core threading fields
     
    188196                // Link lists fields
    189197                // instrusive link field for threads
    190                 struct thread_desc * next;
    191                 struct thread_desc * prev;
    192                 unsigned long long ts;
     198                struct __thread_desc_link link;
    193199
    194200                struct {
     
    201207        extern "Cforall" {
    202208                static inline thread_desc *& get_next( thread_desc & this ) {
    203                         return this.next;
     209                        return this.link.next;
    204210                }
    205211
  • libcfa/src/concurrency/kernel.cfa

    r0f9ceacb rb798713  
    185185        self_mon.recursion = 1;
    186186        self_mon_p = &self_mon;
    187         next = NULL;
     187        link.next = 0p;
     188        link.prev = 0p;
    188189
    189190        node.next = NULL;
     
    271272
    272273        // register the processor unless it's the main thread which is handled in the boot sequence
    273         if(this != mainProcessor)
     274        if(this != mainProcessor) {
    274275                this->id = doregister(this->cltr, this);
     276                ready_queue_grow( this->cltr );
     277        }
     278
    275279
    276280        {
     
    310314        V( this->terminated );
    311315
     316
    312317        // unregister the processor unless it's the main thread which is handled in the boot sequence
    313         if(this != mainProcessor)
     318        if(this != mainProcessor) {
     319                ready_queue_shrink( this->cltr );
    314320                unregister(this->cltr, this);
     321        }
    315322
    316323        __cfaabi_dbg_print_safe("Kernel : core %p terminated\n", this);
     324
     325        stats_tls_tally(this->cltr);
    317326}
    318327
     
    506515        verify( ! kernelTLS.preemption_state.enabled );
    507516
    508         verifyf( thrd->next == NULL, "Expected null got %p", thrd->next );
     517        verifyf( thrd->link.next == NULL, "Expected null got %p", thrd->link.next );
     518
     519
     520        ready_schedule_lock(thrd->curr_cluster, kernelTLS.this_processor);
     521                bool was_empty = push( thrd->curr_cluster, thrd );
     522        ready_schedule_unlock(thrd->curr_cluster, kernelTLS.this_processor);
    509523
    510524        with( *thrd->curr_cluster ) {
    511                 ready_schedule_lock(*thrd->curr_cluster, kernelTLS.this_processor);
    512                 __atomic_acquire(&ready_queue.lock);
    513                 thrd->ts = rdtscl();
    514                 bool was_empty = push( ready_queue, thrd );
    515                 __atomic_unlock(&ready_queue.lock);
    516                 ready_schedule_unlock(*thrd->curr_cluster, kernelTLS.this_processor);
    517 
    518525                if(was_empty) {
    519526                        lock      (proc_list_lock __cfaabi_dbg_ctx2);
     
    526533                        wake_fast(idle);
    527534                }
    528 
    529535        }
    530536
     
    536542        verify( ! kernelTLS.preemption_state.enabled );
    537543
    538         ready_schedule_lock(*this, kernelTLS.this_processor);
    539                 __atomic_acquire(&ready_queue.lock);
    540                         thread_desc * head;
    541                         __attribute__((unused)) bool _;
    542                         [head, _] = pop( ready_queue );
    543                 __atomic_unlock(&ready_queue.lock);
    544         ready_schedule_unlock(*this, kernelTLS.this_processor);
     544        ready_schedule_lock(this, kernelTLS.this_processor);
     545                thread_desc * head = pop( this );
     546        ready_schedule_unlock(this, kernelTLS.this_processor);
    545547
    546548        verify( ! kernelTLS.preemption_state.enabled );
     
    767769        // Destroy the main processor and its context in reverse order of construction
    768770        // These were manually constructed so we need manually destroy them
    769         ^(mainProcessor->runner){};
     771        void ^?{}(processor & this) with( this ) {
     772                //don't join the main thread here, that wouldn't make any sense
     773                __cfaabi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner);
     774        }
     775
    770776        ^(*mainProcessor){};
    771777
  • libcfa/src/concurrency/kernel.hfa

    r0f9ceacb rb798713  
    190190        // spin lock protecting the queue
    191191        volatile bool lock;
     192        unsigned int last_id;
    192193
    193194        // anchor for the head and the tail of the queue
    194195        struct __sentinel_t {
    195                 struct thread_desc * next;
    196                 struct thread_desc * prev;
    197                 unsigned long long ts;
     196                // Link lists fields
     197                // instrusive link field for threads
     198                // must be exactly as in thread_desc
     199                __thread_desc_link link;
    198200        } before, after;
    199201
    200202        // Optional statistic counters
    201         #ifndef __CFA_NO_SCHED_STATS__
     203        #if !defined(__CFA_NO_SCHED_STATS__)
    202204                struct __attribute__((aligned(64))) {
    203205                        // difference between number of push and pops
     
    214216void ^?{}(__intrusive_ready_queue_t & this);
    215217
     218typedef unsigned long long __cfa_readyQ_mask_t;
     219
     220// enum {
     221//      __cfa_ready_queue_mask_size = (64 - sizeof(size_t)) / sizeof(size_t),
     222//      __cfa_max_ready_queues = __cfa_ready_queue_mask_size * 8 * sizeof(size_t)
     223// };
     224
     225#define __cfa_readyQ_mask_size ((64 - sizeof(size_t)) / sizeof(__cfa_readyQ_mask_t))
     226#define __cfa_max_readyQs (__cfa_readyQ_mask_size * 8 * sizeof(__cfa_readyQ_mask_t))
     227
     228//TODO adjust cache size to ARCHITECTURE
     229struct __attribute__((aligned(128))) __ready_queue_t {
     230        struct {
     231                volatile size_t count;
     232                volatile __cfa_readyQ_mask_t mask[ __cfa_readyQ_mask_size ];
     233        } empty;
     234
     235        struct __attribute__((aligned(64))) {
     236                __intrusive_ready_queue_t * volatile data;
     237                volatile size_t count;
     238        } list;
     239
     240        #if !defined(__CFA_NO_STATISTICS__)
     241                __attribute__((aligned(64))) struct {
     242                        struct {
     243                                struct {
     244                                        volatile size_t attempt;
     245                                        volatile size_t success;
     246                                } push;
     247                                struct {
     248                                        volatile size_t maskrds;
     249                                        volatile size_t attempt;
     250                                        volatile size_t success;
     251                                } pop;
     252                        } pick;
     253                        struct {
     254                                volatile size_t value;
     255                                volatile size_t count;
     256                        } full;
     257                } global_stats;
     258
     259        #endif
     260};
     261
     262void  ?{}(__ready_queue_t & this);
     263void ^?{}(__ready_queue_t & this);
     264
    216265//-----------------------------------------------------------------------------
    217266// Cluster
     
    221270
    222271        // Ready queue for threads
    223         __intrusive_ready_queue_t ready_queue;
     272        __ready_queue_t ready_queue;
    224273
    225274        // Name of the cluster
  • libcfa/src/concurrency/kernel_private.hfa

    r0f9ceacb rb798713  
    143143
    144144static inline bool __atomic_try_acquire(volatile bool * ll) {
    145         return __atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST);
     145        return !__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST);
    146146}
    147147
     
    154154// Reader side : acquire when using the ready queue to schedule but not
    155155//  creating/destroying queues
    156 static inline void ready_schedule_lock( struct cluster & cltr, struct processor * proc) with(cltr.ready_lock) {
     156static inline void ready_schedule_lock( struct cluster * cltr, struct processor * proc) with(cltr->ready_lock) {
    157157        unsigned iproc = proc->id;
    158158        /*paranoid*/ verify(data[iproc].handle == proc);
     
    173173}
    174174
    175 static inline void ready_schedule_unlock( struct cluster & cltr, struct processor * proc) with(cltr.ready_lock) {
     175static inline void ready_schedule_unlock( struct cluster * cltr, struct processor * proc) with(cltr->ready_lock) {
    176176        unsigned iproc = proc->id;
    177177        /*paranoid*/ verify(data[iproc].handle == proc);
     
    188188void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t );
    189189
    190 bool push(__intrusive_ready_queue_t & this, thread_desc * node);
    191 [thread_desc *, bool] pop(__intrusive_ready_queue_t & this);
     190//=======================================================================
     191// Ready-Queue API
     192
     193__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd);
     194__attribute__((hot)) thread_desc * pop(struct cluster * cltr);
     195void ready_queue_grow  (struct cluster * cltr);
     196void ready_queue_shrink(struct cluster * cltr);
     197
     198#if !defined(__CFA_NO_STATISTICS__)
     199void stats_tls_tally(struct cluster * cltr);
     200#else
     201static inline void stats_tls_tally(struct cluster * cltr) {}
     202#endif
    192203
    193204// Local Variables: //
  • libcfa/src/concurrency/monitor.cfa

    r0f9ceacb rb798713  
    841841        for(    thread_desc ** thrd_it = &entry_queue.head;
    842842                *thrd_it;
    843                 thrd_it = &(*thrd_it)->next
     843                thrd_it = &(*thrd_it)->link.next
    844844        ) {
    845845                // For each acceptable check if it matches
  • libcfa/src/concurrency/ready_queue.cfa

    r0f9ceacb rb798713  
    5555}
    5656
     57static inline unsigned rand_bit(unsigned rnum, size_t mask) {
     58        verify(sizeof(mask) == 8);
     59        unsigned bit = mask ? rnum % __builtin_popcountl(mask) : 0;
     60#if !defined(__BMI2__)
     61        uint64_t v = mask;   // Input value to find position with rank r.
     62        unsigned int r = bit + 1;// Input: bit's desired rank [1-64].
     63        unsigned int s;      // Output: Resulting position of bit with rank r [1-64]
     64        uint64_t a, b, c, d; // Intermediate temporaries for bit count.
     65        unsigned int t;      // Bit count temporary.
     66
     67        // Do a normal parallel bit count for a 64-bit integer,
     68        // but store all intermediate steps.
     69        a =  v - ((v >> 1) & ~0UL/3);
     70        b = (a & ~0UL/5) + ((a >> 2) & ~0UL/5);
     71        c = (b + (b >> 4)) & ~0UL/0x11;
     72        d = (c + (c >> 8)) & ~0UL/0x101;
     73
     74
     75        t = (d >> 32) + (d >> 48);
     76        // Now do branchless select!
     77        s  = 64;
     78        s -= ((t - r) & 256) >> 3; r -= (t & ((t - r) >> 8));
     79        t  = (d >> (s - 16)) & 0xff;
     80        s -= ((t - r) & 256) >> 4; r -= (t & ((t - r) >> 8));
     81        t  = (c >> (s - 8)) & 0xf;
     82        s -= ((t - r) & 256) >> 5; r -= (t & ((t - r) >> 8));
     83        t  = (b >> (s - 4)) & 0x7;
     84        s -= ((t - r) & 256) >> 6; r -= (t & ((t - r) >> 8));
     85        t  = (a >> (s - 2)) & 0x3;
     86        s -= ((t - r) & 256) >> 7; r -= (t & ((t - r) >> 8));
     87        t  = (v >> (s - 1)) & 0x1;
     88        s -= ((t - r) & 256) >> 8;
     89        return s - 1;
     90#else
     91        uint64_t picked = _pdep_u64(1ul << bit, mask);
     92        return picked ? __builtin_ctzl(picked) : 0;
     93#endif
     94}
     95
     96static inline __cfa_readyQ_mask_t readyQ_mask_full       () { return (8 * sizeof(__cfa_readyQ_mask_t)) - 1; }
     97static inline __cfa_readyQ_mask_t readyQ_mask_shit_length() { return (8 * sizeof(__cfa_readyQ_mask_t)) - __builtin_clzl(readyQ_mask_full()); }
     98
     99static inline [__cfa_readyQ_mask_t, __cfa_readyQ_mask_t] extract(__cfa_readyQ_mask_t idx) {
     100        __cfa_readyQ_mask_t word = idx >> readyQ_mask_shit_length();
     101        __cfa_readyQ_mask_t bit  = idx &  readyQ_mask_full();
     102        return [bit, word];
     103}
     104
    57105//=======================================================================
    58106// Cluster wide reader-writer lock
     
    170218// Intrusive Queue used by ready queue
    171219//=======================================================================
    172 static const size_t fields_offset = offsetof( thread_desc, next );
    173 
    174220// Get the head pointer (one before the first element) from the anchor
    175221static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
    176222        thread_desc * rhead = (thread_desc *)(
    177                 (uintptr_t)( &this.before ) - fields_offset
     223                (uintptr_t)( &this.before ) - offsetof( thread_desc, link )
    178224        );
    179225        /* paranoid */ verify(rhead);
     
    184230static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
    185231        thread_desc * rtail = (thread_desc *)(
    186                 (uintptr_t)( &this.after ) - fields_offset
     232                (uintptr_t)( &this.after ) - offsetof( thread_desc, link )
    187233        );
    188234        /* paranoid */ verify(rtail);
     
    192238// Ctor
    193239void ?{}( __intrusive_ready_queue_t & this ) {
    194         this.before.prev = 0p;
    195         this.before.next = tail(this);
    196 
    197         this.after .prev = head(this);
    198         this.after .next = 0p;
     240        this.lock = false;
     241        this.last_id = -1u;
     242
     243        this.before.link.prev = 0p;
     244        this.before.link.next = tail(this);
     245        this.before.link.ts   = 0;
     246
     247        this.after .link.prev = head(this);
     248        this.after .link.next = 0p;
     249        this.after .link.ts   = 0;
     250
     251        #if !defined(__CFA_NO_SCHED_STATS__)
     252                this.stat.diff = 0;
     253                this.stat.push = 0;
     254                this.stat.pop  = 0;
     255        #endif
    199256
    200257        // We add a boat-load of assertions here because the anchor code is very fragile
    201         /* paranoid */ verify(((uintptr_t)( head(this) ) + fields_offset) == (uintptr_t)(&this.before));
    202         /* paranoid */ verify(((uintptr_t)( tail(this) ) + fields_offset) == (uintptr_t)(&this.after ));
    203         /* paranoid */ verify(head(this)->prev == 0p );
    204         /* paranoid */ verify(head(this)->next == tail(this) );
    205         /* paranoid */ verify(tail(this)->next == 0p );
    206         /* paranoid */ verify(tail(this)->prev == head(this) );
    207         /* paranoid */ verify(&head(this)->prev == &this.before.prev );
    208         /* paranoid */ verify(&head(this)->next == &this.before.next );
    209         /* paranoid */ verify(&tail(this)->prev == &this.after .prev );
    210         /* paranoid */ verify(&tail(this)->next == &this.after .next );
     258        /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.before));
     259        /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( thread_desc, link )) == (uintptr_t)(&this.after ));
     260        /* paranoid */ verify(head(this)->link.prev == 0p );
     261        /* paranoid */ verify(head(this)->link.next == tail(this) );
     262        /* paranoid */ verify(tail(this)->link.next == 0p );
     263        /* paranoid */ verify(tail(this)->link.prev == head(this) );
     264        /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
     265        /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
     266        /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
     267        /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
    211268        /* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
    212269        /* paranoid */ verify(sizeof(this) == 128);
     
    214271        /* paranoid */ verify(__alignof__(this) == 128);
    215272        /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
     273
     274        /* paranoid */ verifyf(readyQ_mask_shit_length() == 6 , "%zu", readyQ_mask_shit_length());
     275        /* paranoid */ verifyf(readyQ_mask_full()        == 63, "%zu", readyQ_mask_full());
    216276}
    217277
     
    219279void ^?{}( __intrusive_ready_queue_t & this ) {
    220280        // Make sure the list is empty
    221         /* paranoid */ verify(head(this)->prev == 0p );
    222         /* paranoid */ verify(head(this)->next == tail(this) );
    223         /* paranoid */ verify(tail(this)->next == 0p );
    224         /* paranoid */ verify(tail(this)->prev == head(this) );
     281        /* paranoid */ verify(head(this)->link.prev == 0p );
     282        /* paranoid */ verify(head(this)->link.next == tail(this) );
     283        /* paranoid */ verify(tail(this)->link.next == 0p );
     284        /* paranoid */ verify(tail(this)->link.prev == head(this) );
    225285}
    226286
     
    229289bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
    230290        verify(this.lock);
    231         verify(node->ts != 0);
    232         verify(node->next == 0p);
    233         verify(node->prev == 0p);
    234 
     291        verify(node->link.ts != 0);
     292        verify(node->link.next == 0p);
     293        verify(node->link.prev == 0p);
     294
     295        if(this.before.link.ts == 0l) {
     296                verify(tail(this)->link.next == 0p);
     297                verify(tail(this)->link.prev == head(this));
     298                verify(head(this)->link.next == tail(this));
     299                verify(head(this)->link.prev == 0p);
     300        }
    235301
    236302        // Get the relevant nodes locally
    237303        thread_desc * tail = tail(this);
    238         thread_desc * prev = tail->prev;
     304        thread_desc * prev = tail->link.prev;
    239305
    240306        // Do the push
    241         node->next = tail;
    242         node->prev = prev;
    243         prev->next = node;
    244         tail->prev = node;
     307        node->link.next = tail;
     308        node->link.prev = prev;
     309        prev->link.next = node;
     310        tail->link.prev = node;
    245311
    246312        // Update stats
     
    250316        #endif
    251317
     318        verify(node->link.next == tail(this));
     319
    252320        // Check if the queue used to be empty
    253         if(this.before.ts == 0l) {
    254                 this.before.ts = node->ts;
    255                 verify(node->prev == head(this));
     321        if(this.before.link.ts == 0l) {
     322                this.before.link.ts = node->link.ts;
     323                verify(node->link.prev == head(this));
    256324                return true;
    257325        }
     
    261329[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
    262330        verify(this.lock);
     331        verify(this.before.link.ts != 0ul);
    263332        thread_desc * head = head(this);
    264333        thread_desc * tail = tail(this);
    265334
    266         thread_desc * node = head->next;
    267         thread_desc * next = node->next;
    268         if(node == tail) return [0p, false];
     335        thread_desc * node = head->link.next;
     336        thread_desc * next = node->link.next;
     337        if(node == tail) {
     338                verify(false);
     339                verify(this.before.link.ts == 0ul);
     340                verify(tail(this)->link.next == 0p);
     341                verify(tail(this)->link.prev == head(this));
     342                verify(head(this)->link.next == tail(this));
     343                verify(head(this)->link.prev == 0p);
     344                return [0p, false];
     345        }
    269346
    270347        /* paranoid */ verify(node);
    271348
    272         head->next = next;
    273         next->prev = head;
     349        head->link.next = next;
     350        next->link.prev = head;
    274351
    275352        #ifndef __CFA_NO_SCHED_STATS__
     
    279356
    280357        if(next == tail) {
    281                 this.before.ts = 0ul;
    282                 node->[next, prev] = 0p;
     358                this.before.link.ts = 0ul;
     359                verify(tail(this)->link.next == 0p);
     360                verify(tail(this)->link.prev == head(this));
     361                verify(head(this)->link.next == tail(this));
     362                verify(head(this)->link.prev == 0p);
     363                node->link.[next, prev] = 0p;
    283364                return [node, true];
    284365        }
    285366        else {
    286                 verify(next->ts != 0);
    287                 this.before.ts = next->ts;
    288                 verify(this.before.ts != 0);
    289                 node->[next, prev] = 0p;
     367                verify(next->link.ts != 0);
     368                this.before.link.ts = next->link.ts;
     369                verify(this.before.link.ts != 0);
     370                node->link.[next, prev] = 0p;
    290371                return [node, false];
    291372        }
     
    293374
    294375static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
    295         return this.before.ts;
    296 }
     376        return this.before.link.ts;
     377}
     378
     379//=======================================================================
     380// Cforall Reqdy Queue used by ready queue
     381//=======================================================================
     382
     383static __attribute__((aligned(128))) thread_local struct {
     384        struct {
     385                struct {
     386                        size_t attempt;
     387                        size_t success;
     388                } push;
     389                struct {
     390                        size_t maskrds;
     391                        size_t attempt;
     392                        size_t success;
     393                } pop;
     394        } pick;
     395        struct {
     396                size_t value;
     397                size_t count;
     398        } full;
     399} tls = {
     400        /* pick */{
     401                /* push */{ 0, 0 },
     402                /* pop  */{ 0, 0, 0 },
     403        },
     404        /* full */{ 0, 0 }
     405};
     406
     407//-----------------------------------------------------------------------
     408
     409void ?{}(__ready_queue_t & this) with (this) {
     410        empty.count = 0;
     411        for( i ; __cfa_readyQ_mask_size ) {
     412                empty.mask[i] = 0;
     413        }
     414
     415        list.data = alloc(4);
     416        for( i; 4 ) {
     417                (list.data[i]){};
     418        }
     419        list.count = 4;
     420
     421        #if !defined(__CFA_NO_STATISTICS__)
     422                global_stats.pick.push.attempt = 0;
     423                global_stats.pick.push.success = 0;
     424                global_stats.pick.pop .maskrds = 0;
     425                global_stats.pick.pop .attempt = 0;
     426                global_stats.pick.pop .success = 0;
     427
     428                global_stats.full.value = 0;
     429                global_stats.full.count = 0;
     430        #endif
     431}
     432
     433void ^?{}(__ready_queue_t & this) with (this) {
     434        verify( 4  == list .count );
     435        verify( 0  == empty.count );
     436
     437        for( i; 4 ) {
     438                ^(list.data[i]){};
     439        }
     440        free(list.data);
     441
     442
     443        #if defined(__CFA_WITH_VERIFY__)
     444                for( i ; __cfa_readyQ_mask_size ) {
     445                        assert( 0 == empty.mask[i] );
     446                }
     447        #endif
     448}
     449
     450//-----------------------------------------------------------------------
     451
     452__attribute__((hot)) bool push(struct cluster * cltr, struct thread_desc * thrd) with (cltr->ready_queue) {
     453        thrd->link.ts = rdtscl();
     454
     455        while(true) {
     456                // Pick a random list
     457                unsigned i = tls_rand() % list.count;
     458
     459                #if !defined(__CFA_NO_STATISTICS__)
     460                        tls.pick.push.attempt++;
     461                #endif
     462
     463                // If we can't lock it retry
     464                if( !__atomic_try_acquire( &list.data[i].lock ) ) continue;
     465                verify(list.data[i].last_id == -1u);
     466                list.data[i].last_id = kernelTLS.this_processor->id;
     467
     468                __attribute__((unused)) size_t num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
     469                bool first = false;
     470
     471                verify( list.data[i].last_id == kernelTLS.this_processor->id );
     472                verify( list.data[i].lock );
     473                // Actually push it
     474                if(push(list.data[i], thrd)) {
     475                        size_t ret = __atomic_fetch_add( &empty.count, 1z, __ATOMIC_SEQ_CST);
     476                        first = (ret == 0);
     477
     478                        __cfa_readyQ_mask_t word;
     479                        __cfa_readyQ_mask_t bit;
     480                        [bit, word] = extract(i);
     481                        verifyf((empty.mask[word] & (1ull << bit)) == 0, "Before set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
     482                        __attribute__((unused)) bool ret = bts(&empty.mask[word], bit);
     483                        verify(!(bool)ret);
     484                        verifyf((empty.mask[word] & (1ull << bit)) != 0, "After set %llu:%llu (%u), %llx & %llx", word, bit, i, empty.mask[word], (1ull << bit));
     485                }
     486                verify(empty.count <= (int)list.count);
     487                verify( list.data[i].last_id == kernelTLS.this_processor->id );
     488                verify( list.data[i].lock );
     489
     490                // Unlock and return
     491                list.data[i].last_id = -1u;
     492                __atomic_unlock( &list.data[i].lock );
     493
     494                #if !defined(__CFA_NO_STATISTICS__)
     495                        tls.pick.push.success++;
     496                        tls.full.value += num;
     497                        tls.full.count += 1;
     498                #endif
     499                return first;
     500        }
     501}
     502
     503//-----------------------------------------------------------------------
     504
     505static struct thread_desc * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
     506        #if !defined(__CFA_NO_STATISTICS__)
     507                tls.pick.pop.attempt++;
     508        #endif
     509
     510        // Pick the bet list
     511        int w = i;
     512        if( __builtin_expect(ts(list.data[j]) != 0, true) ) {
     513                w = (ts(list.data[i]) < ts(list.data[j])) ? i : j;
     514        }
     515
     516        __intrusive_ready_queue_t & list = list.data[w];
     517        // If list looks empty retry
     518        if( ts(list) == 0 ) return 0p;
     519
     520        // If we can't get the lock retry
     521        if( !__atomic_try_acquire(&list.lock) ) return 0p;
     522        verify(list.last_id == -1u);
     523        list.last_id = kernelTLS.this_processor->id;
     524
     525        verify(list.last_id == kernelTLS.this_processor->id);
     526
     527        __attribute__((unused)) int num = __atomic_load_n( &empty.count, __ATOMIC_RELAXED );
     528
     529
     530        // If list is empty, unlock and retry
     531        if( ts(list) == 0 ) {
     532                list.last_id = -1u;
     533                __atomic_unlock(&list.lock);
     534                return 0p;
     535        }
     536        {
     537                __cfa_readyQ_mask_t word;
     538                __cfa_readyQ_mask_t bit;
     539                [bit, word] = extract(w);
     540                verify((empty.mask[word] & (1ull << bit)) != 0);
     541        }
     542
     543        verify(list.last_id == kernelTLS.this_processor->id);
     544        verify(list.lock);
     545
     546        // Actually pop the list
     547        struct thread_desc * thrd;
     548        bool emptied;
     549        [thrd, emptied] = pop(list);
     550        verify(thrd);
     551
     552        verify(list.last_id == kernelTLS.this_processor->id);
     553        verify(list.lock);
     554
     555        if(emptied) {
     556                __atomic_fetch_sub( &empty.count, 1z, __ATOMIC_SEQ_CST);
     557
     558                __cfa_readyQ_mask_t word;
     559                __cfa_readyQ_mask_t bit;
     560                [bit, word] = extract(w);
     561                verify((empty.mask[word] & (1ull << bit)) != 0);
     562                __attribute__((unused)) bool ret = btr(&empty.mask[word], bit);
     563                verify(ret);
     564                verify((empty.mask[word] & (1ull << bit)) == 0);
     565        }
     566
     567        verify(list.lock);
     568
     569        // Unlock and return
     570        list.last_id = -1u;
     571        __atomic_unlock(&list.lock);
     572        verify(empty.count >= 0);
     573
     574        #if !defined(__CFA_NO_STATISTICS__)
     575                tls.pick.pop.success++;
     576                tls.full.value += num;
     577                tls.full.count += 1;
     578        #endif
     579
     580        return thrd;
     581}
     582
     583__attribute__((hot)) thread_desc * pop(struct cluster * cltr) with (cltr->ready_queue) {
     584        verify( list.count > 0 );
     585        while( __atomic_load_n( &empty.count, __ATOMIC_RELAXED ) != 0) {
     586                #if !defined(__CFA_READQ_NO_BITMASK__)
     587                        tls.pick.pop.maskrds++;
     588                        unsigned i, j;
     589                        {
     590                                #if !defined(__CFA_NO_SCHED_STATS__)
     591                                        tls.pick.pop.maskrds++;
     592                                #endif
     593
     594                                // Pick two lists at random
     595                                unsigned num = ((__atomic_load_n( &list.count, __ATOMIC_RELAXED ) - 1) >> 6) + 1;
     596
     597                                unsigned ri = tls_rand();
     598                                unsigned rj = tls_rand();
     599
     600                                unsigned wdxi = (ri >> 6u) % num;
     601                                unsigned wdxj = (rj >> 6u) % num;
     602
     603                                size_t maski = __atomic_load_n( &empty.mask[wdxi], __ATOMIC_RELAXED );
     604                                size_t maskj = __atomic_load_n( &empty.mask[wdxj], __ATOMIC_RELAXED );
     605
     606                                if(maski == 0 && maskj == 0) continue;
     607
     608                                unsigned bi = rand_bit(ri, maski);
     609                                unsigned bj = rand_bit(rj, maskj);
     610
     611                                verifyf(bi < 64, "%zu %u", maski, bi);
     612                                verifyf(bj < 64, "%zu %u", maskj, bj);
     613
     614                                i = bi | (wdxi << 6);
     615                                j = bj | (wdxj << 6);
     616
     617                                verifyf(i < list.count, "%u", wdxi << 6);
     618                                verifyf(j < list.count, "%u", wdxj << 6);
     619                        }
     620
     621                        struct thread_desc * thrd = try_pop(cltr, i, j);
     622                        if(thrd) return thrd;
     623                #else
     624                        // Pick two lists at random
     625                        int i = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
     626                        int j = tls_rand() % __atomic_load_n( &list.count, __ATOMIC_RELAXED );
     627
     628                        struct thread_desc * thrd = try_pop(cltr, i, j);
     629                        if(thrd) return thrd;
     630                #endif
     631        }
     632
     633        return 0p;
     634}
     635
     636//-----------------------------------------------------------------------
     637
     638static void check( __ready_queue_t & q ) with (q) {
     639        #if defined(__CFA_WITH_VERIFY__)
     640                {
     641                        int idx = 0;
     642                        for( w ; __cfa_readyQ_mask_size ) {
     643                                for( b ; 8 * sizeof(__cfa_readyQ_mask_t) ) {
     644                                        bool is_empty = idx < list.count ? (ts(list.data[idx]) == 0) : true;
     645                                        bool should_be_empty = 0 == (empty.mask[w] & (1z << b));
     646                                        assertf(should_be_empty == is_empty, "Inconsistent list %d, mask expect : %d, actual is got %d", idx, should_be_empty, (bool)is_empty);
     647                                        assert(__cfa_max_readyQs > idx);
     648                                        idx++;
     649                                }
     650                        }
     651                }
     652
     653                {
     654                        for( idx ; list.count ) {
     655                                __intrusive_ready_queue_t & sl = list.data[idx];
     656                                assert(!list.data[idx].lock);
     657
     658                                assert(head(sl)->link.prev == 0p );
     659                                assert(head(sl)->link.next->link.prev == head(sl) );
     660                                assert(tail(sl)->link.next == 0p );
     661                                assert(tail(sl)->link.prev->link.next == tail(sl) );
     662
     663                                if(sl.before.link.ts == 0l) {
     664                                        assert(tail(sl)->link.next == 0p);
     665                                        assert(tail(sl)->link.prev == head(sl));
     666                                        assert(head(sl)->link.next == tail(sl));
     667                                        assert(head(sl)->link.prev == 0p);
     668                                }
     669                        }
     670                }
     671        #endif
     672}
     673
     674// Call this function of the intrusive list was moved using memcpy
     675// fixes the list so that the pointers back to anchors aren't left
     676// dangling
     677static inline void fix(__intrusive_ready_queue_t & ll) {
     678        // if the list is not empty then follow he pointer
     679        // and fix its reverse
     680        if(ll.before.link.ts != 0l) {
     681                head(ll)->link.next->link.prev = head(ll);
     682                tail(ll)->link.prev->link.next = tail(ll);
     683        }
     684        // Otherwise just reset the list
     685        else {
     686                tail(ll)->link.next = 0p;
     687                tail(ll)->link.prev = head(ll);
     688                head(ll)->link.next = tail(ll);
     689                head(ll)->link.prev = 0p;
     690        }
     691}
     692
     693void ready_queue_grow  (struct cluster * cltr) {
     694        uint_fast32_t last_size = ready_mutate_lock( *cltr );
     695        check( cltr->ready_queue );
     696
     697        with( cltr->ready_queue ) {
     698                size_t ncount = list.count;
     699
     700                // Check that we have some space left
     701                if(ncount + 4 >= __cfa_max_readyQs) abort("Program attempted to create more than maximum number of Ready Queues (%zu)", __cfa_max_readyQs);
     702
     703                ncount += 4;
     704
     705                // Allocate new array
     706                list.data = alloc(list.data, ncount);
     707
     708                // Fix the moved data
     709                for( idx; (size_t)list.count ) {
     710                        fix(list.data[idx]);
     711                }
     712
     713                // Construct new data
     714                for( idx; (size_t)list.count ~ ncount) {
     715                        (list.data[idx]){};
     716                }
     717
     718                // Update original
     719                list.count = ncount;
     720                // fields in empty don't need to change
     721        }
     722
     723        // Make sure that everything is consistent
     724        check( cltr->ready_queue );
     725        ready_mutate_unlock( *cltr, last_size );
     726}
     727
     728void ready_queue_shrink(struct cluster * cltr) {
     729        uint_fast32_t last_size = ready_mutate_lock( *cltr );
     730        with( cltr->ready_queue ) {
     731                size_t ocount = list.count;
     732                // Check that we have some space left
     733                if(ocount < 8) abort("Program attempted to destroy more Ready Queues than were created");
     734
     735                list.count -= 4;
     736
     737                // redistribute old data
     738                verify(ocount > list.count);
     739                for( idx; (size_t)list.count ~ ocount) {
     740                        // This is not strictly needed but makes checking invariants much easier
     741                        bool locked = __atomic_try_acquire(&list.data[idx].lock);
     742                        verify(locked);
     743                        while(0 != ts(list.data[idx])) {
     744                                struct thread_desc * thrd;
     745                                __attribute__((unused)) bool _;
     746                                [thrd, _] = pop(list.data[idx]);
     747                                verify(thrd);
     748                                push(cltr, thrd);
     749                        }
     750
     751                        __atomic_unlock(&list.data[idx].lock);
     752
     753                        // TODO print the queue statistics here
     754
     755                        ^(list.data[idx]){};
     756                }
     757
     758                // clear the now unused masks
     759                {
     760                        __cfa_readyQ_mask_t fword, fbit, lword, lbit;
     761                        [fbit, fword] = extract(ocount);
     762                        [lbit, lword] = extract(list.count);
     763
     764                        // For now assume that all queues where coverd by the same bitmask
     765                        // This is highly probable as long as grow and shrink use groups of 4
     766                        // exclusively
     767                        verify(fword == lword);
     768                        __cfa_readyQ_mask_t clears = ~0;
     769
     770                        for( b ; fbit ~ lbit ) {
     771                                clears ^= 1 << b;
     772                        }
     773
     774                        empty.mask[fword] &= clears;
     775                }
     776
     777                // Allocate new array
     778                list.data = alloc(list.data, list.count);
     779
     780                // Fix the moved data
     781                for( idx; (size_t)list.count ) {
     782                        fix(list.data[idx]);
     783                }
     784        }
     785
     786        // Make sure that everything is consistent
     787        check( cltr->ready_queue );
     788        ready_mutate_unlock( *cltr, last_size );
     789}
     790
     791//-----------------------------------------------------------------------
     792
     793#if !defined(__CFA_NO_STATISTICS__)
     794void stats_tls_tally(struct cluster * cltr) with (cltr->ready_queue) {
     795        __atomic_fetch_add( &global_stats.pick.push.attempt, tls.pick.push.attempt, __ATOMIC_SEQ_CST );
     796        __atomic_fetch_add( &global_stats.pick.push.success, tls.pick.push.success, __ATOMIC_SEQ_CST );
     797        __atomic_fetch_add( &global_stats.pick.pop .maskrds, tls.pick.pop .maskrds, __ATOMIC_SEQ_CST );
     798        __atomic_fetch_add( &global_stats.pick.pop .attempt, tls.pick.pop .attempt, __ATOMIC_SEQ_CST );
     799        __atomic_fetch_add( &global_stats.pick.pop .success, tls.pick.pop .success, __ATOMIC_SEQ_CST );
     800
     801        __atomic_fetch_add( &global_stats.full.value, tls.full.value, __ATOMIC_SEQ_CST );
     802        __atomic_fetch_add( &global_stats.full.count, tls.full.count, __ATOMIC_SEQ_CST );
     803}
     804#endif
  • libcfa/src/concurrency/thread.cfa

    r0f9ceacb rb798713  
    4141        self_mon_p = &self_mon;
    4242        curr_cluster = &cl;
    43         next = 0p;
    44         prev = 0p;
     43        link.next = 0p;
     44        link.prev = 0p;
    4545
    4646        node.next = NULL;
Note: See TracChangeset for help on using the changeset viewer.