Ignore:
Timestamp:
Apr 28, 2021, 4:56:50 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
8d66610
Parents:
feacef9 (diff), b7fd2db6 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
28 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/alarm.cfa

    rfeacef9 r5407cdc  
    1515
    1616#define __cforall_thread__
     17// #define __CFA_DEBUG_PRINT_PREEMPTION__
    1718
    1819#include <errno.h>
     
    107108                bool first = ! & alarms`first;
    108109
     110                __cfadbg_print_safe( preemption, " KERNEL: alarm inserting %p (%lu).\n", this, this->alarm.tn );
    109111                insert( &alarms, this );
    110112                if( first ) {
     
    114116        unlock( event_kernel->lock );
    115117        this->set = true;
    116         enable_interrupts( __cfaabi_dbg_ctx );
     118        enable_interrupts();
    117119}
    118120
     
    125127        }
    126128        unlock( event_kernel->lock );
    127         enable_interrupts( __cfaabi_dbg_ctx );
     129        enable_interrupts();
    128130        this->set = false;
    129131}
  • libcfa/src/concurrency/clib/cfathread.cfa

    rfeacef9 r5407cdc  
    1414//
    1515
     16#include "fstream.hfa"
     17#include "locks.hfa"
    1618#include "kernel.hfa"
     19#include "stats.hfa"
    1720#include "thread.hfa"
    18 
    19 thread CRunner {
    20         void (*themain)( CRunner * );
     21#include "time.hfa"
     22
     23#include "cfathread.h"
     24
     25extern void ?{}(processor &, const char[], cluster &, $thread *);
     26extern "C" {
     27      extern void __cfactx_invoke_thread(void (*main)(void *), void * this);
     28}
     29
     30//================================================================================
     31// Thread run y the C Interface
     32
     33struct cfathread_object {
     34        $thread self;
     35        void * (*themain)( void * );
     36        void * arg;
     37        void * ret;
    2138};
    22 
    23 static void ?{}( CRunner & this, void (*themain)( CRunner * ) ) {
     39void main(cfathread_object & this);
     40void ^?{}(cfathread_object & mutex this);
     41
     42static inline $thread * get_thread( cfathread_object & this ) { return &this.self; }
     43
     44typedef ThreadCancelled(cfathread_object) cfathread_exception;
     45typedef ThreadCancelled_vtable(cfathread_object) cfathread_vtable;
     46
     47void defaultResumptionHandler(ThreadCancelled(cfathread_object) & except) {
     48        abort | "A thread was cancelled";
     49}
     50
     51cfathread_vtable _cfathread_vtable_instance;
     52
     53cfathread_vtable & const _default_vtable = _cfathread_vtable_instance;
     54
     55cfathread_vtable const & get_exception_vtable(cfathread_exception *) {
     56        return _cfathread_vtable_instance;
     57}
     58
     59static void ?{}( cfathread_object & this, cluster & cl, void *(*themain)( void * ), void * arg ) {
    2460        this.themain = themain;
    25 }
    26 
    27 void main( CRunner & this ) {
    28         this.themain( &this );
    29 }
    30 
    31 processor * procs = 0p;
    32 int proc_cnt = 1;
    33 
     61        this.arg = arg;
     62        (this.self){"C-thread", cl};
     63        __thrd_start(this, main);
     64}
     65
     66void ^?{}(cfathread_object & mutex this) {
     67        ^(this.self){};
     68}
     69
     70void main( cfathread_object & this ) {
     71        __attribute__((unused)) void * const thrd_obj = (void*)&this;
     72        __attribute__((unused)) void * const thrd_hdl = (void*)active_thread();
     73        /* paranoid */ verify( thrd_obj == thrd_hdl );
     74
     75        this.ret = this.themain( this.arg );
     76}
     77
     78//================================================================================
     79// Special Init Thread responsible for the initialization or processors
     80struct __cfainit {
     81        $thread self;
     82        void (*init)( void * );
     83        void * arg;
     84};
     85void main(__cfainit & this);
     86void ^?{}(__cfainit & mutex this);
     87
     88static inline $thread * get_thread( __cfainit & this ) { return &this.self; }
     89
     90typedef ThreadCancelled(__cfainit) __cfainit_exception;
     91typedef ThreadCancelled_vtable(__cfainit) __cfainit_vtable;
     92
     93void defaultResumptionHandler(ThreadCancelled(__cfainit) & except) {
     94        abort | "The init thread was cancelled";
     95}
     96
     97__cfainit_vtable ___cfainit_vtable_instance;
     98
     99__cfainit_vtable const & get_exception_vtable(__cfainit_exception *) {
     100        return ___cfainit_vtable_instance;
     101}
     102
     103static void ?{}( __cfainit & this, void (*init)( void * ), void * arg ) {
     104        this.init = init;
     105        this.arg = arg;
     106        (this.self){"Processir Init"};
     107
     108        // Don't use __thrd_start! just prep the context manually
     109        $thread * this_thrd = get_thread(this);
     110        void (*main_p)(__cfainit &) = main;
     111
     112        disable_interrupts();
     113        __cfactx_start(main_p, get_coroutine(this), this, __cfactx_invoke_thread);
     114
     115        this_thrd->context.[SP, FP] = this_thrd->self_cor.context.[SP, FP];
     116        /* paranoid */ verify( this_thrd->context.SP );
     117
     118        this_thrd->state = Ready;
     119        enable_interrupts();
     120}
     121
     122void ^?{}(__cfainit & mutex this) {
     123        ^(this.self){};
     124}
     125
     126void main( __cfainit & this ) {
     127        __attribute__((unused)) void * const thrd_obj = (void*)&this;
     128        __attribute__((unused)) void * const thrd_hdl = (void*)active_thread();
     129        /* paranoid */ verify( thrd_obj == thrd_hdl );
     130
     131        this.init( this.arg );
     132}
     133
     134//================================================================================
     135// Main Api
    34136extern "C" {
    35         //--------------------
    36         // Basic thread management
    37         CRunner * cfathread_create( void (*main)( CRunner * ) ) {
    38                 return new( main );
    39         }
    40 
    41         void cfathread_join( CRunner * thrd ) {
    42                 delete( thrd );
     137        int cfathread_cluster_create(cfathread_cluster_t * cl) __attribute__((nonnull(1))) {
     138                *cl = new();
     139                return 0;
     140        }
     141
     142        cfathread_cluster_t cfathread_cluster_self(void) {
     143                return active_cluster();
     144        }
     145
     146        int cfathread_cluster_print_stats( cfathread_cluster_t cl ) {
     147                #if !defined(__CFA_NO_STATISTICS__)
     148                        print_stats_at_exit( *cl, CFA_STATS_READY_Q | CFA_STATS_IO );
     149                        print_stats_now( *cl, CFA_STATS_READY_Q | CFA_STATS_IO );
     150                #endif
     151                return 0;
     152        }
     153
     154        int cfathread_cluster_add_worker(cfathread_cluster_t cl, pthread_t* tid, void (*init_routine) (void *), void * arg) {
     155                __cfainit * it = 0p;
     156                if(init_routine) {
     157                        it = alloc();
     158                        (*it){init_routine, arg};
     159                }
     160                processor * proc = alloc();
     161                (*proc){ "C-processor", *cl, get_thread(*it) };
     162
     163                // Wait for the init thread to return before continuing
     164                if(it) {
     165                        ^(*it){};
     166                        free(it);
     167                }
     168
     169                if(tid) *tid = proc->kernel_thread;
     170                return 0;
     171        }
     172
     173        int cfathread_cluster_pause (cfathread_cluster_t) {
     174                abort | "Pausing clusters is not supported";
     175                exit(1);
     176        }
     177
     178        int cfathread_cluster_resume(cfathread_cluster_t) {
     179                abort | "Resuming clusters is not supported";
     180                exit(1);
     181        }
     182
     183        //--------------------
     184        // Thread attributes
     185        int cfathread_attr_init(cfathread_attr_t *attr) __attribute__((nonnull (1))) {
     186                attr->cl = active_cluster();
     187                return 0;
     188        }
     189
     190        //--------------------
     191        // Thread
     192        int cfathread_create( cfathread_t * handle, const cfathread_attr_t * attr, void *(*main)( void * ), void * arg ) __attribute__((nonnull (1))) {
     193                cluster * cl = attr ? attr->cl : active_cluster();
     194                cfathread_t thrd = alloc();
     195                (*thrd){ *cl, main, arg };
     196                *handle = thrd;
     197                return 0;
     198        }
     199
     200        int cfathread_join( cfathread_t thrd, void ** retval ) {
     201                void * ret = join( *thrd ).ret;
     202                ^( *thrd ){};
     203                free(thrd);
     204                if(retval) {
     205                        *retval = ret;
     206                }
     207                return 0;
     208        }
     209
     210        int cfathread_get_errno(void) {
     211                return errno;
     212        }
     213
     214        cfathread_t cfathread_self(void) {
     215                return (cfathread_t)active_thread();
     216        }
     217
     218        int cfathread_usleep(useconds_t usecs) {
     219                sleep(usecs`us);
     220                return 0;
     221        }
     222
     223        int cfathread_sleep(unsigned int secs) {
     224                sleep(secs`s);
     225                return 0;
    43226        }
    44227
     
    47230        }
    48231
    49         void cfathread_unpark( CRunner * thrd ) {
     232        void cfathread_unpark( cfathread_t thrd ) {
    50233                unpark( *thrd );
    51234        }
     
    55238        }
    56239
    57         //--------------------
    58         // Basic kernel features
    59         void cfathread_setproccnt( int ncnt ) {
    60                 assert( ncnt >= 1 );
    61                 adelete( procs );
    62 
    63                 proc_cnt = ncnt - 1;
    64                 procs = anew(proc_cnt);
    65         }
    66 }
     240        typedef struct cfathread_mutex * cfathread_mutex_t;
     241
     242        //--------------------
     243        // Mutex
     244        struct cfathread_mutex {
     245                fast_lock impl;
     246        };
     247        int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; }
     248        int cfathread_mutex_destroy(cfathread_mutex_t *mut) __attribute__((nonnull (1))) { delete( *mut ); return 0; }
     249        int cfathread_mutex_lock   (cfathread_mutex_t *mut) __attribute__((nonnull (1))) { lock( (*mut)->impl ); return 0; }
     250        int cfathread_mutex_unlock (cfathread_mutex_t *mut) __attribute__((nonnull (1))) { unlock( (*mut)->impl ); return 0; }
     251        int cfathread_mutex_trylock(cfathread_mutex_t *mut) __attribute__((nonnull (1))) {
     252                bool ret = try_lock( (*mut)->impl );
     253                if( ret ) return 0;
     254                else return EBUSY;
     255        }
     256
     257        //--------------------
     258        // Condition
     259        struct cfathread_condition {
     260                condition_variable(fast_lock) impl;
     261        };
     262        int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; }
     263        int cfathread_cond_signal(cfathread_cond_t *cond) __attribute__((nonnull (1)))  { notify_one( (*cond)->impl ); return 0; }
     264        int cfathread_cond_wait(cfathread_cond_t *restrict cond, cfathread_mutex_t *restrict mut) __attribute__((nonnull (1,2))) { wait( (*cond)->impl, (*mut)->impl ); return 0; }
     265        int cfathread_cond_timedwait(cfathread_cond_t *restrict cond, cfathread_mutex_t *restrict mut, const struct timespec *restrict abstime) __attribute__((nonnull (1,2,3))) {
     266                Time t = { *abstime };
     267                if( wait( (*cond)->impl, (*mut)->impl, t ) ) {
     268                        return 0;
     269                }
     270                errno = ETIMEDOUT;
     271                return ETIMEDOUT;
     272        }
     273}
     274
     275#include <iofwd.hfa>
     276
     277extern "C" {
     278        #include <unistd.h>
     279        #include <sys/types.h>
     280        #include <sys/socket.h>
     281
     282        //--------------------
     283        // IO operations
     284        int cfathread_socket(int domain, int type, int protocol) {
     285                return socket(domain, type, protocol);
     286        }
     287        int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) {
     288                return bind(socket, address, address_len);
     289        }
     290
     291        int cfathread_listen(int socket, int backlog) {
     292                return listen(socket, backlog);
     293        }
     294
     295        int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) {
     296                return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY);
     297        }
     298
     299        int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) {
     300                return cfa_connect(socket, address, address_len, CFA_IO_LAZY);
     301        }
     302
     303        int cfathread_dup(int fildes) {
     304                return dup(fildes);
     305        }
     306
     307        int cfathread_close(int fildes) {
     308                return cfa_close(fildes, CFA_IO_LAZY);
     309        }
     310
     311        ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags) {
     312                return cfa_sendmsg(socket, message, flags, CFA_IO_LAZY);
     313        }
     314
     315        ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) {
     316                // Use send rather then write for socket since it's faster
     317                return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY);
     318        }
     319
     320        ssize_t cfathread_recvfrom(int socket, void *restrict buffer, size_t length, int flags, struct sockaddr *restrict address, socklen_t *restrict address_len)  {
     321                struct iovec iov;
     322                iov.iov_base = buffer;
     323                iov.iov_len = length;
     324
     325                struct msghdr msg;
     326                msg.msg_name = address;
     327                msg.msg_namelen = address_len ? (socklen_t)*address_len : (socklen_t)0;
     328                msg.msg_iov = &iov;
     329                msg.msg_iovlen = 1;
     330                msg.msg_control = 0p;
     331                msg.msg_controllen = 0;
     332
     333                ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY);
     334
     335                if(address_len) *address_len = msg.msg_namelen;
     336                return ret;
     337        }
     338
     339        ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) {
     340                // Use recv rather then read for socket since it's faster
     341                return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY);
     342        }
     343
     344}
  • libcfa/src/concurrency/clib/cfathread.h

    rfeacef9 r5407cdc  
    1414//
    1515
    16 #include "stddef.h"
    17 #include "invoke.h"
    18 
    1916#if defined(__cforall) || defined(__cplusplus)
    2017extern "C" {
    2118#endif
     19        #include <asm/types.h>
     20        #include <errno.h>
     21        #include <unistd.h>
     22
     23
    2224        //--------------------
    2325        // Basic types
    24         struct cfathread_CRunner_t;
    25         typedef struct cfathread_CRunner_t * cfathread_t;
     26
     27        typedef struct cluster * cfathread_cluster_t;
     28
     29        int cfathread_cluster_create(cfathread_cluster_t * cluster);
     30        cfathread_cluster_t cfathread_cluster_self(void);
     31        int cfathread_cluster_print_stats(cfathread_cluster_t cluster);
     32        int cfathread_cluster_add_worker(cfathread_cluster_t cluster, pthread_t* tid, void (*init_routine) (void *), void * arg);
     33        int cfathread_cluster_pause (cfathread_cluster_t cluster);
     34        int cfathread_cluster_resume(cfathread_cluster_t cluster);
    2635
    2736        //--------------------
    28         // Basic thread support
    29         cfathread_t cfathread_create( void (*main)( cfathread_t ) );
    30         void cfathread_join( cfathread_t );
     37        // thread attribute
     38        typedef struct cfathread_attr {
     39                cfathread_cluster_t cl;
     40        } cfathread_attr_t;
     41
     42        int cfathread_attr_init(cfathread_attr_t * attr) __attribute__((nonnull (1)));
     43        static inline int cfathread_attr_destroy(cfathread_attr_t * attr) __attribute__((nonnull (1)));
     44        static inline int cfathread_attr_destroy(cfathread_attr_t * attr) { return 0; }
     45        static inline int cfathread_attr_setbackground(cfathread_attr_t * attr, int background) __attribute__((nonnull (1)));
     46        static inline int cfathread_attr_setbackground(cfathread_attr_t * attr, int background) { return 0; }
     47        static inline int cfathread_attr_setcluster(cfathread_attr_t * attr, cfathread_cluster_t cl) __attribute__((nonnull (1)));
     48        static inline int cfathread_attr_setcluster(cfathread_attr_t * attr, cfathread_cluster_t cl) { attr->cl = cl; return 0; }
     49
     50        //--------------------
     51        // thread type
     52        struct cfathread_object;
     53        typedef struct cfathread_object * cfathread_t;
     54
     55        int cfathread_create( cfathread_t * h, const cfathread_attr_t * a, void *(*main)( void * ), void * arg ) __attribute__((nonnull (1)));
     56        int cfathread_join( cfathread_t, void ** retval );
     57
     58        int cfathread_get_errno(void);
     59        cfathread_t cfathread_self(void);
     60
     61        int cfathread_usleep(useconds_t usecs);
     62        int cfathread_sleep(unsigned int secs);
    3163
    3264        void cfathread_park( void );
     
    3567
    3668        //--------------------
    37         // Basic kernel features
    38         void cfathread_setproccnt( int );
     69        // mutex and condition
     70        struct timespec;
    3971
     72        typedef struct cfathread_mutex_attr {
     73        } cfathread_mutexattr_t;
     74        typedef struct cfathread_mutex * cfathread_mutex_t;
     75        int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict attr) __attribute__((nonnull (1)));
     76        int cfathread_mutex_destroy(cfathread_mutex_t *mut) __attribute__((nonnull (1)));
     77        int cfathread_mutex_lock(cfathread_mutex_t *mut) __attribute__((nonnull (1)));
     78        int cfathread_mutex_trylock(cfathread_mutex_t *mut) __attribute__((nonnull (1)));
     79        int cfathread_mutex_unlock(cfathread_mutex_t *mut) __attribute__((nonnull (1)));
     80
     81        typedef struct cfathread_cond_attr {
     82        } cfathread_condattr_t;
     83        typedef struct cfathread_condition * cfathread_cond_t;
     84        int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict attr) __attribute__((nonnull (1)));
     85        int cfathread_cond_wait(cfathread_cond_t *restrict cond, cfathread_mutex_t *restrict mut) __attribute__((nonnull (1,2)));
     86        int cfathread_cond_timedwait(cfathread_cond_t *restrict cond, cfathread_mutex_t *restrict mut, const struct timespec *restrict abstime) __attribute__((nonnull (1,2,3)));
     87        int cfathread_cond_signal(cfathread_cond_t *cond) __attribute__((nonnull (1)));
     88
     89        //--------------------
     90        // IO operations
     91        struct sockaddr;
     92        struct msghdr;
     93        int cfathread_socket(int domain, int type, int protocol);
     94        int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len);
     95        int cfathread_listen(int socket, int backlog);
     96        int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);
     97        int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len);
     98        int cfathread_dup(int fildes);
     99        int cfathread_close(int fildes);
     100        ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags);
     101        ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte);
     102        ssize_t cfathread_recvfrom(int socket, void *restrict buffer, size_t length, int flags, struct sockaddr *restrict address, socklen_t *restrict address_len);
     103        ssize_t cfathread_read(int fildes, void *buf, size_t nbyte);
    40104
    41105#if defined(__cforall) || defined(__cplusplus)
  • libcfa/src/concurrency/coroutine.cfa

    rfeacef9 r5407cdc  
    4646
    4747//-----------------------------------------------------------------------------
    48 FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t))
    49 
    50 forall(T &)
    51 void mark_exception(CoroutineCancelled(T) *) {}
    52 
    5348forall(T &)
    5449void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src) {
     
    6560// This code should not be inlined. It is the error path on resume.
    6661forall(T & | is_coroutine(T))
    67 void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ) {
     62void __cfaehm_cancelled_coroutine(
     63                T & cor, $coroutine * desc, EHM_DEFAULT_VTABLE(CoroutineCancelled, (T)) ) {
    6864        verify( desc->cancellation );
    6965        desc->state = Cancelled;
     
    7268        // TODO: Remove explitate vtable set once trac#186 is fixed.
    7369        CoroutineCancelled(T) except;
    74         except.virtual_table = &get_exception_vtable(&except);
     70        except.virtual_table = &_default_vtable;
    7571        except.the_coroutine = &cor;
    7672        except.the_exception = except;
    77         throwResume except;
     73        // Why does this need a cast?
     74        throwResume (CoroutineCancelled(T) &)except;
    7875
    7976        except->virtual_table->free( except );
     
    148145// Part of the Public API
    149146// Not inline since only ever called once per coroutine
    150 forall(T & | is_coroutine(T))
     147forall(T & | is_coroutine(T) | { EHM_DEFAULT_VTABLE(CoroutineCancelled, (T)); })
    151148void prime(T& cor) {
    152149        $coroutine* this = get_coroutine(cor);
     
    196193
    197194void __stack_clean  ( __stack_info_t * this ) {
    198         size_t size = ((intptr_t)this->storage->base) - ((intptr_t)this->storage->limit) + sizeof(__stack_t);
    199195        void * storage = this->storage->limit;
    200196
    201197        #if CFA_COROUTINE_USE_MMAP
     198                size_t size = ((intptr_t)this->storage->base) - ((intptr_t)this->storage->limit) + sizeof(__stack_t);
    202199                storage = (void *)(((intptr_t)storage) - __page_size);
    203200                if(munmap(storage, size + __page_size) == -1) {
  • libcfa/src/concurrency/coroutine.hfa

    rfeacef9 r5407cdc  
    2222//-----------------------------------------------------------------------------
    2323// Exception thrown from resume when a coroutine stack is cancelled.
    24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
     24EHM_FORALL_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
    2525        coroutine_t * the_coroutine;
    2626        exception_t * the_exception;
     
    6060//-----------------------------------------------------------------------------
    6161// Public coroutine API
    62 forall(T & | is_coroutine(T))
     62forall(T & | is_coroutine(T) | { EHM_DEFAULT_VTABLE(CoroutineCancelled, (T)); })
    6363void prime(T & cor);
    6464
     
    130130
    131131forall(T & | is_coroutine(T))
    132 void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc );
     132void __cfaehm_cancelled_coroutine(
     133        T & cor, $coroutine * desc, EHM_DEFAULT_VTABLE(CoroutineCancelled, (T)) );
    133134
    134135// Resume implementation inlined for performance
    135 forall(T & | is_coroutine(T))
     136forall(T & | is_coroutine(T) | { EHM_DEFAULT_VTABLE(CoroutineCancelled, (T)); })
    136137static inline T & resume(T & cor) {
    137138        // optimization : read TLS once and reuse it
     
    163164        $ctx_switch( src, dst );
    164165        if ( unlikely(dst->cancellation) ) {
    165                 __cfaehm_cancelled_coroutine( cor, dst );
     166                __cfaehm_cancelled_coroutine( cor, dst, _default_vtable );
    166167        }
    167168
  • libcfa/src/concurrency/future.hfa

    rfeacef9 r5407cdc  
    3737
    3838                // Fulfil the future, returns whether or not someone was unblocked
    39                 bool fulfil( future(T) & this, T result ) {
     39                $thread * fulfil( future(T) & this, T result ) {
    4040                        this.result = result;
    4141                        return fulfil( (future_t&)this );
     
    9696                bool fulfil( multi_future(T) & this, T result ) {
    9797                        this.result = result;
    98                         return fulfil( (future_t&)this );
     98                        return fulfil( (future_t&)this ) != 0p;
    9999                }
    100100
  • libcfa/src/concurrency/invoke.c

    rfeacef9 r5407cdc  
    3434
    3535extern void disable_interrupts() OPTIONAL_THREAD;
    36 extern void enable_interrupts( __cfaabi_dbg_ctx_param );
     36extern void enable_interrupts( _Bool poll );
    3737
    3838void __cfactx_invoke_coroutine(
     
    8282) {
    8383        // Officially start the thread by enabling preemption
    84         enable_interrupts( __cfaabi_dbg_ctx );
     84        enable_interrupts( true );
    8585
    8686        // Call the main of the thread
  • libcfa/src/concurrency/invoke.h

    rfeacef9 r5407cdc  
    148148                struct $thread * prev;
    149149                volatile unsigned long long ts;
    150                 int preferred;
     150                unsigned preferred;
    151151        };
    152152
     
    200200                } node;
    201201
     202                struct processor * last_proc;
     203
    202204                #if defined( __CFA_WITH_VERIFY__ )
    203205                        void * canary;
     
    224226                }
    225227
     228                static inline $thread * volatile & ?`next ( $thread * this )  __attribute__((const)) {
     229                        return this->seqable.next;
     230                }
     231
    226232                static inline $thread *& Back( $thread * this ) __attribute__((const)) {
    227233                        return this->seqable.back;
  • libcfa/src/concurrency/io.cfa

    rfeacef9 r5407cdc  
    3232        extern "C" {
    3333                #include <sys/syscall.h>
     34                #include <sys/eventfd.h>
    3435
    3536                #include <linux/io_uring.h>
     
    3940        #include "kernel.hfa"
    4041        #include "kernel/fwd.hfa"
     42        #include "kernel_private.hfa"
    4143        #include "io/types.hfa"
    4244
     
    7981        };
    8082
    81         // returns true of acquired as leader or second leader
    82         static inline bool try_lock( __leaderlock_t & this ) {
    83                 const uintptr_t thrd = 1z | (uintptr_t)active_thread();
    84                 bool block;
    85                 disable_interrupts();
    86                 for() {
    87                         struct $thread * expected = this.value;
    88                         if( 1p != expected && 0p != expected ) {
    89                                 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
    90                                 enable_interrupts( __cfaabi_dbg_ctx );
    91                                 return false;
    92                         }
    93                         struct $thread * desired;
    94                         if( 0p == expected ) {
    95                                 // If the lock isn't locked acquire it, no need to block
    96                                 desired = 1p;
    97                                 block = false;
    98                         }
    99                         else {
    100                                 // If the lock is already locked try becomming the next leader
    101                                 desired = (struct $thread *)thrd;
    102                                 block = true;
    103                         }
    104                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    105                 }
    106                 if( block ) {
    107                         enable_interrupts( __cfaabi_dbg_ctx );
    108                         park();
    109                         disable_interrupts();
    110                 }
    111                 return true;
    112         }
    113 
    114         static inline bool next( __leaderlock_t & this ) {
     83        static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want );
     84        static void __ioarbiter_submit( $io_context * , __u32 idxs[], __u32 have, bool lazy );
     85        static void __ioarbiter_flush ( $io_context & );
     86        static inline void __ioarbiter_notify( $io_context & ctx );
     87//=============================================================================================
     88// I/O Polling
     89//=============================================================================================
     90        static inline unsigned __flush( struct $io_context & );
     91        static inline __u32 __release_sqes( struct $io_context & );
     92        extern void __kernel_unpark( $thread * thrd );
     93
     94        bool __cfa_io_drain( processor * proc ) {
    11595                /* paranoid */ verify( ! __preemption_enabled() );
    116                 struct $thread * nextt;
    117                 for() {
    118                         struct $thread * expected = this.value;
    119                         /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
    120 
    121                         struct $thread * desired;
    122                         if( 1p == expected ) {
    123                                 // No next leader, just unlock
    124                                 desired = 0p;
    125                                 nextt   = 0p;
    126                         }
    127                         else {
    128                                 // There is a next leader, remove but keep locked
    129                                 desired = 1p;
    130                                 nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
    131                         }
    132                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    133                 }
    134 
    135                 if(nextt) {
    136                         unpark( nextt );
    137                         enable_interrupts( __cfaabi_dbg_ctx );
    138                         return true;
    139                 }
    140                 enable_interrupts( __cfaabi_dbg_ctx );
    141                 return false;
    142         }
    143 
    144 //=============================================================================================
    145 // I/O Syscall
    146 //=============================================================================================
    147         static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    148                 bool need_sys_to_submit = false;
    149                 bool need_sys_to_complete = false;
    150                 unsigned flags = 0;
    151 
    152                 TO_SUBMIT:
    153                 if( to_submit > 0 ) {
    154                         if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    155                                 need_sys_to_submit = true;
    156                                 break TO_SUBMIT;
    157                         }
    158                         if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
    159                                 need_sys_to_submit = true;
    160                                 flags |= IORING_ENTER_SQ_WAKEUP;
    161                         }
    162                 }
    163 
    164                 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    165                         flags |= IORING_ENTER_GETEVENTS;
    166                         if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    167                                 need_sys_to_complete = true;
    168                         }
    169                 }
    170 
    171                 int ret = 0;
    172                 if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret);
    176 
    177                         if( ret < 0 ) {
    178                                 switch((int)errno) {
    179                                 case EAGAIN:
    180                                 case EINTR:
    181                                 case EBUSY:
    182                                         ret = -1;
    183                                         break;
    184                                 default:
    185                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    186                                 }
    187                         }
    188                 }
    189 
    190                 // Memory barrier
    191                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    192                 return ret;
    193         }
    194 
    195 //=============================================================================================
    196 // I/O Polling
    197 //=============================================================================================
    198         static unsigned __collect_submitions( struct __io_data & ring );
    199         static __u32 __release_consumed_submission( struct __io_data & ring );
    200         static inline void __clean( volatile struct io_uring_sqe * sqe );
    201 
    202         // Process a single completion message from the io_uring
    203         // This is NOT thread-safe
    204         static inline void process( volatile struct io_uring_cqe & cqe ) {
    205                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    206                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    207 
    208                 fulfil( *future, cqe.res );
    209         }
    210 
    211         static [int, bool] __drain_io( & struct __io_data ring ) {
    212                 /* paranoid */ verify( ! __preemption_enabled() );
    213 
    214                 unsigned to_submit = 0;
    215                 if( ring.poller_submits ) {
    216                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    217                         to_submit = __collect_submitions( ring );
    218                 }
    219 
    220                 int ret = __io_uring_enter(ring, to_submit, true);
    221                 if( ret < 0 ) {
    222                         return [0, true];
    223                 }
    224 
    225                 // update statistics
    226                 if (to_submit > 0) {
    227                         __STATS__( true,
    228                                 if( to_submit > 0 ) {
    229                                         io.submit_q.submit_avg.rdy += to_submit;
    230                                         io.submit_q.submit_avg.csm += ret;
    231                                         io.submit_q.submit_avg.cnt += 1;
    232                                 }
    233                         )
    234                 }
    235 
    236                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    237 
    238                 // Release the consumed SQEs
    239                 __release_consumed_submission( ring );
     96                /* paranoid */ verify( ready_schedule_islocked() );
     97                /* paranoid */ verify( proc );
     98                /* paranoid */ verify( proc->io.ctx );
    24099
    241100                // Drain the queue
    242                 unsigned head = *ring.completion_q.head;
    243                 unsigned tail = *ring.completion_q.tail;
    244                 const __u32 mask = *ring.completion_q.mask;
    245 
    246                 // Nothing was new return 0
    247                 if (head == tail) {
    248                         return [0, to_submit > 0];
    249                 }
     101                $io_context * ctx = proc->io.ctx;
     102                unsigned head = *ctx->cq.head;
     103                unsigned tail = *ctx->cq.tail;
     104                const __u32 mask = *ctx->cq.mask;
    250105
    251106                __u32 count = tail - head;
    252                 /* paranoid */ verify( count != 0 );
     107                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     108
     109                if(count == 0) return false;
     110
    253111                for(i; count) {
    254112                        unsigned idx = (head + i) & mask;
    255                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     113                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    256114
    257115                        /* paranoid */ verify(&cqe);
    258116
    259                         process( cqe );
    260                 }
     117                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     118                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     119
     120                        __kernel_unpark( fulfil( *future, cqe.res, false ) );
     121                }
     122
     123                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    261124
    262125                // Mark to the kernel that the cqe has been seen
    263126                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    264                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    265 
    266                 return [count, count > 0 || to_submit > 0];
    267         }
    268 
    269         void main( $io_ctx_thread & this ) {
    270                 __ioctx_register( this );
    271 
    272                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    273 
    274                 const int reset_cnt = 5;
    275                 int reset = reset_cnt;
    276                 // Then loop until we need to start
    277                 LOOP:
    278                 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    279                         // Drain the io
    280                         int count;
    281                         bool again;
    282                         disable_interrupts();
    283                                 [count, again] = __drain_io( *this.ring );
    284 
    285                                 if(!again) reset--;
    286 
     127                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     128
     129                /* paranoid */ verify( ready_schedule_islocked() );
     130                /* paranoid */ verify( ! __preemption_enabled() );
     131
     132                return true;
     133        }
     134
     135        void __cfa_io_flush( processor * proc ) {
     136                /* paranoid */ verify( ! __preemption_enabled() );
     137                /* paranoid */ verify( proc );
     138                /* paranoid */ verify( proc->io.ctx );
     139
     140                $io_context & ctx = *proc->io.ctx;
     141
     142                __ioarbiter_flush( ctx );
     143
     144                __STATS__( true, io.calls.flush++; )
     145                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8);
     146                if( ret < 0 ) {
     147                        switch((int)errno) {
     148                        case EAGAIN:
     149                        case EINTR:
     150                        case EBUSY:
    287151                                // Update statistics
    288                                 __STATS__( true,
    289                                         io.complete_q.completed_avg.val += count;
    290                                         io.complete_q.completed_avg.cnt += 1;
    291                                 )
    292                         enable_interrupts( __cfaabi_dbg_ctx );
    293 
    294                         // If we got something, just yield and check again
    295                         if(reset > 1) {
    296                                 yield();
    297                                 continue LOOP;
     152                                __STATS__( false, io.calls.errors.busy ++; )
     153                                return;
     154                        default:
     155                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    298156                        }
    299 
    300                         // We alread failed to find completed entries a few time.
    301                         if(reset == 1) {
    302                                 // Rearm the context so it can block
    303                                 // but don't block right away
    304                                 // we need to retry one last time in case
    305                                 // something completed *just now*
    306                                 __ioctx_prepare_block( this );
    307                                 continue LOOP;
    308                         }
    309 
    310                                 __STATS__( false,
    311                                         io.complete_q.blocks += 1;
    312                                 )
    313                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    314 
    315                                 // block this thread
    316                                 wait( this.sem );
    317 
    318                         // restore counter
    319                         reset = reset_cnt;
    320                 }
    321 
    322                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
    323 
    324                 __ioctx_unregister( this );
     157                }
     158
     159                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     160                __STATS__( true, io.calls.submitted += ret; )
     161                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     162                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     163
     164                ctx.sq.to_submit -= ret;
     165
     166                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     167
     168                // Release the consumed SQEs
     169                __release_sqes( ctx );
     170
     171                /* paranoid */ verify( ! __preemption_enabled() );
     172
     173                ctx.proc->io.pending = false;
    325174        }
    326175
     
    344193//         head and tail must be fully filled and shouldn't ever be touched again.
    345194//
     195        //=============================================================================================
     196        // Allocation
     197        // for user's convenience fill the sqes from the indexes
     198        static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx)  {
     199                struct io_uring_sqe * sqes = ctx->sq.sqes;
     200                for(i; want) {
     201                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
     202                        out_sqes[i] = &sqes[idxs[i]];
     203                }
     204        }
     205
     206        // Try to directly allocate from the a given context
     207        // Not thread-safe
     208        static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {
     209                __sub_ring_t & sq = ctx->sq;
     210                const __u32 mask  = *sq.mask;
     211                __u32 fhead = sq.free_ring.head;    // get the current head of the queue
     212                __u32 ftail = sq.free_ring.tail;    // get the current tail of the queue
     213
     214                // If we don't have enough sqes, fail
     215                if((ftail - fhead) < want) { return false; }
     216
     217                // copy all the indexes we want from the available list
     218                for(i; want) {
     219                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
     220                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
     221                }
     222
     223                // Advance the head to mark the indexes as consumed
     224                __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);
     225
     226                // return success
     227                return true;
     228        }
    346229
    347230        // Allocate an submit queue entry.
     
    350233        // for convenience, return both the index and the pointer to the sqe
    351234        // sqe == &sqes[idx]
    352         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    353                 /* paranoid */ verify( data != 0 );
    354 
    355                 // Prepare the data we need
    356                 __attribute((unused)) int len   = 0;
    357                 __attribute((unused)) int block = 0;
    358                 __u32 cnt = *ring.submit_q.num;
    359                 __u32 mask = *ring.submit_q.mask;
    360 
    361                 __u32 off = thread_rand();
    362 
    363                 // Loop around looking for an available spot
    364                 for() {
    365                         // Look through the list starting at some offset
    366                         for(i; cnt) {
    367                                 __u64 expected = 3;
    368                                 __u32 idx = (i + off) & mask; // Get an index from a random
    369                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    370                                 volatile __u64 * udata = &sqe->user_data;
    371 
    372                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    373                                 if( *udata == expected &&
    374                                         __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
    375                                 {
    376                                         // update statistics
    377                                         __STATS__( false,
    378                                                 io.submit_q.alloc_avg.val   += len;
    379                                                 io.submit_q.alloc_avg.block += block;
    380                                                 io.submit_q.alloc_avg.cnt   += 1;
    381                                         )
    382 
    383                                         // debug log
    384                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    385 
    386                                         // Success return the data
    387                                         return [sqe, idx];
    388                                 }
    389                                 verify(expected != data);
    390 
    391                                 // This one was used
    392                                 len ++;
    393                         }
    394 
    395                         block++;
    396 
    397                         yield();
    398                 }
    399         }
    400 
    401         static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
    402                 /* paranoid */ verify( idx <= mask   );
    403                 /* paranoid */ verify( idx != -1ul32 );
    404 
    405                 // We need to find a spot in the ready array
    406                 __attribute((unused)) int len   = 0;
    407                 __attribute((unused)) int block = 0;
    408                 __u32 ready_mask = ring.submit_q.ready_cnt - 1;
    409 
    410                 __u32 off = thread_rand();
    411 
    412                 __u32 picked;
    413                 LOOKING: for() {
    414                         for(i; ring.submit_q.ready_cnt) {
    415                                 picked = (i + off) & ready_mask;
    416                                 __u32 expected = -1ul32;
    417                                 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    418                                         break LOOKING;
    419                                 }
    420                                 verify(expected != idx);
    421 
    422                                 len ++;
    423                         }
    424 
    425                         block++;
    426 
    427                         __u32 released = __release_consumed_submission( ring );
    428                         if( released == 0 ) {
    429                                 yield();
    430                         }
    431                 }
    432 
    433                 // update statistics
    434                 __STATS__( false,
    435                         io.submit_q.look_avg.val   += len;
    436                         io.submit_q.look_avg.block += block;
    437                         io.submit_q.look_avg.cnt   += 1;
    438                 )
    439 
    440                 return picked;
    441         }
    442 
    443         void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    444                 __io_data & ring = *ctx->thrd.ring;
    445 
     235        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
     236                __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     237
     238                disable_interrupts();
     239                processor * proc = __cfaabi_tls.this_processor;
     240                $io_context * ctx = proc->io.ctx;
     241                /* paranoid */ verify( __cfaabi_tls.this_processor );
     242                /* paranoid */ verify( ctx );
     243
     244                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     245
     246                // We can proceed to the fast path
     247                if( __alloc(ctx, idxs, want) ) {
     248                        // Allocation was successful
     249                        __STATS__( true, io.alloc.fast += 1; )
     250                        enable_interrupts();
     251
     252                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     253
     254                        __fill( sqes, want, idxs, ctx );
     255                        return ctx;
     256                }
     257                // The fast path failed, fallback
     258                __STATS__( true, io.alloc.fail += 1; )
     259
     260                // Fast path failed, fallback on arbitration
     261                __STATS__( true, io.alloc.slow += 1; )
     262                enable_interrupts();
     263
     264                $io_arbiter * ioarb = proc->cltr->io.arbiter;
     265                /* paranoid */ verify( ioarb );
     266
     267                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     268
     269                struct $io_context * ret = __ioarbiter_allocate(*ioarb, idxs, want);
     270
     271                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     272
     273                __fill( sqes, want, idxs,ret );
     274                return ret;
     275        }
     276
     277
     278        //=============================================================================================
     279        // submission
     280        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {
     281                // We can proceed to the fast path
     282                // Get the right objects
     283                __sub_ring_t & sq = ctx->sq;
     284                const __u32 mask  = *sq.mask;
     285                __u32 tail = *sq.kring.tail;
     286
     287                // Add the sqes to the array
     288                for( i; have ) {
     289                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
     290                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
     291                }
     292
     293                // Make the sqes visible to the submitter
     294                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
     295                sq.to_submit++;
     296
     297                ctx->proc->io.pending = true;
     298                ctx->proc->io.dirty   = true;
     299                if(sq.to_submit > 30 || !lazy) {
     300                        __cfa_io_flush( ctx->proc );
     301                }
     302        }
     303
     304        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
     305                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     306
     307                disable_interrupts();
     308                processor * proc = __cfaabi_tls.this_processor;
     309                $io_context * ctx = proc->io.ctx;
     310                /* paranoid */ verify( __cfaabi_tls.this_processor );
     311                /* paranoid */ verify( ctx );
     312
     313                // Can we proceed to the fast path
     314                if( ctx == inctx )              // We have the right instance?
    446315                {
    447                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    448                         __cfadbg_print_safe( io,
    449                                 "Kernel I/O : submitting %u (%p) for %p\n"
    450                                 "    data: %p\n"
    451                                 "    opcode: %s\n"
    452                                 "    fd: %d\n"
    453                                 "    flags: %d\n"
    454                                 "    prio: %d\n"
    455                                 "    off: %p\n"
    456                                 "    addr: %p\n"
    457                                 "    len: %d\n"
    458                                 "    other flags: %d\n"
    459                                 "    splice fd: %d\n"
    460                                 "    pad[0]: %llu\n"
    461                                 "    pad[1]: %llu\n"
    462                                 "    pad[2]: %llu\n",
    463                                 idx, sqe,
    464                                 active_thread(),
    465                                 (void*)sqe->user_data,
    466                                 opcodes[sqe->opcode],
    467                                 sqe->fd,
    468                                 sqe->flags,
    469                                 sqe->ioprio,
    470                                 (void*)sqe->off,
    471                                 (void*)sqe->addr,
    472                                 sqe->len,
    473                                 sqe->accept_flags,
    474                                 sqe->splice_fd_in,
    475                                 sqe->__pad2[0],
    476                                 sqe->__pad2[1],
    477                                 sqe->__pad2[2]
    478                         );
    479                 }
    480 
    481 
    482                 // Get now the data we definetely need
    483                 volatile __u32 * const tail = ring.submit_q.tail;
    484                 const __u32 mask  = *ring.submit_q.mask;
    485 
    486                 // There are 2 submission schemes, check which one we are using
    487                 if( ring.poller_submits ) {
    488                         // If the poller thread submits, then we just need to add this to the ready array
    489                         __submit_to_ready_array( ring, idx, mask );
    490 
    491                         post( ctx->thrd.sem );
    492 
    493                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    494                 }
    495                 else if( ring.eager_submits ) {
    496                         __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask );
    497 
    498                         #if defined(LEADER_LOCK)
    499                                 if( !try_lock(ring.submit_q.submit_lock) ) {
    500                                         __STATS__( false,
    501                                                 io.submit_q.helped += 1;
    502                                         )
    503                                         return;
    504                                 }
    505                                 /* paranoid */ verify( ! __preemption_enabled() );
    506                                 __STATS__( true,
    507                                         io.submit_q.leader += 1;
    508                                 )
    509                         #else
    510                                 for() {
    511                                         yield();
    512 
    513                                         if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
    514                                                 __STATS__( false,
    515                                                         io.submit_q.leader += 1;
    516                                                 )
    517                                                 break;
    518                                         }
    519 
    520                                         // If some one else collected our index, we are done
    521                                         #warning ABA problem
    522                                         if( ring.submit_q.ready[picked] != idx ) {
    523                                                 __STATS__( false,
    524                                                         io.submit_q.helped += 1;
    525                                                 )
    526                                                 return;
    527                                         }
    528 
    529                                         __STATS__( false,
    530                                                 io.submit_q.busy += 1;
    531                                         )
    532                                 }
    533                         #endif
    534 
    535                         // We got the lock
    536                         // Collect the submissions
    537                         unsigned to_submit = __collect_submitions( ring );
    538 
    539                         // Actually submit
    540                         int ret = __io_uring_enter( ring, to_submit, false );
    541 
    542                         #if defined(LEADER_LOCK)
    543                                 /* paranoid */ verify( ! __preemption_enabled() );
    544                                 next(ring.submit_q.submit_lock);
    545                         #else
    546                                 unlock(ring.submit_q.submit_lock);
    547                         #endif
    548                         if( ret < 0 ) {
    549                                 return;
    550                         }
    551 
    552                         // Release the consumed SQEs
    553                         __release_consumed_submission( ring );
    554 
    555                         // update statistics
    556                         __STATS__( false,
    557                                 io.submit_q.submit_avg.rdy += to_submit;
    558                                 io.submit_q.submit_avg.csm += ret;
    559                                 io.submit_q.submit_avg.cnt += 1;
    560                         )
    561 
    562                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    563                 }
    564                 else
    565                 {
    566                         // get mutual exclusion
    567                         #if defined(LEADER_LOCK)
    568                                 while(!try_lock(ring.submit_q.submit_lock));
    569                         #else
    570                                 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
    571                         #endif
    572 
    573                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    574                         /* paranoid */  "index %u already reclaimed\n"
    575                         /* paranoid */  "head %u, prev %u, tail %u\n"
    576                         /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
    577                         /* paranoid */  idx,
    578                         /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
    579                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
    580                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
    581                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
    582                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
    583                         /* paranoid */ );
    584 
    585                         // Append to the list of ready entries
    586 
    587                         /* paranoid */ verify( idx <= mask );
    588                         ring.submit_q.array[ (*tail) & mask ] = idx;
    589                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    590 
    591                         // Submit however, many entries need to be submitted
    592                         int ret = __io_uring_enter( ring, 1, false );
    593                         if( ret < 0 ) {
    594                                 switch((int)errno) {
    595                                 default:
    596                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    597                                 }
    598                         }
    599 
    600                         /* paranoid */ verify(ret == 1);
    601 
    602                         // update statistics
    603                         __STATS__( false,
    604                                 io.submit_q.submit_avg.csm += 1;
    605                                 io.submit_q.submit_avg.cnt += 1;
    606                         )
    607 
    608                         {
    609                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    610                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    611                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    612 
    613                                 __cfadbg_print_safe( io,
    614                                         "Kernel I/O : last submitted is %u (%p)\n"
    615                                         "    data: %p\n"
    616                                         "    opcode: %s\n"
    617                                         "    fd: %d\n"
    618                                         "    flags: %d\n"
    619                                         "    prio: %d\n"
    620                                         "    off: %p\n"
    621                                         "    addr: %p\n"
    622                                         "    len: %d\n"
    623                                         "    other flags: %d\n"
    624                                         "    splice fd: %d\n"
    625                                         "    pad[0]: %llu\n"
    626                                         "    pad[1]: %llu\n"
    627                                         "    pad[2]: %llu\n",
    628                                         last_idx, sqe,
    629                                         (void*)sqe->user_data,
    630                                         opcodes[sqe->opcode],
    631                                         sqe->fd,
    632                                         sqe->flags,
    633                                         sqe->ioprio,
    634                                         (void*)sqe->off,
    635                                         (void*)sqe->addr,
    636                                         sqe->len,
    637                                         sqe->accept_flags,
    638                                         sqe->splice_fd_in,
    639                                         sqe->__pad2[0],
    640                                         sqe->__pad2[1],
    641                                         sqe->__pad2[2]
    642                                 );
    643                         }
    644 
    645                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    646                         // Release the consumed SQEs
    647 
    648                         __release_consumed_submission( ring );
    649                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    650 
    651                         #if defined(LEADER_LOCK)
    652                                 next(ring.submit_q.submit_lock);
    653                         #else
    654                                 unlock(ring.submit_q.submit_lock);
    655                         #endif
    656 
    657                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    658                 }
    659         }
    660 
    661         // #define PARTIAL_SUBMIT 32
    662 
    663         // go through the list of submissions in the ready array and moved them into
    664         // the ring's submit queue
    665         static unsigned __collect_submitions( struct __io_data & ring ) {
    666                 /* paranoid */ verify( ring.submit_q.ready != 0p );
    667                 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
    668 
    669                 unsigned to_submit = 0;
    670                 __u32 tail = *ring.submit_q.tail;
    671                 const __u32 mask = *ring.submit_q.mask;
    672                 #if defined(PARTIAL_SUBMIT)
    673                         #if defined(LEADER_LOCK)
    674                                 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
    675                         #endif
    676                         const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
    677                         const __u32 offset = ring.submit_q.prev_ready;
    678                         ring.submit_q.prev_ready += cnt;
    679                 #else
    680                         const __u32 cnt = ring.submit_q.ready_cnt;
    681                         const __u32 offset = 0;
    682                 #endif
    683 
    684                 // Go through the list of ready submissions
    685                 for( c; cnt ) {
    686                         __u32 i = (offset + c) % ring.submit_q.ready_cnt;
    687 
    688                         // replace any submission with the sentinel, to consume it.
    689                         __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    690 
    691                         // If it was already the sentinel, then we are done
    692                         if( idx == -1ul32 ) continue;
    693 
    694                         // If we got a real submission, append it to the list
    695                         ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    696                         to_submit++;
    697                 }
    698 
    699                 // Increment the tail based on how many we are ready to submit
    700                 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    701 
    702                 return to_submit;
    703         }
    704 
     316                        __submit(ctx, idxs, have, lazy);
     317
     318                        // Mark the instance as no longer in-use, re-enable interrupts and return
     319                        __STATS__( true, io.submit.fast += 1; )
     320                        enable_interrupts();
     321
     322                        __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     323                        return;
     324                }
     325
     326                // Fast path failed, fallback on arbitration
     327                __STATS__( true, io.submit.slow += 1; )
     328                enable_interrupts();
     329
     330                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     331
     332                __ioarbiter_submit(inctx, idxs, have, lazy);
     333        }
     334
     335        //=============================================================================================
     336        // Flushing
    705337        // Go through the ring's submit queue and release everything that has already been consumed
    706338        // by io_uring
    707         static __u32 __release_consumed_submission( struct __io_data & ring ) {
    708                 const __u32 smask = *ring.submit_q.mask;
    709 
    710                 // We need to get the lock to copy the old head and new head
    711                 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     339        // This cannot be done by multiple threads
     340        static __u32 __release_sqes( struct $io_context & ctx ) {
     341                const __u32 mask = *ctx.sq.mask;
     342
    712343                __attribute__((unused))
    713                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    714                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    715                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    716                 ring.submit_q.prev_head = chead;                // note up to were we processed
    717                 unlock(ring.submit_q.release_lock);
     344                __u32 ctail = *ctx.sq.kring.tail;    // get the current tail of the queue
     345                __u32 chead = *ctx.sq.kring.head;        // get the current head of the queue
     346                __u32 phead = ctx.sq.kring.released; // get the head the last time we were here
     347
     348                __u32 ftail = ctx.sq.free_ring.tail;  // get the current tail of the queue
    718349
    719350                // the 3 fields are organized like this diagram
     
    734365                __u32 count = chead - phead;
    735366
     367                if(count == 0) {
     368                        return 0;
     369                }
     370
    736371                // We acquired an previous-head/current-head range
    737372                // go through the range and release the sqes
    738373                for( i; count ) {
    739                         __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    740 
    741                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    742                         __clean( &ring.submit_q.sqes[ idx ] );
    743                 }
     374                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
     375                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
     376                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     377                }
     378
     379                ctx.sq.kring.released = chead;          // note up to were we processed
     380                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
     381
     382                __ioarbiter_notify(ctx);
     383
    744384                return count;
    745385        }
    746386
    747         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    748                 __clean( sqe );
    749         }
    750 
    751         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    752                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    753                 __cfaabi_dbg_debug_do(
    754                         memset(sqe, 0xde, sizeof(*sqe));
    755                         sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
    756                 );
    757 
    758                 // Mark the entry as unused
    759                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     387//=============================================================================================
     388// I/O Arbiter
     389//=============================================================================================
     390        static inline void block(__outstanding_io_queue & queue, __outstanding_io & item) {
     391                // Lock the list, it's not thread safe
     392                lock( queue.lock __cfaabi_dbg_ctx2 );
     393                {
     394                        // Add our request to the list
     395                        add( queue.queue, item );
     396
     397                        // Mark as pending
     398                        __atomic_store_n( &queue.empty, false, __ATOMIC_SEQ_CST );
     399                }
     400                unlock( queue.lock );
     401
     402                wait( item.sem );
     403        }
     404
     405        static inline bool empty(__outstanding_io_queue & queue ) {
     406                return __atomic_load_n( &queue.empty, __ATOMIC_SEQ_CST);
     407        }
     408
     409        static $io_context * __ioarbiter_allocate( $io_arbiter & this, __u32 idxs[], __u32 want ) {
     410                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     411
     412                __STATS__( false, io.alloc.block += 1; )
     413
     414                // No one has any resources left, wait for something to finish
     415                // We need to add ourself to a list of pending allocs and wait for an answer
     416                __pending_alloc pa;
     417                pa.idxs = idxs;
     418                pa.want = want;
     419
     420                block(this.pending, (__outstanding_io&)pa);
     421
     422                return pa.ctx;
     423
     424        }
     425
     426        static void __ioarbiter_notify( $io_arbiter & this, $io_context * ctx ) {
     427                /* paranoid */ verify( !empty(this.pending.queue) );
     428
     429                lock( this.pending.lock __cfaabi_dbg_ctx2 );
     430                {
     431                        while( !empty(this.pending.queue) ) {
     432                                __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     433                                __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     434                                __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue );
     435
     436                                if( have > pa.want ) goto DONE;
     437                                drop( this.pending.queue );
     438
     439                                /* paranoid */__attribute__((unused)) bool ret =
     440
     441                                __alloc(ctx, pa.idxs, pa.want);
     442
     443                                /* paranoid */ verify( ret );
     444
     445                                pa.ctx = ctx;
     446
     447                                post( pa.sem );
     448                        }
     449
     450                        this.pending.empty = true;
     451                        DONE:;
     452                }
     453                unlock( this.pending.lock );
     454        }
     455
     456        static void __ioarbiter_notify( $io_context & ctx ) {
     457                if(!empty( ctx.arbiter->pending )) {
     458                        __ioarbiter_notify( *ctx.arbiter, &ctx );
     459                }
     460        }
     461
     462        // Simply append to the pending
     463        static void __ioarbiter_submit( $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
     464                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     465
     466                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
     467
     468                __external_io ei;
     469                ei.idxs = idxs;
     470                ei.have = have;
     471                ei.lazy = lazy;
     472
     473                block(ctx->ext_sq, (__outstanding_io&)ei);
     474
     475                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     476        }
     477
     478        static void __ioarbiter_flush( $io_context & ctx ) {
     479                if(!empty( ctx.ext_sq )) {
     480                        __STATS__( false, io.flush.external += 1; )
     481
     482                        __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     483
     484                        lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
     485                        {
     486                                while( !empty(ctx.ext_sq.queue) ) {
     487                                        __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
     488
     489                                        __submit(&ctx, ei.idxs, ei.have, ei.lazy);
     490
     491                                        post( ei.sem );
     492                                }
     493
     494                                ctx.ext_sq.empty = true;
     495                        }
     496                        unlock(ctx.ext_sq.lock );
     497                }
    760498        }
    761499#endif
  • libcfa/src/concurrency/io/call.cfa.in

    rfeacef9 r5407cdc  
    5454                        | IOSQE_IO_DRAIN
    5555                #endif
     56                #if defined(CFA_HAVE_IOSQE_IO_LINK)
     57                        | IOSQE_IO_LINK
     58                #endif
     59                #if defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     60                        | IOSQE_IO_HARDLINK
     61                #endif
    5662                #if defined(CFA_HAVE_IOSQE_ASYNC)
    5763                        | IOSQE_ASYNC
    5864                #endif
    59         ;
    60 
    61         static const __u32 LINK_FLAGS = 0
    62                 #if defined(CFA_HAVE_IOSQE_IO_LINK)
    63                         | IOSQE_IO_LINK
    64                 #endif
    65                 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK)
    66                         | IOSQE_IO_HARDLINK
     65                #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED)
     66                        | IOSQE_BUFFER_SELECTED
    6767                #endif
    6868        ;
     
    7474        ;
    7575
    76         extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
    77         extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
    78 
    79         static inline io_context * __get_io_context( void ) {
    80                 cluster * cltr = active_cluster();
    81 
    82                 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n");
    83                 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr );
    84 
    85                 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr);
    86                 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ];
    87         }
     76        extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
     77        extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    8878#endif
    8979
     
    9888
    9989extern "C" {
    100         #include <sys/types.h>
     90        #include <asm/types.h>
    10191        #include <sys/socket.h>
    10292        #include <sys/syscall.h>
     
    142132        extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    143133
    144         extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
     134        extern ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags);
    145135        extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags);
    146136}
     
    195185                return ', '.join(args_a)
    196186
    197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{
     187AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{
    198188        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op})
    199189                ssize_t res = {name}({args});
     
    205195                }}
    206196        #else
    207                 // we don't support LINK yet
    208                 if( 0 != (submit_flags & LINK_FLAGS) ) {{
    209                         errno = ENOTSUP; return -1;
    210                 }}
    211 
    212                 if( !context ) {{
    213                         context = __get_io_context();
    214                 }}
    215                 if(cancellation) {{
    216                         cancellation->target = (__u64)(uintptr_t)&future;
    217                 }}
    218 
    219197                __u8 sflags = REGULAR_FLAGS & submit_flags;
    220                 struct __io_data & ring = *context->thrd.ring;
    221 
    222198                __u32 idx;
    223199                struct io_uring_sqe * sqe;
    224                 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     200                struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 );
    225201
    226202                sqe->opcode = IORING_OP_{op};
     203                sqe->user_data = (uintptr_t)&future;
    227204                sqe->flags = sflags;
    228205                sqe->ioprio = 0;
     
    238215                asm volatile("": : :"memory");
    239216
    240                 verify( sqe->user_data == (__u64)(uintptr_t)&future );
    241                 __submit( context, idx );
     217                verify( sqe->user_data == (uintptr_t)&future );
     218                cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) );
    242219        #endif
    243220}}"""
    244221
    245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{
    246         if( timeout >= 0 ) {{
    247                 errno = ENOTSUP;
    248                 return -1;
    249         }}
     222SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{
    250223        io_future_t future;
    251224
    252         async_{name}( future, {args}, submit_flags, cancellation, context );
     225        async_{name}( future, {args}, submit_flags );
    253226
    254227        wait( future );
     
    265238                'fd'  : 'fd',
    266239                'off' : 'offset',
    267                 'addr': '(__u64)iov',
     240                'addr': '(uintptr_t)iov',
    268241                'len' : 'iovcnt',
    269242        }, define = 'CFA_HAVE_PREADV2'),
     
    272245                'fd'  : 'fd',
    273246                'off' : 'offset',
    274                 'addr': '(__u64)iov',
     247                'addr': '(uintptr_t)iov',
    275248                'len' : 'iovcnt'
    276249        }, define = 'CFA_HAVE_PWRITEV2'),
     
    284257                'addr': 'fd',
    285258                'len': 'op',
    286                 'off': '(__u64)event'
     259                'off': '(uintptr_t)event'
    287260        }),
    288261        # CFA_HAVE_IORING_OP_SYNC_FILE_RANGE
     
    296269        Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)', {
    297270                'fd': 'sockfd',
    298                 'addr': '(__u64)(struct msghdr *)msg',
     271                'addr': '(uintptr_t)(struct msghdr *)msg',
    299272                'len': '1',
    300273                'msg_flags': 'flags'
     
    303276        Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)', {
    304277                'fd': 'sockfd',
    305                 'addr': '(__u64)(struct msghdr *)msg',
     278                'addr': '(uintptr_t)(struct msghdr *)msg',
    306279                'len': '1',
    307280                'msg_flags': 'flags'
     
    310283        Call('SEND', 'ssize_t send(int sockfd, const void *buf, size_t len, int flags)', {
    311284                'fd': 'sockfd',
    312                 'addr': '(__u64)buf',
     285                'addr': '(uintptr_t)buf',
    313286                'len': 'len',
    314287                'msg_flags': 'flags'
     
    317290        Call('RECV', 'ssize_t recv(int sockfd, void *buf, size_t len, int flags)', {
    318291                'fd': 'sockfd',
    319                 'addr': '(__u64)buf',
     292                'addr': '(uintptr_t)buf',
    320293                'len': 'len',
    321294                'msg_flags': 'flags'
     
    324297        Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    325298                'fd': 'sockfd',
    326                 'addr': '(__u64)addr',
    327                 'addr2': '(__u64)addrlen',
     299                'addr': '(uintptr_t)addr',
     300                'addr2': '(uintptr_t)addrlen',
    328301                'accept_flags': 'flags'
    329302        }),
     
    331304        Call('CONNECT', 'int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)', {
    332305                'fd': 'sockfd',
    333                 'addr': '(__u64)addr',
     306                'addr': '(uintptr_t)addr',
    334307                'off': 'addrlen'
    335308        }),
     
    337310        Call('FALLOCATE', 'int fallocate(int fd, int mode, off_t offset, off_t len)', {
    338311                'fd': 'fd',
    339                 'addr': '(__u64)len',
     312                'addr': '(uintptr_t)len',
    340313                'len': 'mode',
    341314                'off': 'offset'
     
    350323        # CFA_HAVE_IORING_OP_MADVISE
    351324        Call('MADVISE', 'int madvise(void *addr, size_t length, int advice)', {
    352                 'addr': '(__u64)addr',
     325                'addr': '(uintptr_t)addr',
    353326                'len': 'length',
    354327                'fadvise_advice': 'advice'
     
    357330        Call('OPENAT', 'int openat(int dirfd, const char *pathname, int flags, mode_t mode)', {
    358331                'fd': 'dirfd',
    359                 'addr': '(__u64)pathname',
     332                'addr': '(uintptr_t)pathname',
    360333                'len': 'mode',
    361334                'open_flags': 'flags;'
     
    366339                'addr': 'pathname',
    367340                'len': 'sizeof(*how)',
    368                 'off': '(__u64)how',
     341                'off': '(uintptr_t)how',
    369342        }, define = 'CFA_HAVE_OPENAT2'),
    370343        # CFA_HAVE_IORING_OP_CLOSE
     
    375348        Call('STATX', 'int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf)', {
    376349                'fd': 'dirfd',
    377                 'off': '(__u64)statxbuf',
     350                'off': '(uintptr_t)statxbuf',
    378351                'addr': 'pathname',
    379352                'len': 'mask',
     
    383356        Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', {
    384357                'fd': 'fd',
    385                 'addr': '(__u64)buf',
     358                'addr': '(uintptr_t)buf',
    386359                'len': 'count'
    387360        }),
     
    389362        Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', {
    390363                'fd': 'fd',
    391                 'addr': '(__u64)buf',
     364                'addr': '(uintptr_t)buf',
    392365                'len': 'count'
    393366        }),
    394367        # CFA_HAVE_IORING_OP_SPLICE
    395         Call('SPLICE', 'ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags)', {
     368        Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', {
    396369                'splice_fd_in': 'fd_in',
    397370                'splice_off_in': 'off_in ? (__u64)*off_in : (__u64)-1',
     
    415388        if c.define:
    416389                print("""#if defined({define})
    417         {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     390        {ret} cfa_{name}({params}, __u64 submit_flags);
    418391#endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params))
    419392        else:
    420                 print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);"
     393                print("{ret} cfa_{name}({params}, __u64 submit_flags);"
    421394                .format(ret=c.ret, name=c.name, params=c.params))
    422395
     
    426399        if c.define:
    427400                print("""#if defined({define})
    428         void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);
     401        void async_{name}(io_future_t & future, {params}, __u64 submit_flags);
    429402#endif""".format(define=c.define,name=c.name, params=c.params))
    430403        else:
    431                 print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);"
     404                print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"
    432405                .format(name=c.name, params=c.params))
    433406print("\n")
     
    474447
    475448print("""
    476 //-----------------------------------------------------------------------------
    477 bool cancel(io_cancellation & this) {
    478         #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
    479                 return false;
    480         #else
    481                 io_future_t future;
    482 
    483                 io_context * context = __get_io_context();
    484 
    485                 __u8 sflags = 0;
    486                 struct __io_data & ring = *context->thrd.ring;
    487 
    488                 __u32 idx;
    489                 volatile struct io_uring_sqe * sqe;
    490                 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    491 
    492                 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
    493                 sqe->opcode = IORING_OP_ASYNC_CANCEL;
    494                 sqe->flags = sflags;
    495                 sqe->addr = this.target;
    496 
    497                 verify( sqe->user_data == (__u64)(uintptr_t)&future );
    498                 __submit( context, idx );
    499 
    500                 wait(future);
    501 
    502                 if( future.result == 0 ) return true; // Entry found
    503                 if( future.result == -EALREADY) return true; // Entry found but in progress
    504                 if( future.result == -ENOENT ) return false; // Entry not found
    505                 return false;
    506         #endif
    507 }
    508 
    509449//-----------------------------------------------------------------------------
    510450// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    rfeacef9 r5407cdc  
    2626
    2727#if !defined(CFA_HAVE_LINUX_IO_URING_H)
    28         void __kernel_io_startup() {
    29                 // Nothing to do without io_uring
    30         }
    31 
    32         void __kernel_io_shutdown() {
    33                 // Nothing to do without io_uring
    34         }
    35 
    3628        void ?{}(io_context_params & this) {}
    3729
    38         void ?{}(io_context & this, struct cluster & cl) {}
    39         void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
    40 
    41         void ^?{}(io_context & this) {}
    42         void ^?{}(io_context & this, bool cluster_context) {}
    43 
    44         void register_fixed_files( io_context &, int *, unsigned ) {}
    45         void register_fixed_files( cluster    &, int *, unsigned ) {}
     30        void  ?{}($io_context & this, struct cluster & cl) {}
     31        void ^?{}($io_context & this) {}
     32
     33        void __cfa_io_start( processor * proc ) {}
     34        void __cfa_io_flush( processor * proc ) {}
     35        void __cfa_io_stop ( processor * proc ) {}
     36
     37        $io_arbiter * create(void) { return 0p; }
     38        void destroy($io_arbiter *) {}
    4639
    4740#else
     
    6861        void ?{}(io_context_params & this) {
    6962                this.num_entries = 256;
    70                 this.num_ready = 256;
    71                 this.submit_aff = -1;
    72                 this.eager_submits = false;
    73                 this.poller_submits = false;
    74                 this.poll_submit = false;
    75                 this.poll_complete = false;
    7663        }
    7764
     
    10693
    10794//=============================================================================================
    108 // I/O Startup / Shutdown logic + Master Poller
    109 //=============================================================================================
    110 
    111         // IO Master poller loop forward
    112         static void * iopoll_loop( __attribute__((unused)) void * args );
    113 
    114         static struct {
    115                       pthread_t  thrd;    // pthread handle to io poller thread
    116                       void *     stack;   // pthread stack for io poller thread
    117                       int        epollfd; // file descriptor to the epoll instance
    118                 volatile     bool run;     // Whether or not to continue
    119                 volatile     bool stopped; // Whether the poller has finished running
    120                 volatile uint64_t epoch;   // Epoch used for memory reclamation
    121         } iopoll;
    122 
    123         void __kernel_io_startup(void) {
    124                 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
    125 
    126                 iopoll.epollfd = epoll_create1(0);
    127                 if (iopoll.epollfd == -1) {
    128                         abort( "internal error, epoll_create1\n");
    129                 }
    130 
    131                 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
    132 
    133                 iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
    134                 iopoll.run     = true;
    135                 iopoll.stopped = false;
    136                 iopoll.epoch   = 0;
    137         }
    138 
    139         void __kernel_io_shutdown(void) {
    140                 // Notify the io poller thread of the shutdown
    141                 iopoll.run = false;
    142                 sigval val = { 1 };
    143                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    144 
    145                 // Wait for the io poller thread to finish
    146 
    147                 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
    148 
    149                 int ret = close(iopoll.epollfd);
    150                 if (ret == -1) {
    151                         abort( "internal error, close epoll\n");
    152                 }
    153 
    154                 // Io polling is now fully stopped
    155 
    156                 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
    157         }
    158 
    159         static void * iopoll_loop( __attribute__((unused)) void * args ) {
    160                 __processor_id_t id;
    161                 id.full_proc = false;
    162                 id.id = doregister(&id);
    163                 __cfaabi_tls.this_proc_id = &id;
    164                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
    165 
    166                 // Block signals to control when they arrive
    167                 sigset_t mask;
    168                 sigfillset(&mask);
    169                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    170                 abort( "internal error, pthread_sigmask" );
    171                 }
    172 
    173                 sigdelset( &mask, SIGUSR1 );
    174 
    175                 // Create sufficient events
    176                 struct epoll_event events[10];
    177                 // Main loop
    178                 while( iopoll.run ) {
    179                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    180 
    181                         // increment the epoch to notify any deleters we are starting a new cycle
    182                         __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
    183 
    184                         // Wait for events
    185                         int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    186 
    187                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    188 
    189                         // Check if an error occured
    190                         if (nfds == -1) {
    191                                 if( errno == EINTR ) continue;
    192                                 abort( "internal error, pthread_sigmask" );
    193                         }
    194 
    195                         for(i; nfds) {
    196                                 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    197                                 /* paranoid */ verify( io_ctx );
    198                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
    199                                 #if !defined( __CFA_NO_STATISTICS__ )
    200                                         __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    201                                 #endif
    202 
    203                                 eventfd_t v;
    204                                 eventfd_read(io_ctx->ring->efd, &v);
    205 
    206                                 post( io_ctx->sem );
    207                         }
    208                 }
    209 
    210                 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
    211 
    212                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
    213                 unregister(&id);
    214                 return 0p;
    215         }
    216 
    217 //=============================================================================================
    21895// I/O Context Constrution/Destruction
    21996//=============================================================================================
    22097
    221         void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
    222         void main( $io_ctx_thread & this );
    223         static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
    224         void ^?{}( $io_ctx_thread & mutex this ) {}
    225 
    226         static void __io_create ( __io_data & this, const io_context_params & params_in );
    227         static void __io_destroy( __io_data & this );
    228 
    229         void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
    230                 (this.thrd){ cl };
    231                 this.thrd.ring = malloc();
    232                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
    233                 __io_create( *this.thrd.ring, params );
    234 
    235                 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
    236                 this.thrd.done = false;
    237                 __thrd_start( this.thrd, main );
    238 
    239                 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
    240         }
    241 
    242         void ?{}(io_context & this, struct cluster & cl) {
    243                 io_context_params params;
    244                 (this){ cl, params };
    245         }
    246 
    247         void ^?{}(io_context & this, bool cluster_context) {
    248                 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
    249 
    250                 // Notify the thread of the shutdown
    251                 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
    252 
    253                 // If this is an io_context within a cluster, things get trickier
    254                 $thread & thrd = this.thrd.self;
    255                 if( cluster_context ) {
    256                         // We are about to do weird things with the threads
    257                         // we don't need interrupts to complicate everything
    258                         disable_interrupts();
    259 
    260                         // Get cluster info
    261                         cluster & cltr = *thrd.curr_cluster;
    262                         /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
    263                         /* paranoid */ verify( !ready_mutate_islocked() );
    264 
    265                         // We need to adjust the clean-up based on where the thread is
    266                         if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    267                                 // This is the tricky case
    268                                 // The thread was preempted or ready to run and now it is on the ready queue
    269                                 // but the cluster is shutting down, so there aren't any processors to run the ready queue
    270                                 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    271 
    272                                 ready_schedule_lock();
    273                                         // The thread should on the list
    274                                         /* paranoid */ verify( thrd.link.next != 0p );
    275 
    276                                         // Remove the thread from the ready queue of this cluster
    277                                         // The thread should be the last on the list
    278                                         __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    279                                         /* paranoid */ verify( removed );
    280                                         thrd.link.next = 0p;
    281                                         thrd.link.prev = 0p;
    282 
    283                                         // Fixup the thread state
    284                                         thrd.state = Blocked;
    285                                         thrd.ticket = TICKET_BLOCKED;
    286                                         thrd.preempted = __NO_PREEMPTION;
    287 
    288                                 ready_schedule_unlock();
    289 
    290                                 // Pretend like the thread was blocked all along
    291                         }
    292                         // !!! This is not an else if !!!
    293                         // Ok, now the thread is blocked (whether we cheated to get here or not)
    294                         if( thrd.state == Blocked ) {
    295                                 // This is the "easy case"
    296                                 // The thread is parked and can easily be moved to active cluster
    297                                 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
    298                                 thrd.curr_cluster = active_cluster();
    299 
    300                                 // unpark the fast io_poller
    301                                 unpark( &thrd );
    302                         }
    303                         else {
    304                                 // The thread is in a weird state
    305                                 // I don't know what to do here
    306                                 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    307                         }
    308 
    309                         // The weird thread kidnapping stuff is over, restore interrupts.
    310                         enable_interrupts( __cfaabi_dbg_ctx );
    311                 } else {
    312                         post( this.thrd.sem );
    313                 }
    314 
    315                 ^(this.thrd){};
    316                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
    317 
    318                 __io_destroy( *this.thrd.ring );
    319                 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
    320 
    321                 free(this.thrd.ring);
    322         }
    323 
    324         void ^?{}(io_context & this) {
    325                 ^(this){ false };
     98
     99
     100        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd );
     101        static void __io_uring_teardown( $io_context & this );
     102        static void __epoll_register($io_context & ctx);
     103        static void __epoll_unregister($io_context & ctx);
     104        void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
     105        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
     106
     107        void ?{}($io_context & this, processor * proc, struct cluster & cl) {
     108                /* paranoid */ verify( cl.io.arbiter );
     109                this.proc = proc;
     110                this.arbiter = cl.io.arbiter;
     111                this.ext_sq.empty = true;
     112                (this.ext_sq.queue){};
     113                __io_uring_setup( this, cl.io.params, proc->idle );
     114                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
     115        }
     116
     117        void ^?{}($io_context & this) {
     118                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
     119
     120                __io_uring_teardown( this );
     121                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
    326122        }
    327123
     
    329125        extern void __enable_interrupts_hard();
    330126
    331         static void __io_create( __io_data & this, const io_context_params & params_in ) {
     127        static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) {
    332128                // Step 1 : call to setup
    333129                struct io_uring_params params;
    334130                memset(&params, 0, sizeof(params));
    335                 if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
    336                 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
     131                // if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
     132                // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
    337133
    338134                __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
     
    340136                        abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
    341137                }
    342                 if( params_in.poller_submits && params_in.eager_submits ) {
    343                         abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");
    344                 }
    345138
    346139                int fd = syscall(__NR_io_uring_setup, nentries, &params );
     
    350143
    351144                // Step 2 : mmap result
    352                 memset( &this, 0, sizeof(struct __io_data) );
    353                 struct __submition_data  & sq = this.submit_q;
    354                 struct __completion_data & cq = this.completion_q;
     145                struct __sub_ring_t & sq = this.sq;
     146                struct __cmp_ring_t & cq = this.cq;
    355147
    356148                // calculate the right ring size
     
    401193                // Get the pointers from the kernel to fill the structure
    402194                // submit queue
    403                 sq.head    = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    404                 sq.tail    = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    405                 sq.mask    = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    406                 sq.num     = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    407                 sq.flags   = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    408                 sq.dropped = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    409                 sq.array   = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    410                 sq.prev_head = *sq.head;
    411 
    412                 {
    413                         const __u32 num = *sq.num;
    414                         for( i; num ) {
    415                                 __sqe_clean( &sq.sqes[i] );
    416                         }
    417                 }
    418 
    419                 (sq.submit_lock){};
    420                 (sq.release_lock){};
    421 
    422                 if( params_in.poller_submits || params_in.eager_submits ) {
    423                         /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
    424                         sq.ready_cnt = max( params_in.num_ready, 8 );
    425                         sq.ready = alloc( sq.ready_cnt, 64`align );
    426                         for(i; sq.ready_cnt) {
    427                                 sq.ready[i] = -1ul32;
    428                         }
    429                         sq.prev_ready = 0;
    430                 }
    431                 else {
    432                         sq.ready_cnt = 0;
    433                         sq.ready = 0p;
    434                         sq.prev_ready = 0;
    435                 }
     195                sq.kring.head  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
     196                sq.kring.tail  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
     197                sq.kring.array = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
     198                sq.mask        = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
     199                sq.num         = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
     200                sq.flags       = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
     201                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
     202
     203                sq.kring.released = 0;
     204
     205                sq.free_ring.head = 0;
     206                sq.free_ring.tail = *sq.num;
     207                sq.free_ring.array = alloc( *sq.num, 128`align );
     208                for(i; (__u32)*sq.num) {
     209                        sq.free_ring.array[i] = i;
     210                }
     211
     212                sq.to_submit = 0;
    436213
    437214                // completion queue
     
    446223                // io_uring_register is so f*cking slow on some machine that it
    447224                // will never succeed if preemption isn't hard blocked
     225                __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);
     226
    448227                __disable_interrupts_hard();
    449228
    450                 int efd = eventfd(0, 0);
    451                 if (efd < 0) {
    452                         abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    453                 }
    454 
    455                 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
     229                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1);
    456230                if (ret < 0) {
    457231                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     
    459233
    460234                __enable_interrupts_hard();
     235
     236                __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);
    461237
    462238                // some paranoid checks
     
    468244                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
    469245                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
    470                 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
    471                 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
     246                /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
     247                /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
    472248
    473249                // Update the global ring info
    474                 this.ring_flags = params.flags;
     250                this.ring_flags = 0;
    475251                this.fd         = fd;
    476                 this.efd        = efd;
    477                 this.eager_submits  = params_in.eager_submits;
    478                 this.poller_submits = params_in.poller_submits;
    479         }
    480 
    481         static void __io_destroy( __io_data & this ) {
     252        }
     253
     254        static void __io_uring_teardown( $io_context & this ) {
    482255                // Shutdown the io rings
    483                 struct __submition_data  & sq = this.submit_q;
    484                 struct __completion_data & cq = this.completion_q;
     256                struct __sub_ring_t & sq = this.sq;
     257                struct __cmp_ring_t & cq = this.cq;
    485258
    486259                // unmap the submit queue entries
     
    497270                // close the file descriptor
    498271                close(this.fd);
    499                 close(this.efd);
    500 
    501                 free( this.submit_q.ready ); // Maybe null, doesn't matter
     272
     273                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
     274        }
     275
     276        void __cfa_io_start( processor * proc ) {
     277                proc->io.ctx = alloc();
     278                (*proc->io.ctx){proc, *proc->cltr};
     279        }
     280        void __cfa_io_stop ( processor * proc ) {
     281                ^(*proc->io.ctx){};
     282                free(proc->io.ctx);
    502283        }
    503284
     
    505286// I/O Context Sleep
    506287//=============================================================================================
    507         static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
    508                 struct epoll_event ev;
    509                 ev.events = EPOLLIN | EPOLLONESHOT;
    510                 ev.data.u64 = (__u64)&ctx;
    511                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
    512                 if (ret < 0) {
    513                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    514                 }
    515         }
    516 
    517         void __ioctx_register($io_ctx_thread & ctx) {
    518                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    519         }
    520 
    521         void __ioctx_prepare_block($io_ctx_thread & ctx) {
    522                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
    523                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    524         }
    525 
    526         void __ioctx_unregister($io_ctx_thread & ctx) {
    527                 // Read the current epoch so we know when to stop
    528                 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
    529 
    530                 // Remove the fd from the iopoller
    531                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
    532 
    533                 // Notify the io poller thread of the shutdown
    534                 iopoll.run = false;
    535                 sigval val = { 1 };
    536                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    537 
    538                 // Make sure all this is done
    539                 __atomic_thread_fence(__ATOMIC_SEQ_CST);
    540 
    541                 // Wait for the next epoch
    542                 while(curr == iopoll.epoch && !iopoll.stopped) Pause();
    543         }
     288        // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
     289        //      struct epoll_event ev;
     290        //      ev.events = EPOLLIN | EPOLLONESHOT;
     291        //      ev.data.u64 = (__u64)&ctx;
     292        //      int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
     293        //      if (ret < 0) {
     294        //              abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     295        //      }
     296        // }
     297
     298        // static void __epoll_register($io_context & ctx) {
     299        //      __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     300        // }
     301
     302        // static void __epoll_unregister($io_context & ctx) {
     303        //      // Read the current epoch so we know when to stop
     304        //      size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
     305
     306        //      // Remove the fd from the iopoller
     307        //      __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
     308
     309        //      // Notify the io poller thread of the shutdown
     310        //      iopoll.run = false;
     311        //      sigval val = { 1 };
     312        //      pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     313
     314        //      // Make sure all this is done
     315        //      __atomic_thread_fence(__ATOMIC_SEQ_CST);
     316
     317        //      // Wait for the next epoch
     318        //      while(curr == iopoll.epoch && !iopoll.stopped) Pause();
     319        // }
     320
     321        // void __ioctx_prepare_block($io_context & ctx) {
     322        //      __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
     323        //      __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     324        // }
     325
    544326
    545327//=============================================================================================
    546328// I/O Context Misc Setup
    547329//=============================================================================================
    548         void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
    549                 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
    550                 if( ret < 0 ) {
    551                         abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) );
    552                 }
    553 
    554                 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
    555         }
    556 
    557         void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
    558                 for(i; cltr.io.cnt) {
    559                         register_fixed_files( cltr.io.ctxs[i], files, count );
    560                 }
    561         }
     330        void ?{}( $io_arbiter & this ) {
     331                this.pending.empty = true;
     332        }
     333
     334        void ^?{}( $io_arbiter & this ) {}
     335
     336        $io_arbiter * create(void) {
     337                return new();
     338        }
     339        void destroy($io_arbiter * arbiter) {
     340                delete(arbiter);
     341        }
     342
     343//=============================================================================================
     344// I/O Context Misc Setup
     345//=============================================================================================
     346
    562347#endif
  • libcfa/src/concurrency/io/types.hfa

    rfeacef9 r5407cdc  
    2222
    2323#include "bits/locks.hfa"
     24#include "bits/queue.hfa"
    2425#include "kernel/fwd.hfa"
    2526
    2627#if defined(CFA_HAVE_LINUX_IO_URING_H)
    27         #define LEADER_LOCK
    28         struct __leaderlock_t {
    29                 struct $thread * volatile value;        // ($thread) next_leader | (bool:1) is_locked
    30         };
     28        #include "bits/sequence.hfa"
     29        #include "monitor.hfa"
    3130
    32         static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; }
     31        struct processor;
     32        monitor $io_arbiter;
    3333
    3434        //-----------------------------------------------------------------------
    3535        // Ring Data structure
    36       struct __submition_data {
    37                 // Head and tail of the ring (associated with array)
    38                 volatile __u32 * head;
    39                 volatile __u32 * tail;
    40                 volatile __u32 prev_head;
     36      struct __sub_ring_t {
     37                struct {
     38                        // Head and tail of the ring (associated with array)
     39                        volatile __u32 * head;   // one passed last index consumed by the kernel
     40                        volatile __u32 * tail;   // one passed last index visible to the kernel
     41                        volatile __u32 released; // one passed last index released back to the free list
    4142
    42                 // The actual kernel ring which uses head/tail
    43                 // indexes into the sqes arrays
    44                 __u32 * array;
     43                        // The actual kernel ring which uses head/tail
     44                        // indexes into the sqes arrays
     45                        __u32 * array;
     46                } kring;
     47
     48                struct {
     49                        volatile __u32 head;
     50                        volatile __u32 tail;
     51                        // The ring which contains free allocations
     52                        // indexes into the sqes arrays
     53                        __u32 * array;
     54                } free_ring;
     55
     56                // number of sqes to submit on next system call.
     57                __u32 to_submit;
    4558
    4659                // number of entries and mask to go with it
     
    4861                const __u32 * mask;
    4962
    50                 // Submission flags (Not sure what for)
     63                // Submission flags, currently only IORING_SETUP_SQPOLL
    5164                __u32 * flags;
    5265
    53                 // number of sqes not submitted (whatever that means)
     66                // number of sqes not submitted
     67                // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer.
    5468                __u32 * dropped;
    5569
    56                 // Like head/tail but not seen by the kernel
    57                 volatile __u32 * ready;
    58                 __u32 ready_cnt;
    59                 __u32 prev_ready;
    60 
    61                 #if defined(LEADER_LOCK)
    62                         __leaderlock_t submit_lock;
    63                 #else
    64                         __spinlock_t submit_lock;
    65                 #endif
    66                 __spinlock_t  release_lock;
    67 
    6870                // A buffer of sqes (not the actual ring)
    69                 volatile struct io_uring_sqe * sqes;
     71                struct io_uring_sqe * sqes;
    7072
    7173                // The location and size of the mmaped area
     
    7476        };
    7577
    76         struct __completion_data {
     78        struct __cmp_ring_t {
    7779                // Head and tail of the ring
    7880                volatile __u32 * head;
     
    8385                const __u32 * num;
    8486
    85                 // number of cqes not submitted (whatever that means)
     87                // I don't know what this value is for
    8688                __u32 * overflow;
    8789
     
    9496        };
    9597
    96         struct __io_data {
    97                 struct __submition_data submit_q;
    98                 struct __completion_data completion_q;
     98        struct __outstanding_io {
     99                inline Colable;
     100                single_sem sem;
     101        };
     102        static inline __outstanding_io *& Next( __outstanding_io * n ) { return (__outstanding_io *)Next( (Colable *)n ); }
     103
     104        struct __outstanding_io_queue {
     105                __spinlock_t lock;
     106                Queue(__outstanding_io) queue;
     107                volatile bool empty;
     108        };
     109
     110        struct __external_io {
     111                inline __outstanding_io;
     112                __u32 * idxs;
     113                __u32 have;
     114                bool lazy;
     115        };
     116
     117
     118        struct __attribute__((aligned(128))) $io_context {
     119                $io_arbiter * arbiter;
     120                processor * proc;
     121
     122                __outstanding_io_queue ext_sq;
     123
     124                struct __sub_ring_t sq;
     125                struct __cmp_ring_t cq;
    99126                __u32 ring_flags;
    100127                int fd;
    101                 int efd;
    102                 bool eager_submits:1;
    103                 bool poller_submits:1;
     128        };
     129
     130        struct __pending_alloc {
     131                inline __outstanding_io;
     132                __u32 * idxs;
     133                __u32 want;
     134                $io_context * ctx;
     135        };
     136
     137        struct __attribute__((aligned(128))) $io_arbiter {
     138                __outstanding_io_queue pending;
    104139        };
    105140
     
    133168        #endif
    134169
    135         struct $io_ctx_thread;
    136         void __ioctx_register($io_ctx_thread & ctx);
    137         void __ioctx_unregister($io_ctx_thread & ctx);
    138         void __ioctx_prepare_block($io_ctx_thread & ctx);
    139         void __sqe_clean( volatile struct io_uring_sqe * sqe );
     170        // void __ioctx_prepare_block($io_context & ctx);
    140171#endif
    141172
     
    148179
    149180static inline {
    150         bool fulfil( io_future_t & this, __s32 result ) {
     181        $thread * fulfil( io_future_t & this, __s32 result, bool do_unpark = true ) {
    151182                this.result = result;
    152                 return fulfil(this.self);
     183                return fulfil(this.self, do_unpark);
    153184        }
    154185
  • libcfa/src/concurrency/iofwd.hfa

    rfeacef9 r5407cdc  
    1818#include <unistd.h>
    1919extern "C" {
    20         #include <sys/types.h>
     20        #include <asm/types.h>
    2121        #if CFA_HAVE_LINUX_IO_URING_H
    2222                #include <linux/io_uring.h>
     
    4848struct cluster;
    4949struct io_future_t;
    50 struct io_context;
    51 struct io_cancellation;
     50struct $io_context;
    5251
    5352struct iovec;
     
    5554struct sockaddr;
    5655struct statx;
     56struct epoll_event;
     57
     58struct io_uring_sqe;
     59
     60//----------
     61// underlying calls
     62extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
     63extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    5764
    5865//----------
    5966// synchronous calls
    6067#if defined(CFA_HAVE_PREADV2)
    61         extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     68        extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    6269#endif
    6370#if defined(CFA_HAVE_PWRITEV2)
    64         extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     71        extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    6572#endif
    66 extern int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    67 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    68 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    69 extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    70 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    71 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    72 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    73 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    74 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    75 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    76 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    77 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    78 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     73extern int cfa_fsync(int fd, __u64 submit_flags);
     74extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     75extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
     76extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
     77extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
     78extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
     79extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
     80extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
     81extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     82extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
     83extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
     84extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags);
     85extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
    7986#if defined(CFA_HAVE_OPENAT2)
    80         extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     87        extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
    8188#endif
    82 extern int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     89extern int cfa_close(int fd, __u64 submit_flags);
    8390#if defined(CFA_HAVE_STATX)
    84         extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     91        extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
    8592#endif
    86 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    87 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    88 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    89 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     93extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags);
     94extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags);
     95extern ssize_t cfa_splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     96extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    9097
    9198//----------
    9299// asynchronous calls
    93100#if defined(CFA_HAVE_PREADV2)
    94         extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
     101        extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    95102#endif
    96103#if defined(CFA_HAVE_PWRITEV2)
    97         extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
     104        extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    98105#endif
    99 extern void async_fsync(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);
    100 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags, io_cancellation * cancellation, io_context * context);
    101 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    102 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    103 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    104 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    105 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    106 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    107 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, io_cancellation * cancellation, io_context * context);
    108 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags, io_cancellation * cancellation, io_context * context);
    109 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);
    110 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);
    111 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, io_cancellation * cancellation, io_context * context);
     106extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags);
     107extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     108extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
     109extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
     110extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
     111extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
     112extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
     113extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
     114extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     115extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
     116extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
     117extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags);
     118extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
    112119#if defined(CFA_HAVE_OPENAT2)
    113         extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, io_cancellation * cancellation, io_context * context);
     120        extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
    114121#endif
    115 extern void async_close(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);
     122extern void async_close(io_future_t & future, int fd, __u64 submit_flags);
    116123#if defined(CFA_HAVE_STATX)
    117         extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, io_cancellation * cancellation, io_context * context);
     124        extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
    118125#endif
    119 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);
    120 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);
    121 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    122 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
     126void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
     127extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
     128extern void async_splice(io_future_t & future, int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     129extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    123130
    124131
     
    126133// Check if a function is blocks a only the user thread
    127134bool has_user_level_blocking( fptr_t func );
    128 
    129 //-----------------------------------------------------------------------------
    130 void register_fixed_files( io_context & ctx , int * files, unsigned count );
    131 void register_fixed_files( cluster    & cltr, int * files, unsigned count );
  • libcfa/src/concurrency/kernel.cfa

    rfeacef9 r5407cdc  
    2222#include <signal.h>
    2323#include <unistd.h>
     24extern "C" {
     25        #include <sys/eventfd.h>
     26}
    2427
    2528//CFA Includes
     
    3134#include "invoke.h"
    3235
     36#if !defined(__CFA_NO_STATISTICS__)
     37        #define __STATS( ...) __VA_ARGS__
     38#else
     39        #define __STATS( ...)
     40#endif
    3341
    3442//-----------------------------------------------------------------------------
     
    107115static $thread * __next_thread(cluster * this);
    108116static $thread * __next_thread_slow(cluster * this);
     117static inline bool __must_unpark( $thread * thrd ) __attribute((nonnull(1)));
    109118static void __run_thread(processor * this, $thread * dst);
    110119static void __wake_one(cluster * cltr);
    111120
    112 static void push  (__cluster_idles & idles, processor & proc);
    113 static void remove(__cluster_idles & idles, processor & proc);
    114 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles );
    115 
     121static void mark_idle (__cluster_proc_list & idles, processor & proc);
     122static void mark_awake(__cluster_proc_list & idles, processor & proc);
     123static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list idles );
     124
     125extern void __cfa_io_start( processor * );
     126extern bool __cfa_io_drain( processor * );
     127extern void __cfa_io_flush( processor * );
     128extern void __cfa_io_stop ( processor * );
     129static inline bool __maybe_io_drain( processor * );
     130
     131extern void __disable_interrupts_hard();
     132extern void __enable_interrupts_hard();
     133
     134static inline void __disable_interrupts_checked() {
     135        /* paranoid */ verify( __preemption_enabled() );
     136        disable_interrupts();
     137        /* paranoid */ verify( ! __preemption_enabled() );
     138}
     139
     140static inline void __enable_interrupts_checked( bool poll = true ) {
     141        /* paranoid */ verify( ! __preemption_enabled() );
     142        enable_interrupts( poll );
     143        /* paranoid */ verify( __preemption_enabled() );
     144}
    116145
    117146//=============================================================================================
     
    129158        verify(this);
    130159
     160        __cfa_io_start( this );
     161
    131162        __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this);
    132163        #if !defined(__CFA_NO_STATISTICS__)
     
    140171                preemption_scope scope = { this };
    141172
    142                 #if !defined(__CFA_NO_STATISTICS__)
    143                         unsigned long long last_tally = rdtscl();
    144                 #endif
    145 
     173                __STATS( unsigned long long last_tally = rdtscl(); )
     174
     175                // if we need to run some special setup, now is the time to do it.
     176                if(this->init.thrd) {
     177                        this->init.thrd->curr_cluster = this->cltr;
     178                        __run_thread(this, this->init.thrd);
     179                }
    146180
    147181                __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this);
     
    150184                MAIN_LOOP:
    151185                for() {
     186                        // Check if there is pending io
     187                        __maybe_io_drain( this );
     188
    152189                        // Try to get the next thread
    153190                        readyThread = __next_thread( this->cltr );
    154191
    155192                        if( !readyThread ) {
     193                                __cfa_io_flush( this );
    156194                                readyThread = __next_thread_slow( this->cltr );
    157195                        }
     
    167205
    168206                                // Push self to idle stack
    169                                 push(this->cltr->idles, * this);
     207                                mark_idle(this->cltr->procs, * this);
    170208
    171209                                // Confirm the ready-queue is empty
     
    173211                                if( readyThread ) {
    174212                                        // A thread was found, cancel the halt
    175                                         remove(this->cltr->idles, * this);
     213                                        mark_awake(this->cltr->procs, * this);
    176214
    177215                                        #if !defined(__CFA_NO_STATISTICS__)
     
    189227                                #endif
    190228
    191                                 wait( this->idle );
     229                                __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle);
     230
     231                                __disable_interrupts_hard();
     232                                eventfd_t val;
     233                                eventfd_read( this->idle, &val );
     234                                __enable_interrupts_hard();
    192235
    193236                                #if !defined(__CFA_NO_STATISTICS__)
     
    198241
    199242                                // We were woken up, remove self from idle
    200                                 remove(this->cltr->idles, * this);
     243                                mark_awake(this->cltr->procs, * this);
    201244
    202245                                // DON'T just proceed, start looking again
     
    205248
    206249                        /* paranoid */ verify( readyThread );
     250
     251                        // Reset io dirty bit
     252                        this->io.dirty = false;
    207253
    208254                        // We found a thread run it
     
    219265                                }
    220266                        #endif
     267
     268                        if(this->io.pending && !this->io.dirty) {
     269                                __cfa_io_flush( this );
     270                        }
     271
     272                //      SEARCH: {
     273                //              /* paranoid */ verify( ! __preemption_enabled() );
     274                //              /* paranoid */ verify( kernelTLS().this_proc_id );
     275
     276                //              // First, lock the scheduler since we are searching for a thread
     277
     278                //              // Try to get the next thread
     279                //              ready_schedule_lock();
     280                //              readyThread = pop_fast( this->cltr );
     281                //              ready_schedule_unlock();
     282                //              if(readyThread) {  break SEARCH; }
     283
     284                //              // If we can't find a thread, might as well flush any outstanding I/O
     285                //              if(this->io.pending) { __cfa_io_flush( this ); }
     286
     287                //              // Spin a little on I/O, just in case
     288                //              for(25) {
     289                //                      __maybe_io_drain( this );
     290                //                      ready_schedule_lock();
     291                //                      readyThread = pop_fast( this->cltr );
     292                //                      ready_schedule_unlock();
     293                //                      if(readyThread) {  break SEARCH; }
     294                //              }
     295
     296                //              // no luck, try stealing a few times
     297                //              for(25) {
     298                //                      if( __maybe_io_drain( this ) ) {
     299                //                              ready_schedule_lock();
     300                //                              readyThread = pop_fast( this->cltr );
     301                //                      } else {
     302                //                              ready_schedule_lock();
     303                //                              readyThread = pop_slow( this->cltr );
     304                //                      }
     305                //                      ready_schedule_unlock();
     306                //                      if(readyThread) {  break SEARCH; }
     307                //              }
     308
     309                //              // still no luck, search for a thread
     310                //              ready_schedule_lock();
     311                //              readyThread = pop_search( this->cltr );
     312                //              ready_schedule_unlock();
     313                //              if(readyThread) { break SEARCH; }
     314
     315                //              // Don't block if we are done
     316                //              if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
     317
     318                //              __STATS( __tls_stats()->ready.sleep.halts++; )
     319
     320                //              // Push self to idle stack
     321                //              mark_idle(this->cltr->procs, * this);
     322
     323                //              // Confirm the ready-queue is empty
     324                //              __maybe_io_drain( this );
     325                //              ready_schedule_lock();
     326                //              readyThread = pop_search( this->cltr );
     327                //              ready_schedule_unlock();
     328
     329                //              if( readyThread ) {
     330                //                      // A thread was found, cancel the halt
     331                //                      mark_awake(this->cltr->procs, * this);
     332
     333                //                      __STATS( __tls_stats()->ready.sleep.cancels++; )
     334
     335                //                      // continue the main loop
     336                //                      break SEARCH;
     337                //              }
     338
     339                //              __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->id, rdtscl()); )
     340                //              __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle);
     341
     342                //              // __disable_interrupts_hard();
     343                //              eventfd_t val;
     344                //              eventfd_read( this->idle, &val );
     345                //              // __enable_interrupts_hard();
     346
     347                //              __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->id, rdtscl()); )
     348
     349                //              // We were woken up, remove self from idle
     350                //              mark_awake(this->cltr->procs, * this);
     351
     352                //              // DON'T just proceed, start looking again
     353                //              continue MAIN_LOOP;
     354                //      }
     355
     356                // RUN_THREAD:
     357                //      /* paranoid */ verify( kernelTLS().this_proc_id );
     358                //      /* paranoid */ verify( ! __preemption_enabled() );
     359                //      /* paranoid */ verify( readyThread );
     360
     361                //      // Reset io dirty bit
     362                //      this->io.dirty = false;
     363
     364                //      // We found a thread run it
     365                //      __run_thread(this, readyThread);
     366
     367                //      // Are we done?
     368                //      if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
     369
     370                //      #if !defined(__CFA_NO_STATISTICS__)
     371                //              unsigned long long curr = rdtscl();
     372                //              if(curr > (last_tally + 500000000)) {
     373                //                      __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats);
     374                //                      last_tally = curr;
     375                //              }
     376                //      #endif
     377
     378                //      if(this->io.pending && !this->io.dirty) {
     379                //              __cfa_io_flush( this );
     380                //      }
     381
     382                //      // Check if there is pending io
     383                //      __maybe_io_drain( this );
    221384                }
    222385
     
    224387        }
    225388
     389        __cfa_io_stop( this );
     390
    226391        post( this->terminated );
     392
    227393
    228394        if(this == mainProcessor) {
     
    247413        /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next );
    248414        __builtin_prefetch( thrd_dst->context.SP );
     415
     416        __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name);
    249417
    250418        $coroutine * proc_cor = get_coroutine(this->runner);
     
    297465                if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) {
    298466                        // The thread was preempted, reschedule it and reset the flag
    299                         __schedule_thread( thrd_dst );
     467                        schedule_thread$( thrd_dst );
    300468                        break RUNNING;
    301469                }
     
    318486                                break RUNNING;
    319487                        case TICKET_UNBLOCK:
     488                                #if !defined(__CFA_NO_STATISTICS__)
     489                                        __tls_stats()->ready.threads.threads++;
     490                                        __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this );
     491                                #endif
    320492                                // This is case 2, the racy case, someone tried to run this thread before it finished blocking
    321493                                // In this case, just run it again.
     
    330502        proc_cor->state = Active;
    331503
     504        __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);
     505
     506        #if !defined(__CFA_NO_STATISTICS__)
     507                __tls_stats()->ready.threads.threads--;
     508                __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", this );
     509        #endif
     510
    332511        /* paranoid */ verify( ! __preemption_enabled() );
    333512}
     
    339518        $thread * thrd_src = kernelTLS().this_thread;
    340519
    341         #if !defined(__CFA_NO_STATISTICS__)
    342                 struct processor * last_proc = kernelTLS().this_processor;
    343         #endif
     520        __STATS( thrd_src->last_proc = kernelTLS().this_processor; )
    344521
    345522        // Run the thread on this processor
     
    360537
    361538        #if !defined(__CFA_NO_STATISTICS__)
    362                 if(last_proc != kernelTLS().this_processor) {
     539                /* paranoid */ verify( thrd_src->last_proc != 0p );
     540                if(thrd_src->last_proc != kernelTLS().this_processor) {
    363541                        __tls_stats()->ready.threads.migration++;
    364542                }
     
    373551// Scheduler routines
    374552// KERNEL ONLY
    375 void __schedule_thread( $thread * thrd ) {
     553static void __schedule_thread( $thread * thrd ) {
    376554        /* paranoid */ verify( ! __preemption_enabled() );
    377555        /* paranoid */ verify( kernelTLS().this_proc_id );
     556        /* paranoid */ verify( ready_schedule_islocked());
    378557        /* paranoid */ verify( thrd );
    379558        /* paranoid */ verify( thrd->state != Halted );
     
    391570        if (thrd->preempted == __NO_PREEMPTION) thrd->state = Ready;
    392571
     572        // Dereference the thread now because once we push it, there is not guaranteed it's still valid.
     573        struct cluster * cl = thrd->curr_cluster;
     574        __STATS(bool outside = thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )
     575
     576        // push the thread to the cluster ready-queue
     577        push( cl, thrd );
     578
     579        // variable thrd is no longer safe to use
     580        thrd = 0xdeaddeaddeaddeadp;
     581
     582        // wake the cluster using the save variable.
     583        __wake_one( cl );
     584
     585        #if !defined(__CFA_NO_STATISTICS__)
     586                if( kernelTLS().this_stats ) {
     587                        __tls_stats()->ready.threads.threads++;
     588                        if(outside) {
     589                                __tls_stats()->ready.threads.extunpark++;
     590                        }
     591                        __push_stat( __tls_stats(), __tls_stats()->ready.threads.threads, false, "Processor", kernelTLS().this_processor );
     592                }
     593                else {
     594                        __atomic_fetch_add(&cl->stats->ready.threads.threads, 1, __ATOMIC_RELAXED);
     595                        __atomic_fetch_add(&cl->stats->ready.threads.extunpark, 1, __ATOMIC_RELAXED);
     596                        __push_stat( cl->stats, cl->stats->ready.threads.threads, true, "Cluster", cl );
     597                }
     598        #endif
     599
     600        /* paranoid */ verify( ready_schedule_islocked());
     601        /* paranoid */ verify( ! __preemption_enabled() );
     602}
     603
     604void schedule_thread$( $thread * thrd ) {
    393605        ready_schedule_lock();
    394                 // Dereference the thread now because once we push it, there is not guaranteed it's still valid.
    395                 struct cluster * cl = thrd->curr_cluster;
    396 
    397                 // push the thread to the cluster ready-queue
    398                 push( cl, thrd );
    399 
    400                 // variable thrd is no longer safe to use
    401 
    402                 // wake the cluster using the save variable.
    403                 __wake_one( cl );
     606                __schedule_thread( thrd );
    404607        ready_schedule_unlock();
    405 
    406         /* paranoid */ verify( ! __preemption_enabled() );
    407608}
    408609
     
    413614
    414615        ready_schedule_lock();
    415                 $thread * thrd = pop( this );
     616                $thread * thrd = pop_fast( this );
    416617        ready_schedule_unlock();
    417618
     
    427628
    428629        ready_schedule_lock();
    429                 $thread * thrd = pop_slow( this );
     630                $thread * thrd;
     631                for(25) {
     632                        thrd = pop_slow( this );
     633                        if(thrd) goto RET;
     634                }
     635                thrd = pop_search( this );
     636
     637                RET:
    430638        ready_schedule_unlock();
    431639
     
    435643}
    436644
    437 void unpark( $thread * thrd ) {
    438         if( !thrd ) return;
    439 
     645static inline bool __must_unpark( $thread * thrd ) {
    440646        int old_ticket = __atomic_fetch_add(&thrd->ticket, 1, __ATOMIC_SEQ_CST);
    441647        switch(old_ticket) {
    442648                case TICKET_RUNNING:
    443649                        // Wake won the race, the thread will reschedule/rerun itself
    444                         break;
     650                        return false;
    445651                case TICKET_BLOCKED:
    446652                        /* paranoid */ verify( ! thrd->preempted != __NO_PREEMPTION );
    447653                        /* paranoid */ verify( thrd->state == Blocked );
    448 
    449                         {
    450                                 /* paranoid */ verify( publicTLS_get(this_proc_id) );
    451                                 bool full = publicTLS_get(this_proc_id)->full_proc;
    452                                 if(full) disable_interrupts();
    453 
    454                                 /* paranoid */ verify( ! __preemption_enabled() );
    455 
    456                                 // Wake lost the race,
    457                                 __schedule_thread( thrd );
    458 
    459                                 /* paranoid */ verify( ! __preemption_enabled() );
    460 
    461                                 if(full) enable_interrupts( __cfaabi_dbg_ctx );
    462                                 /* paranoid */ verify( publicTLS_get(this_proc_id) );
    463                         }
    464 
    465                         break;
     654                        return true;
    466655                default:
    467656                        // This makes no sense, something is wrong abort
     
    470659}
    471660
     661void __kernel_unpark( $thread * thrd ) {
     662        /* paranoid */ verify( ! __preemption_enabled() );
     663        /* paranoid */ verify( ready_schedule_islocked());
     664
     665        if( !thrd ) return;
     666
     667        if(__must_unpark(thrd)) {
     668                // Wake lost the race,
     669                __schedule_thread( thrd );
     670        }
     671
     672        /* paranoid */ verify( ready_schedule_islocked());
     673        /* paranoid */ verify( ! __preemption_enabled() );
     674}
     675
     676void unpark( $thread * thrd ) {
     677        if( !thrd ) return;
     678
     679        if(__must_unpark(thrd)) {
     680                disable_interrupts();
     681                        // Wake lost the race,
     682                        schedule_thread$( thrd );
     683                enable_interrupts(false);
     684        }
     685}
     686
    472687void park( void ) {
    473         /* paranoid */ verify( __preemption_enabled() );
    474         disable_interrupts();
    475         /* paranoid */ verify( ! __preemption_enabled() );
    476         /* paranoid */ verify( kernelTLS().this_thread->preempted == __NO_PREEMPTION );
    477 
    478         returnToKernel();
    479 
    480         /* paranoid */ verify( ! __preemption_enabled() );
    481         enable_interrupts( __cfaabi_dbg_ctx );
    482         /* paranoid */ verify( __preemption_enabled() );
     688        __disable_interrupts_checked();
     689                /* paranoid */ verify( kernelTLS().this_thread->preempted == __NO_PREEMPTION );
     690                returnToKernel();
     691        __enable_interrupts_checked();
    483692
    484693}
     
    520729// KERNEL ONLY
    521730bool force_yield( __Preemption_Reason reason ) {
    522         /* paranoid */ verify( __preemption_enabled() );
    523         disable_interrupts();
    524         /* paranoid */ verify( ! __preemption_enabled() );
    525 
    526         $thread * thrd = kernelTLS().this_thread;
    527         /* paranoid */ verify(thrd->state == Active);
    528 
    529         // SKULLDUGGERY: It is possible that we are preempting this thread just before
    530         // it was going to park itself. If that is the case and it is already using the
    531         // intrusive fields then we can't use them to preempt the thread
    532         // If that is the case, abandon the preemption.
    533         bool preempted = false;
    534         if(thrd->link.next == 0p) {
    535                 preempted = true;
    536                 thrd->preempted = reason;
    537                 returnToKernel();
    538         }
    539 
    540         /* paranoid */ verify( ! __preemption_enabled() );
    541         enable_interrupts_noPoll();
    542         /* paranoid */ verify( __preemption_enabled() );
    543 
     731        __disable_interrupts_checked();
     732                $thread * thrd = kernelTLS().this_thread;
     733                /* paranoid */ verify(thrd->state == Active);
     734
     735                // SKULLDUGGERY: It is possible that we are preempting this thread just before
     736                // it was going to park itself. If that is the case and it is already using the
     737                // intrusive fields then we can't use them to preempt the thread
     738                // If that is the case, abandon the preemption.
     739                bool preempted = false;
     740                if(thrd->link.next == 0p) {
     741                        preempted = true;
     742                        thrd->preempted = reason;
     743                        returnToKernel();
     744                }
     745        __enable_interrupts_checked( false );
    544746        return preempted;
    545747}
     
    557759        unsigned idle;
    558760        unsigned total;
    559         [idle, total, p] = query(this->idles);
     761        [idle, total, p] = query_idles(this->procs);
    560762
    561763        // If no one is sleeping, we are done
     
    563765
    564766        // We found a processor, wake it up
    565         post( p->idle );
     767        eventfd_t val;
     768        val = 1;
     769        eventfd_write( p->idle, val );
    566770
    567771        #if !defined(__CFA_NO_STATISTICS__)
    568                 __tls_stats()->ready.sleep.wakes++;
     772                if( kernelTLS().this_stats ) {
     773                        __tls_stats()->ready.sleep.wakes++;
     774                }
     775                else {
     776                        __atomic_fetch_add(&this->stats->ready.sleep.wakes, 1, __ATOMIC_RELAXED);
     777                }
    569778        #endif
    570779
     
    579788        __cfadbg_print_safe(runtime_core, "Kernel : waking Processor %p\n", this);
    580789
    581         disable_interrupts();
     790        __disable_interrupts_checked();
    582791                /* paranoid */ verify( ! __preemption_enabled() );
    583                 post( this->idle );
    584         enable_interrupts( __cfaabi_dbg_ctx );
    585 }
    586 
    587 static void push  (__cluster_idles & this, processor & proc) {
     792                eventfd_t val;
     793                val = 1;
     794                eventfd_write( this->idle, val );
     795        __enable_interrupts_checked();
     796}
     797
     798static void mark_idle(__cluster_proc_list & this, processor & proc) {
    588799        /* paranoid */ verify( ! __preemption_enabled() );
    589800        lock( this );
    590801                this.idle++;
    591802                /* paranoid */ verify( this.idle <= this.total );
    592 
    593                 insert_first(this.list, proc);
     803                remove(proc);
     804                insert_first(this.idles, proc);
    594805        unlock( this );
    595806        /* paranoid */ verify( ! __preemption_enabled() );
    596807}
    597808
    598 static void remove(__cluster_idles & this, processor & proc) {
     809static void mark_awake(__cluster_proc_list & this, processor & proc) {
    599810        /* paranoid */ verify( ! __preemption_enabled() );
    600811        lock( this );
    601812                this.idle--;
    602813                /* paranoid */ verify( this.idle >= 0 );
    603 
    604814                remove(proc);
     815                insert_last(this.actives, proc);
    605816        unlock( this );
    606817        /* paranoid */ verify( ! __preemption_enabled() );
    607818}
    608819
    609 static [unsigned idle, unsigned total, * processor] query( & __cluster_idles this ) {
     820static [unsigned idle, unsigned total, * processor] query_idles( & __cluster_proc_list this ) {
     821        /* paranoid */ verify( ! __preemption_enabled() );
     822        /* paranoid */ verify( ready_schedule_islocked() );
     823
    610824        for() {
    611825                uint64_t l = __atomic_load_n(&this.lock, __ATOMIC_SEQ_CST);
     
    613827                unsigned idle    = this.idle;
    614828                unsigned total   = this.total;
    615                 processor * proc = &this.list`first;
     829                processor * proc = &this.idles`first;
    616830                // Compiler fence is unnecessary, but gcc-8 and older incorrectly reorder code without it
    617831                asm volatile("": : :"memory");
     
    619833                return [idle, total, proc];
    620834        }
     835
     836        /* paranoid */ verify( ready_schedule_islocked() );
     837        /* paranoid */ verify( ! __preemption_enabled() );
    621838}
    622839
     
    664881// Kernel Utilities
    665882//=============================================================================================
     883#if defined(CFA_HAVE_LINUX_IO_URING_H)
     884#include "io/types.hfa"
     885#endif
     886
     887static inline bool __maybe_io_drain( processor * proc ) {
     888        bool ret = false;
     889        #if defined(CFA_HAVE_LINUX_IO_URING_H)
     890                __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);
     891
     892                // Check if we should drain the queue
     893                $io_context * ctx = proc->io.ctx;
     894                unsigned head = *ctx->cq.head;
     895                unsigned tail = *ctx->cq.tail;
     896                if(head == tail) return false;
     897                ready_schedule_lock();
     898                ret = __cfa_io_drain( proc );
     899                ready_schedule_unlock();
     900        #endif
     901        return ret;
     902}
     903
    666904//-----------------------------------------------------------------------------
    667905// Debug
     
    691929                __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this );
    692930        }
    693 
    694         extern int __print_alarm_stats;
    695         void print_alarm_stats() {
    696                 __print_alarm_stats = -1;
    697         }
    698931#endif
    699932// Local Variables: //
  • libcfa/src/concurrency/kernel.hfa

    rfeacef9 r5407cdc  
    2828}
    2929
    30 //-----------------------------------------------------------------------------
    31 // Underlying Locks
    3230#ifdef __CFA_WITH_VERIFY__
    3331        extern bool __cfaabi_dbg_in_kernel();
    3432#endif
    3533
    36 extern "C" {
    37         char * strerror(int);
    38 }
    39 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
    40 
    41 struct __bin_sem_t {
    42         pthread_mutex_t         lock;
    43         pthread_cond_t          cond;
    44         int                     val;
    45 };
    46 
    47 static inline void ?{}(__bin_sem_t & this) with( this ) {
    48         // Create the mutex with error checking
    49         pthread_mutexattr_t mattr;
    50         pthread_mutexattr_init( &mattr );
    51         pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
    52         pthread_mutex_init(&lock, &mattr);
    53 
    54         pthread_cond_init (&cond, (const pthread_condattr_t *)0p);  // workaround trac#208: cast should not be required
    55         val = 0;
    56 }
    57 
    58 static inline void ^?{}(__bin_sem_t & this) with( this ) {
    59         CHECKED( pthread_mutex_destroy(&lock) );
    60         CHECKED( pthread_cond_destroy (&cond) );
    61 }
    62 
    63 static inline void wait(__bin_sem_t & this) with( this ) {
    64         verify(__cfaabi_dbg_in_kernel());
    65         CHECKED( pthread_mutex_lock(&lock) );
    66                 while(val < 1) {
    67                         pthread_cond_wait(&cond, &lock);
    68                 }
    69                 val -= 1;
    70         CHECKED( pthread_mutex_unlock(&lock) );
    71 }
    72 
    73 static inline bool post(__bin_sem_t & this) with( this ) {
    74         bool needs_signal = false;
    75 
    76         CHECKED( pthread_mutex_lock(&lock) );
    77                 if(val < 1) {
    78                         val += 1;
    79                         pthread_cond_signal(&cond);
    80                         needs_signal = true;
    81                 }
    82         CHECKED( pthread_mutex_unlock(&lock) );
    83 
    84         return needs_signal;
    85 }
    86 
    87 #undef CHECKED
    88 
     34//-----------------------------------------------------------------------------
     35// I/O
     36struct cluster;
     37struct $io_context;
     38struct $io_arbiter;
     39
     40struct io_context_params {
     41        int num_entries;
     42};
     43
     44void  ?{}(io_context_params & this);
    8945
    9046//-----------------------------------------------------------------------------
     
    9551struct __processor_id_t {
    9652        unsigned id:24;
    97         bool full_proc:1;
    9853
    9954        #if !defined(__CFA_NO_STATISTICS__)
     
    11469        struct cluster * cltr;
    11570
     71        // Ready Queue state per processor
     72        struct {
     73                unsigned short its;
     74                unsigned short itr;
     75                unsigned id;
     76                unsigned target;
     77                unsigned long long int cutoff;
     78        } rdq;
     79
    11680        // Set to true to notify the processor should terminate
    11781        volatile bool do_terminate;
     
    12589        // Handle to pthreads
    12690        pthread_t kernel_thread;
     91
     92        struct {
     93                $io_context * ctx;
     94                bool pending;
     95                bool dirty;
     96        } io;
    12797
    12898        // Preemption data
     
    134104
    135105        // Idle lock (kernel semaphore)
    136         __bin_sem_t idle;
     106        int idle;
    137107
    138108        // Termination synchronisation (user semaphore)
     
    144114        // Link lists fields
    145115        DLISTED_MGD_IMPL_IN(processor)
     116
     117        // special init fields
     118        // This is needed for memcached integration
     119        // once memcached experiments are done this should probably be removed
     120        // it is not a particularly safe scheme as it can make processors less homogeneous
     121        struct {
     122                $thread * thrd;
     123        } init;
    146124
    147125        #if !defined(__CFA_NO_STATISTICS__)
     
    159137void ^?{}(processor & this);
    160138
    161 static inline void  ?{}(processor & this)                    { this{ "Anonymous Processor", *mainCluster}; }
    162 static inline void  ?{}(processor & this, struct cluster & cltr)    { this{ "Anonymous Processor", cltr}; }
    163 static inline void  ?{}(processor & this, const char name[]) { this{name, *mainCluster }; }
     139static inline void  ?{}(processor & this)                        { this{ "Anonymous Processor", *mainCluster}; }
     140static inline void  ?{}(processor & this, struct cluster & cltr) { this{ "Anonymous Processor", cltr}; }
     141static inline void  ?{}(processor & this, const char name[])     { this{name, *mainCluster}; }
    164142
    165143DLISTED_MGD_IMPL_OUT(processor)
    166144
    167145//-----------------------------------------------------------------------------
    168 // I/O
    169 struct __io_data;
    170 
    171 // IO poller user-thread
    172 // Not using the "thread" keyword because we want to control
    173 // more carefully when to start/stop it
    174 struct $io_ctx_thread {
    175         struct __io_data * ring;
    176         single_sem sem;
    177         volatile bool done;
    178         $thread self;
    179 };
    180 
    181 
    182 struct io_context {
    183         $io_ctx_thread thrd;
    184 };
    185 
    186 struct io_context_params {
    187         int num_entries;
    188         int num_ready;
    189         int submit_aff;
    190         bool eager_submits:1;
    191         bool poller_submits:1;
    192         bool poll_submit:1;
    193         bool poll_complete:1;
    194 };
    195 
    196 void  ?{}(io_context_params & this);
    197 
    198 void  ?{}(io_context & this, struct cluster & cl);
    199 void  ?{}(io_context & this, struct cluster & cl, const io_context_params & params);
    200 void ^?{}(io_context & this);
    201 
    202 struct io_cancellation {
    203         __u64 target;
    204 };
    205 
    206 static inline void  ?{}(io_cancellation & this) { this.target = -1u; }
    207 static inline void ^?{}(io_cancellation &) {}
    208 bool cancel(io_cancellation & this);
    209 
    210 //-----------------------------------------------------------------------------
    211146// Cluster Tools
    212147
    213 // Intrusives lanes which are used by the relaxed ready queue
     148// Intrusives lanes which are used by the ready queue
    214149struct __attribute__((aligned(128))) __intrusive_lane_t;
    215150void  ?{}(__intrusive_lane_t & this);
    216151void ^?{}(__intrusive_lane_t & this);
    217152
    218 // Counter used for wether or not the lanes are all empty
    219 struct __attribute__((aligned(128))) __snzi_node_t;
    220 struct __snzi_t {
    221         unsigned mask;
    222         int root;
    223         __snzi_node_t * nodes;
    224 };
    225 
    226 void  ?{}( __snzi_t & this, unsigned depth );
    227 void ^?{}( __snzi_t & this );
     153// Aligned timestamps which are used by the relaxed ready queue
     154struct __attribute__((aligned(128))) __timestamp_t;
     155void  ?{}(__timestamp_t & this);
     156void ^?{}(__timestamp_t & this);
    228157
    229158//TODO adjust cache size to ARCHITECTURE
    230159// Structure holding the relaxed ready queue
    231160struct __ready_queue_t {
    232         // Data tracking how many/which lanes are used
    233         // Aligned to 128 for cache locality
    234         __snzi_t snzi;
    235 
    236161        // Data tracking the actual lanes
    237162        // On a seperate cacheline from the used struct since
     
    242167                __intrusive_lane_t * volatile data;
    243168
     169                // Array of times
     170                __timestamp_t * volatile tscs;
     171
    244172                // Number of lanes (empty or not)
    245173                volatile size_t count;
     
    251179
    252180// Idle Sleep
    253 struct __cluster_idles {
     181struct __cluster_proc_list {
    254182        // Spin lock protecting the queue
    255183        volatile uint64_t lock;
     
    262190
    263191        // List of idle processors
    264         dlist(processor, processor) list;
     192        dlist(processor, processor) idles;
     193
     194        // List of active processors
     195        dlist(processor, processor) actives;
    265196};
    266197
     
    278209
    279210        // List of idle processors
    280         __cluster_idles idles;
     211        __cluster_proc_list procs;
    281212
    282213        // List of threads
     
    292223
    293224        struct {
    294                 io_context * ctxs;
    295                 unsigned cnt;
     225                $io_arbiter * arbiter;
     226                io_context_params params;
    296227        } io;
    297228
  • libcfa/src/concurrency/kernel/fwd.hfa

    rfeacef9 r5407cdc  
    108108
    109109        extern void disable_interrupts();
    110         extern void enable_interrupts_noPoll();
    111         extern void enable_interrupts( __cfaabi_dbg_ctx_param );
     110        extern void enable_interrupts( bool poll = false );
    112111
    113112        extern "Cforall" {
     
    220219                        // Mark as fulfilled, wake thread if needed
    221220                        // return true if a thread was unparked
    222                         bool post(oneshot & this) {
     221                        $thread * post(oneshot & this, bool do_unpark = true) {
    223222                                struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
    224                                 if( got == 0p ) return false;
    225                                 unpark( got );
    226                                 return true;
     223                                if( got == 0p ) return 0p;
     224                                if(do_unpark) unpark( got );
     225                                return got;
    227226                        }
    228227                }
     
    336335                        // from the server side, mark the future as fulfilled
    337336                        // delete it if needed
    338                         bool fulfil( future_t & this ) {
     337                        $thread * fulfil( future_t & this, bool do_unpark = true ) {
    339338                                for() {
    340339                                        struct oneshot * expected = this.ptr;
     
    344343                                                #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
    345344                                        #endif
    346                                                 if( expected == 3p ) { free( &this ); return false; }
     345                                                if( expected == 3p ) { free( &this ); return 0p; }
    347346                                        #if defined(__GNUC__) && __GNUC__ >= 7
    348347                                                #pragma GCC diagnostic pop
     
    356355                                        struct oneshot * want = expected == 0p ? 1p : 2p;
    357356                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    358                                                 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
    359                                                 bool ret = post( *expected );
     357                                                if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return 0p; }
     358                                                $thread * ret = post( *expected, do_unpark );
    360359                                                __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
    361360                                                return ret;
     
    403402                                        __VA_ARGS__ \
    404403                                } \
    405                                 if( !(in_kernel) ) enable_interrupts( __cfaabi_dbg_ctx ); \
     404                                if( !(in_kernel) ) enable_interrupts(); \
    406405                        }
    407406                #else
  • libcfa/src/concurrency/kernel/startup.cfa

    rfeacef9 r5407cdc  
    2222extern "C" {
    2323      #include <limits.h>       // PTHREAD_STACK_MIN
     24        #include <sys/eventfd.h>  // eventfd
    2425      #include <sys/mman.h>     // mprotect
    2526      #include <sys/resource.h> // getrlimit
     
    7273static void __kernel_first_resume( processor * this );
    7374static void __kernel_last_resume ( processor * this );
    74 static void init(processor & this, const char name[], cluster & _cltr);
     75static void init(processor & this, const char name[], cluster & _cltr, $thread * initT);
    7576static void deinit(processor & this);
    7677static void doregister( struct cluster & cltr );
     
    8990extern void __kernel_alarm_startup(void);
    9091extern void __kernel_alarm_shutdown(void);
    91 extern void __kernel_io_startup (void);
    92 extern void __kernel_io_shutdown(void);
    9392
    9493//-----------------------------------------------------------------------------
     
    102101KERNEL_STORAGE($thread,              mainThread);
    103102KERNEL_STORAGE(__stack_t,            mainThreadCtx);
    104 KERNEL_STORAGE(io_context,           mainPollerThread);
    105103KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
    106104#if !defined(__CFA_NO_STATISTICS__)
     
    198196
    199197        void ?{}(processor & this) with( this ) {
    200                 ( this.idle ){};
    201198                ( this.terminated ){};
    202199                ( this.runner ){};
    203                 init( this, "Main Processor", *mainCluster );
     200                init( this, "Main Processor", *mainCluster, 0p );
    204201                kernel_thread = pthread_self();
    205202
     
    226223        __kernel_alarm_startup();
    227224
    228         // Start IO
    229         __kernel_io_startup();
    230 
    231225        // Add the main thread to the ready queue
    232226        // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread
    233         __schedule_thread(mainThread);
     227        schedule_thread$(mainThread);
    234228
    235229        // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX
     
    241235        // THE SYSTEM IS NOW COMPLETELY RUNNING
    242236
    243 
    244         // SKULLDUGGERY: The constructor for the mainCluster will call alloc with a dimension of 0
    245         // malloc *can* return a non-null value, we should free it if that is the case
    246         free( mainCluster->io.ctxs );
    247 
    248         // Now that the system is up, finish creating systems that need threading
    249         mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;
    250         mainCluster->io.cnt  = 1;
    251         (*mainCluster->io.ctxs){ *mainCluster };
    252 
    253237        __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
    254238
    255239        /* paranoid */ verify( ! __preemption_enabled() );
    256         enable_interrupts( __cfaabi_dbg_ctx );
     240        enable_interrupts();
    257241        /* paranoid */ verify( __preemption_enabled() );
    258242
     
    260244
    261245static void __kernel_shutdown(void) {
    262         //Before we start shutting things down, wait for systems that need threading to shutdown
    263         ^(*mainCluster->io.ctxs){};
    264         mainCluster->io.cnt  = 0;
    265         mainCluster->io.ctxs = 0p;
    266 
    267246        /* paranoid */ verify( __preemption_enabled() );
    268247        disable_interrupts();
     
    283262        __kernel_alarm_shutdown();
    284263
    285         // Stop IO
    286         __kernel_io_shutdown();
     264        #if !defined( __CFA_NO_STATISTICS__ )
     265                __stats_t * st = (__stats_t *)& storage_mainProcStats;
     266                __tally_stats(mainCluster->stats, st);
     267                if( 0 != mainProcessor->print_stats ) {
     268                        __print_stats( st, mainProcessor->print_stats, "Processor ", mainProcessor->name, (void*)mainProcessor );
     269                }
     270                #if defined(CFA_STATS_ARRAY)
     271                        __flush_stat( st, "Processor", mainProcessor );
     272                #endif
     273        #endif
    287274
    288275        // Destroy the main processor and its context in reverse order of construction
     
    364351                        __print_stats( &local_stats, proc->print_stats, "Processor ", proc->name, (void*)proc );
    365352                }
     353                #if defined(CFA_STATS_ARRAY)
     354                        __flush_stat( &local_stats, "Processor", proc );
     355                #endif
    366356        #endif
    367357
     
    457447        link.next = 0p;
    458448        link.prev = 0p;
     449        link.preferred = -1u;
     450        last_proc = 0p;
    459451        #if defined( __CFA_WITH_VERIFY__ )
    460452                canary = 0x0D15EA5E0D15EA5Ep;
     
    476468}
    477469
    478 static void init(processor & this, const char name[], cluster & _cltr) with( this ) {
     470static void init(processor & this, const char name[], cluster & _cltr, $thread * initT) with( this ) {
    479471        this.name = name;
    480472        this.cltr = &_cltr;
    481         full_proc = true;
     473        this.rdq.its = 0;
     474        this.rdq.itr = 0;
     475        this.rdq.id  = -1u;
     476        this.rdq.target = -1u;
     477        this.rdq.cutoff = -1ull;
    482478        do_terminate = false;
    483479        preemption_alarm = 0p;
    484480        pending_preemption = false;
    485481
     482        this.io.ctx = 0p;
     483        this.io.pending = false;
     484        this.io.dirty   = false;
     485
     486        this.init.thrd = initT;
     487
     488        this.idle = eventfd(0, 0);
     489        if (idle < 0) {
     490                abort("KERNEL ERROR: PROCESSOR EVENTFD - %s\n", strerror(errno));
     491        }
     492
    486493        #if !defined(__CFA_NO_STATISTICS__)
    487494                print_stats = 0;
     
    489496        #endif
    490497
    491         lock( this.cltr->idles );
    492                 int target = this.cltr->idles.total += 1u;
    493         unlock( this.cltr->idles );
    494 
    495         id = doregister((__processor_id_t*)&this);
    496 
     498        // Register and Lock the RWlock so no-one pushes/pops while we are changing the queue
     499        uint_fast32_t last_size = ready_mutate_register((__processor_id_t*)&this);
     500                this.cltr->procs.total += 1u;
     501                insert_last(this.cltr->procs.actives, this);
     502
     503                // Adjust the ready queue size
     504                ready_queue_grow( cltr );
     505
     506        // Unlock the RWlock
     507        ready_mutate_unlock( last_size );
     508
     509        __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this);
     510}
     511
     512// Not a ctor, it just preps the destruction but should not destroy members
     513static void deinit(processor & this) {
    497514        // Lock the RWlock so no-one pushes/pops while we are changing the queue
    498515        uint_fast32_t last_size = ready_mutate_lock();
     516                this.cltr->procs.total -= 1u;
     517                remove(this);
    499518
    500519                // Adjust the ready queue size
    501                 ready_queue_grow( cltr, target );
    502 
    503         // Unlock the RWlock
    504         ready_mutate_unlock( last_size );
    505 
    506         __cfadbg_print_safe(runtime_core, "Kernel : core %p created\n", &this);
    507 }
    508 
    509 // Not a ctor, it just preps the destruction but should not destroy members
    510 static void deinit(processor & this) {
    511         lock( this.cltr->idles );
    512                 int target = this.cltr->idles.total -= 1u;
    513         unlock( this.cltr->idles );
    514 
    515         // Lock the RWlock so no-one pushes/pops while we are changing the queue
    516         uint_fast32_t last_size = ready_mutate_lock();
    517 
    518                 // Adjust the ready queue size
    519                 ready_queue_shrink( this.cltr, target );
    520 
    521         // Unlock the RWlock
    522         ready_mutate_unlock( last_size );
    523 
    524         // Finally we don't need the read_lock any more
    525         unregister((__processor_id_t*)&this);
    526 }
    527 
    528 void ?{}(processor & this, const char name[], cluster & _cltr) {
    529         ( this.idle ){};
     520                ready_queue_shrink( this.cltr );
     521
     522        // Unlock the RWlock and unregister: we don't need the read_lock any more
     523        ready_mutate_unregister((__processor_id_t*)&this, last_size );
     524
     525        close(this.idle);
     526}
     527
     528void ?{}(processor & this, const char name[], cluster & _cltr, $thread * initT) {
    530529        ( this.terminated ){};
    531530        ( this.runner ){};
    532531
    533532        disable_interrupts();
    534                 init( this, name, _cltr );
    535         enable_interrupts( __cfaabi_dbg_ctx );
     533                init( this, name, _cltr, initT );
     534        enable_interrupts();
    536535
    537536        __cfadbg_print_safe(runtime_core, "Kernel : Starting core %p\n", &this);
    538537
    539538        this.stack = __create_pthread( &this.kernel_thread, __invoke_processor, (void *)&this );
    540 
     539}
     540
     541void ?{}(processor & this, const char name[], cluster & _cltr) {
     542        (this){name, _cltr, 0p};
    541543}
    542544
     
    557559        disable_interrupts();
    558560                deinit( this );
    559         enable_interrupts( __cfaabi_dbg_ctx );
     561        enable_interrupts();
    560562}
    561563
    562564//-----------------------------------------------------------------------------
    563565// Cluster
    564 static void ?{}(__cluster_idles & this) {
     566static void ?{}(__cluster_proc_list & this) {
    565567        this.lock  = 0;
    566568        this.idle  = 0;
    567569        this.total = 0;
    568         (this.list){};
    569570}
    570571
     
    582583        threads{ __get };
    583584
     585        io.arbiter = create();
     586        io.params = io_params;
     587
    584588        doregister(this);
    585589
     
    589593
    590594                // Adjust the ready queue size
    591                 ready_queue_grow( &this, 0 );
     595                ready_queue_grow( &this );
    592596
    593597        // Unlock the RWlock
    594598        ready_mutate_unlock( last_size );
    595         enable_interrupts_noPoll(); // Don't poll, could be in main cluster
    596 
    597 
    598         this.io.cnt  = num_io;
    599         this.io.ctxs = aalloc(num_io);
    600         for(i; this.io.cnt) {
    601                 (this.io.ctxs[i]){ this, io_params };
    602         }
     599        enable_interrupts( false ); // Don't poll, could be in main cluster
    603600}
    604601
    605602void ^?{}(cluster & this) {
    606         for(i; this.io.cnt) {
    607                 ^(this.io.ctxs[i]){ true };
    608         }
    609         free(this.io.ctxs);
     603        destroy(this.io.arbiter);
    610604
    611605        // Lock the RWlock so no-one pushes/pops while we are changing the queue
     
    614608
    615609                // Adjust the ready queue size
    616                 ready_queue_shrink( &this, 0 );
     610                ready_queue_shrink( &this );
    617611
    618612        // Unlock the RWlock
    619613        ready_mutate_unlock( last_size );
    620         enable_interrupts_noPoll(); // Don't poll, could be in main cluster
     614        enable_interrupts( false ); // Don't poll, could be in main cluster
    621615
    622616        #if !defined(__CFA_NO_STATISTICS__)
     
    624618                        __print_stats( this.stats, this.print_stats, "Cluster", this.name, (void*)&this );
    625619                }
     620                #if defined(CFA_STATS_ARRAY)
     621                        __flush_stat( this.stats, "Cluster", &this );
     622                #endif
    626623                free( this.stats );
    627624        #endif
     
    736733}
    737734
    738 
    739735#if defined(__CFA_WITH_VERIFY__)
    740736static bool verify_fwd_bck_rng(void) {
  • libcfa/src/concurrency/kernel_private.hfa

    rfeacef9 r5407cdc  
    2929extern "C" {
    3030        void disable_interrupts() OPTIONAL_THREAD;
    31         void enable_interrupts_noPoll();
    32         void enable_interrupts( __cfaabi_dbg_ctx_param );
    33 }
    34 
    35 void __schedule_thread( $thread * )
    36 #if defined(NDEBUG) || (!defined(__CFA_DEBUG__) && !defined(__CFA_VERIFY__))
    37         __attribute__((nonnull (1)))
    38 #endif
    39 ;
     31        void enable_interrupts( bool poll = true );
     32}
     33
     34void schedule_thread$( $thread * ) __attribute__((nonnull (1)));
    4035
    4136extern bool __preemption_enabled();
     
    7772//-----------------------------------------------------------------------------
    7873// I/O
    79 void ^?{}(io_context & this, bool );
     74$io_arbiter * create(void);
     75void destroy($io_arbiter *);
    8076
    8177//=======================================================================
    8278// Cluster lock API
    8379//=======================================================================
    84 // Cells use by the reader writer lock
    85 // while not generic it only relies on a opaque pointer
    86 struct __attribute__((aligned(128))) __scheduler_lock_id_t {
    87         // Spin lock used as the underlying lock
    88         volatile bool lock;
    89 
    90         // Handle pointing to the proc owning this cell
    91         // Used for allocating cells and debugging
    92         __processor_id_t * volatile handle;
    93 
    94         #ifdef __CFA_WITH_VERIFY__
    95                 // Debug, check if this is owned for reading
    96                 bool owned;
    97         #endif
    98 };
    99 
    100 static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t));
    101 
    10280// Lock-Free registering/unregistering of threads
    10381// Register a processor to a given cluster and get its unique id in return
    104 unsigned doregister( struct __processor_id_t * proc );
     82void register_proc_id( struct __processor_id_t * );
    10583
    10684// Unregister a processor from a given cluster using its id, getting back the original pointer
    107 void     unregister( struct __processor_id_t * proc );
    108 
    109 //-----------------------------------------------------------------------
    110 // Cluster idle lock/unlock
    111 static inline void lock(__cluster_idles & this) {
    112         for() {
    113                 uint64_t l = this.lock;
    114                 if(
    115                         (0 == (l % 2))
    116                         && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
    117                 ) return;
    118                 Pause();
    119         }
    120 }
    121 
    122 static inline void unlock(__cluster_idles & this) {
    123         /* paranoid */ verify( 1 == (this.lock % 2) );
    124         __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );
    125 }
     85void unregister_proc_id( struct __processor_id_t * proc );
    12686
    12787//=======================================================================
     
    151111        __atomic_store_n(ll, (bool)false, __ATOMIC_RELEASE);
    152112}
     113
     114// Cells use by the reader writer lock
     115// while not generic it only relies on a opaque pointer
     116struct __attribute__((aligned(128))) __scheduler_lock_id_t {
     117        // Spin lock used as the underlying lock
     118        volatile bool lock;
     119
     120        // Handle pointing to the proc owning this cell
     121        // Used for allocating cells and debugging
     122        __processor_id_t * volatile handle;
     123
     124        #ifdef __CFA_WITH_VERIFY__
     125                // Debug, check if this is owned for reading
     126                bool owned;
     127        #endif
     128};
     129
     130static_assert( sizeof(struct __scheduler_lock_id_t) <= __alignof(struct __scheduler_lock_id_t));
    153131
    154132//-----------------------------------------------------------------------
     
    246224void ready_mutate_unlock( uint_fast32_t /* value returned by lock */ );
    247225
     226//-----------------------------------------------------------------------
     227// Lock-Free registering/unregistering of threads
     228// Register a processor to a given cluster and get its unique id in return
     229// For convenience, also acquires the lock
     230static inline uint_fast32_t ready_mutate_register( struct __processor_id_t * proc ) {
     231        register_proc_id( proc );
     232        return ready_mutate_lock();
     233}
     234
     235// Unregister a processor from a given cluster using its id, getting back the original pointer
     236// assumes the lock is acquired
     237static inline void ready_mutate_unregister( struct __processor_id_t * proc, uint_fast32_t last_s ) {
     238        ready_mutate_unlock( last_s );
     239        unregister_proc_id( proc );
     240}
     241
     242//-----------------------------------------------------------------------
     243// Cluster idle lock/unlock
     244static inline void lock(__cluster_proc_list & this) {
     245        /* paranoid */ verify( ! __preemption_enabled() );
     246
     247        // Start by locking the global RWlock so that we know no-one is
     248        // adding/removing processors while we mess with the idle lock
     249        ready_schedule_lock();
     250
     251        // Simple counting lock, acquired, acquired by incrementing the counter
     252        // to an odd number
     253        for() {
     254                uint64_t l = this.lock;
     255                if(
     256                        (0 == (l % 2))
     257                        && __atomic_compare_exchange_n(&this.lock, &l, l + 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)
     258                ) return;
     259                Pause();
     260        }
     261
     262        /* paranoid */ verify( ! __preemption_enabled() );
     263}
     264
     265static inline void unlock(__cluster_proc_list & this) {
     266        /* paranoid */ verify( ! __preemption_enabled() );
     267
     268        /* paranoid */ verify( 1 == (this.lock % 2) );
     269        // Simple couting lock, release by incrementing to an even number
     270        __atomic_fetch_add( &this.lock, 1, __ATOMIC_SEQ_CST );
     271
     272        // Release the global lock, which we acquired when locking
     273        ready_schedule_unlock();
     274
     275        /* paranoid */ verify( ! __preemption_enabled() );
     276}
     277
    248278//=======================================================================
    249279// Ready-Queue API
    250280//-----------------------------------------------------------------------
    251 // pop thread from the ready queue of a cluster
    252 // returns 0p if empty
    253 __attribute__((hot)) bool query(struct cluster * cltr);
    254 
    255 //-----------------------------------------------------------------------
    256281// push thread onto a ready queue for a cluster
    257282// returns true if the list was previously empty, false otherwise
    258 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd);
    259 
    260 //-----------------------------------------------------------------------
    261 // pop thread from the ready queue of a cluster
     283__attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd);
     284
     285//-----------------------------------------------------------------------
     286// pop thread from the local queues of a cluster
    262287// returns 0p if empty
    263288// May return 0p spuriously
    264 __attribute__((hot)) struct $thread * pop(struct cluster * cltr);
    265 
    266 //-----------------------------------------------------------------------
    267 // pop thread from the ready queue of a cluster
     289__attribute__((hot)) struct $thread * pop_fast(struct cluster * cltr);
     290
     291//-----------------------------------------------------------------------
     292// pop thread from any ready queue of a cluster
     293// returns 0p if empty
     294// May return 0p spuriously
     295__attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr);
     296
     297//-----------------------------------------------------------------------
     298// search all ready queues of a cluster for any thread
    268299// returns 0p if empty
    269300// guaranteed to find any threads added before this call
    270 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr);
    271 
    272 //-----------------------------------------------------------------------
    273 // remove thread from the ready queue of a cluster
    274 // returns bool if it wasn't found
    275 bool remove_head(struct cluster * cltr, struct $thread * thrd);
     301__attribute__((hot)) struct $thread * pop_search(struct cluster * cltr);
    276302
    277303//-----------------------------------------------------------------------
    278304// Increase the width of the ready queue (number of lanes) by 4
    279 void ready_queue_grow  (struct cluster * cltr, int target);
     305void ready_queue_grow  (struct cluster * cltr);
    280306
    281307//-----------------------------------------------------------------------
    282308// Decrease the width of the ready queue (number of lanes) by 4
    283 void ready_queue_shrink(struct cluster * cltr, int target);
     309void ready_queue_shrink(struct cluster * cltr);
    284310
    285311
  • libcfa/src/concurrency/locks.cfa

    rfeacef9 r5407cdc  
    134134        lock( lock __cfaabi_dbg_ctx2 );
    135135        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
    136         /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
     136        /* paranoid */ verifyf( owner == active_thread() || !strict_owner , "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
     137        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to release owner lock %p which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
    137138
    138139        // if recursion count is zero release lock and set new owner if one is waiting
     
    146147size_t wait_count( blocking_lock & this ) with( this ) {
    147148        return wait_count;
    148 }
    149 
    150 void set_recursion_count( blocking_lock & this, size_t recursion ) with( this ) {
    151         recursion_count = recursion;
    152 }
    153 
    154 size_t get_recursion_count( blocking_lock & this ) with( this ) {
    155         return recursion_count;
    156149}
    157150
     
    173166}
    174167
    175 void on_wait( blocking_lock & this ) with( this ) {
     168size_t on_wait( blocking_lock & this ) with( this ) {
    176169        lock( lock __cfaabi_dbg_ctx2 );
    177170        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
    178171        /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
    179172
     173        size_t ret = recursion_count;
     174
    180175        pop_and_set_new_owner( this );
    181176        unlock( lock );
     177        return ret;
     178}
     179
     180void on_wakeup( blocking_lock & this, size_t recursion ) with( this ) {
     181        recursion_count = recursion;
    182182}
    183183
     
    274274        }
    275275
    276         bool empty( condition_variable(L) & this ) with(this) { return empty(blocked_threads); }
     276        bool empty( condition_variable(L) & this ) with(this) {
     277                lock( lock __cfaabi_dbg_ctx2 );
     278                bool ret = empty(blocked_threads);
     279                unlock( lock );
     280                return ret;
     281        }
    277282
    278283        int counter( condition_variable(L) & this ) with(this) { return count; }
     
    285290                if (i->lock) {
    286291                        // if lock was passed get recursion count to reset to after waking thread
    287                         recursion_count = get_recursion_count(*i->lock);
    288                         on_wait( *i->lock );
     292                        recursion_count = on_wait( *i->lock );
    289293                }
    290294                return recursion_count;
     
    301305
    302306                // resets recursion count here after waking
    303                 if (i.lock) set_recursion_count(*i.lock, recursion_count);
     307                if (i.lock) on_wakeup(*i.lock, recursion_count);
    304308        }
    305309
     
    323327
    324328                // resets recursion count here after waking
    325                 if (info.lock) set_recursion_count(*info.lock, recursion_count);
     329                if (info.lock) on_wakeup(*info.lock, recursion_count);
    326330        }
    327331
     
    373377}
    374378
    375 bool V(semaphore & this) with( this ) {
     379$thread * V (semaphore & this, const bool doUnpark ) with( this ) {
    376380        $thread * thrd = 0p;
    377381        lock( lock __cfaabi_dbg_ctx2 );
     
    385389
    386390        // make new owner
    387         unpark( thrd );
    388 
     391        if( doUnpark ) unpark( thrd );
     392
     393        return thrd;
     394}
     395
     396bool V(semaphore & this) with( this ) {
     397        $thread * thrd = V(this, true);
    389398        return thrd != 0p;
    390399}
  • libcfa/src/concurrency/locks.hfa

    rfeacef9 r5407cdc  
    2020
    2121#include "bits/weakso_locks.hfa"
     22#include "containers/queueLockFree.hfa"
     23
     24#include "thread.hfa"
    2225
    2326#include "time_t.hfa"
    2427#include "time.hfa"
     28
     29//-----------------------------------------------------------------------------
     30// Semaphores
     31
     32// '0-nary' semaphore
     33// Similar to a counting semaphore except the value of one is never reached
     34// as a consequence, a V() that would bring the value to 1 *spins* until
     35// a P consumes it
     36struct Semaphore0nary {
     37        __spinlock_t lock; // needed to protect
     38        mpsc_queue($thread) queue;
     39};
     40
     41static inline bool P(Semaphore0nary & this, $thread * thrd) {
     42        /* paranoid */ verify(!(thrd->seqable.next));
     43        /* paranoid */ verify(!(thrd`next));
     44
     45        push(this.queue, thrd);
     46        return true;
     47}
     48
     49static inline bool P(Semaphore0nary & this) {
     50    $thread * thrd = active_thread();
     51    P(this, thrd);
     52    park();
     53    return true;
     54}
     55
     56static inline $thread * V(Semaphore0nary & this, bool doUnpark = true) {
     57        $thread * next;
     58        lock(this.lock __cfaabi_dbg_ctx2);
     59                for (;;) {
     60                        next = pop(this.queue);
     61                        if (next) break;
     62                        Pause();
     63                }
     64        unlock(this.lock);
     65
     66        if (doUnpark) unpark(next);
     67        return next;
     68}
     69
     70// Wrapper used on top of any sempahore to avoid potential locking
     71struct BinaryBenaphore {
     72        volatile ssize_t counter;
     73};
     74
     75static inline {
     76        void ?{}(BinaryBenaphore & this) { this.counter = 0; }
     77        void ?{}(BinaryBenaphore & this, zero_t) { this.counter = 0; }
     78        void ?{}(BinaryBenaphore & this, one_t ) { this.counter = 1; }
     79
     80        // returns true if no blocking needed
     81        bool P(BinaryBenaphore & this) {
     82                return __atomic_fetch_sub(&this.counter, 1, __ATOMIC_SEQ_CST) > 0;
     83        }
     84
     85        bool tryP(BinaryBenaphore & this) {
     86                ssize_t c = this.counter;
     87                return (c >= 1) && __atomic_compare_exchange_n(&this.counter, &c, c-1, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
     88        }
     89
     90        // returns true if notify needed
     91        bool V(BinaryBenaphore & this) {
     92                ssize_t c = 0;
     93                for () {
     94                        if (__atomic_compare_exchange_n(&this.counter, &c, c+1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     95                                if (c == 0) return true;
     96                                /* paranoid */ verify(c < 0);
     97                                return false;
     98                        } else {
     99                                if (c == 1) return true;
     100                                /* paranoid */ verify(c < 1);
     101                                Pause();
     102                        }
     103                }
     104        }
     105}
     106
     107// Binary Semaphore based on the BinaryBenaphore on top of the 0-nary Semaphore
     108struct ThreadBenaphore {
     109        BinaryBenaphore ben;
     110        Semaphore0nary  sem;
     111};
     112
     113static inline void ?{}(ThreadBenaphore & this) {}
     114static inline void ?{}(ThreadBenaphore & this, zero_t) { (this.ben){ 0 }; }
     115static inline void ?{}(ThreadBenaphore & this, one_t ) { (this.ben){ 1 }; }
     116
     117static inline bool P(ThreadBenaphore & this)              { return P(this.ben) ? false : P(this.sem); }
     118static inline bool tryP(ThreadBenaphore & this)           { return tryP(this.ben); }
     119static inline bool P(ThreadBenaphore & this, bool wait)   { return wait ? P(this) : tryP(this); }
     120
     121static inline $thread * V(ThreadBenaphore & this, bool doUnpark = true) {
     122        if (V(this.ben)) return 0p;
     123        return V(this.sem, doUnpark);
     124}
     125
     126//-----------------------------------------------------------------------------
     127// Semaphore
     128struct semaphore {
     129        __spinlock_t lock;
     130        int count;
     131        __queue_t($thread) waiting;
     132};
     133
     134void  ?{}(semaphore & this, int count = 1);
     135void ^?{}(semaphore & this);
     136bool   P (semaphore & this);
     137bool   V (semaphore & this);
     138bool   V (semaphore & this, unsigned count);
     139$thread * V (semaphore & this, bool );
    25140
    26141//----------
     
    31146static inline void  ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };}
    32147static inline void ^?{}( single_acquisition_lock & this ) {}
    33 static inline void   lock      ( single_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
    34 static inline void   unlock    ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
    35 static inline void   on_wait   ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
    36 static inline void   on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    37 static inline void   set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    38 static inline size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
     148static inline void   lock     ( single_acquisition_lock & this ) { lock    ( (blocking_lock &)this ); }
     149static inline bool   try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); }
     150static inline void   unlock   ( single_acquisition_lock & this ) { unlock  ( (blocking_lock &)this ); }
     151static inline size_t on_wait  ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     152static inline void   on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
     153static inline void   on_notify( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    39154
    40155//----------
     
    45160static inline void  ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };}
    46161static inline void ^?{}( owner_lock & this ) {}
    47 static inline void   lock     ( owner_lock & this ) { lock   ( (blocking_lock &)this ); }
    48 static inline void   unlock   ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }
    49 static inline void   on_wait  ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }
     162static inline void   lock     ( owner_lock & this ) { lock    ( (blocking_lock &)this ); }
     163static inline bool   try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); }
     164static inline void   unlock   ( owner_lock & this ) { unlock  ( (blocking_lock &)this ); }
     165static inline size_t on_wait  ( owner_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     166static inline void   on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    50167static inline void   on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    51 static inline void   set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    52 static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
     168
     169struct fast_lock {
     170        $thread * volatile owner;
     171        ThreadBenaphore sem;
     172};
     173
     174static inline bool $try_lock(fast_lock & this, $thread * thrd) {
     175    $thread * exp = 0p;
     176    return __atomic_compare_exchange_n(&this.owner, &exp, thrd, false, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED);
     177}
     178
     179static inline void lock( fast_lock & this ) __attribute__((artificial));
     180static inline void lock( fast_lock & this ) {
     181        $thread * thrd = active_thread();
     182        /* paranoid */verify(thrd != this.owner);
     183
     184        for (;;) {
     185                if ($try_lock(this, thrd)) return;
     186                P(this.sem);
     187        }
     188}
     189
     190static inline bool try_lock( fast_lock & this ) __attribute__((artificial));
     191static inline bool try_lock ( fast_lock & this ) {
     192        $thread * thrd = active_thread();
     193        /* paranoid */ verify(thrd != this.owner);
     194        return $try_lock(this, thrd);
     195}
     196
     197static inline $thread * unlock( fast_lock & this ) __attribute__((artificial));
     198static inline $thread * unlock( fast_lock & this ) {
     199        /* paranoid */ verify(active_thread() == this.owner);
     200
     201        // open 'owner' before unlocking anyone
     202        // so new and unlocked threads don't park incorrectly.
     203        // This may require additional fencing on ARM.
     204        this.owner = 0p;
     205
     206        return V(this.sem);
     207}
     208
     209static inline size_t on_wait( fast_lock & this ) { unlock(this); return 0; }
     210static inline void on_wakeup( fast_lock & this, size_t ) { lock(this); }
     211static inline void on_notify( fast_lock &, struct $thread * t ) { unpark(t); }
     212
     213struct mcs_node {
     214        mcs_node * volatile next;
     215        single_sem sem;
     216};
     217
     218static inline void ?{}(mcs_node & this) { this.next = 0p; }
     219
     220static inline mcs_node * volatile & ?`next ( mcs_node * node ) {
     221        return node->next;
     222}
     223
     224struct mcs_lock {
     225        mcs_queue(mcs_node) queue;
     226};
     227
     228static inline void lock(mcs_lock & l, mcs_node & n) {
     229        if(push(l.queue, &n))
     230                wait(n.sem);
     231}
     232
     233static inline void unlock(mcs_lock & l, mcs_node & n) {
     234        mcs_node * next = advance(l.queue, &n);
     235        if(next) post(next->sem);
     236}
    53237
    54238//-----------------------------------------------------------------------------
     
    59243
    60244        // For synchronization locks to use when releasing
    61         void on_wait( L & );
    62 
    63         // to get recursion count for cond lock to reset after waking
    64         size_t get_recursion_count( L & );
     245        size_t on_wait( L & );
    65246
    66247        // to set recursion count after getting signalled;
    67         void set_recursion_count( L &, size_t recursion );
     248        void on_wakeup( L &, size_t recursion );
    68249};
    69250
     
    119300        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    120301}
    121 
    122 //-----------------------------------------------------------------------------
    123 // Semaphore
    124 struct semaphore {
    125         __spinlock_t lock;
    126         int count;
    127         __queue_t($thread) waiting;
    128 };
    129 
    130 void  ?{}(semaphore & this, int count = 1);
    131 void ^?{}(semaphore & this);
    132 bool   P (semaphore & this);
    133 bool   V (semaphore & this);
    134 bool   V (semaphore & this, unsigned count);
  • libcfa/src/concurrency/monitor.hfa

    rfeacef9 r5407cdc  
    6161static inline forall( T & | sized(T) | { void ^?{}( T & mutex ); } )
    6262void delete( T * th ) {
    63         ^(*th){};
     63        if(th) ^(*th){};
    6464        free( th );
    6565}
  • libcfa/src/concurrency/preemption.cfa

    rfeacef9 r5407cdc  
    1515
    1616#define __cforall_thread__
     17// #define __CFA_DEBUG_PRINT_PREEMPTION__
    1718
    1819#include "preemption.hfa"
     
    2829#include "kernel_private.hfa"
    2930
     31
    3032#if !defined(__CFA_DEFAULT_PREEMPTION__)
    3133#define __CFA_DEFAULT_PREEMPTION__ 10`ms
    3234#endif
    3335
    34 Duration default_preemption() __attribute__((weak)) {
    35         return __CFA_DEFAULT_PREEMPTION__;
     36__attribute__((weak)) Duration default_preemption() {
     37        const char * preempt_rate_s = getenv("CFA_DEFAULT_PREEMPTION");
     38        if(!preempt_rate_s) {
     39                __cfadbg_print_safe(preemption, "No CFA_DEFAULT_PREEMPTION in ENV\n");
     40                return __CFA_DEFAULT_PREEMPTION__;
     41        }
     42
     43        char * endptr = 0p;
     44        long int preempt_rate_l = strtol(preempt_rate_s, &endptr, 10);
     45        if(preempt_rate_l < 0 || preempt_rate_l > 65535) {
     46                __cfadbg_print_safe(preemption, "CFA_DEFAULT_PREEMPTION out of range : %ld\n", preempt_rate_l);
     47                return __CFA_DEFAULT_PREEMPTION__;
     48        }
     49        if('\0' != *endptr) {
     50                __cfadbg_print_safe(preemption, "CFA_DEFAULT_PREEMPTION not a decimal number : %s\n", preempt_rate_s);
     51                return __CFA_DEFAULT_PREEMPTION__;
     52        }
     53
     54        return preempt_rate_l`ms;
    3655}
    3756
     
    98117        //Loop throught every thing expired
    99118        while( node = get_expired( alarms, currtime ) ) {
    100                 // __cfaabi_dbg_print_buffer_decl( " KERNEL: preemption tick.\n" );
     119                __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn);
    101120                Duration period = node->period;
    102121                if( period == 0) {
     
    104123                }
    105124
     125                __cfadbg_print_buffer_local( preemption, " KERNEL: alarm ticking node %p.\n", node );
     126
     127
    106128                // Check if this is a kernel
    107129                if( node->type == Kernel ) {
     
    109131                }
    110132                else if( node->type == User ) {
     133                        __cfadbg_print_buffer_local( preemption, " KERNEL: alarm unparking %p.\n", node->thrd );
    111134                        timeout( node->thrd );
    112135                }
     
    117140                // Check if this is a periodic alarm
    118141                if( period > 0 ) {
    119                         // __cfaabi_dbg_print_buffer_local( " KERNEL: alarm period is %lu.\n", period.tv );
     142                        __cfadbg_print_buffer_local( preemption, " KERNEL: alarm period is %lu.\n", period`ns );
    120143                        node->alarm = currtime + period;    // Alarm is periodic, add currtime to it (used cached current time)
    121144                        insert( alarms, node );             // Reinsert the node for the next time it triggers
     
    125148        // If there are still alarms pending, reset the timer
    126149        if( & (*alarms)`first ) {
    127                 __cfadbg_print_buffer_decl(preemption, " KERNEL: @%ju(%ju) resetting alarm to %ju.\n", currtime.tv, __kernel_get_time().tv, (alarms->head->alarm - currtime).tv);
    128150                Duration delta = (*alarms)`first.alarm - currtime;
    129151                Duration capped = max(delta, 50`us);
    130                 // itimerval tim  = { caped };
    131                 // __cfaabi_dbg_print_buffer_local( "    Values are %lu, %lu, %lu %lu.\n", delta.tv, caped.tv, tim.it_value.tv_sec, tim.it_value.tv_usec);
    132 
    133152                __kernel_set_timer( capped );
    134153        }
     
    296315        // Enable interrupts by decrementing the counter
    297316        // If counter reaches 0, execute any pending __cfactx_switch
    298         void enable_interrupts( __cfaabi_dbg_ctx_param ) {
     317        void enable_interrupts( bool poll ) {
    299318                // Cache the processor now since interrupts can start happening after the atomic store
    300319                processor   * proc = __cfaabi_tls.this_processor;
    301                 /* paranoid */ verify( proc );
     320                /* paranoid */ verify( !poll || proc );
    302321
    303322                with( __cfaabi_tls.preemption_state ){
     
    321340                                // Signal the compiler that a fence is needed but only for signal handlers
    322341                                __atomic_signal_fence(__ATOMIC_RELEASE);
    323                                 if( proc->pending_preemption ) {
     342                                if( poll && proc->pending_preemption ) {
    324343                                        proc->pending_preemption = false;
    325344                                        force_yield( __POLL_PREEMPTION );
    326345                                }
    327346                        }
    328                 }
    329 
    330                 // For debugging purposes : keep track of the last person to enable the interrupts
    331                 __cfaabi_dbg_debug_do( proc->last_enable = caller; )
    332         }
    333 
    334         // Disable interrupts by incrementint the counter
    335         // Don't execute any pending __cfactx_switch even if counter reaches 0
    336         void enable_interrupts_noPoll() {
    337                 unsigned short prev = __cfaabi_tls.preemption_state.disable_count;
    338                 __cfaabi_tls.preemption_state.disable_count -= 1;
    339                 // If this triggers someone is enabled already enabled interrupts
    340                 /* paranoid */ verifyf( prev != 0u, "Incremented from %u\n", prev );
    341                 if( prev == 1 ) {
    342                         #if GCC_VERSION > 50000
    343                                 static_assert(__atomic_always_lock_free(sizeof(__cfaabi_tls.preemption_state.enabled), &__cfaabi_tls.preemption_state.enabled), "Must be lock-free");
    344                         #endif
    345                         // Set enabled flag to true
    346                         // should be atomic to avoid preemption in the middle of the operation.
    347                         // use memory order RELAXED since there is no inter-thread on this variable requirements
    348                         __atomic_store_n(&__cfaabi_tls.preemption_state.enabled, true, __ATOMIC_RELAXED);
    349 
    350                         // Signal the compiler that a fence is needed but only for signal handlers
    351                         __atomic_signal_fence(__ATOMIC_RELEASE);
    352347                }
    353348        }
     
    585580
    586581        // Setup proper signal handlers
    587         __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO | SA_RESTART ); // __cfactx_switch handler
    588         __cfaabi_sigaction( SIGALRM, sigHandler_alarm    , SA_SIGINFO | SA_RESTART ); // debug handler
     582        __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO ); // __cfactx_switch handler
     583        __cfaabi_sigaction( SIGALRM, sigHandler_alarm    , SA_SIGINFO ); // debug handler
    589584
    590585        signal_block( SIGALRM );
     
    689684}
    690685
    691 #if !defined(__CFA_NO_STATISTICS__)
    692         int __print_alarm_stats = 0;
    693 #endif
    694 
    695686// Main of the alarm thread
    696687// Waits on SIGALRM and send SIGUSR1 to whom ever needs it
    697688static void * alarm_loop( __attribute__((unused)) void * args ) {
    698689        __processor_id_t id;
    699         id.full_proc = false;
    700         id.id = doregister(&id);
     690        register_proc_id(&id);
    701691        __cfaabi_tls.this_proc_id = &id;
    702692
    703         #if !defined(__CFA_NO_STATISTICS__)
    704                 struct __stats_t local_stats;
    705                 __cfaabi_tls.this_stats = &local_stats;
    706                 __init_stats( &local_stats );
    707         #endif
    708693
    709694        // Block sigalrms to control when they arrive
     
    764749EXIT:
    765750        __cfaabi_dbg_print_safe( "Kernel : Preemption thread stopping\n" );
    766         unregister(&id);
    767 
    768         #if !defined(__CFA_NO_STATISTICS__)
    769                 if( 0 != __print_alarm_stats ) {
    770                         __print_stats( &local_stats, __print_alarm_stats, "Alarm", "Thread", 0p );
    771                 }
    772         #endif
     751        register_proc_id(&id);
     752
    773753        return 0p;
    774754}
  • libcfa/src/concurrency/ready_queue.cfa

    rfeacef9 r5407cdc  
    1717// #define __CFA_DEBUG_PRINT_READY_QUEUE__
    1818
    19 // #define USE_SNZI
     19// #define USE_MPSC
     20
     21#define USE_RELAXED_FIFO
     22// #define USE_WORK_STEALING
    2023
    2124#include "bits/defs.hfa"
     
    2831#include <unistd.h>
    2932
    30 #include "snzi.hfa"
    3133#include "ready_subqueue.hfa"
    3234
    3335static const size_t cache_line_size = 64;
     36
     37#if !defined(__CFA_NO_STATISTICS__)
     38        #define __STATS(...) __VA_ARGS__
     39#else
     40        #define __STATS(...)
     41#endif
    3442
    3543// No overriden function, no environment variable, no define
     
    3947#endif
    4048
    41 #define BIAS 16
     49#if   defined(USE_RELAXED_FIFO)
     50        #define BIAS 4
     51        #define READYQ_SHARD_FACTOR 4
     52        #define SEQUENTIAL_SHARD 1
     53#elif defined(USE_WORK_STEALING)
     54        #define READYQ_SHARD_FACTOR 2
     55        #define SEQUENTIAL_SHARD 2
     56#else
     57        #error no scheduling strategy selected
     58#endif
     59
     60static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
     61static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
     62static inline struct $thread * search(struct cluster * cltr);
     63static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
     64
    4265
    4366// returns the maximum number of processors the RWLock support
     
    93116//=======================================================================
    94117// Lock-Free registering/unregistering of threads
    95 unsigned doregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     118void register_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    96119        __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
    97120
     
    107130                        /*paranoid*/ verify(0 == (__alignof__(data[i]) % cache_line_size));
    108131                        /*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
    109                         return i;
     132                        proc->id = i;
    110133                }
    111134        }
     
    134157        /*paranoid*/ verify(__alignof__(data[n]) == (2 * cache_line_size));
    135158        /*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
    136         return n;
    137 }
    138 
    139 void unregister( struct __processor_id_t * proc ) with(*__scheduler_lock) {
     159        proc->id = n;
     160}
     161
     162void unregister_proc_id( struct __processor_id_t * proc ) with(*__scheduler_lock) {
    140163        unsigned id = proc->id;
    141164        /*paranoid*/ verify(id < ready);
     
    192215
    193216//=======================================================================
    194 // Cforall Reqdy Queue used for scheduling
     217// Cforall Ready Queue used for scheduling
    195218//=======================================================================
    196219void ?{}(__ready_queue_t & this) with (this) {
    197220        lanes.data  = 0p;
     221        lanes.tscs  = 0p;
    198222        lanes.count = 0;
    199223}
    200224
    201225void ^?{}(__ready_queue_t & this) with (this) {
    202         verify( 1 == lanes.count );
    203         #ifdef USE_SNZI
    204                 verify( !query( snzi ) );
    205         #endif
     226        verify( SEQUENTIAL_SHARD == lanes.count );
    206227        free(lanes.data);
     228        free(lanes.tscs);
    207229}
    208230
    209231//-----------------------------------------------------------------------
    210 __attribute__((hot)) bool query(struct cluster * cltr) {
    211         #ifdef USE_SNZI
    212                 return query(cltr->ready_queue.snzi);
    213         #endif
    214         return true;
    215 }
    216 
    217 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
    218         unsigned i;
    219         bool local;
    220         #if defined(BIAS)
     232#if defined(USE_RELAXED_FIFO)
     233        //-----------------------------------------------------------------------
     234        // get index from random number with or without bias towards queues
     235        static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
     236                unsigned i;
     237                bool local;
    221238                unsigned rlow  = r % BIAS;
    222239                unsigned rhigh = r / BIAS;
     
    224241                        // (BIAS - 1) out of BIAS chances
    225242                        // Use perferred queues
    226                         i = preferred + (rhigh % 4);
     243                        i = preferred + (rhigh % READYQ_SHARD_FACTOR);
    227244                        local = true;
    228245                }
     
    233250                        local = false;
    234251                }
    235         #else
    236                 i = r;
    237                 local = false;
     252                return [i, local];
     253        }
     254
     255        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     256                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     257
     258                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     259                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     260
     261                // write timestamp
     262                thrd->link.ts = rdtscl();
     263
     264                bool local;
     265                int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
     266
     267                // Try to pick a lane and lock it
     268                unsigned i;
     269                do {
     270                        // Pick the index of a lane
     271                        unsigned r = __tls_rand_fwd();
     272                        [i, local] = idx_from_r(r, preferred);
     273
     274                        i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     275
     276                        #if !defined(__CFA_NO_STATISTICS__)
     277                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
     278                                else if(local) __tls_stats()->ready.push.local.attempt++;
     279                                else __tls_stats()->ready.push.share.attempt++;
     280                        #endif
     281
     282                #if defined(USE_MPSC)
     283                        // mpsc always succeeds
     284                } while( false );
     285                #else
     286                        // If we can't lock it retry
     287                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     288                #endif
     289
     290                // Actually push it
     291                push(lanes.data[i], thrd);
     292
     293                #if !defined(USE_MPSC)
     294                        // Unlock and return
     295                        __atomic_unlock( &lanes.data[i].lock );
     296                #endif
     297
     298                // Mark the current index in the tls rng instance as having an item
     299                __tls_rand_advance_bck();
     300
     301                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     302
     303                // Update statistics
     304                #if !defined(__CFA_NO_STATISTICS__)
     305                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     306                        else if(local) __tls_stats()->ready.push.local.success++;
     307                        else __tls_stats()->ready.push.share.success++;
     308                #endif
     309        }
     310
     311        // Pop from the ready queue from a given cluster
     312        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     313                /* paranoid */ verify( lanes.count > 0 );
     314                /* paranoid */ verify( kernelTLS().this_processor );
     315                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     316
     317                unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     318                int preferred = kernelTLS().this_processor->rdq.id;
     319
     320
     321                // As long as the list is not empty, try finding a lane that isn't empty and pop from it
     322                for(25) {
     323                        // Pick two lists at random
     324                        unsigned ri = __tls_rand_bck();
     325                        unsigned rj = __tls_rand_bck();
     326
     327                        unsigned i, j;
     328                        __attribute__((unused)) bool locali, localj;
     329                        [i, locali] = idx_from_r(ri, preferred);
     330                        [j, localj] = idx_from_r(rj, preferred);
     331
     332                        i %= count;
     333                        j %= count;
     334
     335                        // try popping from the 2 picked lists
     336                        struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
     337                        if(thrd) {
     338                                return thrd;
     339                        }
     340                }
     341
     342                // All lanes where empty return 0p
     343                return 0p;
     344        }
     345
     346        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
     347        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
     348                return search(cltr);
     349        }
     350#endif
     351#if defined(USE_WORK_STEALING)
     352        __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
     353                __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
     354
     355                const bool external = (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
     356                /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
     357
     358                // write timestamp
     359                thrd->link.ts = rdtscl();
     360
     361                // Try to pick a lane and lock it
     362                unsigned i;
     363                do {
     364                        #if !defined(__CFA_NO_STATISTICS__)
     365                                if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
     366                                else __tls_stats()->ready.push.local.attempt++;
     367                        #endif
     368
     369                        if(unlikely(external)) {
     370                                i = __tls_rand() % lanes.count;
     371                        }
     372                        else {
     373                                processor * proc = kernelTLS().this_processor;
     374                                unsigned r = proc->rdq.its++;
     375                                i =  proc->rdq.id + (r % READYQ_SHARD_FACTOR);
     376                        }
     377
     378
     379                #if defined(USE_MPSC)
     380                        // mpsc always succeeds
     381                } while( false );
     382                #else
     383                        // If we can't lock it retry
     384                } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
     385                #endif
     386
     387                // Actually push it
     388                push(lanes.data[i], thrd);
     389
     390                #if !defined(USE_MPSC)
     391                        // Unlock and return
     392                        __atomic_unlock( &lanes.data[i].lock );
     393                #endif
     394
     395                #if !defined(__CFA_NO_STATISTICS__)
     396                        if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
     397                        else __tls_stats()->ready.push.local.success++;
     398                #endif
     399
     400                __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
     401        }
     402
     403        // Pop from the ready queue from a given cluster
     404        __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
     405                /* paranoid */ verify( lanes.count > 0 );
     406                /* paranoid */ verify( kernelTLS().this_processor );
     407                /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
     408
     409                processor * proc = kernelTLS().this_processor;
     410
     411                if(proc->rdq.target == -1u) {
     412                        proc->rdq.target = __tls_rand() % lanes.count;
     413                        unsigned it1  = proc->rdq.itr;
     414                        unsigned it2  = proc->rdq.itr + 1;
     415                        unsigned idx1 = proc->rdq.id + (it1 % READYQ_SHARD_FACTOR);
     416                        unsigned idx2 = proc->rdq.id + (it2 % READYQ_SHARD_FACTOR);
     417                        unsigned long long tsc1 = ts(lanes.data[idx1]);
     418                        unsigned long long tsc2 = ts(lanes.data[idx2]);
     419                        proc->rdq.cutoff = min(tsc1, tsc2);
     420                        if(proc->rdq.cutoff == 0) proc->rdq.cutoff = -1ull;
     421                }
     422                else {
     423                        unsigned target = proc->rdq.target;
     424                        proc->rdq.target = -1u;
     425                        if(lanes.tscs[target].tv < proc->rdq.cutoff) {
     426                                $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
     427                                if(t) return t;
     428                        }
     429                }
     430
     431                for(READYQ_SHARD_FACTOR) {
     432                        unsigned i = proc->rdq.id + (--proc->rdq.itr % READYQ_SHARD_FACTOR);
     433                        if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
     434                }
     435                return 0p;
     436        }
     437
     438        __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     439                unsigned i = __tls_rand() % lanes.count;
     440                return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
     441        }
     442
     443        __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
     444                return search(cltr);
     445        }
     446#endif
     447
     448//=======================================================================
     449// Various Ready Queue utilities
     450//=======================================================================
     451// these function work the same or almost the same
     452// whether they are using work-stealing or relaxed fifo scheduling
     453
     454//-----------------------------------------------------------------------
     455// try to pop from a lane given by index w
     456static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     457        __STATS( stats.attempt++; )
     458
     459        // Get relevant elements locally
     460        __intrusive_lane_t & lane = lanes.data[w];
     461
     462        // If list looks empty retry
     463        if( is_empty(lane) ) {
     464                __STATS( stats.espec++; )
     465                return 0p;
     466        }
     467
     468        // If we can't get the lock retry
     469        if( !__atomic_try_acquire(&lane.lock) ) {
     470                __STATS( stats.elock++; )
     471                return 0p;
     472        }
     473
     474        // If list is empty, unlock and retry
     475        if( is_empty(lane) ) {
     476                __atomic_unlock(&lane.lock);
     477                __STATS( stats.eempty++; )
     478                return 0p;
     479        }
     480
     481        // Actually pop the list
     482        struct $thread * thrd;
     483        thrd = pop(lane);
     484
     485        /* paranoid */ verify(thrd);
     486        /* paranoid */ verify(lane.lock);
     487
     488        // Unlock and return
     489        __atomic_unlock(&lane.lock);
     490
     491        // Update statistics
     492        __STATS( stats.success++; )
     493
     494        #if defined(USE_WORK_STEALING)
     495                lanes.tscs[w].tv = thrd->link.ts;
    238496        #endif
    239         return [i, local];
     497
     498        // return the popped thread
     499        return thrd;
    240500}
    241501
    242502//-----------------------------------------------------------------------
    243 __attribute__((hot)) bool push(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    244         __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
    245 
    246         // write timestamp
    247         thrd->link.ts = rdtscl();
    248 
    249         __attribute__((unused)) bool local;
    250         __attribute__((unused)) int preferred;
    251         #if defined(BIAS)
    252                 preferred =
    253                         //*
    254                         kernelTLS().this_processor ? kernelTLS().this_processor->id * 4 : -1;
    255                         /*/
    256                         thrd->link.preferred * 4;
    257                         //*/
    258         #endif
    259 
    260         // Try to pick a lane and lock it
    261         unsigned i;
    262         do {
    263                 // Pick the index of a lane
    264                 // unsigned r = __tls_rand();
    265                 unsigned r = __tls_rand_fwd();
    266                 [i, local] = idx_from_r(r, preferred);
    267 
    268                 #if !defined(__CFA_NO_STATISTICS__)
    269                         if(local) {
    270                                 __tls_stats()->ready.pick.push.local++;
    271                         }
    272                 #endif
    273 
    274                 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    275 
    276                 #if !defined(__CFA_NO_STATISTICS__)
    277                         __tls_stats()->ready.pick.push.attempt++;
    278                 #endif
    279 
    280                 // If we can't lock it retry
    281         } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
    282 
    283         bool first = false;
    284 
    285         // Actually push it
    286         #ifdef USE_SNZI
    287                 bool lane_first =
    288         #endif
    289 
    290         push(lanes.data[i], thrd);
    291 
    292         #ifdef USE_SNZI
    293                 // If this lane used to be empty we need to do more
    294                 if(lane_first) {
    295                         // Check if the entire queue used to be empty
    296                         first = !query(snzi);
    297 
    298                         // Update the snzi
    299                         arrive( snzi, i );
    300                 }
    301         #endif
    302 
    303         __tls_rand_advance_bck();
    304 
    305         // Unlock and return
    306         __atomic_unlock( &lanes.data[i].lock );
    307 
    308         __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
    309 
    310         // Update statistics
    311         #if !defined(__CFA_NO_STATISTICS__)
    312                 #if defined(BIAS)
    313                         if( local ) __tls_stats()->ready.pick.push.lsuccess++;
    314                 #endif
    315                 __tls_stats()->ready.pick.push.success++;
    316         #endif
    317 
    318         // return whether or not the list was empty before this push
    319         return first;
    320 }
    321 
    322 static struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j);
    323 static struct $thread * try_pop(struct cluster * cltr, unsigned i);
    324 
    325 // Pop from the ready queue from a given cluster
    326 __attribute__((hot)) $thread * pop(struct cluster * cltr) with (cltr->ready_queue) {
    327         /* paranoid */ verify( lanes.count > 0 );
    328         unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
    329         int preferred;
    330         #if defined(BIAS)
    331                 // Don't bother trying locally too much
    332                 int local_tries = 8;
    333                 preferred = kernelTLS().this_processor->id * 4;
    334         #endif
    335 
    336 
    337         // As long as the list is not empty, try finding a lane that isn't empty and pop from it
    338         #ifdef USE_SNZI
    339                 while( query(snzi) ) {
    340         #else
    341                 for(25) {
    342         #endif
    343                 // Pick two lists at random
    344                 // unsigned ri = __tls_rand();
    345                 // unsigned rj = __tls_rand();
    346                 unsigned ri = __tls_rand_bck();
    347                 unsigned rj = __tls_rand_bck();
    348 
    349                 unsigned i, j;
    350                 __attribute__((unused)) bool locali, localj;
    351                 [i, locali] = idx_from_r(ri, preferred);
    352                 [j, localj] = idx_from_r(rj, preferred);
    353 
    354                 #if !defined(__CFA_NO_STATISTICS__)
    355                         if(locali) {
    356                                 __tls_stats()->ready.pick.pop.local++;
    357                         }
    358                         if(localj) {
    359                                 __tls_stats()->ready.pick.pop.local++;
    360                         }
    361                 #endif
    362 
    363                 i %= count;
    364                 j %= count;
    365 
    366                 // try popping from the 2 picked lists
    367                 struct $thread * thrd = try_pop(cltr, i, j);
    368                 if(thrd) {
    369                         #if defined(BIAS) && !defined(__CFA_NO_STATISTICS__)
    370                                 if( locali || localj ) __tls_stats()->ready.pick.pop.lsuccess++;
    371                         #endif
    372                         return thrd;
    373                 }
    374         }
    375 
    376         // All lanes where empty return 0p
    377         return 0p;
    378 }
    379 
    380 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
     503// try to pop from any lanes making sure you don't miss any threads push
     504// before the start of the function
     505static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
    381506        /* paranoid */ verify( lanes.count > 0 );
    382507        unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
     
    384509        for(i; count) {
    385510                unsigned idx = (offset + i) % count;
    386                 struct $thread * thrd = try_pop(cltr, idx);
     511                struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
    387512                if(thrd) {
    388513                        return thrd;
     
    394519}
    395520
    396 
    397521//-----------------------------------------------------------------------
    398 // Given 2 indexes, pick the list with the oldest push an try to pop from it
    399 static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j) with (cltr->ready_queue) {
    400         #if !defined(__CFA_NO_STATISTICS__)
    401                 __tls_stats()->ready.pick.pop.attempt++;
    402         #endif
    403 
    404         // Pick the bet list
    405         int w = i;
    406         if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
    407                 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
    408         }
    409 
    410         return try_pop(cltr, w);
    411 }
    412 
    413 static inline struct $thread * try_pop(struct cluster * cltr, unsigned w) with (cltr->ready_queue) {
    414         // Get relevant elements locally
    415         __intrusive_lane_t & lane = lanes.data[w];
    416 
    417         // If list looks empty retry
    418         if( is_empty(lane) ) return 0p;
    419 
    420         // If we can't get the lock retry
    421         if( !__atomic_try_acquire(&lane.lock) ) return 0p;
    422 
    423 
    424         // If list is empty, unlock and retry
    425         if( is_empty(lane) ) {
    426                 __atomic_unlock(&lane.lock);
    427                 return 0p;
    428         }
    429 
    430         // Actually pop the list
    431         struct $thread * thrd;
    432         thrd = pop(lane);
    433 
    434         /* paranoid */ verify(thrd);
    435         /* paranoid */ verify(lane.lock);
    436 
    437         #ifdef USE_SNZI
    438                 // If this was the last element in the lane
    439                 if(emptied) {
    440                         depart( snzi, w );
    441                 }
    442         #endif
    443 
    444         // Unlock and return
    445         __atomic_unlock(&lane.lock);
    446 
    447         // Update statistics
    448         #if !defined(__CFA_NO_STATISTICS__)
    449                 __tls_stats()->ready.pick.pop.success++;
    450         #endif
    451 
    452         // Update the thread bias
    453         thrd->link.preferred = w / 4;
    454 
    455         // return the popped thread
    456         return thrd;
    457 }
    458 //-----------------------------------------------------------------------
    459 
    460 bool remove_head(struct cluster * cltr, struct $thread * thrd) with (cltr->ready_queue) {
    461         for(i; lanes.count) {
    462                 __intrusive_lane_t & lane = lanes.data[i];
    463 
    464                 bool removed = false;
    465 
    466                 __atomic_acquire(&lane.lock);
    467                         if(head(lane)->link.next == thrd) {
    468                                 $thread * pthrd;
    469                                 pthrd = pop(lane);
    470 
    471                                 /* paranoid */ verify( pthrd == thrd );
    472 
    473                                 removed = true;
    474                                 #ifdef USE_SNZI
    475                                         if(emptied) {
    476                                                 depart( snzi, i );
    477                                         }
    478                                 #endif
    479                         }
    480                 __atomic_unlock(&lane.lock);
    481 
    482                 if( removed ) return true;
    483         }
    484         return false;
    485 }
    486 
    487 //-----------------------------------------------------------------------
    488 
     522// Check that all the intrusive queues in the data structure are still consistent
    489523static void check( __ready_queue_t & q ) with (q) {
    490         #if defined(__CFA_WITH_VERIFY__)
     524        #if defined(__CFA_WITH_VERIFY__) && !defined(USE_MPSC)
    491525                {
    492526                        for( idx ; lanes.count ) {
     
    499533                                assert(tail(sl)->link.prev->link.next == tail(sl) );
    500534
    501                                 if(sl.before.link.ts == 0l) {
     535                                if(is_empty(sl)) {
    502536                                        assert(tail(sl)->link.prev == head(sl));
    503537                                        assert(head(sl)->link.next == tail(sl));
     
    511545}
    512546
     547//-----------------------------------------------------------------------
     548// Given 2 indexes, pick the list with the oldest push an try to pop from it
     549static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
     550        // Pick the bet list
     551        int w = i;
     552        if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
     553                w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
     554        }
     555
     556        return try_pop(cltr, w __STATS(, stats));
     557}
     558
    513559// Call this function of the intrusive list was moved using memcpy
    514560// fixes the list so that the pointers back to anchors aren't left dangling
    515561static inline void fix(__intrusive_lane_t & ll) {
    516         // if the list is not empty then follow he pointer and fix its reverse
    517         if(!is_empty(ll)) {
    518                 head(ll)->link.next->link.prev = head(ll);
    519                 tail(ll)->link.prev->link.next = tail(ll);
    520         }
    521         // Otherwise just reset the list
    522         else {
    523                 verify(tail(ll)->link.next == 0p);
    524                 tail(ll)->link.prev = head(ll);
    525                 head(ll)->link.next = tail(ll);
    526                 verify(head(ll)->link.prev == 0p);
    527         }
     562        #if !defined(USE_MPSC)
     563                // if the list is not empty then follow he pointer and fix its reverse
     564                if(!is_empty(ll)) {
     565                        head(ll)->link.next->link.prev = head(ll);
     566                        tail(ll)->link.prev->link.next = tail(ll);
     567                }
     568                // Otherwise just reset the list
     569                else {
     570                        verify(tail(ll)->link.next == 0p);
     571                        tail(ll)->link.prev = head(ll);
     572                        head(ll)->link.next = tail(ll);
     573                        verify(head(ll)->link.prev == 0p);
     574                }
     575        #endif
     576}
     577
     578static void assign_list(unsigned & value, dlist(processor, processor) & list, unsigned count) {
     579        processor * it = &list`first;
     580        for(unsigned i = 0; i < count; i++) {
     581                /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
     582                it->rdq.id = value;
     583                it->rdq.target = -1u;
     584                value += READYQ_SHARD_FACTOR;
     585                it = &(*it)`next;
     586        }
     587}
     588
     589static void reassign_cltr_id(struct cluster * cltr) {
     590        unsigned preferred = 0;
     591        assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
     592        assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
     593}
     594
     595static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
     596        #if defined(USE_WORK_STEALING)
     597                lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
     598                for(i; lanes.count) {
     599                        lanes.tscs[i].tv = ts(lanes.data[i]);
     600                }
     601        #endif
    528602}
    529603
    530604// Grow the ready queue
    531 void ready_queue_grow  (struct cluster * cltr, int target) {
     605void ready_queue_grow(struct cluster * cltr) {
     606        size_t ncount;
     607        int target = cltr->procs.total;
     608
    532609        /* paranoid */ verify( ready_mutate_islocked() );
    533610        __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
     
    538615        // grow the ready queue
    539616        with( cltr->ready_queue ) {
    540                 #ifdef USE_SNZI
    541                         ^(snzi){};
    542                 #endif
    543 
    544617                // Find new count
    545618                // Make sure we always have atleast 1 list
    546                 size_t ncount = target >= 2 ? target * 4: 1;
     619                if(target >= 2) {
     620                        ncount = target * READYQ_SHARD_FACTOR;
     621                } else {
     622                        ncount = SEQUENTIAL_SHARD;
     623                }
    547624
    548625                // Allocate new array (uses realloc and memcpies the data)
     
    561638                // Update original
    562639                lanes.count = ncount;
    563 
    564                 #ifdef USE_SNZI
    565                         // Re-create the snzi
    566                         snzi{ log2( lanes.count / 8 ) };
    567                         for( idx; (size_t)lanes.count ) {
    568                                 if( !is_empty(lanes.data[idx]) ) {
    569                                         arrive(snzi, idx);
    570                                 }
    571                         }
    572                 #endif
    573         }
     640        }
     641
     642        fix_times(cltr);
     643
     644        reassign_cltr_id(cltr);
    574645
    575646        // Make sure that everything is consistent
     
    582653
    583654// Shrink the ready queue
    584 void ready_queue_shrink(struct cluster * cltr, int target) {
     655void ready_queue_shrink(struct cluster * cltr) {
    585656        /* paranoid */ verify( ready_mutate_islocked() );
    586657        __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
     
    589660        /* paranoid */ check( cltr->ready_queue );
    590661
     662        int target = cltr->procs.total;
     663
    591664        with( cltr->ready_queue ) {
    592                 #ifdef USE_SNZI
    593                         ^(snzi){};
    594                 #endif
    595 
    596665                // Remember old count
    597666                size_t ocount = lanes.count;
     
    599668                // Find new count
    600669                // Make sure we always have atleast 1 list
    601                 lanes.count = target >= 2 ? target * 4: 1;
     670                lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
    602671                /* paranoid */ verify( ocount >= lanes.count );
    603                 /* paranoid */ verify( lanes.count == target * 4 || target < 2 );
     672                /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
    604673
    605674                // for printing count the number of displaced threads
     
    644713                        fix(lanes.data[idx]);
    645714                }
    646 
    647                 #ifdef USE_SNZI
    648                         // Re-create the snzi
    649                         snzi{ log2( lanes.count / 8 ) };
    650                         for( idx; (size_t)lanes.count ) {
    651                                 if( !is_empty(lanes.data[idx]) ) {
    652                                         arrive(snzi, idx);
    653                                 }
    654                         }
    655                 #endif
    656         }
     715        }
     716
     717        fix_times(cltr);
     718
     719        reassign_cltr_id(cltr);
    657720
    658721        // Make sure that everything is consistent
  • libcfa/src/concurrency/ready_subqueue.hfa

    rfeacef9 r5407cdc  
    22
    33#define __CFA_NO_SCHED_STATS__
     4
     5#include "containers/queueLockFree.hfa"
    46
    57// Intrusives lanes which are used by the relaxed ready queue
    68struct __attribute__((aligned(128))) __intrusive_lane_t {
    79
    8         // anchor for the head and the tail of the queue
    9         __attribute__((aligned(128))) struct __sentinel_t {
    10                 // Link lists fields
    11                 // instrusive link field for threads
    12                 // must be exactly as in $thread
    13                 __thread_desc_link link;
    14         } before, after;
     10        #if defined(USE_MPSC)
     11                mpsc_queue($thread) queue;
     12                __attribute__((aligned(128)))
     13        #else
     14                // anchor for the head and the tail of the queue
     15                __attribute__((aligned(128))) struct __sentinel_t {
     16                        // Link lists fields
     17                        // instrusive link field for threads
     18                        // must be exactly as in $thread
     19                        __thread_desc_link link;
     20                } before, after;
     21        #endif
    1522
    1623        // spin lock protecting the queue
     
    3542// Get the head pointer (one before the first element) from the anchor
    3643static inline $thread * head(const __intrusive_lane_t & this) {
    37         $thread * rhead = ($thread *)(
    38                 (uintptr_t)( &this.before ) - offsetof( $thread, link )
    39         );
    40         /* paranoid */ verify(rhead);
    41         return rhead;
     44        #if defined(USE_MPSC)
     45                return this.queue.head;
     46        #else
     47                $thread * rhead = ($thread *)(
     48                        (uintptr_t)( &this.before ) - offsetof( $thread, link )
     49                );
     50                /* paranoid */ verify(rhead);
     51                return rhead;
     52        #endif
    4253}
    4354
    4455// Get the tail pointer (one after the last element) from the anchor
    4556static inline $thread * tail(const __intrusive_lane_t & this) {
    46         $thread * rtail = ($thread *)(
    47                 (uintptr_t)( &this.after ) - offsetof( $thread, link )
    48         );
    49         /* paranoid */ verify(rtail);
    50         return rtail;
     57        #if defined(USE_MPSC)
     58                return this.queue.tail;
     59        #else
     60                $thread * rtail = ($thread *)(
     61                        (uintptr_t)( &this.after ) - offsetof( $thread, link )
     62                );
     63                /* paranoid */ verify(rtail);
     64                return rtail;
     65        #endif
    5166}
    5267
     
    5570        this.lock = false;
    5671
    57         this.before.link.prev = 0p;
    58         this.before.link.next = tail(this);
    59         this.before.link.ts   = 0;
    60 
    61         this.after .link.prev = head(this);
    62         this.after .link.next = 0p;
    63         this.after .link.ts   = 0;
    64 
    65         #if !defined(__CFA_NO_SCHED_STATS__)
    66                 this.stat.diff = 0;
    67                 this.stat.push = 0;
    68                 this.stat.pop  = 0;
    69         #endif
    70 
    71         // We add a boat-load of assertions here because the anchor code is very fragile
    72         /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
    73         /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
    74         /* paranoid */ verify(head(this)->link.prev == 0p );
    75         /* paranoid */ verify(head(this)->link.next == tail(this) );
    76         /* paranoid */ verify(tail(this)->link.next == 0p );
    77         /* paranoid */ verify(tail(this)->link.prev == head(this) );
    78         /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
    79         /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
    80         /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
    81         /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
    82         /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
    83         /* paranoid */ verify(__alignof__(this) == 128);
    84         /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
     72        #if !defined(USE_MPSC)
     73                this.before.link.prev = 0p;
     74                this.before.link.next = tail(this);
     75                this.before.link.ts   = 0;
     76
     77                this.after .link.prev = head(this);
     78                this.after .link.next = 0p;
     79                this.after .link.ts   = 0;
     80
     81                #if !defined(__CFA_NO_SCHED_STATS__)
     82                        this.stat.diff = 0;
     83                        this.stat.push = 0;
     84                        this.stat.pop  = 0;
     85                #endif
     86
     87                // We add a boat-load of assertions here because the anchor code is very fragile
     88                /* paranoid */ verify(((uintptr_t)( head(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.before));
     89                /* paranoid */ verify(((uintptr_t)( tail(this) ) + offsetof( $thread, link )) == (uintptr_t)(&this.after ));
     90                /* paranoid */ verify(head(this)->link.prev == 0p );
     91                /* paranoid */ verify(head(this)->link.next == tail(this) );
     92                /* paranoid */ verify(tail(this)->link.next == 0p );
     93                /* paranoid */ verify(tail(this)->link.prev == head(this) );
     94                /* paranoid */ verify(&head(this)->link.prev == &this.before.link.prev );
     95                /* paranoid */ verify(&head(this)->link.next == &this.before.link.next );
     96                /* paranoid */ verify(&tail(this)->link.prev == &this.after .link.prev );
     97                /* paranoid */ verify(&tail(this)->link.next == &this.after .link.next );
     98                /* paranoid */ verify(__alignof__(__intrusive_lane_t) == 128);
     99                /* paranoid */ verify(__alignof__(this) == 128);
     100                /* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
     101        #endif
    85102}
    86103
    87104// Dtor is trivial
    88105void ^?{}( __intrusive_lane_t & this ) {
    89         // Make sure the list is empty
    90         /* paranoid */ verify(head(this)->link.prev == 0p );
    91         /* paranoid */ verify(head(this)->link.next == tail(this) );
    92         /* paranoid */ verify(tail(this)->link.next == 0p );
    93         /* paranoid */ verify(tail(this)->link.prev == head(this) );
     106        #if !defined(USE_MPSC)
     107                // Make sure the list is empty
     108                /* paranoid */ verify(head(this)->link.prev == 0p );
     109                /* paranoid */ verify(head(this)->link.next == tail(this) );
     110                /* paranoid */ verify(tail(this)->link.next == 0p );
     111                /* paranoid */ verify(tail(this)->link.prev == head(this) );
     112        #endif
    94113}
    95114
     
    97116// returns true of lane was empty before push, false otherwise
    98117bool push(__intrusive_lane_t & this, $thread * node) {
    99         #if defined(__CFA_WITH_VERIFY__)
    100                 /* paranoid */ verify(this.lock);
    101                 /* paranoid */ verify(node->link.ts != 0);
    102                 /* paranoid */ verify(node->link.next == 0p);
    103                 /* paranoid */ verify(node->link.prev == 0p);
    104                 /* paranoid */ verify(tail(this)->link.next == 0p);
    105                 /* paranoid */ verify(head(this)->link.prev == 0p);
    106 
     118        #if defined(USE_MPSC)
     119                inline $thread * volatile & ?`next ( $thread * this )  __attribute__((const)) {
     120                        return this->link.next;
     121                }
     122                push(this.queue, node);
     123        #else
     124                #if defined(__CFA_WITH_VERIFY__)
     125                        /* paranoid */ verify(this.lock);
     126                        /* paranoid */ verify(node->link.ts != 0);
     127                        /* paranoid */ verify(node->link.next == 0p);
     128                        /* paranoid */ verify(node->link.prev == 0p);
     129                        /* paranoid */ verify(tail(this)->link.next == 0p);
     130                        /* paranoid */ verify(head(this)->link.prev == 0p);
     131
     132                        if(this.before.link.ts == 0l) {
     133                                /* paranoid */ verify(tail(this)->link.prev == head(this));
     134                                /* paranoid */ verify(head(this)->link.next == tail(this));
     135                        } else {
     136                                /* paranoid */ verify(tail(this)->link.prev != head(this));
     137                                /* paranoid */ verify(head(this)->link.next != tail(this));
     138                        }
     139                #endif
     140
     141                // Get the relevant nodes locally
     142                $thread * tail = tail(this);
     143                $thread * prev = tail->link.prev;
     144
     145                // Do the push
     146                node->link.next = tail;
     147                node->link.prev = prev;
     148                prev->link.next = node;
     149                tail->link.prev = node;
     150
     151                // Update stats
     152                #if !defined(__CFA_NO_SCHED_STATS__)
     153                        this.stat.diff++;
     154                        this.stat.push++;
     155                #endif
     156
     157                verify(node->link.next == tail(this));
     158
     159                // Check if the queue used to be empty
    107160                if(this.before.link.ts == 0l) {
    108                         /* paranoid */ verify(tail(this)->link.prev == head(this));
    109                         /* paranoid */ verify(head(this)->link.next == tail(this));
    110                 } else {
    111                         /* paranoid */ verify(tail(this)->link.prev != head(this));
    112                         /* paranoid */ verify(head(this)->link.next != tail(this));
    113                 }
    114         #endif
    115 
    116         // Get the relevant nodes locally
    117         $thread * tail = tail(this);
    118         $thread * prev = tail->link.prev;
    119 
    120         // Do the push
    121         node->link.next = tail;
    122         node->link.prev = prev;
    123         prev->link.next = node;
    124         tail->link.prev = node;
    125 
    126         // Update stats
    127         #if !defined(__CFA_NO_SCHED_STATS__)
    128                 this.stat.diff++;
    129                 this.stat.push++;
    130         #endif
    131 
    132         verify(node->link.next == tail(this));
    133 
    134         // Check if the queue used to be empty
    135         if(this.before.link.ts == 0l) {
    136                 this.before.link.ts = node->link.ts;
    137                 /* paranoid */ verify(node->link.prev == head(this));
    138                 return true;
    139         }
    140         return false;
     161                        this.before.link.ts = node->link.ts;
     162                        /* paranoid */ verify(node->link.prev == head(this));
     163                        return true;
     164                }
     165                return false;
     166        #endif
    141167}
    142168
     
    146172$thread * pop(__intrusive_lane_t & this) {
    147173        /* paranoid */ verify(this.lock);
    148         /* paranoid */ verify(this.before.link.ts != 0ul);
    149 
    150         // Get anchors locally
    151         $thread * head = head(this);
    152         $thread * tail = tail(this);
    153 
    154         // Get the relevant nodes locally
    155         $thread * node = head->link.next;
    156         $thread * next = node->link.next;
    157 
    158         /* paranoid */ verify(node != tail);
    159         /* paranoid */ verify(node);
    160 
    161         // Do the pop
    162         head->link.next = next;
    163         next->link.prev = head;
    164         node->link.next = 0p;
    165         node->link.prev = 0p;
    166 
    167         // Update head time stamp
    168         this.before.link.ts = next->link.ts;
    169 
    170         // Update stats
    171         #ifndef __CFA_NO_SCHED_STATS__
    172                 this.stat.diff--;
    173                 this.stat.pop ++;
    174         #endif
    175 
    176         // Check if we emptied list and return accordingly
    177         /* paranoid */ verify(tail(this)->link.next == 0p);
    178         /* paranoid */ verify(head(this)->link.prev == 0p);
    179         if(next == tail) {
    180                 /* paranoid */ verify(this.before.link.ts == 0);
    181                 /* paranoid */ verify(tail(this)->link.prev == head(this));
    182                 /* paranoid */ verify(head(this)->link.next == tail(this));
    183                 return node;
    184         }
    185         else {
    186                 /* paranoid */ verify(next->link.ts != 0);
    187                 /* paranoid */ verify(tail(this)->link.prev != head(this));
    188                 /* paranoid */ verify(head(this)->link.next != tail(this));
    189                 /* paranoid */ verify(this.before.link.ts != 0);
    190                 return node;
    191         }
     174        #if defined(USE_MPSC)
     175                inline $thread * volatile & ?`next ( $thread * this )  __attribute__((const)) {
     176                        return this->link.next;
     177                }
     178                return pop(this.queue);
     179        #else
     180                /* paranoid */ verify(this.before.link.ts != 0ul);
     181
     182                // Get anchors locally
     183                $thread * head = head(this);
     184                $thread * tail = tail(this);
     185
     186                // Get the relevant nodes locally
     187                $thread * node = head->link.next;
     188                $thread * next = node->link.next;
     189
     190                /* paranoid */ verify(node != tail);
     191                /* paranoid */ verify(node);
     192
     193                // Do the pop
     194                head->link.next = next;
     195                next->link.prev = head;
     196                node->link.next = 0p;
     197                node->link.prev = 0p;
     198
     199                // Update head time stamp
     200                this.before.link.ts = next->link.ts;
     201
     202                // Update stats
     203                #ifndef __CFA_NO_SCHED_STATS__
     204                        this.stat.diff--;
     205                        this.stat.pop ++;
     206                #endif
     207
     208                // Check if we emptied list and return accordingly
     209                /* paranoid */ verify(tail(this)->link.next == 0p);
     210                /* paranoid */ verify(head(this)->link.prev == 0p);
     211                if(next == tail) {
     212                        /* paranoid */ verify(this.before.link.ts == 0);
     213                        /* paranoid */ verify(tail(this)->link.prev == head(this));
     214                        /* paranoid */ verify(head(this)->link.next == tail(this));
     215                        return node;
     216                }
     217                else {
     218                        /* paranoid */ verify(next->link.ts != 0);
     219                        /* paranoid */ verify(tail(this)->link.prev != head(this));
     220                        /* paranoid */ verify(head(this)->link.next != tail(this));
     221                        /* paranoid */ verify(this.before.link.ts != 0);
     222                        return node;
     223                }
     224        #endif
    192225}
    193226
    194227// Check whether or not list is empty
    195228static inline bool is_empty(__intrusive_lane_t & this) {
    196         // Cannot verify here since it may not be locked
    197         return this.before.link.ts == 0;
     229        #if defined(USE_MPSC)
     230                return this.queue.head == 0p;
     231        #else
     232                // Cannot verify here since it may not be locked
     233                return this.before.link.ts == 0;
     234        #endif
    198235}
    199236
    200237// Return the timestamp
    201238static inline unsigned long long ts(__intrusive_lane_t & this) {
    202         // Cannot verify here since it may not be locked
    203         return this.before.link.ts;
    204 }
     239        #if defined(USE_MPSC)
     240                $thread * tl = this.queue.head;
     241                if(!tl) return -1ull;
     242                return tl->link.ts;
     243        #else
     244                // Cannot verify here since it may not be locked
     245                return this.before.link.ts;
     246        #endif
     247}
     248
     249// Aligned timestamps which are used by the relaxed ready queue
     250struct __attribute__((aligned(128))) __timestamp_t {
     251        volatile unsigned long long tv;
     252};
     253
     254void  ?{}(__timestamp_t & this) { this.tv = 0; }
     255void ^?{}(__timestamp_t & this) {}
  • libcfa/src/concurrency/stats.cfa

    rfeacef9 r5407cdc  
    55#include <inttypes.h>
    66#include "bits/debug.hfa"
     7#include "bits/locks.hfa"
    78#include "stats.hfa"
     9#include "strstream.hfa"
    810
    911#if !defined(__CFA_NO_STATISTICS__)
    1012        void __init_stats( struct __stats_t * stats ) {
    11                 stats->ready.pick.push.attempt  = 0;
    12                 stats->ready.pick.push.success  = 0;
    13                 stats->ready.pick.push.local    = 0;
    14                 stats->ready.pick.push.lsuccess = 0;
    15                 stats->ready.pick.pop .probe    = 0;
    16                 stats->ready.pick.pop .attempt  = 0;
    17                 stats->ready.pick.pop .success  = 0;
    18                 stats->ready.pick.pop .local    = 0;
    19                 stats->ready.pick.pop .lsuccess = 0;
     13                stats->ready.push.local.attempt = 0;
     14                stats->ready.push.local.success = 0;
     15                stats->ready.push.share.attempt = 0;
     16                stats->ready.push.share.success = 0;
     17                stats->ready.push.extrn.attempt = 0;
     18                stats->ready.push.extrn.success = 0;
     19                stats->ready.pop.local .attempt = 0;
     20                stats->ready.pop.local .success = 0;
     21                stats->ready.pop.local .elock   = 0;
     22                stats->ready.pop.local .eempty  = 0;
     23                stats->ready.pop.local .espec   = 0;
     24                stats->ready.pop.help  .attempt = 0;
     25                stats->ready.pop.help  .success = 0;
     26                stats->ready.pop.help  .elock   = 0;
     27                stats->ready.pop.help  .eempty  = 0;
     28                stats->ready.pop.help  .espec   = 0;
     29                stats->ready.pop.steal .attempt = 0;
     30                stats->ready.pop.steal .success = 0;
     31                stats->ready.pop.steal .elock   = 0;
     32                stats->ready.pop.steal .eempty  = 0;
     33                stats->ready.pop.steal .espec   = 0;
     34                stats->ready.pop.search.attempt = 0;
     35                stats->ready.pop.search.success = 0;
     36                stats->ready.pop.search.elock   = 0;
     37                stats->ready.pop.search.eempty  = 0;
     38                stats->ready.pop.search.espec   = 0;
    2039                stats->ready.threads.migration = 0;
     40                stats->ready.threads.extunpark = 0;
     41                stats->ready.threads.threads   = 0;
    2142                stats->ready.sleep.halts   = 0;
    2243                stats->ready.sleep.cancels = 0;
     
    2546
    2647                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    27                         stats->io.submit_q.submit_avg.rdy = 0;
    28                         stats->io.submit_q.submit_avg.csm = 0;
    29                         stats->io.submit_q.submit_avg.cnt = 0;
    30                         stats->io.submit_q.look_avg.val   = 0;
    31                         stats->io.submit_q.look_avg.cnt   = 0;
    32                         stats->io.submit_q.look_avg.block = 0;
    33                         stats->io.submit_q.alloc_avg.val   = 0;
    34                         stats->io.submit_q.alloc_avg.cnt   = 0;
    35                         stats->io.submit_q.alloc_avg.block = 0;
    36                         stats->io.submit_q.helped = 0;
    37                         stats->io.submit_q.leader = 0;
    38                         stats->io.submit_q.busy   = 0;
    39                         stats->io.complete_q.completed_avg.val = 0;
    40                         stats->io.complete_q.completed_avg.cnt = 0;
    41                         stats->io.complete_q.blocks = 0;
     48                        stats->io.alloc.fast        = 0;
     49                        stats->io.alloc.slow        = 0;
     50                        stats->io.alloc.fail        = 0;
     51                        stats->io.alloc.revoke      = 0;
     52                        stats->io.alloc.block       = 0;
     53                        stats->io.submit.fast       = 0;
     54                        stats->io.submit.slow       = 0;
     55                        stats->io.flush.external    = 0;
     56                        stats->io.calls.flush       = 0;
     57                        stats->io.calls.submitted   = 0;
     58                        stats->io.calls.drain       = 0;
     59                        stats->io.calls.completed   = 0;
     60                        stats->io.calls.errors.busy = 0;
     61                        stats->io.poller.sleeps     = 0;
     62                #endif
     63
     64                #if defined(CFA_STATS_ARRAY)
     65                        stats->array.values = alloc(CFA_STATS_ARRAY);
     66                        stats->array.cnt = 0;
    4267                #endif
    4368        }
    4469
    4570        void __tally_stats( struct __stats_t * cltr, struct __stats_t * proc ) {
    46                 __atomic_fetch_add( &cltr->ready.pick.push.attempt , proc->ready.pick.push.attempt , __ATOMIC_SEQ_CST ); proc->ready.pick.push.attempt  = 0;
    47                 __atomic_fetch_add( &cltr->ready.pick.push.success , proc->ready.pick.push.success , __ATOMIC_SEQ_CST ); proc->ready.pick.push.success  = 0;
    48                 __atomic_fetch_add( &cltr->ready.pick.push.local   , proc->ready.pick.push.local   , __ATOMIC_SEQ_CST ); proc->ready.pick.push.local    = 0;
    49                 __atomic_fetch_add( &cltr->ready.pick.push.lsuccess, proc->ready.pick.push.lsuccess, __ATOMIC_SEQ_CST ); proc->ready.pick.push.lsuccess = 0;
    50                 __atomic_fetch_add( &cltr->ready.pick.pop .probe   , proc->ready.pick.pop .probe   , __ATOMIC_SEQ_CST ); proc->ready.pick.pop .probe    = 0;
    51                 __atomic_fetch_add( &cltr->ready.pick.pop .attempt , proc->ready.pick.pop .attempt , __ATOMIC_SEQ_CST ); proc->ready.pick.pop .attempt  = 0;
    52                 __atomic_fetch_add( &cltr->ready.pick.pop .success , proc->ready.pick.pop .success , __ATOMIC_SEQ_CST ); proc->ready.pick.pop .success  = 0;
    53                 __atomic_fetch_add( &cltr->ready.pick.pop .local   , proc->ready.pick.pop .local   , __ATOMIC_SEQ_CST ); proc->ready.pick.pop .local    = 0;
    54                 __atomic_fetch_add( &cltr->ready.pick.pop .lsuccess, proc->ready.pick.pop .lsuccess, __ATOMIC_SEQ_CST ); proc->ready.pick.pop .lsuccess = 0;
     71                __atomic_fetch_add( &cltr->ready.push.local.attempt, proc->ready.push.local.attempt, __ATOMIC_SEQ_CST ); proc->ready.push.local.attempt = 0;
     72                __atomic_fetch_add( &cltr->ready.push.local.success, proc->ready.push.local.success, __ATOMIC_SEQ_CST ); proc->ready.push.local.success = 0;
     73                __atomic_fetch_add( &cltr->ready.push.share.attempt, proc->ready.push.share.attempt, __ATOMIC_SEQ_CST ); proc->ready.push.share.attempt = 0;
     74                __atomic_fetch_add( &cltr->ready.push.share.success, proc->ready.push.share.success, __ATOMIC_SEQ_CST ); proc->ready.push.share.success = 0;
     75                __atomic_fetch_add( &cltr->ready.push.extrn.attempt, proc->ready.push.extrn.attempt, __ATOMIC_SEQ_CST ); proc->ready.push.extrn.attempt = 0;
     76                __atomic_fetch_add( &cltr->ready.push.extrn.success, proc->ready.push.extrn.success, __ATOMIC_SEQ_CST ); proc->ready.push.extrn.success = 0;
     77                __atomic_fetch_add( &cltr->ready.pop.local .attempt, proc->ready.pop.local .attempt, __ATOMIC_SEQ_CST ); proc->ready.pop.local .attempt = 0;
     78                __atomic_fetch_add( &cltr->ready.pop.local .success, proc->ready.pop.local .success, __ATOMIC_SEQ_CST ); proc->ready.pop.local .success = 0;
     79                __atomic_fetch_add( &cltr->ready.pop.local .elock  , proc->ready.pop.local .elock  , __ATOMIC_SEQ_CST ); proc->ready.pop.local .elock   = 0;
     80                __atomic_fetch_add( &cltr->ready.pop.local .eempty , proc->ready.pop.local .eempty , __ATOMIC_SEQ_CST ); proc->ready.pop.local .eempty  = 0;
     81                __atomic_fetch_add( &cltr->ready.pop.local .espec  , proc->ready.pop.local .espec  , __ATOMIC_SEQ_CST ); proc->ready.pop.local .espec   = 0;
     82                __atomic_fetch_add( &cltr->ready.pop.help  .attempt, proc->ready.pop.help  .attempt, __ATOMIC_SEQ_CST ); proc->ready.pop.help  .attempt = 0;
     83                __atomic_fetch_add( &cltr->ready.pop.help  .success, proc->ready.pop.help  .success, __ATOMIC_SEQ_CST ); proc->ready.pop.help  .success = 0;
     84                __atomic_fetch_add( &cltr->ready.pop.help  .elock  , proc->ready.pop.help  .elock  , __ATOMIC_SEQ_CST ); proc->ready.pop.help  .elock   = 0;
     85                __atomic_fetch_add( &cltr->ready.pop.help  .eempty , proc->ready.pop.help  .eempty , __ATOMIC_SEQ_CST ); proc->ready.pop.help  .eempty  = 0;
     86                __atomic_fetch_add( &cltr->ready.pop.help  .espec  , proc->ready.pop.help  .espec  , __ATOMIC_SEQ_CST ); proc->ready.pop.help  .espec   = 0;
     87                __atomic_fetch_add( &cltr->ready.pop.steal .attempt, proc->ready.pop.steal .attempt, __ATOMIC_SEQ_CST ); proc->ready.pop.steal .attempt = 0;
     88                __atomic_fetch_add( &cltr->ready.pop.steal .success, proc->ready.pop.steal .success, __ATOMIC_SEQ_CST ); proc->ready.pop.steal .success = 0;
     89                __atomic_fetch_add( &cltr->ready.pop.steal .elock  , proc->ready.pop.steal .elock  , __ATOMIC_SEQ_CST ); proc->ready.pop.steal .elock   = 0;
     90                __atomic_fetch_add( &cltr->ready.pop.steal .eempty , proc->ready.pop.steal .eempty , __ATOMIC_SEQ_CST ); proc->ready.pop.steal .eempty  = 0;
     91                __atomic_fetch_add( &cltr->ready.pop.steal .espec  , proc->ready.pop.steal .espec  , __ATOMIC_SEQ_CST ); proc->ready.pop.steal .espec   = 0;
     92                __atomic_fetch_add( &cltr->ready.pop.search.attempt, proc->ready.pop.search.attempt, __ATOMIC_SEQ_CST ); proc->ready.pop.search.attempt = 0;
     93                __atomic_fetch_add( &cltr->ready.pop.search.success, proc->ready.pop.search.success, __ATOMIC_SEQ_CST ); proc->ready.pop.search.success = 0;
     94                __atomic_fetch_add( &cltr->ready.pop.search.elock  , proc->ready.pop.search.elock  , __ATOMIC_SEQ_CST ); proc->ready.pop.search.elock   = 0;
     95                __atomic_fetch_add( &cltr->ready.pop.search.eempty , proc->ready.pop.search.eempty , __ATOMIC_SEQ_CST ); proc->ready.pop.search.eempty  = 0;
     96                __atomic_fetch_add( &cltr->ready.pop.search.espec  , proc->ready.pop.search.espec  , __ATOMIC_SEQ_CST ); proc->ready.pop.search.espec   = 0;
    5597                __atomic_fetch_add( &cltr->ready.threads.migration , proc->ready.threads.migration , __ATOMIC_SEQ_CST ); proc->ready.threads.migration  = 0;
     98                __atomic_fetch_add( &cltr->ready.threads.extunpark , proc->ready.threads.extunpark , __ATOMIC_SEQ_CST ); proc->ready.threads.extunpark  = 0;
     99                __atomic_fetch_add( &cltr->ready.threads.threads   , proc->ready.threads.threads   , __ATOMIC_SEQ_CST ); proc->ready.threads.threads    = 0;
    56100                __atomic_fetch_add( &cltr->ready.sleep.halts       , proc->ready.sleep.halts       , __ATOMIC_SEQ_CST ); proc->ready.sleep.halts        = 0;
    57101                __atomic_fetch_add( &cltr->ready.sleep.cancels     , proc->ready.sleep.cancels     , __ATOMIC_SEQ_CST ); proc->ready.sleep.cancels      = 0;
     
    60104
    61105                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    62                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.rdy     , proc->io.submit_q.submit_avg.rdy     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.rdy      = 0;
    63                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.csm     , proc->io.submit_q.submit_avg.csm     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.csm      = 0;
    64                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.avl     , proc->io.submit_q.submit_avg.avl     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.avl      = 0;
    65                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.cnt     , proc->io.submit_q.submit_avg.cnt     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.cnt      = 0;
    66                         __atomic_fetch_add( &cltr->io.submit_q.look_avg.val       , proc->io.submit_q.look_avg.val       , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.val        = 0;
    67                         __atomic_fetch_add( &cltr->io.submit_q.look_avg.cnt       , proc->io.submit_q.look_avg.cnt       , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.cnt        = 0;
    68                         __atomic_fetch_add( &cltr->io.submit_q.look_avg.block     , proc->io.submit_q.look_avg.block     , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.block      = 0;
    69                         __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.val      , proc->io.submit_q.alloc_avg.val      , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.val       = 0;
    70                         __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt      , proc->io.submit_q.alloc_avg.cnt      , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.cnt       = 0;
    71                         __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block    , proc->io.submit_q.alloc_avg.block    , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.block     = 0;
    72                         __atomic_fetch_add( &cltr->io.submit_q.helped             , proc->io.submit_q.helped             , __ATOMIC_SEQ_CST ); proc->io.submit_q.helped              = 0;
    73                         __atomic_fetch_add( &cltr->io.submit_q.leader             , proc->io.submit_q.leader             , __ATOMIC_SEQ_CST ); proc->io.submit_q.leader              = 0;
    74                         __atomic_fetch_add( &cltr->io.submit_q.busy               , proc->io.submit_q.busy               , __ATOMIC_SEQ_CST ); proc->io.submit_q.busy                = 0;
    75                         __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val, proc->io.complete_q.completed_avg.val, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.val = 0;
    76                         __atomic_fetch_add( &cltr->io.complete_q.completed_avg.cnt, proc->io.complete_q.completed_avg.cnt, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.cnt = 0;
    77                         __atomic_fetch_add( &cltr->io.complete_q.blocks           , proc->io.complete_q.blocks           , __ATOMIC_SEQ_CST ); proc->io.complete_q.blocks            = 0;
     106                        __atomic_fetch_add( &cltr->io.alloc.fast       , proc->io.alloc.fast       , __ATOMIC_SEQ_CST ); proc->io.alloc.fast        = 0;
     107                        __atomic_fetch_add( &cltr->io.alloc.slow       , proc->io.alloc.slow       , __ATOMIC_SEQ_CST ); proc->io.alloc.slow        = 0;
     108                        __atomic_fetch_add( &cltr->io.alloc.fail       , proc->io.alloc.fail       , __ATOMIC_SEQ_CST ); proc->io.alloc.fail        = 0;
     109                        __atomic_fetch_add( &cltr->io.alloc.revoke     , proc->io.alloc.revoke     , __ATOMIC_SEQ_CST ); proc->io.alloc.revoke      = 0;
     110                        __atomic_fetch_add( &cltr->io.alloc.block      , proc->io.alloc.block      , __ATOMIC_SEQ_CST ); proc->io.alloc.block       = 0;
     111                        __atomic_fetch_add( &cltr->io.submit.fast      , proc->io.submit.fast      , __ATOMIC_SEQ_CST ); proc->io.submit.fast       = 0;
     112                        __atomic_fetch_add( &cltr->io.submit.slow      , proc->io.submit.slow      , __ATOMIC_SEQ_CST ); proc->io.submit.slow       = 0;
     113                        __atomic_fetch_add( &cltr->io.flush.external   , proc->io.flush.external   , __ATOMIC_SEQ_CST ); proc->io.flush.external    = 0;
     114                        __atomic_fetch_add( &cltr->io.calls.flush      , proc->io.calls.flush      , __ATOMIC_SEQ_CST ); proc->io.calls.flush       = 0;
     115                        __atomic_fetch_add( &cltr->io.calls.submitted  , proc->io.calls.submitted  , __ATOMIC_SEQ_CST ); proc->io.calls.submitted   = 0;
     116                        __atomic_fetch_add( &cltr->io.calls.drain      , proc->io.calls.drain      , __ATOMIC_SEQ_CST ); proc->io.calls.drain       = 0;
     117                        __atomic_fetch_add( &cltr->io.calls.completed  , proc->io.calls.completed  , __ATOMIC_SEQ_CST ); proc->io.calls.completed   = 0;
     118                        __atomic_fetch_add( &cltr->io.calls.errors.busy, proc->io.calls.errors.busy, __ATOMIC_SEQ_CST ); proc->io.calls.errors.busy = 0;
     119                        __atomic_fetch_add( &cltr->io.poller.sleeps    , proc->io.poller.sleeps    , __ATOMIC_SEQ_CST ); proc->io.poller.sleeps     = 0;
    78120                #endif
    79121        }
    80122
     123        #define eng3(X) (ws(3, 3, unit(eng( X ))))
     124
    81125        void __print_stats( struct __stats_t * stats, int flags, const char * type, const char * name, void * id ) with( *stats ) {
    82126
     127                char buf[1024];
     128                ostrstream sstr = { buf, 1024 };
     129
    83130                if( flags & CFA_STATS_READY_Q ) {
    84                         double push_sur = (100.0 * ((double)ready.pick.push.success) / ready.pick.push.attempt);
    85                         double pop_sur  = (100.0 * ((double)ready.pick.pop .success) / ready.pick.pop .attempt);
    86 
    87                         double push_len = ((double)ready.pick.push.attempt) / ready.pick.push.success;
    88                         double pop_len  = ((double)ready.pick.pop .attempt) / ready.pick.pop .success;
    89 
    90                         double lpush_sur = (100.0 * ((double)ready.pick.push.lsuccess) / ready.pick.push.local);
    91                         double lpop_sur  = (100.0 * ((double)ready.pick.pop .lsuccess) / ready.pick.pop .local);
    92 
    93                         double lpush_len = ((double)ready.pick.push.local) / ready.pick.push.lsuccess;
    94                         double lpop_len  = ((double)ready.pick.pop .local) / ready.pick.pop .lsuccess;
    95 
    96                         __cfaabi_bits_print_safe( STDOUT_FILENO,
    97                                 "----- %s \"%s\" (%p) - Ready Q Stats -----\n"
    98                                 "- total threads run      : %'15" PRIu64 "\n"
    99                                 "- total threads scheduled: %'15" PRIu64 "\n"
    100                                 "- push average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    101                                 "- pop  average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    102                                 "- local push avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    103                                 "- local pop  avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    104                                 "- thread migrations      : %'15" PRIu64 "\n"
    105                                 "- Idle Sleep -\n"
    106                                 "-- halts                 : %'15" PRIu64 "\n"
    107                                 "-- cancelled halts       : %'15" PRIu64 "\n"
    108                                 "-- schedule wake         : %'15" PRIu64 "\n"
    109                                 "-- wake on exit          : %'15" PRIu64 "\n"
    110                                 "\n"
    111                                 , type, name, id
    112                                 , ready.pick.pop.success
    113                                 , ready.pick.push.success
    114                                 , push_len, push_sur, ready.pick.push.attempt
    115                                 , pop_len , pop_sur , ready.pick.pop .attempt
    116                                 , lpush_len, lpush_sur, ready.pick.push.local
    117                                 , lpop_len , lpop_sur , ready.pick.pop .local
    118                                 , ready.threads.migration
    119                                 , ready.sleep.halts, ready.sleep.cancels, ready.sleep.wakes, ready.sleep.exits
    120                         );
     131
     132                        sstr | "----- " | type | "\"" | name | "\" (" | "" | id | "" | ") - Ready Q Stats -----";
     133
     134                        uint64_t totalR = ready.pop.local.success + ready.pop.help.success + ready.pop.steal.success + ready.pop.search.success;
     135                        uint64_t totalS = ready.push.local.success + ready.push.share.success + ready.push.extrn.success;
     136                        sstr | "- totals   : " | eng3(totalR) | "run," | eng3(totalS) | "schd (" | eng3(ready.push.extrn.success) | "ext," | eng3(ready.threads.migration) | "mig," | eng3(ready.threads.extunpark) | " eupk)";
     137
     138                        double push_len = ((double)ready.push.local.attempt + ready.push.share.attempt + ready.push.extrn.attempt) / totalS;
     139                        double sLcl_len = ready.push.local.success ? ((double)ready.push.local.attempt) / ready.push.local.success : 0;
     140                        double sOth_len = ready.push.share.success ? ((double)ready.push.share.attempt) / ready.push.share.success : 0;
     141                        double sExt_len = ready.push.extrn.success ? ((double)ready.push.extrn.attempt) / ready.push.extrn.success : 0;
     142                        sstr | "- push avg : " | ws(3, 3, push_len)
     143                             | "- l: " | eng3(ready.push.local.attempt) | " (" | ws(3, 3, sLcl_len) | ")"
     144                             | ", s: " | eng3(ready.push.share.attempt) | " (" | ws(3, 3, sOth_len) | ")"
     145                             | ", e: " | eng3(ready.push.extrn.attempt) | " (" | ws(3, 3, sExt_len) | ")";
     146
     147                        double rLcl_pc = (100.0 * (double)ready.pop.local .success) / totalR;
     148                        sstr | "- local    : " | eng3(ready.pop.local .success) | "-"| ws(3, 3, rLcl_pc) | '%'
     149                             | " (" | eng3(ready.pop.local .attempt) | " try," | eng3(ready.pop.local .espec) | " spc," | eng3(ready.pop.local .elock) | " lck," | eng3(ready.pop.local .eempty) | " ept)";
     150                        double rHlp_pc = (100.0 * (double)ready.pop.help  .success) / totalR;
     151                        sstr | "- help     : " | eng3(ready.pop.help  .success) | "-"| ws(3, 3, rHlp_pc) | '%'
     152                             | " (" | eng3(ready.pop.help  .attempt) | " try," | eng3(ready.pop.help  .espec) | " spc," | eng3(ready.pop.help  .elock) | " lck," | eng3(ready.pop.help  .eempty) | " ept)";
     153                        double rStl_pc = (100.0 * (double)ready.pop.steal .success) / totalR;
     154                        sstr | "- steal    : " | eng3(ready.pop.steal .success) | "-"| ws(3, 3, rStl_pc) | '%'
     155                             | " (" | eng3(ready.pop.steal .attempt) | " try," | eng3(ready.pop.steal .espec) | " spc," | eng3(ready.pop.steal .elock) | " lck," | eng3(ready.pop.steal .eempty) | " ept)";
     156                        double rSch_pc = (100.0 * (double)ready.pop.search.success) / totalR;
     157                        sstr | "- search   : " | eng3(ready.pop.search.success) | "-"| ws(3, 3, rSch_pc) | '%'
     158                             | " (" | eng3(ready.pop.search.attempt) | " try," | eng3(ready.pop.search.espec) | " spc," | eng3(ready.pop.search.elock) | " lck," | eng3(ready.pop.search.eempty) | " ept)";
     159
     160                        sstr | "- Idle Slp : " | eng3(ready.sleep.halts) | "halt," | eng3(ready.sleep.cancels) | "cancel," | eng3(ready.sleep.wakes) | "wake," | eng3(ready.sleep.exits) | "exit";
     161                        sstr | nl;
    121162                }
    122163
    123164                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    124165                        if( flags & CFA_STATS_IO ) {
    125                                 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt;
    126                                 double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt;
    127 
    128                                 double lavgv = 0;
    129                                 double lavgb = 0;
    130                                 if(io.submit_q.look_avg.cnt != 0) {
    131                                         lavgv = ((double)io.submit_q.look_avg.val  ) / io.submit_q.look_avg.cnt;
    132                                         lavgb = ((double)io.submit_q.look_avg.block) / io.submit_q.look_avg.cnt;
    133                                 }
    134 
    135                                 double aavgv = 0;
    136                                 double aavgb = 0;
    137                                 if(io.submit_q.alloc_avg.cnt != 0) {
    138                                         aavgv = ((double)io.submit_q.alloc_avg.val  ) / io.submit_q.alloc_avg.cnt;
    139                                         aavgb = ((double)io.submit_q.alloc_avg.block) / io.submit_q.alloc_avg.cnt;
    140                                 }
    141 
    142                                 __cfaabi_bits_print_safe( STDOUT_FILENO,
    143                                         "----- %s \"%s\" (%p) - I/O Stats -----\n"
    144                                         "- total submit calls     : %'15" PRIu64 "\n"
    145                                         "- avg ready entries      : %'18.2lf\n"
    146                                         "- avg submitted entries  : %'18.2lf\n"
    147                                         "- total helped entries   : %'15" PRIu64 "\n"
    148                                         "- total leader entries   : %'15" PRIu64 "\n"
    149                                         "- total busy submit      : %'15" PRIu64 "\n"
    150                                         "- total ready search     : %'15" PRIu64 "\n"
    151                                         "- avg ready search len   : %'18.2lf\n"
    152                                         "- avg ready search block : %'18.2lf\n"
    153                                         "- total alloc search     : %'15" PRIu64 "\n"
    154                                         "- avg alloc search len   : %'18.2lf\n"
    155                                         "- avg alloc search block : %'18.2lf\n"
    156                                         "- total wait calls       : %'15" PRIu64 "\n"
    157                                         "- avg completion/wait    : %'18.2lf\n"
    158                                         "- total completion blocks: %'15" PRIu64 "\n"
    159                                         "\n"
    160                                         , type,  name, id
    161                                         , io.submit_q.submit_avg.cnt
    162                                         , avgrdy, avgcsm
    163                                         , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy
    164                                         , io.submit_q.look_avg.cnt
    165                                         , lavgv, lavgb
    166                                         , io.submit_q.alloc_avg.cnt
    167                                         , aavgv, aavgb
    168                                         , io.complete_q.completed_avg.cnt
    169                                         , ((double)io.complete_q.completed_avg.val) / io.complete_q.completed_avg.cnt
    170                                         , io.complete_q.blocks
    171                                 );
     166                                sstr | "----- " | type | "\"" | name | "\" (" | "" | id | "" | ") - I/O Stats -----";
     167
     168                                uint64_t total_allocs = io.alloc.fast + io.alloc.slow;
     169                                double avgfasta = (100.0 * (double)io.alloc.fast) / total_allocs;
     170                                sstr | "- total allocations : " | eng3(io.alloc.fast) | "fast," | eng3(io.alloc.slow) | "slow (" | ws(3, 3, avgfasta) | "%)";
     171                                sstr | "-     failures      : " | eng3(io.alloc.fail) | "oom, " | eng3(io.alloc.revoke) | "rvk, " | eng3(io.alloc.block) | "blk";
     172
     173                                uint64_t total_submits = io.submit.fast + io.submit.slow;
     174                                double avgfasts = (100.0 * (double)io.submit.fast) / total_submits;
     175                                sstr | "- total submits     : " | eng3(io.submit.fast) | "fast," | eng3(io.submit.slow) | "slow (" | ws(3, 3, avgfasts) | "%)";
     176                                sstr | "- flush external    : " | eng3(io.flush.external);
     177
     178                                sstr | "- io_uring_enter    : " | eng3(io.calls.flush) | " (" | eng3(io.calls.drain) | ", " | eng3(io.calls.errors.busy) | " EBUSY)";
     179
     180                                double avgsubs = ((double)io.calls.submitted) / io.calls.flush;
     181                                double avgcomp = ((double)io.calls.completed) / io.calls.drain;
     182                                sstr | "-     submits       : " | eng3(io.calls.submitted) | "(" | ws(3, 3, avgsubs) | "/flush)";
     183                                sstr | "-     completes     : " | eng3(io.calls.completed) | "(" | ws(3, 3, avgcomp) | "/drain)";
     184
     185                                sstr | "- poller sleeping   : " | eng3(io.poller.sleeps);
     186                                sstr | nl;
    172187                        }
    173188                #endif
     189
     190                if(flags) write( sstr, stdout );
    174191        }
     192
     193        #if defined(CFA_STATS_ARRAY)
     194                extern "C" {
     195                        #include <stdio.h>
     196                        #include <errno.h>
     197                        #include <sys/stat.h>
     198                        #include <fcntl.h>
     199                }
     200
     201                void __flush_stat( struct __stats_t * this, const char * name, void * handle) {
     202                        int ret = mkdir(".cfadata", 0755);
     203                        if(ret < 0 && errno != EEXIST) abort("Failed to create directory .cfadata: %d\n", errno);
     204
     205                        char filename[100];
     206                        snprintf(filename, 100, ".cfadata/%s%p.data", name, handle);
     207
     208                        int fd = open(filename, O_WRONLY | O_APPEND | O_CREAT, 0644);
     209                        if(fd < 0) abort("Failed to create file %s: %d\n", filename, errno);
     210
     211                        for(i; this->array.cnt) {
     212                                char line[100];
     213                                size_t n = snprintf(line, 100, "%llu, %lld\n", this->array.values[i].ts, this->array.values[i].value);
     214                                write(fd, line, n);
     215                        }
     216
     217                        this->array.cnt = 0;
     218                        close(fd);
     219                }
     220
     221                static __spinlock_t stats_lock;
     222
     223                void __push_stat( struct __stats_t * this, int64_t value, bool external, const char * name, void * handle ) {
     224                        if(external) lock(stats_lock __cfaabi_dbg_ctx2);
     225
     226                        if( this->array.cnt >= CFA_STATS_ARRAY ) __flush_stat( this, name, handle );
     227
     228                        size_t idx = this->array.cnt;
     229                        this->array.cnt++;
     230
     231                        if(external) unlock(stats_lock);
     232
     233                        this->array.values[idx].ts = rdtscl();
     234                        this->array.values[idx].value = value;
     235                }
     236        #endif
    175237#endif
  • libcfa/src/concurrency/stats.hfa

    rfeacef9 r5407cdc  
    11#pragma once
     2
     3// #define CFA_STATS_ARRAY 10000
    24
    35#include <stdint.h>
     
    1416        static inline void __print_stats( struct __stats_t *, int, const char *, const char *, void * ) {}
    1517#else
     18        struct __stats_readyQ_pop_t {
     19                // number of attemps at poping something
     20                volatile uint64_t attempt;
    1621
    17         struct __attribute__((aligned(64))) __stats_readQ_t {
     22                // number of successes at poping
     23                volatile uint64_t success;
     24
     25                // number of attempts failed due to the lock being held
     26                volatile uint64_t elock;
     27
     28                // number of attempts failed due to the queue being empty (lock held)
     29                volatile uint64_t eempty;
     30
     31                // number of attempts failed due to the queue looking empty (lock not held)
     32                volatile uint64_t espec;
     33        };
     34
     35        struct __attribute__((aligned(64))) __stats_readyQ_t {
     36                // Push statistic
    1837                struct {
    19                         // Push statistic
    2038                        struct {
    21                                 // number of attemps at pushing something
     39                                // number of attemps at pushing something to preferred queues
    2240                                volatile uint64_t attempt;
    2341
    24                                 // number of successes at pushing
     42                                // number of successes at pushing to preferred queues
    2543                                volatile uint64_t success;
     44                        }
     45                        // Stats for local queue within cluster
     46                        local,
    2647
    27                                 // number of attemps at pushing something to preferred queues
    28                                 volatile uint64_t local;
     48                        // Stats for non-local queues within cluster
     49                        share,
    2950
    30                                 // number of successes at pushing to preferred queues
    31                                 volatile uint64_t lsuccess;
    32                         } push;
     51                        // Stats from outside cluster
     52                        extrn;
     53                } push;
    3354
    34                         // Pop statistic
    35                         struct {
    36                                 // number of reads of the mask
    37                                 // picking an empty __cfa_readyQ_mask_t counts here
    38                                 // but not as an attempt
    39                                 volatile uint64_t probe;
     55                // Pop statistic
     56                struct {
     57                        // pop from local queue
     58                        __stats_readyQ_pop_t local;
    4059
    41                                 // number of attemps at poping something
    42                                 volatile uint64_t attempt;
     60                        // pop before looking at local queue
     61                        __stats_readyQ_pop_t help;
    4362
    44                                 // number of successes at poping
    45                                 volatile uint64_t success;
     63                        // pop from some other queue
     64                        __stats_readyQ_pop_t steal;
    4665
    47                                 // number of attemps at poping something to preferred queues
    48                                 volatile uint64_t local;
     66                        // pop when searching queues sequentially
     67                        __stats_readyQ_pop_t search;
     68                } pop;
    4969
    50                                 // number of successes at poping to preferred queues
    51                                 volatile uint64_t lsuccess;
    52                         } pop;
    53                 } pick;
    5470                struct {
    5571                        volatile uint64_t migration;
     72                        volatile uint64_t extunpark;
     73                        volatile  int64_t threads; // number of threads in the system, includes only local change
    5674                } threads;
    5775                struct {
     
    6684                struct __attribute__((aligned(64))) __stats_io_t{
    6785                        struct {
     86                                volatile uint64_t fast;
     87                                volatile uint64_t slow;
     88                                volatile uint64_t fail;
     89                                volatile uint64_t revoke;
     90                                volatile uint64_t block;
     91                        } alloc;
     92                        struct {
     93                                volatile uint64_t fast;
     94                                volatile uint64_t slow;
     95                        } submit;
     96                        struct {
     97                                volatile uint64_t external;
     98                        } flush;
     99                        struct {
     100                                volatile uint64_t drain;
     101                                volatile uint64_t completed;
     102                                volatile uint64_t flush;
     103                                volatile uint64_t submitted;
    68104                                struct {
    69                                         volatile uint64_t rdy;
    70                                         volatile uint64_t csm;
    71                                         volatile uint64_t avl;
    72                                         volatile uint64_t cnt;
    73                                 } submit_avg;
    74                                 struct {
    75                                         volatile uint64_t val;
    76                                         volatile uint64_t cnt;
    77                                         volatile uint64_t block;
    78                                 } look_avg;
    79                                 struct {
    80                                         volatile uint64_t val;
    81                                         volatile uint64_t cnt;
    82                                         volatile uint64_t block;
    83                                 } alloc_avg;
    84                                 volatile uint64_t helped;
    85                                 volatile uint64_t leader;
    86                                 volatile uint64_t busy;
    87                         } submit_q;
     105                                        volatile uint64_t busy;
     106                                } errors;
     107                        } calls;
    88108                        struct {
    89                                 struct {
    90                                         volatile uint64_t val;
    91                                         volatile uint64_t cnt;
    92                                 } completed_avg;
    93                                 volatile uint64_t blocks;
    94                         } complete_q;
     109                                volatile uint64_t sleeps;
     110                        } poller;
     111                };
     112        #endif
     113
     114        #if defined(CFA_STATS_ARRAY)
     115                struct __stats_elem_t {
     116                        long long int ts;
     117                        int64_t value;
    95118                };
    96119        #endif
    97120
    98121        struct __attribute__((aligned(128))) __stats_t {
    99                 __stats_readQ_t ready;
     122                __stats_readyQ_t ready;
    100123                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    101124                        __stats_io_t    io;
    102125                #endif
     126
     127                #if defined(CFA_STATS_ARRAY)
     128                        struct {
     129                                __stats_elem_t * values;
     130                                volatile size_t cnt;
     131                        } array;
     132                #endif
     133
    103134        };
    104135
     
    106137        void __tally_stats( struct __stats_t *, struct __stats_t * );
    107138        void __print_stats( struct __stats_t *, int, const char *, const char *, void * );
     139        #if defined(CFA_STATS_ARRAY)
     140                void __push_stat ( struct __stats_t *, int64_t value, bool external, const char * name, void * handle);
     141                void __flush_stat( struct __stats_t *, const char *, void * );
     142        #else
     143                static inline void __push_stat ( struct __stats_t *, int64_t, bool, const char *, void * ) {}
     144                static inline void __flush_stat( struct __stats_t *, const char *, void * ) {}
     145        #endif
    108146#endif
    109147
  • libcfa/src/concurrency/thread.cfa

    rfeacef9 r5407cdc  
    3939        link.next = 0p;
    4040        link.prev = 0p;
    41         link.preferred = -1;
     41        link.preferred = -1u;
     42        last_proc = 0p;
    4243        #if defined( __CFA_WITH_VERIFY__ )
    4344                canary = 0x0D15EA5E0D15EA5Ep;
     
    6263}
    6364
    64 FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t))
    65 
    6665forall(T &)
    6766void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src) {
     
    7372forall(T &)
    7473const char * msg(ThreadCancelled(T) *) {
    75         return "ThreadCancelled";
     74        return "ThreadCancelled(...)";
    7675}
    7776
    7877forall(T &)
    7978static void default_thread_cancel_handler(ThreadCancelled(T) & ) {
     79        // Improve this error message, can I do formatting?
    8080        abort( "Unhandled thread cancellation.\n" );
    8181}
    8282
    83 forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
     83forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T))
     84    | { EHM_DEFAULT_VTABLE(ThreadCancelled, (T)); })
    8485void ?{}( thread_dtor_guard_t & this,
    8586                T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) {
    86         $monitor * m = get_monitor(thrd);
     87        $monitor * m = get_monitor(thrd);
    8788        $thread * desc = get_thread(thrd);
    8889
     
    103104        }
    104105        desc->state = Cancelled;
    105         void(*defaultResumptionHandler)(ThreadCancelled(T) &) = 
     106        void(*defaultResumptionHandler)(ThreadCancelled(T) &) =
    106107                join ? cancelHandler : default_thread_cancel_handler;
    107108
     109        // TODO: Remove explitate vtable set once trac#186 is fixed.
    108110        ThreadCancelled(T) except;
    109         // TODO: Remove explitate vtable set once trac#186 is fixed.
    110         except.virtual_table = &get_exception_vtable(&except);
     111        except.virtual_table = &_default_vtable;
    111112        except.the_thread = &thrd;
    112113        except.the_exception = __cfaehm_cancellation_exception( cancellation );
    113         throwResume except;
     114        // Why is this cast required?
     115        throwResume (ThreadCancelled(T) &)except;
    114116
    115117        except.the_exception->virtual_table->free( except.the_exception );
     
    134136        /* paranoid */ verify( this_thrd->context.SP );
    135137
    136         __schedule_thread( this_thrd );
    137         enable_interrupts( __cfaabi_dbg_ctx );
     138        schedule_thread$( this_thrd );
     139        enable_interrupts();
    138140}
    139141
     
    158160
    159161//-----------------------------------------------------------------------------
    160 forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
     162forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T))
     163    | { EHM_DEFAULT_VTABLE(ThreadCancelled, (T)); })
    161164T & join( T & this ) {
    162165        thread_dtor_guard_t guard = { this, defaultResumptionHandler };
     
    167170        disable_interrupts();
    168171        uint64_t ret = __tls_rand();
    169         enable_interrupts( __cfaabi_dbg_ctx );
     172        enable_interrupts();
    170173        return ret;
    171174}
  • libcfa/src/concurrency/thread.hfa

    rfeacef9 r5407cdc  
    3232};
    3333
    34 FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
     34EHM_FORALL_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
    3535        thread_t * the_thread;
    3636        exception_t * the_exception;
     
    4242forall(T &)
    4343const char * msg(ThreadCancelled(T) *);
    44 
    45 // define that satisfies the trait without using the thread keyword
    46 #define DECL_THREAD(X) $thread* get_thread(X& this) __attribute__((const)) { return &this.__thrd; } void main(X& this)
    4744
    4845// Inline getters for threads/coroutines/monitors
     
    8279};
    8380
    84 forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
     81forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T))
     82    | { EHM_DEFAULT_VTABLE(ThreadCancelled, (T)); })
    8583void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) );
    8684void ^?{}( thread_dtor_guard_t & this );
     
    128126//----------
    129127// join
    130 forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
     128forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T))
     129    | { EHM_DEFAULT_VTABLE(ThreadCancelled, (T)); })
    131130T & join( T & this );
    132131
Note: See TracChangeset for help on using the changeset viewer.