Ignore:
Timestamp:
Apr 16, 2021, 2:28:09 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum, stuck-waitfor-destruct
Children:
665edf40
Parents:
857a1c6 (diff), 5f6a172 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
15 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/coroutine.cfa

    r857a1c6 rc8a0210  
    4646
    4747//-----------------------------------------------------------------------------
    48 FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t))
    49 
    50 forall(T &)
    51 void mark_exception(CoroutineCancelled(T) *) {}
     48EHM_VIRTUAL_TABLE(SomeCoroutineCancelled, std_coroutine_cancelled);
    5249
    5350forall(T &)
     
    7168
    7269        // TODO: Remove explitate vtable set once trac#186 is fixed.
    73         CoroutineCancelled(T) except;
    74         except.virtual_table = &get_exception_vtable(&except);
     70        SomeCoroutineCancelled except;
     71        except.virtual_table = &std_coroutine_cancelled;
    7572        except.the_coroutine = &cor;
    7673        except.the_exception = except;
    77         throwResume except;
     74        // Why does this need a cast?
     75        throwResume (SomeCoroutineCancelled &)except;
    7876
    7977        except->virtual_table->free( except );
  • libcfa/src/concurrency/coroutine.hfa

    r857a1c6 rc8a0210  
    2222//-----------------------------------------------------------------------------
    2323// Exception thrown from resume when a coroutine stack is cancelled.
    24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
     24EHM_EXCEPTION(SomeCoroutineCancelled)(
     25        void * the_coroutine;
     26        exception_t * the_exception;
     27);
     28
     29EHM_EXTERN_VTABLE(SomeCoroutineCancelled, std_coroutine_cancelled);
     30
     31EHM_FORALL_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
    2532        coroutine_t * the_coroutine;
    2633        exception_t * the_exception;
     
    3744// Anything that implements this trait can be resumed.
    3845// Anything that is resumed is a coroutine.
    39 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
     46trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(SomeCoroutineCancelled)) {
    4047        void main(T & this);
    4148        $coroutine * get_coroutine(T & this);
  • libcfa/src/concurrency/invoke.h

    r857a1c6 rc8a0210  
    148148                struct $thread * prev;
    149149                volatile unsigned long long ts;
    150                 int preferred;
    151150        };
    152151
  • libcfa/src/concurrency/io/call.cfa.in

    r857a1c6 rc8a0210  
    201201
    202202                sqe->opcode = IORING_OP_{op};
    203                 sqe->user_data = (__u64)(uintptr_t)&future;
     203                sqe->user_data = (uintptr_t)&future;
    204204                sqe->flags = sflags;
    205205                sqe->ioprio = 0;
     
    215215                asm volatile("": : :"memory");
    216216
    217                 verify( sqe->user_data == (__u64)(uintptr_t)&future );
     217                verify( sqe->user_data == (uintptr_t)&future );
    218218                cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) );
    219219        #endif
     
    238238                'fd'  : 'fd',
    239239                'off' : 'offset',
    240                 'addr': '(__u64)iov',
     240                'addr': '(uintptr_t)iov',
    241241                'len' : 'iovcnt',
    242242        }, define = 'CFA_HAVE_PREADV2'),
     
    245245                'fd'  : 'fd',
    246246                'off' : 'offset',
    247                 'addr': '(__u64)iov',
     247                'addr': '(uintptr_t)iov',
    248248                'len' : 'iovcnt'
    249249        }, define = 'CFA_HAVE_PWRITEV2'),
     
    257257                'addr': 'fd',
    258258                'len': 'op',
    259                 'off': '(__u64)event'
     259                'off': '(uintptr_t)event'
    260260        }),
    261261        # CFA_HAVE_IORING_OP_SYNC_FILE_RANGE
     
    269269        Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)', {
    270270                'fd': 'sockfd',
    271                 'addr': '(__u64)(struct msghdr *)msg',
     271                'addr': '(uintptr_t)(struct msghdr *)msg',
    272272                'len': '1',
    273273                'msg_flags': 'flags'
     
    276276        Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)', {
    277277                'fd': 'sockfd',
    278                 'addr': '(__u64)(struct msghdr *)msg',
     278                'addr': '(uintptr_t)(struct msghdr *)msg',
    279279                'len': '1',
    280280                'msg_flags': 'flags'
     
    283283        Call('SEND', 'ssize_t send(int sockfd, const void *buf, size_t len, int flags)', {
    284284                'fd': 'sockfd',
    285                 'addr': '(__u64)buf',
     285                'addr': '(uintptr_t)buf',
    286286                'len': 'len',
    287287                'msg_flags': 'flags'
     
    290290        Call('RECV', 'ssize_t recv(int sockfd, void *buf, size_t len, int flags)', {
    291291                'fd': 'sockfd',
    292                 'addr': '(__u64)buf',
     292                'addr': '(uintptr_t)buf',
    293293                'len': 'len',
    294294                'msg_flags': 'flags'
     
    297297        Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    298298                'fd': 'sockfd',
    299                 'addr': '(__u64)addr',
    300                 'addr2': '(__u64)addrlen',
     299                'addr': '(uintptr_t)addr',
     300                'addr2': '(uintptr_t)addrlen',
    301301                'accept_flags': 'flags'
    302302        }),
     
    304304        Call('CONNECT', 'int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)', {
    305305                'fd': 'sockfd',
    306                 'addr': '(__u64)addr',
     306                'addr': '(uintptr_t)addr',
    307307                'off': 'addrlen'
    308308        }),
     
    310310        Call('FALLOCATE', 'int fallocate(int fd, int mode, off_t offset, off_t len)', {
    311311                'fd': 'fd',
    312                 'addr': '(__u64)len',
     312                'addr': '(uintptr_t)len',
    313313                'len': 'mode',
    314314                'off': 'offset'
     
    323323        # CFA_HAVE_IORING_OP_MADVISE
    324324        Call('MADVISE', 'int madvise(void *addr, size_t length, int advice)', {
    325                 'addr': '(__u64)addr',
     325                'addr': '(uintptr_t)addr',
    326326                'len': 'length',
    327327                'fadvise_advice': 'advice'
     
    330330        Call('OPENAT', 'int openat(int dirfd, const char *pathname, int flags, mode_t mode)', {
    331331                'fd': 'dirfd',
    332                 'addr': '(__u64)pathname',
     332                'addr': '(uintptr_t)pathname',
    333333                'len': 'mode',
    334334                'open_flags': 'flags;'
     
    339339                'addr': 'pathname',
    340340                'len': 'sizeof(*how)',
    341                 'off': '(__u64)how',
     341                'off': '(uintptr_t)how',
    342342        }, define = 'CFA_HAVE_OPENAT2'),
    343343        # CFA_HAVE_IORING_OP_CLOSE
     
    348348        Call('STATX', 'int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf)', {
    349349                'fd': 'dirfd',
    350                 'off': '(__u64)statxbuf',
     350                'off': '(uintptr_t)statxbuf',
    351351                'addr': 'pathname',
    352352                'len': 'mask',
     
    356356        Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', {
    357357                'fd': 'fd',
    358                 'addr': '(__u64)buf',
     358                'addr': '(uintptr_t)buf',
    359359                'len': 'count'
    360360        }),
     
    362362        Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', {
    363363                'fd': 'fd',
    364                 'addr': '(__u64)buf',
     364                'addr': '(uintptr_t)buf',
    365365                'len': 'count'
    366366        }),
  • libcfa/src/concurrency/kernel.cfa

    r857a1c6 rc8a0210  
    113113static void __wake_one(cluster * cltr);
    114114
    115 static void push  (__cluster_idles & idles, processor & proc);
    116 static void remove(__cluster_idles & idles, processor & proc);
    117 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles );
     115static void mark_idle (__cluster_proc_list & idles, processor & proc);
     116static void mark_awake(__cluster_proc_list & idles, processor & proc);
     117static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list idles );
    118118
    119119extern void __cfa_io_start( processor * );
     
    189189
    190190                                // Push self to idle stack
    191                                 push(this->cltr->idles, * this);
     191                                mark_idle(this->cltr->procs, * this);
    192192
    193193                                // Confirm the ready-queue is empty
     
    195195                                if( readyThread ) {
    196196                                        // A thread was found, cancel the halt
    197                                         remove(this->cltr->idles, * this);
     197                                        mark_awake(this->cltr->procs, * this);
    198198
    199199                                        #if !defined(__CFA_NO_STATISTICS__)
     
    225225
    226226                                // We were woken up, remove self from idle
    227                                 remove(this->cltr->idles, * this);
     227                                mark_awake(this->cltr->procs, * this);
    228228
    229229                                // DON'T just proceed, start looking again
     
    359359                                #if !defined(__CFA_NO_STATISTICS__)
    360360                                        __tls_stats()->ready.threads.threads++;
     361                                        __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this );
    361362                                #endif
    362363                                // This is case 2, the racy case, someone tried to run this thread before it finished blocking
     
    376377        #if !defined(__CFA_NO_STATISTICS__)
    377378                __tls_stats()->ready.threads.threads--;
     379                __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this );
    378380        #endif
    379381
     
    455457                if( kernelTLS().this_stats ) {
    456458                        __tls_stats()->ready.threads.threads++;
     459                        __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", kernelTLS().this_processor );
    457460                }
    458461                else {
    459462                        __atomic_fetch_add(&cl->stats->ready.threads.threads, 1, __ATOMIC_RELAXED);
     463                        __push_stat( cl->stats, cl->stats->ready.threads.threads, true, "Cluster", cl );
    460464                }
    461465        #endif
     
    470474
    471475        ready_schedule_lock();
    472                 $thread * thrd = pop( this );
     476                $thread * thrd = pop_fast( this );
    473477        ready_schedule_unlock();
    474478
     
    613617        unsigned idle;
    614618        unsigned total;
    615         [idle, total, p] = query(this->idles);
     619        [idle, total, p] = query_idles(this->procs);
    616620
    617621        // If no one is sleeping, we are done
     
    650654}
    651655
    652 static void push  (__cluster_idles & this, processor & proc) {
     656static void mark_idle(__cluster_proc_list & this, processor & proc) {
    653657        /* paranoid */ verify( ! __preemption_enabled() );
    654658        lock( this );
    655659                this.idle++;
    656660                /* paranoid */ verify( this.idle <= this.total );
    657 
    658                 insert_first(this.list, proc);
     661                remove(proc);
     662                insert_first(this.idles, proc);
    659663        unlock( this );
    660664        /* paranoid */ verify( ! __preemption_enabled() );
    661665}
    662666
    663 static void remove(__cluster_idles & this, processor & proc) {
     667static void mark_awake(__cluster_proc_list & this, processor & proc) {
    664668        /* paranoid */ verify( ! __preemption_enabled() );
    665669        lock( this );
    666670                this.idle--;
    667671                /* paranoid */ verify( this.idle >= 0 );
    668 
    669672                remove(proc);
     673                insert_last(this.actives, proc);
    670674        unlock( this );
    671675        /* paranoid */ verify( ! __preemption_enabled() );
    672676}
    673677
    674 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) {
     678static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list this ) {
     679        /* paranoid */ verify( ! __preemption_enabled() );
     680        /* paranoid */ verify( ready_schedule_islocked() );
     681
    675682        for() {
    676683                uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST);
     
    678685                unsigned idle    = this.idle;
    679686                unsigned total   = this.total;
    680                 processor * proc = &this.list`first;
     687                processor * proc = &this.idles`first;
    681688                // Compiler fence is unnecessary, but gcc-8 and older incorrectly reorder code without it
    682689                asm volatile("": : :"memory");
     
    684691                return [idle, total, proc];
    685692        }
     693
     694        /* paranoid */ verify( ready_schedule_islocked() );
     695        /* paranoid */ verify( ! __preemption_enabled() );
    686696}
    687697
  • libcfa/src/concurrency/kernel.hfa

    r857a1c6 rc8a0210  
    6969        struct cluster * cltr;
    7070
    71         // Id within the cluster
    72         unsigned cltr_id;
     71        // Ready Queue state per processor
     72        struct {
     73                unsigned short its;
     74                unsigned short itr;
     75                unsigned id;
     76                unsigned target;
     77                unsigned long long int cutoff;
     78        } rdq;
    7379
    7480        // Set to true to notify the processor should terminate
     
    140146// Cluster Tools
    141147
    142 // Intrusives lanes which are used by the relaxed ready queue
     148// Intrusives lanes which are used by the ready queue
    143149struct __attribute__((aligned(128))) __intrusive_lane_t;
    144150void  ?{}(__intrusive_lane_t & this);
    145151void ^?{}(__intrusive_lane_t & this);
    146152
    147 // Counter used for wether or not the lanes are all empty
    148 struct __attribute__((aligned(128))) __snzi_node_t;
    149 struct __snzi_t {
    150         unsigned mask;
    151         int root;
    152         __snzi_node_t * nodes;
    153 };
    154 
    155 void  ?{}( __snzi_t & this, unsigned depth );
    156 void ^?{}( __snzi_t & this );
     153// Aligned timestamps which are used by the relaxed ready queue
     154struct __attribute__((aligned(128))) __timestamp_t;
     155void  ?{}(__timestamp_t & this);
     156void ^?{}(__timestamp_t & this);
    157157
    158158//TODO adjust cache size to ARCHITECTURE
    159159// Structure holding the relaxed ready queue
    160160struct __ready_queue_t {
    161         // Data tracking how many/which lanes are used
    162         // Aligned to 128 for cache locality
    163         __snzi_t snzi;
    164 
    165161        // Data tracking the actual lanes
    166162        // On a seperate cacheline from the used struct since
     
    171167                __intrusive_lane_t * volatile data;
    172168
     169                // Array of times
     170                __timestamp_t * volatile tscs;
     171
    173172                // Number of lanes (empty or not)
    174173                volatile size_t count;
     
    180179
    181180// Idle Sleep
    182 struct __cluster_idles {
     181struct __cluster_proc_list {
    183182        // Spin lock protecting the queue
    184183        volatile uint64_t lock;
     
    191190
    192191        // List of idle processors
    193         dlist(processor, processor) list;
     192        dlist(processor, processor) idles;
     193
     194        // List of active processors
     195        dlist(processor, processor) actives;
    194196};
    195197
     
    207209
    208210        // List of idle processors
    209         __cluster_idles idles;
     211        __cluster_proc_list procs;
    210212
    211213        // List of threads
  • libcfa/src/concurrency/kernel/startup.cfa

    r857a1c6 rc8a0210  
    268268                        __print_stats( st, mainProcessor->print_stats, "Processor ", mainProcessor->name, (void*)mainProcessor );
    269269                }
     270                #if defined(CFA_STATS_ARRAY)
     271                        __flush_stat( st, "Processor", mainProcessor );
     272                #endif
    270273        #endif
    271274
     
    348351                        __print_stats( &local_stats, proc->print_stats, "Processor ", proc->name, (void*)proc );
    349352                }
     353                #if defined(CFA_STATS_ARRAY)
     354                        __flush_stat( &local_stats, "Processor", proc );
     355                #endif
    350356        #endif
    351357
     
    463469        this.name = name;
    464470        this.cltr = &_cltr;
     471        this.rdq.its = 0;
     472        this.rdq.itr = 0;
     473        this.rdq.id  = -1u;
     474        this.rdq.target = -1u;
     475        this.rdq.cutoff = -1ull;
    465476        do_terminate = false;
    466477        preemption_alarm = 0p;
     
    483494        #endif
    484495
    485         lock( this.cltr->idles );
    486                 int target = this.cltr->idles.total += 1u;
    487         unlock( this.cltr->idles );
    488 
    489         id = doregister((__processor_id_t*)&this);
    490 
     496        // Register and Lock the RWlock so no-one pushes/pops while we are changing the queue
     497        uint_fast32_t last_size = ready_mutate_register((__processor_id_t*)&this);
     498                this.cltr->procs.total += 1u;
     499                insert_last(this.cltr->procs.actives, this);
     500
     501                // Adjust the ready queue size
     502                ready_queue_grow( cltr );
     503
     504        // Unlock the RWlock
     505        ready_mutate_unlock( last_size );
     506
     507        __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this);
     508}
     509
     510// Not a ctor, it just preps the destruction but should not destroy members
     511static void deinit(processor & this) {
    491512        // Lock the RWlock so no-one pushes/pops while we are changing the queue
    492513        uint_fast32_t last_size = ready_mutate_lock();
     514                this.cltr->procs.total -= 1u;
     515                remove(this);
    493516
    494517                // Adjust the ready queue size
    495                 this.cltr_id = ready_queue_grow( cltr, target );
    496 
    497         // Unlock the RWlock
    498         ready_mutate_unlock( last_size );
    499 
    500         __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this);
    501 }
    502 
    503 // Not a ctor, it just preps the destruction but should not destroy members
    504 static void deinit(processor & this) {
    505         lock( this.cltr->idles );
    506                 int target = this.cltr->idles.total -= 1u;
    507         unlock( this.cltr->idles );
    508 
    509         // Lock the RWlock so no-one pushes/pops while we are changing the queue
    510         uint_fast32_t last_size = ready_mutate_lock();
    511 
    512                 // Adjust the ready queue size
    513                 ready_queue_shrink( this.cltr, target );
    514 
    515         // Unlock the RWlock
    516         ready_mutate_unlock( last_size );
    517 
    518         // Finally we don't need the read_lock any more
    519         unregister((__processor_id_t*)&this);
     518                ready_queue_shrink( this.cltr );
     519
     520        // Unlock the RWlock and unregister: we don't need the read_lock any more
     521        ready_mutate_unregister((__processor_id_t*)&this, last_size );
    520522
    521523        close(this.idle);
     
    560562//-----------------------------------------------------------------------------
    561563// Cluster
    562 static void ?{}(__cluster_idles & this) {
     564static void ?{}(__cluster_proc_list & this) {
    563565        this.lock  = 0;
    564566        this.idle  = 0;
    565567        this.total = 0;
    566         (this.list){};
    567568}
    568569
     
    590591
    591592                // Adjust the ready queue size
    592                 ready_queue_grow( &this, 0 );
     593                ready_queue_grow( &this );
    593594
    594595        // Unlock the RWlock
     
    605606
    606607                // Adjust the ready queue size
    607                 ready_queue_shrink( &this, 0 );
     608                ready_queue_shrink( &this );
    608609
    609610        // Unlock the RWlock
     
    615616                        __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this );
    616617                }
     618                #if defined(CFA_STATS_ARRAY)
     619                        __flush_stat( this.stats, "Cluster", &this );
     620                #endif
    617621                free( this.stats );
    618622        #endif
  • libcfa/src/concurrency/kernel_private.hfa

    r857a1c6 rc8a0210  
    8383// Cluster lock API
    8484//=======================================================================
    85 // Cells use by the reader writer lock
    86 // while not generic it only relies on a opaque pointer
    87 struct __attribute__((aligned(128))) __scheduler_lock_id_t {
    88         // Spin lock used as the underlying lock
    89         volatile bool lock;
    90 
    91         // Handle pointing to the proc owning this cell
    92         // Used for allocating cells and debugging
    93         __processor_id_t * volatile handle;
    94 
    95         #ifdef __CFA_WITH_VERIFY__
    96                 // Debug, check if this is owned for reading
    97                 bool owned;
    98         #endif
    99 };
    100 
    101 static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t));
    102 
    10385// Lock-Free registering/unregistering of threads
    10486// Register a processor to a given cluster and get its unique id in return
    105 unsigned doregister( struct __processor_id_t * proc );
     87void register_proc_id( struct __processor_id_t * );
    10688
    10789// Unregister a processor from a given cluster using its id, getting back the original pointer
    108 void     unregister( struct __processor_id_t * proc );
    109 
    110 //-----------------------------------------------------------------------
    111 // Cluster idle lock/unlock
    112 static inline void lock(__cluster_idles & this) {
    113         for() {
    114                 uint64_t l = this.lock;
    115                 if(
    116                         (0 == (l % 2))
    117                         && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
    118                 ) return;
    119                 Pause();
    120         }
    121 }
    122 
    123 static inline void unlock(__cluster_idles & this) {
    124         /* paranoid */ verify( 1 == (this.lock % 2) );
    125         __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );
    126 }
     90void unregister_proc_id( struct __processor_id_t * proc );
    12791
    12892//=======================================================================
     
    152116        __atomic_store_n(ll, (bool)false, __ATOMIC_RELEASE);
    153117}
     118
     119// Cells use by the reader writer lock
     120// while not generic it only relies on a opaque pointer
     121struct __attribute__((aligned(128))) __scheduler_lock_id_t {
     122        // Spin lock used as the underlying lock
     123        volatile bool lock;
     124
     125        // Handle pointing to the proc owning this cell
     126        // Used for allocating cells and debugging
     127        __processor_id_t * volatile handle;
     128
     129        #ifdef __CFA_WITH_VERIFY__
     130                // Debug, check if this is owned for reading
     131                bool owned;
     132        #endif
     133};
     134
     135static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t));
    154136
    155137//-----------------------------------------------------------------------
     
    247229void ready_mutate_unlock( uint_fast32_t /* value returned by lock */ );
    248230
     231//-----------------------------------------------------------------------
     232// Lock-Free registering/unregistering of threads
     233// Register a processor to a given cluster and get its unique id in return
     234// For convenience, also acquires the lock
     235static inline uint_fast32_t ready_mutate_register( struct __processor_id_t * proc ) {
     236        register_proc_id( proc );
     237        return ready_mutate_lock();
     238}
     239
     240// Unregister a processor from a given cluster using its id, getting back the original pointer
     241// assumes the lock is acquired
     242static inline void ready_mutate_unregister( struct __processor_id_t * proc, uint_fast32_t last_s ) {
     243        ready_mutate_unlock( last_s );
     244        unregister_proc_id( proc );
     245}
     246
     247//-----------------------------------------------------------------------
     248// Cluster idle lock/unlock
     249static inline void lock(__cluster_proc_list & this) {
     250        /* paranoid */ verify( ! __preemption_enabled() );
     251
     252        // Start by locking the global RWlock so that we know no-one is
     253        // adding/removing processors while we mess with the idle lock
     254        ready_schedule_lock();
     255
     256        // Simple counting lock, acquired, acquired by incrementing the counter
     257        // to an odd number
     258        for() {
     259                uint64_t l = this.lock;
     260                if(
     261                        (0 == (l % 2))
     262                        && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
     263                ) return;
     264                Pause();
     265        }
     266
     267        /* paranoid */ verify( ! __preemption_enabled() );
     268}
     269
     270static inline void unlock(__cluster_proc_list & this) {
     271        /* paranoid */ verify( ! __preemption_enabled() );
     272
     273        /* paranoid */ verify( 1 == (this.lock % 2) );
     274        // Simple couting lock, release by incrementing to an even number
     275        __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );
     276
     277        // Release the global lock, which we acquired when locking
     278        ready_schedule_unlock();
     279
     280        /* paranoid */ verify( ! __preemption_enabled() );
     281}
     282
    249283//=======================================================================
    250284// Ready-Queue API
    251285//-----------------------------------------------------------------------
    252 // pop thread from the ready queue of a cluster
    253 // returns 0p if empty
    254 __attribute__((hot)) bool query(struct cluster * cltr);
    255 
    256 //-----------------------------------------------------------------------
    257286// push thread onto a ready queue for a cluster
    258287// returns true if the list was previously empty, false otherwise
    259 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd);
     288__attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd);
    260289
    261290//-----------------------------------------------------------------------
     
    263292// returns 0p if empty
    264293// May return 0p spuriously
    265 __attribute__((hot)) struct $thread * pop(struct cluster * cltr);
     294__attribute__((hot)) struct $thread * pop_fast(struct cluster * cltr);
    266295
    267296//-----------------------------------------------------------------------
     
    272301
    273302//-----------------------------------------------------------------------
    274 // remove thread from the ready queue of a cluster
    275 // returns bool if it wasn't found
    276 bool remove_head(struct cluster * cltr, struct $thread * thrd);
    277 
    278 //-----------------------------------------------------------------------
    279303// Increase the width of the ready queue (number of lanes) by 4
    280 unsigned ready_queue_grow  (struct cluster * cltr, int target);
     304void ready_queue_grow  (struct cluster * cltr);
    281305
    282306//-----------------------------------------------------------------------
    283307// Decrease the width of the ready queue (number of lanes) by 4
    284 void ready_queue_shrink(struct cluster * cltr, int target);
     308void ready_queue_shrink(struct cluster * cltr);
    285309
    286310
  • libcfa/src/concurrency/preemption.cfa

    r857a1c6 rc8a0210  
    712712static void * alarm_loop( __attribute__((unused)) void * args ) {
    713713        __processor_id_t id;
    714         id.id = doregister(&id);
     714        register_proc_id(&id);
    715715        __cfaabi_tls.this_proc_id = &id;
    716716
     
    773773EXIT:
    774774        __cfaabi_dbg_print_safe( "Kernel : Preemption thread stopping\n" );
    775         unregister(&id);
     775        register_proc_id(&id);
    776776
    777777        return 0p;
  • libcfa/src/concurrency/ready_queue.cfa

    r857a1c6 rc8a0210  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
    19 // #define USE_SNZI
    2019// #define USE_MPSC
     20
     21#define USE_RELAXED_FIFO
     22// #define USE_WORK_STEALING
    2123
    2224#include "bits/defs.hfa"
     
    2931#include <unistd.h>
    3032
    31 #include "snzi.hfa"
    3233#include "ready_subqueue.hfa"
    3334
     
    4041#endif
    4142
    42 #define BIAS 4
     43#if   defined(USE_RELAXED_FIFO)
     44        #define BIAS 4
     45        #define READYQ_SHARD_FACTOR 4
     46        #define SEQUENTIAL_SHARD 1
     47#elif defined(USE_WORK_STEALING)
     48        #define READYQ_SHARD_FACTOR 2
     49        #define SEQUENTIAL_SHARD 2
     50#else
     51        #error no scheduling strategy selected
     52#endif
     53
     54static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
     55static inline struct $thread * try_pop(struct cluster * cltr, unsigned w);
     56static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
     57static inline struct $thread * search(struct cluster * cltr);
     58
    4359
    4460// returns the maximum number of processors the RWLock support
     
    94110//=======================================================================
    95111// Lock-Free registering/unregistering of threads
    96 unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     112void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    97113        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    98114
     
    108124                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
    109125                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
    110                         return i;
     126                        proc->id = i;
    111127                }
    112128        }
     
    135151        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
    136152        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
    137         return n;
    138 }
    139 
    140 void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     153        proc->id = n;
     154}
     155
     156void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    141157        unsigned id = proc->id;
    142158        /*paranoid*/ verify(id < ready);
     
    193209
    194210//=======================================================================
    195 // Cforall Reqdy Queue used for scheduling
     211// Cforall Ready Queue used for scheduling
    196212//=======================================================================
    197213void ?{}(__ready_queue_t & this) with (this) {
    198214        lanes.data  = 0p;
     215        lanes.tscs  = 0p;
    199216        lanes.count = 0;
    200217}
    201218
    202219void ^?{}(__ready_queue_t & this) with (this) {
    203         verify( 1 == lanes.count );
    204         #ifdef USE_SNZI
    205                 verify( !query( snzi ) );
    206         #endif
     220        verify( SEQUENTIAL_SHARD == lanes.count );
    207221        free(lanes.data);
     222        free(lanes.tscs);
    208223}
    209224
    210225//-----------------------------------------------------------------------
    211 __attribute__((hot)) bool query(struct cluster * cltr) {
    212         #ifdef USE_SNZI
    213                 return query(cltr->ready_queue.snzi);
    214         #endif
    215         return true;
    216 }
    217 
    218 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
    219         unsigned i;
    220         bool local;
    221         #if defined(BIAS)
     226#if defined(USE_RELAXED_FIFO)
     227        //-----------------------------------------------------------------------
     228        // get index from random number with or without bias towards queues
     229        static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
     230                unsigned i;
     231                bool local;
    222232                unsigned rlow  = r % BIAS;
    223233                unsigned rhigh = r / BIAS;
     
    225235                        // (BIAS - 1) out of BIAS chances
    226236                        // Use perferred queues
    227                         i = preferred + (rhigh % 4);
     237                        i = preferred + (rhigh % READYQ_SHARD_FACTOR);
    228238                        local = true;
    229239                }
     
    234244                        local = false;
    235245                }
    236         #else
    237                 i = r;
    238                 local = false;
    239         #endif
    240         return [i, local];
    241 }
    242 
    243 //-----------------------------------------------------------------------
    244 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    245         __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    246 
    247         const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
    248 
    249         // write timestamp
    250         thrd->link.ts = rdtscl();
    251 
    252         bool first = false;
    253         __attribute__((unused)) bool local;
    254         __attribute__((unused)) int preferred;
    255         #if defined(BIAS)
    256                 preferred =
    257                         //*
    258                         external ? -1 : kernelTLS().this_processor->cltr_id;
    259                         /*/
    260                         thrd->link.preferred * 4;
    261                         //*/
    262         #endif
    263 
    264         // Try to pick a lane and lock it
    265         unsigned i;
    266         do {
    267                 // Pick the index of a lane
    268                 // unsigned r = __tls_rand();
    269                 unsigned r = __tls_rand_fwd();
    270                 [i, local] = idx_from_r(r, preferred);
    271 
    272                 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    273 
     246                return [i, local];
     247        }
     248
     249        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     250                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     251
     252                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     253                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     254
     255                // write timestamp
     256                thrd->link.ts = rdtscl();
     257
     258                bool local;
     259                int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
     260
     261                // Try to pick a lane and lock it
     262                unsigned i;
     263                do {
     264                        // Pick the index of a lane
     265                        unsigned r = __tls_rand_fwd();
     266                        [i, local] = idx_from_r(r, preferred);
     267
     268                        i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     269
     270                        #if !defined(__CFA_NO_STATISTICS__)
     271                                if(external) {
     272                                        if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.local, 1, __ATOMIC_RELAXED);
     273                                        __atomic_fetch_add(&cltr->stats->ready.pick.ext.attempt, 1, __ATOMIC_RELAXED);
     274                                }
     275                                else {
     276                                        if(local) __tls_stats()->ready.pick.push.local++;
     277                                        __tls_stats()->ready.pick.push.attempt++;
     278                                }
     279                        #endif
     280
     281                #if defined(USE_MPSC)
     282                        // mpsc always succeeds
     283                } while( false );
     284                #else
     285                        // If we can't lock it retry
     286                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     287                #endif
     288
     289                // Actually push it
     290                push(lanes.data[i], thrd);
     291
     292                #if !defined(USE_MPSC)
     293                        // Unlock and return
     294                        __atomic_unlock( &lanes.data[i].lock );
     295                #endif
     296
     297                // Mark the current index in the tls rng instance as having an item
     298                __tls_rand_advance_bck();
     299
     300                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     301
     302                // Update statistics
    274303                #if !defined(__CFA_NO_STATISTICS__)
    275304                        if(external) {
    276                                 if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.local, 1, __ATOMIC_RELAXED);
    277                                 __atomic_fetch_add(&cltr->stats->ready.pick.ext.attempt, 1, __ATOMIC_RELAXED);
     305                                if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.lsuccess, 1, __ATOMIC_RELAXED);
     306                                __atomic_fetch_add(&cltr->stats->ready.pick.ext.success, 1, __ATOMIC_RELAXED);
    278307                        }
    279308                        else {
    280                                 if(local) __tls_stats()->ready.pick.push.local++;
    281                                 __tls_stats()->ready.pick.push.attempt++;
     309                                if(local) __tls_stats()->ready.pick.push.lsuccess++;
     310                                __tls_stats()->ready.pick.push.success++;
    282311                        }
    283312                #endif
    284 
    285         #if defined(USE_MPSC)
    286                 // mpsc always succeeds
    287         } while( false );
    288         #else
    289                 // If we can't lock it retry
    290         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    291         #endif
    292 
    293         // Actually push it
    294         #ifdef USE_SNZI
    295                 bool lane_first =
    296         #endif
    297 
    298         push(lanes.data[i], thrd);
    299 
    300         #ifdef USE_SNZI
    301                 // If this lane used to be empty we need to do more
    302                 if(lane_first) {
    303                         // Check if the entire queue used to be empty
    304                         first = !query(snzi);
    305 
    306                         // Update the snzi
    307                         arrive( snzi, i );
    308                 }
    309         #endif
    310 
    311         #if !defined(USE_MPSC)
    312                 // Unlock and return
    313                 __atomic_unlock( &lanes.data[i].lock );
    314         #endif
    315 
    316         // Mark the current index in the tls rng instance as having an item
    317         __tls_rand_advance_bck();
    318 
    319         __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     313        }
     314
     315        // Pop from the ready queue from a given cluster
     316        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     317                /* paranoid */ verify( lanes.count > 0 );
     318                /* paranoid */ verify( kernelTLS().this_processor );
     319                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     320
     321                unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     322                int preferred = kernelTLS().this_processor->rdq.id;
     323
     324
     325                // As long as the list is not empty, try finding a lane that isn't empty and pop from it
     326                for(25) {
     327                        // Pick two lists at random
     328                        unsigned ri = __tls_rand_bck();
     329                        unsigned rj = __tls_rand_bck();
     330
     331                        unsigned i, j;
     332                        __attribute__((unused)) bool locali, localj;
     333                        [i, locali] = idx_from_r(ri, preferred);
     334                        [j, localj] = idx_from_r(rj, preferred);
     335
     336                        #if !defined(__CFA_NO_STATISTICS__)
     337                                if(locali && localj) {
     338                                        __tls_stats()->ready.pick.pop.local++;
     339                                }
     340                        #endif
     341
     342                        i %= count;
     343                        j %= count;
     344
     345                        // try popping from the 2 picked lists
     346                        struct $thread * thrd = try_pop(cltr, i, j);
     347                        if(thrd) {
     348                                #if !defined(__CFA_NO_STATISTICS__)
     349                                        if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
     350                                #endif
     351                                return thrd;
     352                        }
     353                }
     354
     355                // All lanes where empty return 0p
     356                return 0p;
     357        }
     358
     359        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) {
     360                return search(cltr);
     361        }
     362#endif
     363#if defined(USE_WORK_STEALING)
     364        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     365                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     366
     367                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     368                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     369
     370                // write timestamp
     371                thrd->link.ts = rdtscl();
     372
     373                // Try to pick a lane and lock it
     374                unsigned i;
     375                do {
     376                        if(unlikely(external)) {
     377                                i = __tls_rand() % lanes.count;
     378                        }
     379                        else {
     380                                processor * proc = kernelTLS().this_processor;
     381                                unsigned r = proc->rdq.its++;
     382                                i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     383                        }
     384
     385
     386                #if defined(USE_MPSC)
     387                        // mpsc always succeeds
     388                } while( false );
     389                #else
     390                        // If we can't lock it retry
     391                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     392                #endif
     393
     394                // Actually push it
     395                push(lanes.data[i], thrd);
     396
     397                #if !defined(USE_MPSC)
     398                        // Unlock and return
     399                        __atomic_unlock( &lanes.data[i].lock );
     400                #endif
     401
     402                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     403        }
     404
     405        // Pop from the ready queue from a given cluster
     406        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     407                /* paranoid */ verify( lanes.count > 0 );
     408                /* paranoid */ verify( kernelTLS().this_processor );
     409                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     410
     411                processor * proc = kernelTLS().this_processor;
     412
     413                if(proc->rdq.target == -1u) {
     414                        proc->rdq.target = __tls_rand() % lanes.count;
     415                        unsigned it1  = proc->rdq.itr;
     416                        unsigned it2  = proc->rdq.itr + 1;
     417                        unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
     418                        unsigned idx2 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
     419                        unsigned long long tsc1 = ts(lanes.data[idx1]);
     420                        unsigned long long tsc2 = ts(lanes.data[idx2]);
     421                        proc->rdq.cutoff = min(tsc1, tsc2);
     422                }
     423                else if(lanes.tscs[proc->rdq.target].tv < proc->rdq.cutoff) {
     424                        $thread * t = try_pop(cltr, proc->rdq.target);
     425                        proc->rdq.target = -1u;
     426                        if(t) return t;
     427                }
     428
     429                for(READYQ_SHARD_FACTOR) {
     430                        unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR);
     431                        if($thread * t = try_pop(cltr, i)) return t;
     432                }
     433                return 0p;
     434        }
     435
     436        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     437                for(25) {
     438                        unsigned i = __tls_rand() % lanes.count;
     439                        $thread * t = try_pop(cltr, i);
     440                        if(t) return t;
     441                }
     442
     443                return search(cltr);
     444        }
     445#endif
     446
     447//=======================================================================
     448// Various Ready Queue utilities
     449//=======================================================================
     450// these function work the same or almost the same
     451// whether they are using work-stealing or relaxed fifo scheduling
     452
     453//-----------------------------------------------------------------------
     454// try to pop from a lane given by index w
     455static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
     456        // Get relevant elements locally
     457        __intrusive_lane_t & lane = lanes.data[w];
     458
     459        // If list looks empty retry
     460        if( is_empty(lane) ) return 0p;
     461
     462        // If we can't get the lock retry
     463        if( !__atomic_try_acquire(&lane.lock) ) return 0p;
     464
     465        // If list is empty, unlock and retry
     466        if( is_empty(lane) ) {
     467                __atomic_unlock(&lane.lock);
     468                return 0p;
     469        }
     470
     471        // Actually pop the list
     472        struct $thread * thrd;
     473        thrd = pop(lane);
     474
     475        /* paranoid */ verify(thrd);
     476        /* paranoid */ verify(lane.lock);
     477
     478        // Unlock and return
     479        __atomic_unlock(&lane.lock);
    320480
    321481        // Update statistics
    322482        #if !defined(__CFA_NO_STATISTICS__)
    323                 if(external) {
    324                         if(local) __atomic_fetch_add(&cltr->stats->ready.pick.ext.lsuccess, 1, __ATOMIC_RELAXED);
    325                         __atomic_fetch_add(&cltr->stats->ready.pick.ext.success, 1, __ATOMIC_RELAXED);
    326                 }
    327                 else {
    328                         if(local) __tls_stats()->ready.pick.push.lsuccess++;
    329                         __tls_stats()->ready.pick.push.success++;
    330                 }
     483                __tls_stats()->ready.pick.pop.success++;
    331484        #endif
    332485
    333         // return whether or not the list was empty before this push
    334         return first;
    335 }
    336 
    337 static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
    338 static struct $thread * try_pop(struct cluster * cltr, unsigned i);
    339 
    340 // Pop from the ready queue from a given cluster
    341 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
    342         /* paranoid */ verify( lanes.count > 0 );
    343         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    344         int preferred;
    345         #if defined(BIAS)
    346                 // Don't bother trying locally too much
    347                 preferred = kernelTLS().this_processor->cltr_id;
     486        #if defined(USE_WORK_STEALING)
     487                lanes.tscs[w].tv = thrd->link.ts;
    348488        #endif
    349489
    350 
    351         // As long as the list is not empty, try finding a lane that isn't empty and pop from it
    352         #ifdef USE_SNZI
    353                 while( query(snzi) ) {
    354         #else
    355                 for(25) {
    356         #endif
    357                 // Pick two lists at random
    358                 // unsigned ri = __tls_rand();
    359                 // unsigned rj = __tls_rand();
    360                 unsigned ri = __tls_rand_bck();
    361                 unsigned rj = __tls_rand_bck();
    362 
    363                 unsigned i, j;
    364                 __attribute__((unused)) bool locali, localj;
    365                 [i, locali] = idx_from_r(ri, preferred);
    366                 [j, localj] = idx_from_r(rj, preferred);
    367 
    368                 #if !defined(__CFA_NO_STATISTICS__)
    369                         if(locali && localj) {
    370                                 __tls_stats()->ready.pick.pop.local++;
    371                         }
    372                 #endif
    373 
    374                 i %= count;
    375                 j %= count;
    376 
    377                 // try popping from the 2 picked lists
    378                 struct $thread * thrd = try_pop(cltr, i, j);
    379                 if(thrd) {
    380                         #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
    381                                 if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
    382                         #endif
    383                         return thrd;
    384                 }
    385         }
    386 
    387         // All lanes where empty return 0p
    388         return 0p;
    389 }
    390 
    391 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     490        // return the popped thread
     491        return thrd;
     492}
     493
     494//-----------------------------------------------------------------------
     495// try to pop from any lanes making sure you don't miss any threads push
     496// before the start of the function
     497static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
    392498        /* paranoid */ verify( lanes.count > 0 );
    393499        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     
    405511}
    406512
    407 
    408513//-----------------------------------------------------------------------
    409 // Given 2 indexes, pick the list with the oldest push an try to pop from it
    410 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
    411         #if !defined(__CFA_NO_STATISTICS__)
    412                 __tls_stats()->ready.pick.pop.attempt++;
    413         #endif
    414 
    415         // Pick the bet list
    416         int w = i;
    417         if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
    418                 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
    419         }
    420 
    421         return try_pop(cltr, w);
    422 }
    423 
    424 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
    425         // Get relevant elements locally
    426         __intrusive_lane_t & lane = lanes.data[w];
    427 
    428         // If list looks empty retry
    429         if( is_empty(lane) ) return 0p;
    430 
    431         // If we can't get the lock retry
    432         if( !__atomic_try_acquire(&lane.lock) ) return 0p;
    433 
    434 
    435         // If list is empty, unlock and retry
    436         if( is_empty(lane) ) {
    437                 __atomic_unlock(&lane.lock);
    438                 return 0p;
    439         }
    440 
    441         // Actually pop the list
    442         struct $thread * thrd;
    443         thrd = pop(lane);
    444 
    445         /* paranoid */ verify(thrd);
    446         /* paranoid */ verify(lane.lock);
    447 
    448         #ifdef USE_SNZI
    449                 // If this was the last element in the lane
    450                 if(emptied) {
    451                         depart( snzi, w );
    452                 }
    453         #endif
    454 
    455         // Unlock and return
    456         __atomic_unlock(&lane.lock);
    457 
    458         // Update statistics
    459         #if !defined(__CFA_NO_STATISTICS__)
    460                 __tls_stats()->ready.pick.pop.success++;
    461         #endif
    462 
    463         // Update the thread bias
    464         thrd->link.preferred = w / 4;
    465 
    466         // return the popped thread
    467         return thrd;
    468 }
    469 //-----------------------------------------------------------------------
    470 
    471 bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    472         for(i; lanes.count) {
    473                 __intrusive_lane_t & lane = lanes.data[i];
    474 
    475                 bool removed = false;
    476 
    477                 __atomic_acquire(&lane.lock);
    478                         if(head(lane)->link.next == thrd) {
    479                                 $thread * pthrd;
    480                                 pthrd = pop(lane);
    481 
    482                                 /* paranoid */ verify( pthrd == thrd );
    483 
    484                                 removed = true;
    485                                 #ifdef USE_SNZI
    486                                         if(emptied) {
    487                                                 depart( snzi, i );
    488                                         }
    489                                 #endif
    490                         }
    491                 __atomic_unlock(&lane.lock);
    492 
    493                 if( removed ) return true;
    494         }
    495         return false;
    496 }
    497 
    498 //-----------------------------------------------------------------------
    499 
     514// Check that all the intrusive queues in the data structure are still consistent
    500515static void check( __ready_queue_t & q ) with (q) {
    501516        #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
     
    522537}
    523538
     539//-----------------------------------------------------------------------
     540// Given 2 indexes, pick the list with the oldest push an try to pop from it
     541static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
     542        #if !defined(__CFA_NO_STATISTICS__)
     543                __tls_stats()->ready.pick.pop.attempt++;
     544        #endif
     545
     546        // Pick the bet list
     547        int w = i;
     548        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
     549                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
     550        }
     551
     552        return try_pop(cltr, w);
     553}
     554
    524555// Call this function of the intrusive list was moved using memcpy
    525556// fixes the list so that the pointers back to anchors aren't left dangling
     
    541572}
    542573
     574static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
     575        processor * it = &list`first;
     576        for(unsigned i = 0; i < count; i++) {
     577                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
     578                it->rdq.id = value;
     579                it->rdq.target = -1u;
     580                value += READYQ_SHARD_FACTOR;
     581                it = &(*it)`next;
     582        }
     583}
     584
     585static void reassign_cltr_id(struct cluster * cltr) {
     586        unsigned preferred = 0;
     587        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
     588        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
     589}
     590
     591static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
     592        #if defined(USE_WORK_STEALING)
     593                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
     594                for(i; lanes.count) {
     595                        lanes.tscs[i].tv = ts(lanes.data[i]);
     596                }
     597        #endif
     598}
     599
    543600// Grow the ready queue
    544 unsigned ready_queue_grow(struct cluster * cltr, int target) {
    545         unsigned preferred;
     601void ready_queue_grow(struct cluster * cltr) {
    546602        size_t ncount;
     603        int target = cltr->procs.total;
    547604
    548605        /* paranoid */ verify( ready_mutate_islocked() );
     
    554611        // grow the ready queue
    555612        with( cltr->ready_queue ) {
    556                 #ifdef USE_SNZI
    557                         ^(snzi){};
    558                 #endif
    559 
    560613                // Find new count
    561614                // Make sure we always have atleast 1 list
    562615                if(target >= 2) {
    563                         ncount = target * 4;
    564                         preferred = ncount - 4;
     616                        ncount = target * READYQ_SHARD_FACTOR;
    565617                } else {
    566                         ncount = 1;
    567                         preferred = 0;
     618                        ncount = SEQUENTIAL_SHARD;
    568619                }
    569620
     
    583634                // Update original
    584635                lanes.count = ncount;
    585 
    586                 #ifdef USE_SNZI
    587                         // Re-create the snzi
    588                         snzi{ log2( lanes.count / 8 ) };
    589                         for( idx; (size_t)lanes.count ) {
    590                                 if( !is_empty(lanes.data[idx]) ) {
    591                                         arrive(snzi, idx);
    592                                 }
    593                         }
    594                 #endif
    595         }
     636        }
     637
     638        fix_times(cltr);
     639
     640        reassign_cltr_id(cltr);
    596641
    597642        // Make sure that everything is consistent
     
    601646
    602647        /* paranoid */ verify( ready_mutate_islocked() );
    603         return preferred;
    604648}
    605649
    606650// Shrink the ready queue
    607 void ready_queue_shrink(struct cluster * cltr, int target) {
     651void ready_queue_shrink(struct cluster * cltr) {
    608652        /* paranoid */ verify( ready_mutate_islocked() );
    609653        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     
    612656        /* paranoid */ check( cltr->ready_queue );
    613657
     658        int target = cltr->procs.total;
     659
    614660        with( cltr->ready_queue ) {
    615                 #ifdef USE_SNZI
    616                         ^(snzi){};
    617                 #endif
    618 
    619661                // Remember old count
    620662                size_t ocount = lanes.count;
     
    622664                // Find new count
    623665                // Make sure we always have atleast 1 list
    624                 lanes.count = target >= 2 ? target * 4: 1;
     666                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
    625667                /* paranoid */ verify( ocount >= lanes.count );
    626                 /* paranoid */ verify( lanes.count == target * 4 || target < 2 );
     668                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
    627669
    628670                // for printing count the number of displaced threads
     
    667709                        fix(lanes.data[idx]);
    668710                }
    669 
    670                 #ifdef USE_SNZI
    671                         // Re-create the snzi
    672                         snzi{ log2( lanes.count / 8 ) };
    673                         for( idx; (size_t)lanes.count ) {
    674                                 if( !is_empty(lanes.data[idx]) ) {
    675                                         arrive(snzi, idx);
    676                                 }
    677                         }
    678                 #endif
    679         }
     711        }
     712
     713        fix_times(cltr);
     714
     715        reassign_cltr_id(cltr);
    680716
    681717        // Make sure that everything is consistent
  • libcfa/src/concurrency/ready_subqueue.hfa

    r857a1c6 rc8a0210  
    246246        #endif
    247247}
     248
     249// Aligned timestamps which are used by the relaxed ready queue
     250struct __attribute__((aligned(128))) __timestamp_t {
     251        volatile unsigned long long tv;
     252};
     253
     254void  ?{}(__timestamp_t & this) { this.tv = 0; }
     255void ^?{}(__timestamp_t & this) {}
  • libcfa/src/concurrency/stats.cfa

    r857a1c6 rc8a0210  
    55#include <inttypes.h>
    66#include "bits/debug.hfa"
     7#include "bits/locks.hfa"
    78#include "stats.hfa"
    89
     
    4445                        stats->io.calls.errors.busy = 0;
    4546                        stats->io.poller.sleeps     = 0;
     47                #endif
     48
     49                #if defined(CFA_STATS_ARRAY)
     50                        stats->array.values = alloc(CFA_STATS_ARRAY);
     51                        stats->array.cnt = 0;
    4652                #endif
    4753        }
     
    151157                #endif
    152158        }
     159
     160        #if defined(CFA_STATS_ARRAY)
     161                extern "C" {
     162                        #include <stdio.h>
     163                        #include <errno.h>
     164                        #include <sys/stat.h>
     165                        #include <fcntl.h>
     166                }
     167
     168                void __flush_stat( struct __stats_t * this, const char * name, void * handle) {
     169                        int ret = mkdir(".cfadata", 0755);
     170                        if(ret < 0 && errno != EEXIST) abort("Failed to create directory .cfadata: %d\n", errno);
     171
     172                        char filename[100];
     173                        snprintf(filename, 100, ".cfadata/%s%p.data", name, handle);
     174
     175                        int fd = open(filename, O_WRONLY | O_APPEND | O_CREAT, 0644);
     176                        if(fd < 0) abort("Failed to create file %s: %d\n", filename, errno);
     177
     178                        for(i; this->array.cnt) {
     179                                char line[100];
     180                                size_t n = snprintf(line, 100, "%llu, %lld\n", this->array.values[i].ts, this->array.values[i].value);
     181                                write(fd, line, n);
     182                        }
     183
     184                        this->array.cnt = 0;
     185                        close(fd);
     186                }
     187
     188                static __spinlock_t stats_lock;
     189
     190                void __push_stat( struct __stats_t * this, int64_t value, bool external, const char * name, void * handle ) {
     191                        if(external) lock(stats_lock __cfaabi_dbg_ctx2);
     192
     193                        if( this->array.cnt >= CFA_STATS_ARRAY ) __flush_stat( this, name, handle );
     194
     195                        size_t idx = this->array.cnt;
     196                        this->array.cnt++;
     197
     198                        if(external) unlock(stats_lock);
     199
     200                        this->array.values[idx].ts = rdtscl();
     201                        this->array.values[idx].value = value;
     202                }
     203        #endif
    153204#endif
  • libcfa/src/concurrency/stats.hfa

    r857a1c6 rc8a0210  
    11#pragma once
     2
     3// #define CFA_STATS_ARRAY 10000
    24
    35#include <stdint.h>
     
    109111        #endif
    110112
     113        #if defined(CFA_STATS_ARRAY)
     114                struct __stats_elem_t {
     115                        long long int ts;
     116                        int64_t value;
     117                };
     118        #endif
     119
    111120        struct __attribute__((aligned(128))) __stats_t {
    112121                __stats_readQ_t ready;
     
    114123                        __stats_io_t    io;
    115124                #endif
     125
     126                #if defined(CFA_STATS_ARRAY)
     127                        struct {
     128                                __stats_elem_t * values;
     129                                volatile size_t cnt;
     130                        } array;
     131                #endif
     132
    116133        };
    117134
     
    119136        void __tally_stats( struct __stats_t *, struct __stats_t * );
    120137        void __print_stats( struct __stats_t *, int, const char *, const char *, void * );
     138        #if defined(CFA_STATS_ARRAY)
     139                void __push_stat ( struct __stats_t *, int64_t value, bool external, const char * name, void * handle);
     140                void __flush_stat( struct __stats_t *, const char *, void * );
     141        #else
     142                static inline void __push_stat ( struct __stats_t *, int64_t, bool, const char *, void * ) {}
     143                static inline void __flush_stat( struct __stats_t *, const char *, void * ) {}
     144        #endif
    121145#endif
    122146
  • libcfa/src/concurrency/thread.cfa

    r857a1c6 rc8a0210  
    3939        link.next = 0p;
    4040        link.prev = 0p;
    41         link.preferred = -1;
    4241        #if defined( __CFA_WITH_VERIFY__ )
    4342                canary = 0x0D15EA5E0D15EA5Ep;
     
    6261}
    6362
    64 FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t))
     63EHM_VIRTUAL_TABLE(SomeThreadCancelled, std_thread_cancelled);
    6564
    6665forall(T &)
     
    7372forall(T &)
    7473const char * msg(ThreadCancelled(T) *) {
    75         return "ThreadCancelled";
     74        return "ThreadCancelled(...)";
    7675}
    7776
    7877forall(T &)
    7978static void default_thread_cancel_handler(ThreadCancelled(T) & ) {
     79        // Improve this error message, can I do formatting?
    8080        abort( "Unhandled thread cancellation.\n" );
    8181}
    8282
    83 forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
     83static void default_thread_cancel_handler(SomeThreadCancelled & ) {
     84        // Improve this error message, can I do formatting?
     85        abort( "Unhandled thread cancellation.\n" );
     86}
     87
     88forall(T & | is_thread(T) | IS_EXCEPTION(SomeThreadCancelled))
    8489void ?{}( thread_dtor_guard_t & this,
    85                 T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) {
    86         $monitor * m = get_monitor(thrd);
     90                T & thrd, void(*cancelHandler)(SomeThreadCancelled &)) {
     91        $monitor * m = get_monitor(thrd);
    8792        $thread * desc = get_thread(thrd);
    8893
    8994        // Setup the monitor guard
    9095        void (*dtor)(T& mutex this) = ^?{};
    91         bool join = cancelHandler != (void(*)(ThreadCancelled(T)&))0;
     96        bool join = cancelHandler != (void(*)(SomeThreadCancelled&))0;
    9297        (this.mg){&m, (void(*)())dtor, join};
    9398
     
    103108        }
    104109        desc->state = Cancelled;
    105         void(*defaultResumptionHandler)(ThreadCancelled(T) &) =
     110        void(*defaultResumptionHandler)(SomeThreadCancelled &) =
    106111                join ? cancelHandler : default_thread_cancel_handler;
    107112
    108         ThreadCancelled(T) except;
    109113        // TODO: Remove explitate vtable set once trac#186 is fixed.
    110         except.virtual_table = &get_exception_vtable(&except);
     114        SomeThreadCancelled except;
     115        except.virtual_table = &std_thread_cancelled;
    111116        except.the_thread = &thrd;
    112117        except.the_exception = __cfaehm_cancellation_exception( cancellation );
    113         throwResume except;
     118        // Why is this cast required?
     119        throwResume (SomeThreadCancelled &)except;
    114120
    115121        except.the_exception->virtual_table->free( except.the_exception );
     
    158164
    159165//-----------------------------------------------------------------------------
    160 forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
     166forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(SomeThreadCancelled))
    161167T & join( T & this ) {
    162168        thread_dtor_guard_t guard = { this, defaultResumptionHandler };
  • libcfa/src/concurrency/thread.hfa

    r857a1c6 rc8a0210  
    3232};
    3333
    34 FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
     34EHM_EXCEPTION(SomeThreadCancelled) (
     35        void * the_thread;
     36        exception_t * the_exception;
     37);
     38
     39EHM_EXTERN_VTABLE(SomeThreadCancelled, std_thread_cancelled);
     40
     41EHM_FORALL_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
    3542        thread_t * the_thread;
    3643        exception_t * the_exception;
     
    7986};
    8087
    81 forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
    82 void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) );
     88forall( T & | is_thread(T) | IS_EXCEPTION(SomeThreadCancelled) )
     89void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(SomeThreadCancelled &) );
    8390void ^?{}( thread_dtor_guard_t & this );
    8491
     
    125132//----------
    126133// join
    127 forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
     134forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(SomeThreadCancelled) )
    128135T & join( T & this );
    129136
Note: See TracChangeset for help on using the changeset viewer.