Ignore:
Timestamp:
Jan 25, 2021, 3:45:42 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
c292244
Parents:
b6a8b31 (diff), 7158202 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
20 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/coroutine.cfa

    rb6a8b31 rd95969a  
    4646
    4747//-----------------------------------------------------------------------------
    48 FORALL_DATA_INSTANCE(CoroutineCancelled, (dtype coroutine_t), (coroutine_t))
    49 
    50 forall(dtype T)
     48FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t))
     49
     50forall(T &)
    5151void mark_exception(CoroutineCancelled(T) *) {}
    5252
    53 forall(dtype T)
     53forall(T &)
    5454void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src) {
    5555        dst->virtual_table = src->virtual_table;
     
    5858}
    5959
    60 forall(dtype T)
     60forall(T &)
    6161const char * msg(CoroutineCancelled(T) *) {
    6262        return "CoroutineCancelled(...)";
     
    6464
    6565// This code should not be inlined. It is the error path on resume.
    66 forall(dtype T | is_coroutine(T))
     66forall(T & | is_coroutine(T))
    6767void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ) {
    6868        verify( desc->cancellation );
     
    148148// Part of the Public API
    149149// Not inline since only ever called once per coroutine
    150 forall(dtype T | is_coroutine(T))
     150forall(T & | is_coroutine(T))
    151151void prime(T& cor) {
    152152        $coroutine* this = get_coroutine(cor);
  • libcfa/src/concurrency/coroutine.hfa

    rb6a8b31 rd95969a  
    2222//-----------------------------------------------------------------------------
    2323// Exception thrown from resume when a coroutine stack is cancelled.
    24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (dtype coroutine_t), (coroutine_t)) (
     24FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
    2525        coroutine_t * the_coroutine;
    2626        exception_t * the_exception;
    2727);
    2828
    29 forall(dtype T)
     29forall(T &)
    3030void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src);
    3131
    32 forall(dtype T)
     32forall(T &)
    3333const char * msg(CoroutineCancelled(T) *);
    3434
     
    3737// Anything that implements this trait can be resumed.
    3838// Anything that is resumed is a coroutine.
    39 trait is_coroutine(dtype T | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
     39trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
    4040        void main(T & this);
    4141        $coroutine * get_coroutine(T & this);
     
    6060//-----------------------------------------------------------------------------
    6161// Public coroutine API
    62 forall(dtype T | is_coroutine(T))
     62forall(T & | is_coroutine(T))
    6363void prime(T & cor);
    6464
     
    7272        void __cfactx_invoke_coroutine(void (*main)(void *), void * this);
    7373
    74         forall(dtype T)
     74        forall(T &)
    7575        void __cfactx_start(void (*main)(T &), struct $coroutine * cor, T & this, void (*invoke)(void (*main)(void *), void *));
    7676
     
    129129}
    130130
    131 forall(dtype T | is_coroutine(T))
     131forall(T & | is_coroutine(T))
    132132void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc );
    133133
    134134// Resume implementation inlined for performance
    135 forall(dtype T | is_coroutine(T))
     135forall(T & | is_coroutine(T))
    136136static inline T & resume(T & cor) {
    137137        // optimization : read TLS once and reuse it
  • libcfa/src/concurrency/future.hfa

    rb6a8b31 rd95969a  
    1919#include "monitor.hfa"
    2020
    21 forall( otype T ) {
     21forall( T ) {
    2222        struct future {
    2323                inline future_t;
     
    5858}
    5959
    60 forall( otype T ) {
     60forall( T ) {
    6161        monitor multi_future {
    6262                inline future_t;
  • libcfa/src/concurrency/io.cfa

    rb6a8b31 rd95969a  
    4141        #include "io/types.hfa"
    4242
    43         static const char * opcodes[] = {
     43        __attribute__((unused)) static const char * opcodes[] = {
    4444                "OP_NOP",
    4545                "OP_READV",
     
    173173                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174174                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
     175                        __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING %d returned %d\n", ring.fd, ret);
     176
    175177                        if( ret < 0 ) {
    176178                                switch((int)errno) {
    177179                                case EAGAIN:
    178180                                case EINTR:
     181                                case EBUSY:
    179182                                        ret = -1;
    180183                                        break;
     
    318321
    319322                __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
     323
     324                __ioctx_unregister( this );
    320325        }
    321326
     
    389394
    390395                        block++;
    391 
    392                         abort( "Kernel I/O : all submit queue entries used, yielding\n" );
    393396
    394397                        yield();
     
    465468                                sqe->flags,
    466469                                sqe->ioprio,
    467                                 sqe->off,
    468                                 sqe->addr,
     470                                (void*)sqe->off,
     471                                (void*)sqe->addr,
    469472                                sqe->len,
    470473                                sqe->accept_flags,
     
    491494                }
    492495                else if( ring.eager_submits ) {
    493                         __u32 picked = __submit_to_ready_array( ring, idx, mask );
     496                        __attribute__((unused)) __u32 picked = __submit_to_ready_array( ring, idx, mask );
    494497
    495498                        #if defined(LEADER_LOCK)
     
    629632                                        sqe->flags,
    630633                                        sqe->ioprio,
    631                                         sqe->off,
    632                                         sqe->addr,
     634                                        (void*)sqe->off,
     635                                        (void*)sqe->addr,
    633636                                        sqe->len,
    634637                                        sqe->accept_flags,
     
    642645                        __atomic_thread_fence( __ATOMIC_SEQ_CST );
    643646                        // Release the consumed SQEs
     647
    644648                        __release_consumed_submission( ring );
    645649                        // ring.submit_q.sqes[idx].user_data = 3ul64;
  • libcfa/src/concurrency/io/setup.cfa

    rb6a8b31 rd95969a  
    4242        void ^?{}(io_context & this, bool cluster_context) {}
    4343
     44        void register_fixed_files( io_context &, int *, unsigned ) {}
     45        void register_fixed_files( cluster    &, int *, unsigned ) {}
     46
    4447#else
    4548        #include <errno.h>
     
    110113
    111114        static struct {
    112                 pthread_t     thrd;    // pthread handle to io poller thread
    113                 void *        stack;   // pthread stack for io poller thread
    114                 int           epollfd; // file descriptor to the epoll instance
    115                 volatile bool run;     // Whether or not to continue
     115                      pthread_t  thrd;    // pthread handle to io poller thread
     116                      void *     stack;   // pthread stack for io poller thread
     117                      int        epollfd; // file descriptor to the epoll instance
     118                volatile     bool run;     // Whether or not to continue
     119                volatile     bool stopped; // Whether the poller has finished running
     120                volatile uint64_t epoch;   // Epoch used for memory reclamation
    116121        } iopoll;
    117122
     
    126131                __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
    127132
    128                 iopoll.run = true;
    129                 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
     133                iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
     134                iopoll.run     = true;
     135                iopoll.stopped = false;
     136                iopoll.epoch   = 0;
    130137        }
    131138
     
    171178                while( iopoll.run ) {
    172179                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
     180
     181                        // increment the epoch to notify any deleters we are starting a new cycle
     182                        __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
    173183
    174184                        // Wait for events
     
    197207                        }
    198208                }
     209
     210                __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
    199211
    200212                __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
     
    493505// I/O Context Sleep
    494506//=============================================================================================
    495         #define IOEVENTS EPOLLIN | EPOLLONESHOT
    496 
    497507        static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
    498508                struct epoll_event ev;
    499                 ev.events = IOEVENTS;
     509                ev.events = EPOLLIN | EPOLLONESHOT;
    500510                ev.data.u64 = (__u64)&ctx;
    501511                int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
     
    514524        }
    515525
     526        void __ioctx_unregister($io_ctx_thread & ctx) {
     527                // Read the current epoch so we know when to stop
     528                size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
     529
     530                // Remove the fd from the iopoller
     531                __ioctx_epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
     532
     533                // Notify the io poller thread of the shutdown
     534                iopoll.run = false;
     535                sigval val = { 1 };
     536                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     537
     538                // Make sure all this is done
     539                __atomic_thread_fence(__ATOMIC_SEQ_CST);
     540
     541                // Wait for the next epoch
     542                while(curr == iopoll.epoch && !iopoll.stopped) Pause();
     543        }
     544
    516545//=============================================================================================
    517546// I/O Context Misc Setup
     
    520549                int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
    521550                if( ret < 0 ) {
    522                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     551                        abort( "KERNEL ERROR: IO_URING REGISTER - (%d) %s\n", (int)errno, strerror(errno) );
    523552                }
    524553
  • libcfa/src/concurrency/io/types.hfa

    rb6a8b31 rd95969a  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
     7// io/types.hfa -- PRIVATE
     8// Types used by the I/O subsystem
    89//
    910// Author           : Thierry Delisle
     
    2122
    2223#include "bits/locks.hfa"
     24#include "kernel/fwd.hfa"
    2325
    2426#if defined(CFA_HAVE_LINUX_IO_URING_H)
     
    133135        struct $io_ctx_thread;
    134136        void __ioctx_register($io_ctx_thread & ctx);
     137        void __ioctx_unregister($io_ctx_thread & ctx);
    135138        void __ioctx_prepare_block($io_ctx_thread & ctx);
    136139        void __sqe_clean( volatile struct io_uring_sqe * sqe );
  • libcfa/src/concurrency/kernel.cfa

    rb6a8b31 rd95969a  
    140140                preemption_scope scope = { this };
    141141
     142                #if !defined(__CFA_NO_STATISTICS__)
     143                        unsigned long long last_tally = rdtscl();
     144                #endif
     145
     146
    142147                __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this);
    143148
     
    206211                        // Are we done?
    207212                        if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
     213
     214                        #if !defined(__CFA_NO_STATISTICS__)
     215                                unsigned long long curr = rdtscl();
     216                                if(curr > (last_tally + 500000000)) {
     217                                        __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats);
     218                                        last_tally = curr;
     219                                }
     220                        #endif
    208221                }
    209222
     
    211224        }
    212225
    213         V( this->terminated );
     226        post( this->terminated );
    214227
    215228        if(this == mainProcessor) {
     
    611624// Unexpected Terminating logic
    612625//=============================================================================================
    613 static __spinlock_t kernel_abort_lock;
    614 static bool kernel_abort_called = false;
    615 
    616 void * kernel_abort(void) __attribute__ ((__nothrow__)) {
    617         // abort cannot be recursively entered by the same or different processors because all signal handlers return when
    618         // the globalAbort flag is true.
    619         lock( kernel_abort_lock __cfaabi_dbg_ctx2 );
    620 
    621         // disable interrupts, it no longer makes sense to try to interrupt this processor
    622         disable_interrupts();
    623 
    624         // first task to abort ?
    625         if ( kernel_abort_called ) {                    // not first task to abort ?
    626                 unlock( kernel_abort_lock );
    627 
    628                 sigset_t mask;
    629                 sigemptyset( &mask );
    630                 sigaddset( &mask, SIGALRM );            // block SIGALRM signals
    631                 sigaddset( &mask, SIGUSR1 );            // block SIGALRM signals
    632                 sigsuspend( &mask );                            // block the processor to prevent further damage during abort
    633                 _exit( EXIT_FAILURE );                          // if processor unblocks before it is killed, terminate it
    634         }
    635         else {
    636                 kernel_abort_called = true;
    637                 unlock( kernel_abort_lock );
    638         }
    639 
    640         return __cfaabi_tls.this_thread;
    641 }
    642 
    643 void kernel_abort_msg( void * kernel_data, char * abort_text, int abort_text_size ) {
    644         $thread * thrd = ( $thread * ) kernel_data;
     626void __kernel_abort_msg( char * abort_text, int abort_text_size ) {
     627        $thread * thrd = __cfaabi_tls.this_thread;
    645628
    646629        if(thrd) {
     
    662645}
    663646
    664 int kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {
    665         return get_coroutine(kernelTLS().this_thread) == get_coroutine(mainThread) ? 4 : 2;
     647int __kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {
     648        return get_coroutine(__cfaabi_tls.this_thread) == get_coroutine(mainThread) ? 4 : 2;
    666649}
    667650
     
    681664// Kernel Utilities
    682665//=============================================================================================
    683 //-----------------------------------------------------------------------------
    684 // Locks
    685 void  ?{}( semaphore & this, int count = 1 ) {
    686         (this.lock){};
    687         this.count = count;
    688         (this.waiting){};
    689 }
    690 void ^?{}(semaphore & this) {}
    691 
    692 bool P(semaphore & this) with( this ){
    693         lock( lock __cfaabi_dbg_ctx2 );
    694         count -= 1;
    695         if ( count < 0 ) {
    696                 // queue current task
    697                 append( waiting, active_thread() );
    698 
    699                 // atomically release spin lock and block
    700                 unlock( lock );
    701                 park();
    702                 return true;
    703         }
    704         else {
    705             unlock( lock );
    706             return false;
    707         }
    708 }
    709 
    710 bool V(semaphore & this) with( this ) {
    711         $thread * thrd = 0p;
    712         lock( lock __cfaabi_dbg_ctx2 );
    713         count += 1;
    714         if ( count <= 0 ) {
    715                 // remove task at head of waiting list
    716                 thrd = pop_head( waiting );
    717         }
    718 
    719         unlock( lock );
    720 
    721         // make new owner
    722         unpark( thrd );
    723 
    724         return thrd != 0p;
    725 }
    726 
    727 bool V(semaphore & this, unsigned diff) with( this ) {
    728         $thread * thrd = 0p;
    729         lock( lock __cfaabi_dbg_ctx2 );
    730         int release = max(-count, (int)diff);
    731         count += diff;
    732         for(release) {
    733                 unpark( pop_head( waiting ) );
    734         }
    735 
    736         unlock( lock );
    737 
    738         return thrd != 0p;
    739 }
    740 
    741666//-----------------------------------------------------------------------------
    742667// Debug
  • libcfa/src/concurrency/kernel.hfa

    rb6a8b31 rd95969a  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel --
     7// kernel -- Header containing the core of the kernel API
    88//
    99// Author           : Thierry Delisle
     
    2424extern "C" {
    2525        #include <bits/pthreadtypes.h>
     26        #include <pthread.h>
    2627        #include <linux/types.h>
    2728}
    2829
    2930//-----------------------------------------------------------------------------
    30 // Locks
    31 struct semaphore {
    32         __spinlock_t lock;
    33         int count;
    34         __queue_t($thread) waiting;
    35 };
    36 
    37 void  ?{}(semaphore & this, int count = 1);
    38 void ^?{}(semaphore & this);
    39 bool   P (semaphore & this);
    40 bool   V (semaphore & this);
    41 bool   V (semaphore & this, unsigned count);
     31// Underlying Locks
     32#ifdef __CFA_WITH_VERIFY__
     33        extern bool __cfaabi_dbg_in_kernel();
     34#endif
     35
     36extern "C" {
     37        char * strerror(int);
     38}
     39#define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
     40
     41struct __bin_sem_t {
     42        pthread_mutex_t         lock;
     43        pthread_cond_t          cond;
     44        int                     val;
     45};
     46
     47static inline void ?{}(__bin_sem_t & this) with( this ) {
     48        // Create the mutex with error checking
     49        pthread_mutexattr_t mattr;
     50        pthread_mutexattr_init( &mattr );
     51        pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
     52        pthread_mutex_init(&lock, &mattr);
     53
     54        pthread_cond_init (&cond, (const pthread_condattr_t *)0p);  // workaround trac#208: cast should not be required
     55        val = 0;
     56}
     57
     58static inline void ^?{}(__bin_sem_t & this) with( this ) {
     59        CHECKED( pthread_mutex_destroy(&lock) );
     60        CHECKED( pthread_cond_destroy (&cond) );
     61}
     62
     63static inline void wait(__bin_sem_t & this) with( this ) {
     64        verify(__cfaabi_dbg_in_kernel());
     65        CHECKED( pthread_mutex_lock(&lock) );
     66                while(val < 1) {
     67                        pthread_cond_wait(&cond, &lock);
     68                }
     69                val -= 1;
     70        CHECKED( pthread_mutex_unlock(&lock) );
     71}
     72
     73static inline bool post(__bin_sem_t & this) with( this ) {
     74        bool needs_signal = false;
     75
     76        CHECKED( pthread_mutex_lock(&lock) );
     77                if(val < 1) {
     78                        val += 1;
     79                        pthread_cond_signal(&cond);
     80                        needs_signal = true;
     81                }
     82        CHECKED( pthread_mutex_unlock(&lock) );
     83
     84        return needs_signal;
     85}
     86
     87#undef CHECKED
    4288
    4389
     
    91137
    92138        // Termination synchronisation (user semaphore)
    93         semaphore terminated;
     139        oneshot terminated;
    94140
    95141        // pthread Stack
  • libcfa/src/concurrency/kernel/fwd.hfa

    rb6a8b31 rd95969a  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel/fwd.hfa --
     7// kernel/fwd.hfa -- PUBLIC
     8// Fundamental code needed to implement threading M.E.S. algorithms.
    89//
    910// Author           : Thierry Delisle
     
    134135                extern uint64_t thread_rand();
    135136
     137                // Semaphore which only supports a single thread
     138                struct single_sem {
     139                        struct $thread * volatile ptr;
     140                };
     141
     142                static inline {
     143                        void  ?{}(single_sem & this) {
     144                                this.ptr = 0p;
     145                        }
     146
     147                        void ^?{}(single_sem &) {}
     148
     149                        bool wait(single_sem & this) {
     150                                for() {
     151                                        struct $thread * expected = this.ptr;
     152                                        if(expected == 1p) {
     153                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     154                                                        return false;
     155                                                }
     156                                        }
     157                                        else {
     158                                                /* paranoid */ verify( expected == 0p );
     159                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     160                                                        park();
     161                                                        return true;
     162                                                }
     163                                        }
     164
     165                                }
     166                        }
     167
     168                        bool post(single_sem & this) {
     169                                for() {
     170                                        struct $thread * expected = this.ptr;
     171                                        if(expected == 1p) return false;
     172                                        if(expected == 0p) {
     173                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     174                                                        return false;
     175                                                }
     176                                        }
     177                                        else {
     178                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     179                                                        unpark( expected );
     180                                                        return true;
     181                                                }
     182                                        }
     183                                }
     184                        }
     185                }
     186
     187                // Synchronozation primitive which only supports a single thread and one post
     188                // Similar to a binary semaphore with a 'one shot' semantic
     189                // is expected to be discarded after each party call their side
     190                struct oneshot {
     191                        // Internal state :
     192                        //     0p     : is initial state (wait will block)
     193                        //     1p     : fulfilled (wait won't block)
     194                        // any thread : a thread is currently waiting
     195                        struct $thread * volatile ptr;
     196                };
     197
     198                static inline {
     199                        void  ?{}(oneshot & this) {
     200                                this.ptr = 0p;
     201                        }
     202
     203                        void ^?{}(oneshot &) {}
     204
     205                        // Wait for the post, return immidiately if it already happened.
     206                        // return true if the thread was parked
     207                        bool wait(oneshot & this) {
     208                                for() {
     209                                        struct $thread * expected = this.ptr;
     210                                        if(expected == 1p) return false;
     211                                        /* paranoid */ verify( expected == 0p );
     212                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     213                                                park();
     214                                                /* paranoid */ verify( this.ptr == 1p );
     215                                                return true;
     216                                        }
     217                                }
     218                        }
     219
     220                        // Mark as fulfilled, wake thread if needed
     221                        // return true if a thread was unparked
     222                        bool post(oneshot & this) {
     223                                struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     224                                if( got == 0p ) return false;
     225                                unpark( got );
     226                                return true;
     227                        }
     228                }
     229
     230                // base types for future to build upon
     231                // It is based on the 'oneshot' type to allow multiple futures
     232                // to block on the same instance, permitting users to block a single
     233                // thread on "any of" [a given set of] futures.
     234                // does not support multiple threads waiting on the same future
     235                struct future_t {
     236                        // Internal state :
     237                        //     0p      : is initial state (wait will block)
     238                        //     1p      : fulfilled (wait won't block)
     239                        //     2p      : in progress ()
     240                        //     3p      : abandoned, server should delete
     241                        // any oneshot : a context has been setup to wait, a thread could wait on it
     242                        struct oneshot * volatile ptr;
     243                };
     244
     245                static inline {
     246                        void  ?{}(future_t & this) {
     247                                this.ptr = 0p;
     248                        }
     249
     250                        void ^?{}(future_t &) {}
     251
     252                        void reset(future_t & this) {
     253                                // needs to be in 0p or 1p
     254                                __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     255                        }
     256
     257                        // check if the future is available
     258                        bool available( future_t & this ) {
     259                                return this.ptr == 1p;
     260                        }
     261
     262                        // Prepare the future to be waited on
     263                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     264                        bool setup( future_t & this, oneshot & wait_ctx ) {
     265                                /* paranoid */ verify( wait_ctx.ptr == 0p );
     266                                // The future needs to set the wait context
     267                                for() {
     268                                        struct oneshot * expected = this.ptr;
     269                                        // Is the future already fulfilled?
     270                                        if(expected == 1p) return false; // Yes, just return false (didn't block)
     271
     272                                        // The future is not fulfilled, try to setup the wait context
     273                                        /* paranoid */ verify( expected == 0p );
     274                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     275                                                return true;
     276                                        }
     277                                }
     278                        }
     279
     280                        // Stop waiting on a future
     281                        // When multiple futures are waited for together in "any of" pattern
     282                        // futures that weren't fulfilled before the thread woke up
     283                        // should retract the wait ctx
     284                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     285                        void retract( future_t & this, oneshot & wait_ctx ) {
     286                                // Remove the wait context
     287                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     288
     289                                // got == 0p: future was never actually setup, just return
     290                                if( got == 0p ) return;
     291
     292                                // got == wait_ctx: since fulfil does an atomic_swap,
     293                                // if we got back the original then no one else saw context
     294                                // It is safe to delete (which could happen after the return)
     295                                if( got == &wait_ctx ) return;
     296
     297                                // got == 1p: the future is ready and the context was fully consumed
     298                                // the server won't use the pointer again
     299                                // It is safe to delete (which could happen after the return)
     300                                if( got == 1p ) return;
     301
     302                                // got == 2p: the future is ready but the context hasn't fully been consumed
     303                                // spin until it is safe to move on
     304                                if( got == 2p ) {
     305                                        while( this.ptr != 1p ) Pause();
     306                                        return;
     307                                }
     308
     309                                // got == any thing else, something wen't wrong here, abort
     310                                abort("Future in unexpected state");
     311                        }
     312
     313                        // Mark the future as abandoned, meaning it will be deleted by the server
     314                        bool abandon( future_t & this ) {
     315                                /* paranoid */ verify( this.ptr != 3p );
     316
     317                                // Mark the future as abandonned
     318                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);
     319
     320                                // If the future isn't already fulfilled, let the server delete it
     321                                if( got == 0p ) return false;
     322
     323                                // got == 2p: the future is ready but the context hasn't fully been consumed
     324                                // spin until it is safe to move on
     325                                if( got == 2p ) {
     326                                        while( this.ptr != 1p ) Pause();
     327                                        got = 1p;
     328                                }
     329
     330                                // The future is completed delete it now
     331                                /* paranoid */ verify( this.ptr != 1p );
     332                                free( &this );
     333                                return true;
     334                        }
     335
     336                        // from the server side, mark the future as fulfilled
     337                        // delete it if needed
     338                        bool fulfil( future_t & this ) {
     339                                for() {
     340                                        struct oneshot * expected = this.ptr;
     341                                        // was this abandoned?
     342                                        #if defined(__GNUC__) && __GNUC__ >= 7
     343                                                #pragma GCC diagnostic push
     344                                                #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
     345                                        #endif
     346                                                if( expected == 3p ) { free( &this ); return false; }
     347                                        #if defined(__GNUC__) && __GNUC__ >= 7
     348                                                #pragma GCC diagnostic pop
     349                                        #endif
     350
     351                                        /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen
     352                                        /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.
     353
     354                                        // If there is a wait context, we need to consume it and mark it as consumed after
     355                                        // If there is no context then we can skip the in progress phase
     356                                        struct oneshot * want = expected == 0p ? 1p : 2p;
     357                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     358                                                if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
     359                                                bool ret = post( *expected );
     360                                                __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     361                                                return ret;
     362                                        }
     363                                }
     364
     365                        }
     366
     367                        // Wait for the future to be fulfilled
     368                        bool wait( future_t & this ) {
     369                                oneshot temp;
     370                                if( !setup(this, temp) ) return false;
     371
     372                                // Wait context is setup, just wait on it
     373                                bool ret = wait( temp );
     374
     375                                // Wait for the future to tru
     376                                while( this.ptr == 2p ) Pause();
     377                                // Make sure the state makes sense
     378                                // Should be fulfilled, could be in progress but it's out of date if so
     379                                // since if that is the case, the oneshot was fulfilled (unparking this thread)
     380                                // and the oneshot should not be needed any more
     381                                __attribute__((unused)) struct oneshot * was = this.ptr;
     382                                /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );
     383
     384                                // Mark the future as fulfilled, to be consistent
     385                                // with potential calls to avail
     386                                // this.ptr = 1p;
     387                                return ret;
     388                        }
     389                }
     390
    136391                //-----------------------------------------------------------------------
    137392                // Statics call at the end of each thread to register statistics
  • libcfa/src/concurrency/kernel/startup.cfa

    rb6a8b31 rd95969a  
    199199        void ?{}(processor & this) with( this ) {
    200200                ( this.idle ){};
    201                 ( this.terminated ){ 0 };
     201                ( this.terminated ){};
    202202                ( this.runner ){};
    203203                init( this, "Main Processor", *mainCluster );
     
    528528void ?{}(processor & this, const char name[], cluster & _cltr) {
    529529        ( this.idle ){};
    530         ( this.terminated ){ 0 };
     530        ( this.terminated ){};
    531531        ( this.runner ){};
    532532
     
    549549                __wake_proc( &this );
    550550
    551                 P( terminated );
     551                wait( terminated );
    552552                /* paranoid */ verify( active_processor() != &this);
    553553        }
  • libcfa/src/concurrency/locks.cfa

    rb6a8b31 rd95969a  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// locks.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2021
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
     17#define __cforall_thread__
     18
    119#include "locks.hfa"
    220#include "kernel_private.hfa"
     
    725//-----------------------------------------------------------------------------
    826// info_thread
    9 forall(dtype L | is_blocking_lock(L)) {
     27forall(L & | is_blocking_lock(L)) {
    1028        struct info_thread {
    1129                // used to put info_thread on a dl queue (aka sequence)
     
    5674
    5775void ^?{}( blocking_lock & this ) {}
    58 void  ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };}
    59 void ^?{}( single_acquisition_lock & this ) {}
    60 void  ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };}
    61 void ^?{}( owner_lock & this ) {}
    62 void  ?{}( multiple_acquisition_lock & this ) {((blocking_lock &)this){ true, false };}
    63 void ^?{}( multiple_acquisition_lock & this ) {}
     76
    6477
    6578void lock( blocking_lock & this ) with( this ) {
     
    170183
    171184//-----------------------------------------------------------------------------
    172 // Overloaded routines for traits
    173 // These routines are temporary until an inheritance bug is fixed
    174 void   lock      ( single_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
    175 void   unlock    ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
    176 void   on_wait   ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
    177 void   on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    178 void   set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    179 size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
    180 
    181 void   lock     ( owner_lock & this ) { lock   ( (blocking_lock &)this ); }
    182 void   unlock   ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }
    183 void   on_wait  ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }
    184 void   on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    185 void   set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    186 size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
    187 
    188 void   lock     ( multiple_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
    189 void   unlock   ( multiple_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
    190 void   on_wait  ( multiple_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
    191 void   on_notify( multiple_acquisition_lock & this, struct $thread * t ){ on_notify( (blocking_lock &)this, t ); }
    192 void   set_recursion_count( multiple_acquisition_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); }
    193 size_t get_recursion_count( multiple_acquisition_lock & this ){ return get_recursion_count( (blocking_lock &)this ); }
    194 
    195 //-----------------------------------------------------------------------------
    196185// alarm node wrapper
    197 forall(dtype L | is_blocking_lock(L)) {
     186forall(L & | is_blocking_lock(L)) {
    198187        struct alarm_node_wrap {
    199188                alarm_node_t alarm_node;
     
    239228//-----------------------------------------------------------------------------
    240229// condition variable
    241 forall(dtype L | is_blocking_lock(L)) {
     230forall(L & | is_blocking_lock(L)) {
    242231
    243232        void ?{}( condition_variable(L) & this ){
     
    356345        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time         ) with(this) { WAIT_TIME( info, &l , time ) }
    357346}
     347
     348//-----------------------------------------------------------------------------
     349// Semaphore
     350void  ?{}( semaphore & this, int count = 1 ) {
     351        (this.lock){};
     352        this.count = count;
     353        (this.waiting){};
     354}
     355void ^?{}(semaphore & this) {}
     356
     357bool P(semaphore & this) with( this ){
     358        lock( lock __cfaabi_dbg_ctx2 );
     359        count -= 1;
     360        if ( count < 0 ) {
     361                // queue current task
     362                append( waiting, active_thread() );
     363
     364                // atomically release spin lock and block
     365                unlock( lock );
     366                park();
     367                return true;
     368        }
     369        else {
     370            unlock( lock );
     371            return false;
     372        }
     373}
     374
     375bool V(semaphore & this) with( this ) {
     376        $thread * thrd = 0p;
     377        lock( lock __cfaabi_dbg_ctx2 );
     378        count += 1;
     379        if ( count <= 0 ) {
     380                // remove task at head of waiting list
     381                thrd = pop_head( waiting );
     382        }
     383
     384        unlock( lock );
     385
     386        // make new owner
     387        unpark( thrd );
     388
     389        return thrd != 0p;
     390}
     391
     392bool V(semaphore & this, unsigned diff) with( this ) {
     393        $thread * thrd = 0p;
     394        lock( lock __cfaabi_dbg_ctx2 );
     395        int release = max(-count, (int)diff);
     396        count += diff;
     397        for(release) {
     398                unpark( pop_head( waiting ) );
     399        }
     400
     401        unlock( lock );
     402
     403        return thrd != 0p;
     404}
  • libcfa/src/concurrency/locks.hfa

    rb6a8b31 rd95969a  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// locks.hfa -- PUBLIC
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2021
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include <stdbool.h>
    420
    5 #include "bits/locks.hfa"
    6 #include "bits/sequence.hfa"
    7 
    8 #include "invoke.h"
     21#include "bits/weakso_locks.hfa"
    922
    1023#include "time_t.hfa"
    1124#include "time.hfa"
    1225
     26//----------
     27struct single_acquisition_lock {
     28        inline blocking_lock;
     29};
     30
     31static inline void  ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };}
     32static inline void ^?{}( single_acquisition_lock & this ) {}
     33static inline void   lock      ( single_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
     34static inline void   unlock    ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
     35static inline void   on_wait   ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
     36static inline void   on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
     37static inline void   set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
     38static inline size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
     39
     40//----------
     41struct owner_lock {
     42        inline blocking_lock;
     43};
     44
     45static inline void  ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };}
     46static inline void ^?{}( owner_lock & this ) {}
     47static inline void   lock     ( owner_lock & this ) { lock   ( (blocking_lock &)this ); }
     48static inline void   unlock   ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }
     49static inline void   on_wait  ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }
     50static inline void   on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
     51static inline void   set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
     52static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
     53
    1354//-----------------------------------------------------------------------------
    1455// is_blocking_lock
    15 trait is_blocking_lock(dtype L | sized(L)) {
     56trait is_blocking_lock(L & | sized(L)) {
    1657        // For synchronization locks to use when acquiring
    1758        void on_notify( L &, struct $thread * );
     
    3172// the info thread is a wrapper around a thread used
    3273// to store extra data for use in the condition variable
    33 forall(dtype L | is_blocking_lock(L)) {
     74forall(L & | is_blocking_lock(L)) {
    3475        struct info_thread;
    3576
     
    4081
    4182//-----------------------------------------------------------------------------
    42 // Blocking Locks
    43 struct blocking_lock {
    44         // Spin lock used for mutual exclusion
    45         __spinlock_t lock;
    46 
    47         // List of blocked threads
    48         Sequence( $thread ) blocked_threads;
    49 
    50         // Count of current blocked threads
    51         size_t wait_count;
    52 
    53         // Flag if the lock allows multiple acquisition
    54         bool multi_acquisition;
    55 
    56         // Flag if lock can be released by non owner
    57         bool strict_owner;
    58 
    59         // Current thread owning the lock
    60         struct $thread * owner;
    61 
    62         // Number of recursion level
    63         size_t recursion_count;
    64 };
    65 
    66 struct single_acquisition_lock {
    67         inline blocking_lock;
    68 };
    69 
    70 struct owner_lock {
    71         inline blocking_lock;
    72 };
    73 
    74 struct multiple_acquisition_lock {
    75         inline blocking_lock;
    76 };
    77 
    78 void  ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner );
    79 void ^?{}( blocking_lock & this );
    80 
    81 void  ?{}( single_acquisition_lock & this );
    82 void ^?{}( single_acquisition_lock & this );
    83 
    84 void  ?{}( owner_lock & this );
    85 void ^?{}( owner_lock & this );
    86 
    87 void  ?{}( multiple_acquisition_lock & this );
    88 void ^?{}( multiple_acquisition_lock & this );
    89 
    90 void lock( blocking_lock & this );
    91 bool try_lock( blocking_lock & this );
    92 void unlock( blocking_lock & this );
    93 void on_notify( blocking_lock & this, struct $thread * t );
    94 void on_wait( blocking_lock & this );
    95 size_t wait_count( blocking_lock & this );
    96 void set_recursion_count( blocking_lock & this, size_t recursion );
    97 size_t get_recursion_count( blocking_lock & this );
    98 
    99 void lock( single_acquisition_lock & this );
    100 void unlock( single_acquisition_lock & this );
    101 void on_notify( single_acquisition_lock & this, struct $thread * t );
    102 void on_wait( single_acquisition_lock & this );
    103 void set_recursion_count( single_acquisition_lock & this, size_t recursion );
    104 size_t get_recursion_count( single_acquisition_lock & this );
    105 
    106 void lock( owner_lock & this );
    107 void unlock( owner_lock & this );
    108 void on_notify( owner_lock & this, struct $thread * t );
    109 void on_wait( owner_lock & this );
    110 void set_recursion_count( owner_lock & this, size_t recursion );
    111 size_t get_recursion_count( owner_lock & this );
    112 
    113 void lock( multiple_acquisition_lock & this );
    114 void unlock( multiple_acquisition_lock & this );
    115 void on_notify( multiple_acquisition_lock & this, struct $thread * t );
    116 void on_wait( multiple_acquisition_lock & this );
    117 void set_recursion_count( multiple_acquisition_lock & this, size_t recursion );
    118 size_t get_recursion_count( multiple_acquisition_lock & this );
    119 
    120 //-----------------------------------------------------------------------------
    12183// Synchronization Locks
    122 forall(dtype L | is_blocking_lock(L)) {
     84forall(L & | is_blocking_lock(L)) {
    12385        struct condition_variable {
    12486                // Spin lock used for mutual exclusion
     
    157119        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    158120}
     121
     122//-----------------------------------------------------------------------------
     123// Semaphore
     124struct semaphore {
     125        __spinlock_t lock;
     126        int count;
     127        __queue_t($thread) waiting;
     128};
     129
     130void  ?{}(semaphore & this, int count = 1);
     131void ^?{}(semaphore & this);
     132bool   P (semaphore & this);
     133bool   V (semaphore & this);
     134bool   V (semaphore & this, unsigned count);
  • libcfa/src/concurrency/monitor.cfa

    rb6a8b31 rd95969a  
    5050static inline [$thread *, int] search_entry_queue( const __waitfor_mask_t &, $monitor * monitors [], __lock_size_t count );
    5151
    52 forall(dtype T | sized( T ))
     52forall(T & | sized( T ))
    5353static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val );
    5454static inline __lock_size_t count_max    ( const __waitfor_mask_t & mask );
     
    949949}
    950950
    951 forall(dtype T | sized( T ))
     951forall(T & | sized( T ))
    952952static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) {
    953953        if( !val ) return size;
  • libcfa/src/concurrency/monitor.hfa

    rb6a8b31 rd95969a  
    2222#include "stdlib.hfa"
    2323
    24 trait is_monitor(dtype T) {
     24trait is_monitor(T &) {
    2525        $monitor * get_monitor( T & );
    2626        void ^?{}( T & mutex );
     
    5959void ^?{}( monitor_dtor_guard_t & this );
    6060
    61 static inline forall( dtype T | sized(T) | { void ^?{}( T & mutex ); } )
     61static inline forall( T & | sized(T) | { void ^?{}( T & mutex ); } )
    6262void delete( T * th ) {
    6363        ^(*th){};
  • libcfa/src/concurrency/mutex.cfa

    rb6a8b31 rd95969a  
    164164}
    165165
    166 forall(dtype L | is_lock(L))
     166forall(L & | is_lock(L))
    167167void wait(condition_variable & this, L & l) {
    168168        lock( this.lock __cfaabi_dbg_ctx2 );
     
    176176//-----------------------------------------------------------------------------
    177177// Scopes
    178 forall(dtype L | is_lock(L))
     178forall(L & | is_lock(L))
    179179void lock_all  ( L * locks[], size_t count) {
    180180        // Sort locks based on addresses
     
    188188}
    189189
    190 forall(dtype L | is_lock(L))
     190forall(L & | is_lock(L))
    191191void unlock_all( L * locks[], size_t count) {
    192192        // Lock all
  • libcfa/src/concurrency/mutex.hfa

    rb6a8b31 rd95969a  
    4242};
    4343
    44 void ?{}(mutex_lock & this);
    45 void ^?{}(mutex_lock & this);
    46 void lock(mutex_lock & this);
    47 bool try_lock(mutex_lock & this);
    48 void unlock(mutex_lock & this);
     44void ?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     45void ^?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     46void lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     47bool try_lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     48void unlock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    4949
    5050// Exclusive lock - recursive
     
    6464};
    6565
    66 void ?{}(recursive_mutex_lock & this);
    67 void ^?{}(recursive_mutex_lock & this);
    68 void lock(recursive_mutex_lock & this);
    69 bool try_lock(recursive_mutex_lock & this);
    70 void unlock(recursive_mutex_lock & this);
     66void ?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     67void ^?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     68void lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     69bool try_lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     70void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    7171
    72 trait is_lock(dtype L | sized(L)) {
     72trait is_lock(L & | sized(L)) {
    7373        void lock  (L &);
    7474        void unlock(L &);
     
    8686};
    8787
    88 void ?{}(condition_variable & this);
    89 void ^?{}(condition_variable & this);
     88void ?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     89void ^?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9090
    91 void notify_one(condition_variable & this);
    92 void notify_all(condition_variable & this);
     91void notify_one(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     92void notify_all(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9393
    94 void wait(condition_variable & this);
     94void wait(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9595
    96 forall(dtype L | is_lock(L))
    97 void wait(condition_variable & this, L & l);
     96forall(L & | is_lock(L))
     97void wait(condition_variable & this, L & l) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9898
    9999//-----------------------------------------------------------------------------
    100100// Scopes
    101 forall(dtype L | is_lock(L)) {
     101forall(L & | is_lock(L)) {
    102102        #if !defined( __TUPLE_ARRAYS_EXIST__ )
    103103        void lock  ( L * locks [], size_t count);
  • libcfa/src/concurrency/preemption.cfa

    rb6a8b31 rd95969a  
    616616}
    617617
     618// Prevent preemption since we are about to start terminating things
     619void __kernel_abort_lock(void) {
     620        signal_block( SIGUSR1 );
     621}
     622
    618623// Raii ctor/dtor for the preemption_scope
    619624// Used by thread to control when they want to receive preemption signals
  • libcfa/src/concurrency/stats.hfa

    rb6a8b31 rd95969a  
    22
    33#include <stdint.h>
     4
     5enum {
     6        CFA_STATS_READY_Q  = 0x01,
     7        CFA_STATS_IO = 0x02,
     8};
    49
    510#if defined(__CFA_NO_STATISTICS__)
     
    914        static inline void __print_stats( struct __stats_t *, int, const char *, const char *, void * ) {}
    1015#else
    11         enum {
    12                 CFA_STATS_READY_Q  = 0x01,
    13                 #if defined(CFA_HAVE_LINUX_IO_URING_H)
    14                         CFA_STATS_IO = 0x02,
    15                 #endif
    16         };
    1716
    1817        struct __attribute__((aligned(64))) __stats_readQ_t {
  • libcfa/src/concurrency/thread.cfa

    rb6a8b31 rd95969a  
    6262}
    6363
    64 FORALL_DATA_INSTANCE(ThreadCancelled, (dtype thread_t), (thread_t))
     64FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t))
    6565
    66 forall(dtype T)
     66forall(T &)
    6767void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src) {
    6868        dst->virtual_table = src->virtual_table;
     
    7171}
    7272
    73 forall(dtype T)
     73forall(T &)
    7474const char * msg(ThreadCancelled(T) *) {
    7575        return "ThreadCancelled";
    7676}
    7777
    78 forall(dtype T)
     78forall(T &)
    7979static void default_thread_cancel_handler(ThreadCancelled(T) & ) {
    8080        abort( "Unhandled thread cancellation.\n" );
    8181}
    8282
    83 forall(dtype T | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
     83forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
    8484void ?{}( thread_dtor_guard_t & this,
    85                 T & thrd, void(*defaultResumptionHandler)(ThreadCancelled(T) &)) {
    86         $monitor * m = get_monitor(thrd);
     85                T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) {
     86        $monitor * m = get_monitor(thrd);
    8787        $thread * desc = get_thread(thrd);
    8888
    8989        // Setup the monitor guard
    9090        void (*dtor)(T& mutex this) = ^?{};
    91         bool join = defaultResumptionHandler != (void(*)(ThreadCancelled(T)&))0;
     91        bool join = cancelHandler != (void(*)(ThreadCancelled(T)&))0;
    9292        (this.mg){&m, (void(*)())dtor, join};
    9393
     
    103103        }
    104104        desc->state = Cancelled;
    105         if (!join) {
    106                 defaultResumptionHandler = default_thread_cancel_handler;
    107         }
     105        void(*defaultResumptionHandler)(ThreadCancelled(T) &) =
     106                join ? cancelHandler : default_thread_cancel_handler;
    108107
    109108        ThreadCancelled(T) except;
     
    125124//-----------------------------------------------------------------------------
    126125// Starting and stopping threads
    127 forall( dtype T | is_thread(T) )
     126forall( T & | is_thread(T) )
    128127void __thrd_start( T & this, void (*main_p)(T &) ) {
    129128        $thread * this_thrd = get_thread(this);
     
    141140//-----------------------------------------------------------------------------
    142141// Support for threads that don't ues the thread keyword
    143 forall( dtype T | sized(T) | is_thread(T) | { void ?{}(T&); } )
     142forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } )
    144143void ?{}( scoped(T)& this ) with( this ) {
    145144        handle{};
     
    147146}
    148147
    149 forall( dtype T, ttype P | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
     148forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
    150149void ?{}( scoped(T)& this, P params ) with( this ) {
    151150        handle{ params };
     
    153152}
    154153
    155 forall( dtype T | sized(T) | is_thread(T) )
     154forall( T & | sized(T) | is_thread(T) )
    156155void ^?{}( scoped(T)& this ) with( this ) {
    157156        ^handle{};
     
    159158
    160159//-----------------------------------------------------------------------------
    161 forall(dtype T | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
     160forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
    162161T & join( T & this ) {
    163162        thread_dtor_guard_t guard = { this, defaultResumptionHandler };
  • libcfa/src/concurrency/thread.hfa

    rb6a8b31 rd95969a  
    2626//-----------------------------------------------------------------------------
    2727// thread trait
    28 trait is_thread(dtype T) {
     28trait is_thread(T &) {
    2929        void ^?{}(T& mutex this);
    3030        void main(T& this);
     
    3232};
    3333
    34 FORALL_DATA_EXCEPTION(ThreadCancelled, (dtype thread_t), (thread_t)) (
     34FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
    3535        thread_t * the_thread;
    3636        exception_t * the_exception;
    3737);
    3838
    39 forall(dtype T)
     39forall(T &)
    4040void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src);
    4141
    42 forall(dtype T)
     42forall(T &)
    4343const char * msg(ThreadCancelled(T) *);
    4444
     
    4747
    4848// Inline getters for threads/coroutines/monitors
    49 forall( dtype T | is_thread(T) )
     49forall( T & | is_thread(T) )
    5050static inline $coroutine* get_coroutine(T & this) __attribute__((const)) { return &get_thread(this)->self_cor; }
    5151
    52 forall( dtype T | is_thread(T) )
     52forall( T & | is_thread(T) )
    5353static inline $monitor  * get_monitor  (T & this) __attribute__((const)) { return &get_thread(this)->self_mon; }
    5454
     
    6060extern struct cluster * mainCluster;
    6161
    62 forall( dtype T | is_thread(T) )
     62forall( T & | is_thread(T) )
    6363void __thrd_start( T & this, void (*)(T &) );
    6464
     
    8282};
    8383
    84 forall( dtype T | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
     84forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
    8585void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) );
    8686void ^?{}( thread_dtor_guard_t & this );
     
    8989// thread runner
    9090// Structure that actually start and stop threads
    91 forall( dtype T | sized(T) | is_thread(T) )
     91forall( T & | sized(T) | is_thread(T) )
    9292struct scoped {
    9393        T handle;
    9494};
    9595
    96 forall( dtype T | sized(T) | is_thread(T) | { void ?{}(T&); } )
     96forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } )
    9797void ?{}( scoped(T)& this );
    9898
    99 forall( dtype T, ttype P | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
     99forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
    100100void ?{}( scoped(T)& this, P params );
    101101
    102 forall( dtype T | sized(T) | is_thread(T) )
     102forall( T & | sized(T) | is_thread(T) )
    103103void ^?{}( scoped(T)& this );
    104104
     
    115115void unpark( $thread * this );
    116116
    117 forall( dtype T | is_thread(T) )
     117forall( T & | is_thread(T) )
    118118static inline void unpark( T & this ) { if(!&this) return; unpark( get_thread( this ) );}
    119119
     
    128128//----------
    129129// join
    130 forall( dtype T | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
     130forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
    131131T & join( T & this );
    132132
Note: See TracChangeset for help on using the changeset viewer.