Changeset 24e321c


Ignore:
Timestamp:
Sep 23, 2021, 2:18:01 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
Children:
75c7252
Parents:
fcd65ca
Message:

Unpark now takes a hint on locality.

Location:
libcfa/src/concurrency
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/invoke.h

    rfcd65ca r24e321c  
    175175                struct cluster * curr_cluster;
    176176
    177                 // preferred ready-queue
     177                // preferred ready-queue or CPU
    178178                unsigned preferred;
    179179
  • libcfa/src/concurrency/io.cfa

    rfcd65ca r24e321c  
    9090        static inline unsigned __flush( struct $io_context & );
    9191        static inline __u32 __release_sqes( struct $io_context & );
    92         extern void __kernel_unpark( thread$ * thrd );
     92        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9393
    9494        bool __cfa_io_drain( processor * proc ) {
     
    118118                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    119119
    120                         __kernel_unpark( fulfil( *future, cqe.res, false ) );
     120                        __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
    121121                }
    122122
  • libcfa/src/concurrency/kernel.cfa

    rfcd65ca r24e321c  
    476476                if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) {
    477477                        // The thread was preempted, reschedule it and reset the flag
    478                         schedule_thread$( thrd_dst );
     478                        schedule_thread$( thrd_dst, UNPARK_LOCAL );
    479479                        break RUNNING;
    480480                }
     
    560560// Scheduler routines
    561561// KERNEL ONLY
    562 static void __schedule_thread( thread$ * thrd ) {
     562static void __schedule_thread( thread$ * thrd, unpark_hint hint ) {
    563563        /* paranoid */ verify( ! __preemption_enabled() );
    564564        /* paranoid */ verify( ready_schedule_islocked());
     
    580580        // Dereference the thread now because once we push it, there is not guaranteed it's still valid.
    581581        struct cluster * cl = thrd->curr_cluster;
    582         __STATS(bool outside = thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
     582        __STATS(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
    583583
    584584        // push the thread to the cluster ready-queue
    585         push( cl, thrd, local );
     585        push( cl, thrd, hint );
    586586
    587587        // variable thrd is no longer safe to use
     
    608608}
    609609
    610 void schedule_thread$( thread$ * thrd ) {
     610void schedule_thread$( thread$ * thrd, unpark_hint hint ) {
    611611        ready_schedule_lock();
    612                 __schedule_thread( thrd );
     612                __schedule_thread( thrd, hint );
    613613        ready_schedule_unlock();
    614614}
     
    661661}
    662662
    663 void __kernel_unpark( thread$ * thrd ) {
     663void __kernel_unpark( thread$ * thrd, unpark_hint hint ) {
    664664        /* paranoid */ verify( ! __preemption_enabled() );
    665665        /* paranoid */ verify( ready_schedule_islocked());
     
    669669        if(__must_unpark(thrd)) {
    670670                // Wake lost the race,
    671                 __schedule_thread( thrd );
     671                __schedule_thread( thrd, hint );
    672672        }
    673673
     
    676676}
    677677
    678 void unpark( thread$ * thrd ) {
     678void unpark( thread$ * thrd, unpark_hint hint ) {
    679679        if( !thrd ) return;
    680680
     
    682682                disable_interrupts();
    683683                        // Wake lost the race,
    684                         schedule_thread$( thrd );
     684                        schedule_thread$( thrd, hint );
    685685                enable_interrupts(false);
    686686        }
  • libcfa/src/concurrency/kernel/fwd.hfa

    rfcd65ca r24e321c  
    119119
    120120        extern "Cforall" {
     121                enum unpark_hint { UNPARK_LOCAL, UNPARK_REMOTE };
     122
    121123                extern void park( void );
    122                 extern void unpark( struct thread$ * this );
     124                extern void unpark( struct thread$ *, unpark_hint );
     125                static inline void unpark( struct thread$ * thrd ) { unpark(thrd, UNPARK_LOCAL); }
    123126                static inline struct thread$ * active_thread () {
    124127                        struct thread$ * t = publicTLS_get( this_thread );
  • libcfa/src/concurrency/kernel/startup.cfa

    rfcd65ca r24e321c  
    200200        __cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n");
    201201
     202        // Construct the processor context of the main processor
     203        void ?{}(processorCtx_t & this, processor * proc) {
     204                (this.__cor){ "Processor" };
     205                this.__cor.starter = 0p;
     206                this.proc = proc;
     207        }
     208
     209        void ?{}(processor & this) with( this ) {
     210                ( this.terminated ){};
     211                ( this.runner ){};
     212                init( this, "Main Processor", *mainCluster, 0p );
     213                kernel_thread = pthread_self();
     214
     215                runner{ &this };
     216                __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner);
     217        }
     218
     219        // Initialize the main processor and the main processor ctx
     220        // (the coroutine that contains the processing control flow)
     221        mainProcessor = (processor *)&storage_mainProcessor;
     222        (*mainProcessor){};
     223
     224        register_tls( mainProcessor );
     225
    202226        // Start by initializing the main thread
    203227        // SKULLDUGGERY: the mainThread steals the process main thread
     
    210234        __cfadbg_print_safe(runtime_core, "Kernel : Main thread ready\n");
    211235
    212 
    213 
    214         // Construct the processor context of the main processor
    215         void ?{}(processorCtx_t & this, processor * proc) {
    216                 (this.__cor){ "Processor" };
    217                 this.__cor.starter = 0p;
    218                 this.proc = proc;
    219         }
    220 
    221         void ?{}(processor & this) with( this ) {
    222                 ( this.terminated ){};
    223                 ( this.runner ){};
    224                 init( this, "Main Processor", *mainCluster, 0p );
    225                 kernel_thread = pthread_self();
    226 
    227                 runner{ &this };
    228                 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner);
    229         }
    230 
    231         // Initialize the main processor and the main processor ctx
    232         // (the coroutine that contains the processing control flow)
    233         mainProcessor = (processor *)&storage_mainProcessor;
    234         (*mainProcessor){};
    235 
    236         register_tls( mainProcessor );
    237 
    238236        //initialize the global state variables
    239237        __cfaabi_tls.this_processor = mainProcessor;
     
    251249        // Add the main thread to the ready queue
    252250        // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread
    253         schedule_thread$(mainThread);
     251        schedule_thread$(mainThread, UNPARK_LOCAL);
    254252
    255253        // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX
     
    485483        link.next = 0p;
    486484        link.ts   = -1llu;
    487         preferred = -1u;
     485        preferred = ready_queue_new_preferred();
    488486        last_proc = 0p;
    489487        #if defined( __CFA_WITH_VERIFY__ )
  • libcfa/src/concurrency/kernel_private.hfa

    rfcd65ca r24e321c  
    4646}
    4747
    48 void schedule_thread$( thread$ * ) __attribute__((nonnull (1)));
     48void schedule_thread$( thread$ *, unpark_hint hint ) __attribute__((nonnull (1)));
    4949
    5050extern bool __preemption_enabled();
     
    300300// push thread onto a ready queue for a cluster
    301301// returns true if the list was previously empty, false otherwise
    302 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool local);
     302__attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint);
    303303
    304304//-----------------------------------------------------------------------
     
    321321
    322322//-----------------------------------------------------------------------
     323// get preferred ready for new thread
     324unsigned ready_queue_new_preferred();
     325
     326//-----------------------------------------------------------------------
    323327// Increase the width of the ready queue (number of lanes) by 4
    324328void ready_queue_grow  (struct cluster * cltr);
  • libcfa/src/concurrency/ready_queue.cfa

    rfcd65ca r24e321c  
    290290//-----------------------------------------------------------------------
    291291#if defined(USE_CPU_WORK_STEALING)
    292         __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {
     292        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    293293                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    294294
     
    450450        }
    451451
    452         __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {
     452        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    453453                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    454454
    455                 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     455                const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    456456                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    457457
     
    537537#endif
    538538#if defined(USE_WORK_STEALING)
    539         __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {
     539        __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) {
    540540                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    541541
    542542                // #define USE_PREFERRED
    543543                #if !defined(USE_PREFERRED)
    544                 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     544                const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    545545                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
    546546                #else
    547547                        unsigned preferred = thrd->preferred;
    548                         const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
     548                        const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
    549549                        /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
    550550
     
    687687        #endif
    688688
    689         thrd->preferred = w;
     689        #if defined(USE_CPU_WORK_STEALING)
     690                thrd->preferred = w / READYQ_SHARD_FACTOR;
     691        #else
     692                thrd->preferred = w;
     693        #endif
    690694
    691695        // return the popped thread
     
    713717
    714718//-----------------------------------------------------------------------
     719// get preferred ready for new thread
     720unsigned ready_queue_new_preferred() {
     721        unsigned pref = 0;
     722        if(struct thread$ * thrd = publicTLS_get( this_thread )) {
     723                pref = thrd->preferred;
     724        }
     725        else {
     726                #if defined(USE_CPU_WORK_STEALING)
     727                        pref = __kernel_getcpu();
     728                #endif
     729        }
     730
     731        #if defined(USE_CPU_WORK_STEALING)
     732                /* paranoid */ verify(pref >= 0);
     733                /* paranoid */ verify(pref < cpu_info.hthrd_count);
     734        #endif
     735
     736        return pref;
     737}
     738
     739//-----------------------------------------------------------------------
    715740// Check that all the intrusive queues in the data structure are still consistent
    716741static void check( __ready_queue_t & q ) with (q) {
  • libcfa/src/concurrency/thread.cfa

    rfcd65ca r24e321c  
    4343        link.next = 0p;
    4444        link.ts   = -1llu;
    45         preferred = thread_rand() % cl.ready_queue.lanes.count;
     45        preferred = ready_queue_new_preferred();
    4646        last_proc = 0p;
    4747        #if defined( __CFA_WITH_VERIFY__ )
     
    140140        /* paranoid */ verify( this_thrd->context.SP );
    141141
    142         schedule_thread$( this_thrd );
     142        schedule_thread$( this_thrd, UNPARK_LOCAL );
    143143        enable_interrupts();
    144144}
Note: See TracChangeset for help on using the changeset viewer.