Ignore:
Timestamp:
Jan 20, 2021, 8:46:31 PM (3 years ago)
Author:
Peter A. Buhr <pabuhr@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
481cf3a
Parents:
467c8b7 (diff), 9db2c92 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

fix conflict

Location:
libcfa/src/concurrency
Files:
17 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/coroutine.cfa

    r467c8b7 rc08c3cf  
    4646
    4747//-----------------------------------------------------------------------------
    48 FORALL_DATA_INSTANCE(CoroutineCancelled, (dtype coroutine_t), (coroutine_t))
    49 
    50 forall(dtype T)
     48FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t))
     49
     50forall(T &)
    5151void mark_exception(CoroutineCancelled(T) *) {}
    5252
    53 forall(dtype T)
     53forall(T &)
    5454void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src) {
    5555        dst->virtual_table = src->virtual_table;
     
    5858}
    5959
    60 forall(dtype T)
     60forall(T &)
    6161const char * msg(CoroutineCancelled(T) *) {
    6262        return "CoroutineCancelled(...)";
     
    6464
    6565// This code should not be inlined. It is the error path on resume.
    66 forall(dtype T | is_coroutine(T))
     66forall(T & | is_coroutine(T))
    6767void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ) {
    6868        verify( desc->cancellation );
     
    148148// Part of the Public API
    149149// Not inline since only ever called once per coroutine
    150 forall(dtype T | is_coroutine(T))
     150forall(T & | is_coroutine(T))
    151151void prime(T& cor) {
    152152        $coroutine* this = get_coroutine(cor);
  • libcfa/src/concurrency/coroutine.hfa

    r467c8b7 rc08c3cf  
    2222//-----------------------------------------------------------------------------
    2323// Exception thrown from resume when a coroutine stack is cancelled.
    24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (dtype coroutine_t), (coroutine_t)) (
     24FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
    2525        coroutine_t * the_coroutine;
    2626        exception_t * the_exception;
    2727);
    2828
    29 forall(dtype T)
     29forall(T &)
    3030void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src);
    3131
    32 forall(dtype T)
     32forall(T &)
    3333const char * msg(CoroutineCancelled(T) *);
    3434
     
    3737// Anything that implements this trait can be resumed.
    3838// Anything that is resumed is a coroutine.
    39 trait is_coroutine(dtype T | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
     39trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
    4040        void main(T & this);
    4141        $coroutine * get_coroutine(T & this);
     
    6060//-----------------------------------------------------------------------------
    6161// Public coroutine API
    62 forall(dtype T | is_coroutine(T))
     62forall(T & | is_coroutine(T))
    6363void prime(T & cor);
    6464
     
    7272        void __cfactx_invoke_coroutine(void (*main)(void *), void * this);
    7373
    74         forall(dtype T)
     74        forall(T &)
    7575        void __cfactx_start(void (*main)(T &), struct $coroutine * cor, T & this, void (*invoke)(void (*main)(void *), void *));
    7676
     
    129129}
    130130
    131 forall(dtype T | is_coroutine(T))
     131forall(T & | is_coroutine(T))
    132132void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc );
    133133
    134134// Resume implementation inlined for performance
    135 forall(dtype T | is_coroutine(T))
     135forall(T & | is_coroutine(T))
    136136static inline T & resume(T & cor) {
    137137        // optimization : read TLS once and reuse it
  • libcfa/src/concurrency/future.hfa

    r467c8b7 rc08c3cf  
    1919#include "monitor.hfa"
    2020
    21 forall( otype T ) {
     21forall( T ) {
    2222        struct future {
    2323                inline future_t;
     
    5858}
    5959
    60 forall( otype T ) {
     60forall( T ) {
    6161        monitor multi_future {
    6262                inline future_t;
  • libcfa/src/concurrency/io/types.hfa

    r467c8b7 rc08c3cf  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
     7// io/types.hfa -- PRIVATE
     8// Types used by the I/O subsystem
    89//
    910// Author           : Thierry Delisle
     
    2122
    2223#include "bits/locks.hfa"
     24#include "kernel/fwd.hfa"
    2325
    2426#if defined(CFA_HAVE_LINUX_IO_URING_H)
  • libcfa/src/concurrency/kernel.cfa

    r467c8b7 rc08c3cf  
    224224        }
    225225
    226         V( this->terminated );
     226        post( this->terminated );
    227227
    228228        if(this == mainProcessor) {
     
    624624// Unexpected Terminating logic
    625625//=============================================================================================
    626 
    627 extern "C" {
    628         extern void __cfaabi_real_abort(void);
    629 }
    630 static volatile bool kernel_abort_called = false;
    631 
    632 void * kernel_abort(void) __attribute__ ((__nothrow__)) {
    633         // abort cannot be recursively entered by the same or different processors because all signal handlers return when
    634         // the globalAbort flag is true.
    635         bool first = !__atomic_test_and_set( &kernel_abort_called, __ATOMIC_SEQ_CST);
    636 
    637         // first task to abort ?
    638         if ( !first ) {
    639                 // We aren't the first to abort.
    640                 // I give up, just let C handle it
    641                 __cfaabi_real_abort();
    642         }
    643 
    644         // disable interrupts, it no longer makes sense to try to interrupt this processor
    645         disable_interrupts();
    646 
    647         return __cfaabi_tls.this_thread;
    648 }
    649 
    650 void kernel_abort_msg( void * kernel_data, char * abort_text, int abort_text_size ) {
    651         $thread * thrd = ( $thread * ) kernel_data;
     626void __kernel_abort_msg( char * abort_text, int abort_text_size ) {
     627        $thread * thrd = __cfaabi_tls.this_thread;
    652628
    653629        if(thrd) {
     
    669645}
    670646
    671 int kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {
    672         return get_coroutine(kernelTLS().this_thread) == get_coroutine(mainThread) ? 4 : 2;
     647int __kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {
     648        return get_coroutine(__cfaabi_tls.this_thread) == get_coroutine(mainThread) ? 4 : 2;
    673649}
    674650
     
    688664// Kernel Utilities
    689665//=============================================================================================
    690 //-----------------------------------------------------------------------------
    691 // Locks
    692 void  ?{}( semaphore & this, int count = 1 ) {
    693         (this.lock){};
    694         this.count = count;
    695         (this.waiting){};
    696 }
    697 void ^?{}(semaphore & this) {}
    698 
    699 bool P(semaphore & this) with( this ){
    700         lock( lock __cfaabi_dbg_ctx2 );
    701         count -= 1;
    702         if ( count < 0 ) {
    703                 // queue current task
    704                 append( waiting, active_thread() );
    705 
    706                 // atomically release spin lock and block
    707                 unlock( lock );
    708                 park();
    709                 return true;
    710         }
    711         else {
    712             unlock( lock );
    713             return false;
    714         }
    715 }
    716 
    717 bool V(semaphore & this) with( this ) {
    718         $thread * thrd = 0p;
    719         lock( lock __cfaabi_dbg_ctx2 );
    720         count += 1;
    721         if ( count <= 0 ) {
    722                 // remove task at head of waiting list
    723                 thrd = pop_head( waiting );
    724         }
    725 
    726         unlock( lock );
    727 
    728         // make new owner
    729         unpark( thrd );
    730 
    731         return thrd != 0p;
    732 }
    733 
    734 bool V(semaphore & this, unsigned diff) with( this ) {
    735         $thread * thrd = 0p;
    736         lock( lock __cfaabi_dbg_ctx2 );
    737         int release = max(-count, (int)diff);
    738         count += diff;
    739         for(release) {
    740                 unpark( pop_head( waiting ) );
    741         }
    742 
    743         unlock( lock );
    744 
    745         return thrd != 0p;
    746 }
    747 
    748666//-----------------------------------------------------------------------------
    749667// Debug
  • libcfa/src/concurrency/kernel.hfa

    r467c8b7 rc08c3cf  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel --
     7// kernel -- Header containing the core of the kernel API
    88//
    99// Author           : Thierry Delisle
     
    2424extern "C" {
    2525        #include <bits/pthreadtypes.h>
     26        #include <pthread.h>
    2627        #include <linux/types.h>
    2728}
    2829
    2930//-----------------------------------------------------------------------------
    30 // Locks
    31 struct semaphore {
    32         __spinlock_t lock;
    33         int count;
    34         __queue_t($thread) waiting;
    35 };
    36 
    37 void  ?{}(semaphore & this, int count = 1);
    38 void ^?{}(semaphore & this);
    39 bool   P (semaphore & this);
    40 bool   V (semaphore & this);
    41 bool   V (semaphore & this, unsigned count);
     31// Underlying Locks
     32#ifdef __CFA_WITH_VERIFY__
     33        extern bool __cfaabi_dbg_in_kernel();
     34#endif
     35
     36extern "C" {
     37        char * strerror(int);
     38}
     39#define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
     40
     41struct __bin_sem_t {
     42        pthread_mutex_t         lock;
     43        pthread_cond_t          cond;
     44        int                     val;
     45};
     46
     47static inline void ?{}(__bin_sem_t & this) with( this ) {
     48        // Create the mutex with error checking
     49        pthread_mutexattr_t mattr;
     50        pthread_mutexattr_init( &mattr );
     51        pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
     52        pthread_mutex_init(&lock, &mattr);
     53
     54        pthread_cond_init (&cond, (const pthread_condattr_t *)0p);  // workaround trac#208: cast should not be required
     55        val = 0;
     56}
     57
     58static inline void ^?{}(__bin_sem_t & this) with( this ) {
     59        CHECKED( pthread_mutex_destroy(&lock) );
     60        CHECKED( pthread_cond_destroy (&cond) );
     61}
     62
     63static inline void wait(__bin_sem_t & this) with( this ) {
     64        verify(__cfaabi_dbg_in_kernel());
     65        CHECKED( pthread_mutex_lock(&lock) );
     66                while(val < 1) {
     67                        pthread_cond_wait(&cond, &lock);
     68                }
     69                val -= 1;
     70        CHECKED( pthread_mutex_unlock(&lock) );
     71}
     72
     73static inline bool post(__bin_sem_t & this) with( this ) {
     74        bool needs_signal = false;
     75
     76        CHECKED( pthread_mutex_lock(&lock) );
     77                if(val < 1) {
     78                        val += 1;
     79                        pthread_cond_signal(&cond);
     80                        needs_signal = true;
     81                }
     82        CHECKED( pthread_mutex_unlock(&lock) );
     83
     84        return needs_signal;
     85}
     86
     87#undef CHECKED
    4288
    4389
     
    91137
    92138        // Termination synchronisation (user semaphore)
    93         semaphore terminated;
     139        oneshot terminated;
    94140
    95141        // pthread Stack
  • libcfa/src/concurrency/kernel/fwd.hfa

    r467c8b7 rc08c3cf  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel/fwd.hfa --
     7// kernel/fwd.hfa -- PUBLIC
     8// Fundamental code needed to implement threading M.E.S. algorithms.
    89//
    910// Author           : Thierry Delisle
     
    134135                extern uint64_t thread_rand();
    135136
     137                // Semaphore which only supports a single thread
     138                struct single_sem {
     139                        struct $thread * volatile ptr;
     140                };
     141
     142                static inline {
     143                        void  ?{}(single_sem & this) {
     144                                this.ptr = 0p;
     145                        }
     146
     147                        void ^?{}(single_sem &) {}
     148
     149                        bool wait(single_sem & this) {
     150                                for() {
     151                                        struct $thread * expected = this.ptr;
     152                                        if(expected == 1p) {
     153                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     154                                                        return false;
     155                                                }
     156                                        }
     157                                        else {
     158                                                /* paranoid */ verify( expected == 0p );
     159                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     160                                                        park();
     161                                                        return true;
     162                                                }
     163                                        }
     164
     165                                }
     166                        }
     167
     168                        bool post(single_sem & this) {
     169                                for() {
     170                                        struct $thread * expected = this.ptr;
     171                                        if(expected == 1p) return false;
     172                                        if(expected == 0p) {
     173                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     174                                                        return false;
     175                                                }
     176                                        }
     177                                        else {
     178                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     179                                                        unpark( expected );
     180                                                        return true;
     181                                                }
     182                                        }
     183                                }
     184                        }
     185                }
     186
     187                // Synchronozation primitive which only supports a single thread and one post
     188                // Similar to a binary semaphore with a 'one shot' semantic
     189                // is expected to be discarded after each party call their side
     190                struct oneshot {
     191                        // Internal state :
     192                        //     0p     : is initial state (wait will block)
     193                        //     1p     : fulfilled (wait won't block)
     194                        // any thread : a thread is currently waiting
     195                        struct $thread * volatile ptr;
     196                };
     197
     198                static inline {
     199                        void  ?{}(oneshot & this) {
     200                                this.ptr = 0p;
     201                        }
     202
     203                        void ^?{}(oneshot &) {}
     204
     205                        // Wait for the post, return immidiately if it already happened.
     206                        // return true if the thread was parked
     207                        bool wait(oneshot & this) {
     208                                for() {
     209                                        struct $thread * expected = this.ptr;
     210                                        if(expected == 1p) return false;
     211                                        /* paranoid */ verify( expected == 0p );
     212                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     213                                                park();
     214                                                /* paranoid */ verify( this.ptr == 1p );
     215                                                return true;
     216                                        }
     217                                }
     218                        }
     219
     220                        // Mark as fulfilled, wake thread if needed
     221                        // return true if a thread was unparked
     222                        bool post(oneshot & this) {
     223                                struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     224                                if( got == 0p ) return false;
     225                                unpark( got );
     226                                return true;
     227                        }
     228                }
     229
     230                // base types for future to build upon
     231                // It is based on the 'oneshot' type to allow multiple futures
     232                // to block on the same instance, permitting users to block a single
     233                // thread on "any of" [a given set of] futures.
     234                // does not support multiple threads waiting on the same future
     235                struct future_t {
     236                        // Internal state :
     237                        //     0p      : is initial state (wait will block)
     238                        //     1p      : fulfilled (wait won't block)
     239                        //     2p      : in progress ()
     240                        //     3p      : abandoned, server should delete
     241                        // any oneshot : a context has been setup to wait, a thread could wait on it
     242                        struct oneshot * volatile ptr;
     243                };
     244
     245                static inline {
     246                        void  ?{}(future_t & this) {
     247                                this.ptr = 0p;
     248                        }
     249
     250                        void ^?{}(future_t &) {}
     251
     252                        void reset(future_t & this) {
     253                                // needs to be in 0p or 1p
     254                                __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     255                        }
     256
     257                        // check if the future is available
     258                        bool available( future_t & this ) {
     259                                return this.ptr == 1p;
     260                        }
     261
     262                        // Prepare the future to be waited on
     263                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     264                        bool setup( future_t & this, oneshot & wait_ctx ) {
     265                                /* paranoid */ verify( wait_ctx.ptr == 0p );
     266                                // The future needs to set the wait context
     267                                for() {
     268                                        struct oneshot * expected = this.ptr;
     269                                        // Is the future already fulfilled?
     270                                        if(expected == 1p) return false; // Yes, just return false (didn't block)
     271
     272                                        // The future is not fulfilled, try to setup the wait context
     273                                        /* paranoid */ verify( expected == 0p );
     274                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     275                                                return true;
     276                                        }
     277                                }
     278                        }
     279
     280                        // Stop waiting on a future
     281                        // When multiple futures are waited for together in "any of" pattern
     282                        // futures that weren't fulfilled before the thread woke up
     283                        // should retract the wait ctx
     284                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     285                        void retract( future_t & this, oneshot & wait_ctx ) {
     286                                // Remove the wait context
     287                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     288
     289                                // got == 0p: future was never actually setup, just return
     290                                if( got == 0p ) return;
     291
     292                                // got == wait_ctx: since fulfil does an atomic_swap,
     293                                // if we got back the original then no one else saw context
     294                                // It is safe to delete (which could happen after the return)
     295                                if( got == &wait_ctx ) return;
     296
     297                                // got == 1p: the future is ready and the context was fully consumed
     298                                // the server won't use the pointer again
     299                                // It is safe to delete (which could happen after the return)
     300                                if( got == 1p ) return;
     301
     302                                // got == 2p: the future is ready but the context hasn't fully been consumed
     303                                // spin until it is safe to move on
     304                                if( got == 2p ) {
     305                                        while( this.ptr != 1p ) Pause();
     306                                        return;
     307                                }
     308
     309                                // got == any thing else, something wen't wrong here, abort
     310                                abort("Future in unexpected state");
     311                        }
     312
     313                        // Mark the future as abandoned, meaning it will be deleted by the server
     314                        bool abandon( future_t & this ) {
     315                                /* paranoid */ verify( this.ptr != 3p );
     316
     317                                // Mark the future as abandonned
     318                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);
     319
     320                                // If the future isn't already fulfilled, let the server delete it
     321                                if( got == 0p ) return false;
     322
     323                                // got == 2p: the future is ready but the context hasn't fully been consumed
     324                                // spin until it is safe to move on
     325                                if( got == 2p ) {
     326                                        while( this.ptr != 1p ) Pause();
     327                                        got = 1p;
     328                                }
     329
     330                                // The future is completed delete it now
     331                                /* paranoid */ verify( this.ptr != 1p );
     332                                free( &this );
     333                                return true;
     334                        }
     335
     336                        // from the server side, mark the future as fulfilled
     337                        // delete it if needed
     338                        bool fulfil( future_t & this ) {
     339                                for() {
     340                                        struct oneshot * expected = this.ptr;
     341                                        // was this abandoned?
     342                                        #if defined(__GNUC__) && __GNUC__ >= 7
     343                                                #pragma GCC diagnostic push
     344                                                #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
     345                                        #endif
     346                                                if( expected == 3p ) { free( &this ); return false; }
     347                                        #if defined(__GNUC__) && __GNUC__ >= 7
     348                                                #pragma GCC diagnostic pop
     349                                        #endif
     350
     351                                        /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen
     352                                        /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.
     353
     354                                        // If there is a wait context, we need to consume it and mark it as consumed after
     355                                        // If there is no context then we can skip the in progress phase
     356                                        struct oneshot * want = expected == 0p ? 1p : 2p;
     357                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     358                                                if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
     359                                                bool ret = post( *expected );
     360                                                __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     361                                                return ret;
     362                                        }
     363                                }
     364
     365                        }
     366
     367                        // Wait for the future to be fulfilled
     368                        bool wait( future_t & this ) {
     369                                oneshot temp;
     370                                if( !setup(this, temp) ) return false;
     371
     372                                // Wait context is setup, just wait on it
     373                                bool ret = wait( temp );
     374
     375                                // Wait for the future to tru
     376                                while( this.ptr == 2p ) Pause();
     377                                // Make sure the state makes sense
     378                                // Should be fulfilled, could be in progress but it's out of date if so
     379                                // since if that is the case, the oneshot was fulfilled (unparking this thread)
     380                                // and the oneshot should not be needed any more
     381                                __attribute__((unused)) struct oneshot * was = this.ptr;
     382                                /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );
     383
     384                                // Mark the future as fulfilled, to be consistent
     385                                // with potential calls to avail
     386                                // this.ptr = 1p;
     387                                return ret;
     388                        }
     389                }
     390
    136391                //-----------------------------------------------------------------------
    137392                // Statics call at the end of each thread to register statistics
  • libcfa/src/concurrency/kernel/startup.cfa

    r467c8b7 rc08c3cf  
    199199        void ?{}(processor & this) with( this ) {
    200200                ( this.idle ){};
    201                 ( this.terminated ){ 0 };
     201                ( this.terminated ){};
    202202                ( this.runner ){};
    203203                init( this, "Main Processor", *mainCluster );
     
    528528void ?{}(processor & this, const char name[], cluster & _cltr) {
    529529        ( this.idle ){};
    530         ( this.terminated ){ 0 };
     530        ( this.terminated ){};
    531531        ( this.runner ){};
    532532
     
    549549                __wake_proc( &this );
    550550
    551                 P( terminated );
     551                wait( terminated );
    552552                /* paranoid */ verify( active_processor() != &this);
    553553        }
  • libcfa/src/concurrency/locks.cfa

    r467c8b7 rc08c3cf  
    77//-----------------------------------------------------------------------------
    88// info_thread
    9 forall(dtype L | is_blocking_lock(L)) {
     9forall(L & | is_blocking_lock(L)) {
    1010        struct info_thread {
    1111                // used to put info_thread on a dl queue (aka sequence)
     
    195195//-----------------------------------------------------------------------------
    196196// alarm node wrapper
    197 forall(dtype L | is_blocking_lock(L)) {
     197forall(L & | is_blocking_lock(L)) {
    198198        struct alarm_node_wrap {
    199199                alarm_node_t alarm_node;
     
    239239//-----------------------------------------------------------------------------
    240240// condition variable
    241 forall(dtype L | is_blocking_lock(L)) {
     241forall(L & | is_blocking_lock(L)) {
    242242
    243243        void ?{}( condition_variable(L) & this ){
     
    356356        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time         ) with(this) { WAIT_TIME( info, &l , time ) }
    357357}
     358
     359//-----------------------------------------------------------------------------
     360// Semaphore
     361void  ?{}( semaphore & this, int count = 1 ) {
     362        (this.lock){};
     363        this.count = count;
     364        (this.waiting){};
     365}
     366void ^?{}(semaphore & this) {}
     367
     368bool P(semaphore & this) with( this ){
     369        lock( lock __cfaabi_dbg_ctx2 );
     370        count -= 1;
     371        if ( count < 0 ) {
     372                // queue current task
     373                append( waiting, active_thread() );
     374
     375                // atomically release spin lock and block
     376                unlock( lock );
     377                park();
     378                return true;
     379        }
     380        else {
     381            unlock( lock );
     382            return false;
     383        }
     384}
     385
     386bool V(semaphore & this) with( this ) {
     387        $thread * thrd = 0p;
     388        lock( lock __cfaabi_dbg_ctx2 );
     389        count += 1;
     390        if ( count <= 0 ) {
     391                // remove task at head of waiting list
     392                thrd = pop_head( waiting );
     393        }
     394
     395        unlock( lock );
     396
     397        // make new owner
     398        unpark( thrd );
     399
     400        return thrd != 0p;
     401}
     402
     403bool V(semaphore & this, unsigned diff) with( this ) {
     404        $thread * thrd = 0p;
     405        lock( lock __cfaabi_dbg_ctx2 );
     406        int release = max(-count, (int)diff);
     407        count += diff;
     408        for(release) {
     409                unpark( pop_head( waiting ) );
     410        }
     411
     412        unlock( lock );
     413
     414        return thrd != 0p;
     415}
  • libcfa/src/concurrency/locks.hfa

    r467c8b7 rc08c3cf  
    1313//-----------------------------------------------------------------------------
    1414// is_blocking_lock
    15 trait is_blocking_lock(dtype L | sized(L)) {
     15trait is_blocking_lock(L & | sized(L)) {
    1616        // For synchronization locks to use when acquiring
    1717        void on_notify( L &, struct $thread * );
     
    3131// the info thread is a wrapper around a thread used
    3232// to store extra data for use in the condition variable
    33 forall(dtype L | is_blocking_lock(L)) {
     33forall(L & | is_blocking_lock(L)) {
    3434        struct info_thread;
    3535
     
    120120//-----------------------------------------------------------------------------
    121121// Synchronization Locks
    122 forall(dtype L | is_blocking_lock(L)) {
     122forall(L & | is_blocking_lock(L)) {
    123123        struct condition_variable {
    124124                // Spin lock used for mutual exclusion
     
    157157        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    158158}
     159
     160//-----------------------------------------------------------------------------
     161// Semaphore
     162struct semaphore {
     163        __spinlock_t lock;
     164        int count;
     165        __queue_t($thread) waiting;
     166};
     167
     168void  ?{}(semaphore & this, int count = 1);
     169void ^?{}(semaphore & this);
     170bool   P (semaphore & this);
     171bool   V (semaphore & this);
     172bool   V (semaphore & this, unsigned count);
  • libcfa/src/concurrency/monitor.cfa

    r467c8b7 rc08c3cf  
    5050static inline [$thread *, int] search_entry_queue( const __waitfor_mask_t &, $monitor * monitors [], __lock_size_t count );
    5151
    52 forall(dtype T | sized( T ))
     52forall(T & | sized( T ))
    5353static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val );
    5454static inline __lock_size_t count_max    ( const __waitfor_mask_t & mask );
     
    949949}
    950950
    951 forall(dtype T | sized( T ))
     951forall(T & | sized( T ))
    952952static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) {
    953953        if( !val ) return size;
  • libcfa/src/concurrency/monitor.hfa

    r467c8b7 rc08c3cf  
    2222#include "stdlib.hfa"
    2323
    24 trait is_monitor(dtype T) {
     24trait is_monitor(T &) {
    2525        $monitor * get_monitor( T & );
    2626        void ^?{}( T & mutex );
     
    5959void ^?{}( monitor_dtor_guard_t & this );
    6060
    61 static inline forall( dtype T | sized(T) | { void ^?{}( T & mutex ); } )
     61static inline forall( T & | sized(T) | { void ^?{}( T & mutex ); } )
    6262void delete( T * th ) {
    6363        ^(*th){};
  • libcfa/src/concurrency/mutex.cfa

    r467c8b7 rc08c3cf  
    164164}
    165165
    166 forall(dtype L | is_lock(L))
     166forall(L & | is_lock(L))
    167167void wait(condition_variable & this, L & l) {
    168168        lock( this.lock __cfaabi_dbg_ctx2 );
     
    176176//-----------------------------------------------------------------------------
    177177// Scopes
    178 forall(dtype L | is_lock(L))
     178forall(L & | is_lock(L))
    179179void lock_all  ( L * locks[], size_t count) {
    180180        // Sort locks based on addresses
     
    188188}
    189189
    190 forall(dtype L | is_lock(L))
     190forall(L & | is_lock(L))
    191191void unlock_all( L * locks[], size_t count) {
    192192        // Lock all
  • libcfa/src/concurrency/mutex.hfa

    r467c8b7 rc08c3cf  
    7070void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    7171
    72 trait is_lock(dtype L | sized(L)) {
     72trait is_lock(L & | sized(L)) {
    7373        void lock  (L &);
    7474        void unlock(L &);
     
    9494void wait(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9595
    96 forall(dtype L | is_lock(L))
     96forall(L & | is_lock(L))
    9797void wait(condition_variable & this, L & l) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9898
    9999//-----------------------------------------------------------------------------
    100100// Scopes
    101 forall(dtype L | is_lock(L)) {
     101forall(L & | is_lock(L)) {
    102102        #if !defined( __TUPLE_ARRAYS_EXIST__ )
    103103        void lock  ( L * locks [], size_t count);
  • libcfa/src/concurrency/preemption.cfa

    r467c8b7 rc08c3cf  
    616616}
    617617
     618// Prevent preemption since we are about to start terminating things
     619void __kernel_abort_lock(void) {
     620        signal_block( SIGUSR1 );
     621}
     622
    618623// Raii ctor/dtor for the preemption_scope
    619624// Used by thread to control when they want to receive preemption signals
  • libcfa/src/concurrency/thread.cfa

    r467c8b7 rc08c3cf  
    6262}
    6363
    64 FORALL_DATA_INSTANCE(ThreadCancelled, (dtype thread_t), (thread_t))
     64FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t))
    6565
    66 forall(dtype T)
     66forall(T &)
    6767void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src) {
    6868        dst->virtual_table = src->virtual_table;
     
    7171}
    7272
    73 forall(dtype T)
     73forall(T &)
    7474const char * msg(ThreadCancelled(T) *) {
    7575        return "ThreadCancelled";
    7676}
    7777
    78 forall(dtype T)
     78forall(T &)
    7979static void default_thread_cancel_handler(ThreadCancelled(T) & ) {
    8080        abort( "Unhandled thread cancellation.\n" );
    8181}
    8282
    83 forall(dtype T | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
     83forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
    8484void ?{}( thread_dtor_guard_t & this,
    8585                T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) {
     
    124124//-----------------------------------------------------------------------------
    125125// Starting and stopping threads
    126 forall( dtype T | is_thread(T) )
     126forall( T & | is_thread(T) )
    127127void __thrd_start( T & this, void (*main_p)(T &) ) {
    128128        $thread * this_thrd = get_thread(this);
     
    140140//-----------------------------------------------------------------------------
    141141// Support for threads that don't ues the thread keyword
    142 forall( dtype T | sized(T) | is_thread(T) | { void ?{}(T&); } )
     142forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } )
    143143void ?{}( scoped(T)& this ) with( this ) {
    144144        handle{};
     
    146146}
    147147
    148 forall( dtype T, ttype P | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
     148forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
    149149void ?{}( scoped(T)& this, P params ) with( this ) {
    150150        handle{ params };
     
    152152}
    153153
    154 forall( dtype T | sized(T) | is_thread(T) )
     154forall( T & | sized(T) | is_thread(T) )
    155155void ^?{}( scoped(T)& this ) with( this ) {
    156156        ^handle{};
     
    158158
    159159//-----------------------------------------------------------------------------
    160 forall(dtype T | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
     160forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
    161161T & join( T & this ) {
    162162        thread_dtor_guard_t guard = { this, defaultResumptionHandler };
  • libcfa/src/concurrency/thread.hfa

    r467c8b7 rc08c3cf  
    2626//-----------------------------------------------------------------------------
    2727// thread trait
    28 trait is_thread(dtype T) {
     28trait is_thread(T &) {
    2929        void ^?{}(T& mutex this);
    3030        void main(T& this);
     
    3232};
    3333
    34 FORALL_DATA_EXCEPTION(ThreadCancelled, (dtype thread_t), (thread_t)) (
     34FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
    3535        thread_t * the_thread;
    3636        exception_t * the_exception;
    3737);
    3838
    39 forall(dtype T)
     39forall(T &)
    4040void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src);
    4141
    42 forall(dtype T)
     42forall(T &)
    4343const char * msg(ThreadCancelled(T) *);
    4444
     
    4747
    4848// Inline getters for threads/coroutines/monitors
    49 forall( dtype T | is_thread(T) )
     49forall( T & | is_thread(T) )
    5050static inline $coroutine* get_coroutine(T & this) __attribute__((const)) { return &get_thread(this)->self_cor; }
    5151
    52 forall( dtype T | is_thread(T) )
     52forall( T & | is_thread(T) )
    5353static inline $monitor  * get_monitor  (T & this) __attribute__((const)) { return &get_thread(this)->self_mon; }
    5454
     
    6060extern struct cluster * mainCluster;
    6161
    62 forall( dtype T | is_thread(T) )
     62forall( T & | is_thread(T) )
    6363void __thrd_start( T & this, void (*)(T &) );
    6464
     
    8282};
    8383
    84 forall( dtype T | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
     84forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
    8585void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) );
    8686void ^?{}( thread_dtor_guard_t & this );
     
    8989// thread runner
    9090// Structure that actually start and stop threads
    91 forall( dtype T | sized(T) | is_thread(T) )
     91forall( T & | sized(T) | is_thread(T) )
    9292struct scoped {
    9393        T handle;
    9494};
    9595
    96 forall( dtype T | sized(T) | is_thread(T) | { void ?{}(T&); } )
     96forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } )
    9797void ?{}( scoped(T)& this );
    9898
    99 forall( dtype T, ttype P | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
     99forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
    100100void ?{}( scoped(T)& this, P params );
    101101
    102 forall( dtype T | sized(T) | is_thread(T) )
     102forall( T & | sized(T) | is_thread(T) )
    103103void ^?{}( scoped(T)& this );
    104104
     
    115115void unpark( $thread * this );
    116116
    117 forall( dtype T | is_thread(T) )
     117forall( T & | is_thread(T) )
    118118static inline void unpark( T & this ) { if(!&this) return; unpark( get_thread( this ) );}
    119119
     
    128128//----------
    129129// join
    130 forall( dtype T | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
     130forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
    131131T & join( T & this );
    132132
Note: See TracChangeset for help on using the changeset viewer.