Changeset 454f478 for libcfa/src


Ignore:
Timestamp:
Jan 20, 2021, 5:35:39 PM (3 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
9db2c92
Parents:
dafbde8
Message:

Re-arranged and commented low-level headers.
Main goal was for better support of weakso locks that are comming.

Location:
libcfa/src
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/bits/defs.hfa

    rdafbde8 r454f478  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // defs.hfa --
     7// defs.hfa -- Commen macros, functions and typedefs
     8// Most files depend on them and they are always useful to have.
     9//
     10//  *** Must not contain code specific to libcfathread ***
    811//
    912// Author           : Thierry Delisle
     
    6265        #endif
    6366}
     67
     68// pause to prevent excess processor bus usage
     69#if defined( __i386 ) || defined( __x86_64 )
     70        #define Pause() __asm__ __volatile__ ( "pause" : : : )
     71#elif defined( __ARM_ARCH )
     72        #define Pause() __asm__ __volatile__ ( "YIELD" : : : )
     73#else
     74        #error unsupported architecture
     75#endif
  • libcfa/src/bits/locks.hfa

    rdafbde8 r454f478  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // bits/locks.hfa -- Fast internal locks.
     7// bits/locks.hfa -- Basic spinlocks that are reused in the system.
     8// Used for locks that aren't specific to cforall threads and can be used anywhere
     9//
     10//  *** Must not contain code specific to libcfathread ***
    811//
    912// Author           : Thierry Delisle
     
    1922#include "bits/defs.hfa"
    2023#include <assert.h>
    21 
    22 #ifdef __cforall
    23         extern "C" {
    24                 #include <pthread.h>
    25         }
    26 #endif
    27 
    28 // pause to prevent excess processor bus usage
    29 #if defined( __i386 ) || defined( __x86_64 )
    30         #define Pause() __asm__ __volatile__ ( "pause" : : : )
    31 #elif defined( __ARM_ARCH )
    32         #define Pause() __asm__ __volatile__ ( "YIELD" : : : )
    33 #else
    34         #error unsupported architecture
    35 #endif
    3624
    3725struct __spinlock_t {
     
    10492                enable_interrupts_noPoll();
    10593        }
    106 
    107 
    108         #ifdef __CFA_WITH_VERIFY__
    109                 extern bool __cfaabi_dbg_in_kernel();
    110         #endif
    111 
    112         extern "C" {
    113                 char * strerror(int);
    114         }
    115         #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
    116 
    117         struct __bin_sem_t {
    118                 pthread_mutex_t         lock;
    119                 pthread_cond_t          cond;
    120                 int                     val;
    121         };
    122 
    123         static inline void ?{}(__bin_sem_t & this) with( this ) {
    124                 // Create the mutex with error checking
    125                 pthread_mutexattr_t mattr;
    126                 pthread_mutexattr_init( &mattr );
    127                 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
    128                 pthread_mutex_init(&lock, &mattr);
    129 
    130                 pthread_cond_init (&cond, (const pthread_condattr_t *)0p);  // workaround trac#208: cast should not be required
    131                 val = 0;
    132         }
    133 
    134         static inline void ^?{}(__bin_sem_t & this) with( this ) {
    135                 CHECKED( pthread_mutex_destroy(&lock) );
    136                 CHECKED( pthread_cond_destroy (&cond) );
    137         }
    138 
    139         static inline void wait(__bin_sem_t & this) with( this ) {
    140                 verify(__cfaabi_dbg_in_kernel());
    141                 CHECKED( pthread_mutex_lock(&lock) );
    142                         while(val < 1) {
    143                                 pthread_cond_wait(&cond, &lock);
    144                         }
    145                         val -= 1;
    146                 CHECKED( pthread_mutex_unlock(&lock) );
    147         }
    148 
    149         static inline bool post(__bin_sem_t & this) with( this ) {
    150                 bool needs_signal = false;
    151 
    152                 CHECKED( pthread_mutex_lock(&lock) );
    153                         if(val < 1) {
    154                                 val += 1;
    155                                 pthread_cond_signal(&cond);
    156                                 needs_signal = true;
    157                         }
    158                 CHECKED( pthread_mutex_unlock(&lock) );
    159 
    160                 return needs_signal;
    161         }
    162 
    163         #undef CHECKED
    164 
    165         struct $thread;
    166         extern void park( void );
    167         extern void unpark( struct $thread * this );
    168         static inline struct $thread * active_thread ();
    169 
    170         // Semaphore which only supports a single thread
    171         struct single_sem {
    172                 struct $thread * volatile ptr;
    173         };
    174 
    175         static inline {
    176                 void  ?{}(single_sem & this) {
    177                         this.ptr = 0p;
    178                 }
    179 
    180                 void ^?{}(single_sem &) {}
    181 
    182                 bool wait(single_sem & this) {
    183                         for() {
    184                                 struct $thread * expected = this.ptr;
    185                                 if(expected == 1p) {
    186                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    187                                                 return false;
    188                                         }
    189                                 }
    190                                 else {
    191                                         /* paranoid */ verify( expected == 0p );
    192                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    193                                                 park();
    194                                                 return true;
    195                                         }
    196                                 }
    197 
    198                         }
    199                 }
    200 
    201                 bool post(single_sem & this) {
    202                         for() {
    203                                 struct $thread * expected = this.ptr;
    204                                 if(expected == 1p) return false;
    205                                 if(expected == 0p) {
    206                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    207                                                 return false;
    208                                         }
    209                                 }
    210                                 else {
    211                                         if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    212                                                 unpark( expected );
    213                                                 return true;
    214                                         }
    215                                 }
    216                         }
    217                 }
    218         }
    219 
    220         // Synchronozation primitive which only supports a single thread and one post
    221         // Similar to a binary semaphore with a 'one shot' semantic
    222         // is expected to be discarded after each party call their side
    223         struct oneshot {
    224                 // Internal state :
    225                 //     0p     : is initial state (wait will block)
    226                 //     1p     : fulfilled (wait won't block)
    227                 // any thread : a thread is currently waiting
    228                 struct $thread * volatile ptr;
    229         };
    230 
    231         static inline {
    232                 void  ?{}(oneshot & this) {
    233                         this.ptr = 0p;
    234                 }
    235 
    236                 void ^?{}(oneshot &) {}
    237 
    238                 // Wait for the post, return immidiately if it already happened.
    239                 // return true if the thread was parked
    240                 bool wait(oneshot & this) {
    241                         for() {
    242                                 struct $thread * expected = this.ptr;
    243                                 if(expected == 1p) return false;
    244                                 /* paranoid */ verify( expected == 0p );
    245                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    246                                         park();
    247                                         /* paranoid */ verify( this.ptr == 1p );
    248                                         return true;
    249                                 }
    250                         }
    251                 }
    252 
    253                 // Mark as fulfilled, wake thread if needed
    254                 // return true if a thread was unparked
    255                 bool post(oneshot & this) {
    256                         struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
    257                         if( got == 0p ) return false;
    258                         unpark( got );
    259                         return true;
    260                 }
    261         }
    262 
    263         // base types for future to build upon
    264         // It is based on the 'oneshot' type to allow multiple futures
    265         // to block on the same instance, permitting users to block a single
    266         // thread on "any of" [a given set of] futures.
    267         // does not support multiple threads waiting on the same future
    268         struct future_t {
    269                 // Internal state :
    270                 //     0p      : is initial state (wait will block)
    271                 //     1p      : fulfilled (wait won't block)
    272                 //     2p      : in progress ()
    273                 //     3p      : abandoned, server should delete
    274                 // any oneshot : a context has been setup to wait, a thread could wait on it
    275                 struct oneshot * volatile ptr;
    276         };
    277 
    278         static inline {
    279                 void  ?{}(future_t & this) {
    280                         this.ptr = 0p;
    281                 }
    282 
    283                 void ^?{}(future_t &) {}
    284 
    285                 void reset(future_t & this) {
    286                         // needs to be in 0p or 1p
    287                         __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
    288                 }
    289 
    290                 // check if the future is available
    291                 bool available( future_t & this ) {
    292                         return this.ptr == 1p;
    293                 }
    294 
    295                 // Prepare the future to be waited on
    296                 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
    297                 bool setup( future_t & this, oneshot & wait_ctx ) {
    298                         /* paranoid */ verify( wait_ctx.ptr == 0p );
    299                         // The future needs to set the wait context
    300                         for() {
    301                                 struct oneshot * expected = this.ptr;
    302                                 // Is the future already fulfilled?
    303                                 if(expected == 1p) return false; // Yes, just return false (didn't block)
    304 
    305                                 // The future is not fulfilled, try to setup the wait context
    306                                 /* paranoid */ verify( expected == 0p );
    307                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    308                                         return true;
    309                                 }
    310                         }
    311                 }
    312 
    313                 // Stop waiting on a future
    314                 // When multiple futures are waited for together in "any of" pattern
    315                 // futures that weren't fulfilled before the thread woke up
    316                 // should retract the wait ctx
    317                 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
    318                 void retract( future_t & this, oneshot & wait_ctx ) {
    319                         // Remove the wait context
    320                         struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
    321 
    322                         // got == 0p: future was never actually setup, just return
    323                         if( got == 0p ) return;
    324 
    325                         // got == wait_ctx: since fulfil does an atomic_swap,
    326                         // if we got back the original then no one else saw context
    327                         // It is safe to delete (which could happen after the return)
    328                         if( got == &wait_ctx ) return;
    329 
    330                         // got == 1p: the future is ready and the context was fully consumed
    331                         // the server won't use the pointer again
    332                         // It is safe to delete (which could happen after the return)
    333                         if( got == 1p ) return;
    334 
    335                         // got == 2p: the future is ready but the context hasn't fully been consumed
    336                         // spin until it is safe to move on
    337                         if( got == 2p ) {
    338                                 while( this.ptr != 1p ) Pause();
    339                                 return;
    340                         }
    341 
    342                         // got == any thing else, something wen't wrong here, abort
    343                         abort("Future in unexpected state");
    344                 }
    345 
    346                 // Mark the future as abandoned, meaning it will be deleted by the server
    347                 bool abandon( future_t & this ) {
    348                         /* paranoid */ verify( this.ptr != 3p );
    349 
    350                         // Mark the future as abandonned
    351                         struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);
    352 
    353                         // If the future isn't already fulfilled, let the server delete it
    354                         if( got == 0p ) return false;
    355 
    356                         // got == 2p: the future is ready but the context hasn't fully been consumed
    357                         // spin until it is safe to move on
    358                         if( got == 2p ) {
    359                                 while( this.ptr != 1p ) Pause();
    360                                 got = 1p;
    361                         }
    362 
    363                         // The future is completed delete it now
    364                         /* paranoid */ verify( this.ptr != 1p );
    365                         free( &this );
    366                         return true;
    367                 }
    368 
    369                 // from the server side, mark the future as fulfilled
    370                 // delete it if needed
    371                 bool fulfil( future_t & this ) {
    372                         for() {
    373                                 struct oneshot * expected = this.ptr;
    374                                 // was this abandoned?
    375                                 #if defined(__GNUC__) && __GNUC__ >= 7
    376                                         #pragma GCC diagnostic push
    377                                         #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
    378                                 #endif
    379                                         if( expected == 3p ) { free( &this ); return false; }
    380                                 #if defined(__GNUC__) && __GNUC__ >= 7
    381                                         #pragma GCC diagnostic pop
    382                                 #endif
    383 
    384                                 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen
    385                                 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.
    386 
    387                                 // If there is a wait context, we need to consume it and mark it as consumed after
    388                                 // If there is no context then we can skip the in progress phase
    389                                 struct oneshot * want = expected == 0p ? 1p : 2p;
    390                                 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
    391                                         if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
    392                                         bool ret = post( *expected );
    393                                         __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
    394                                         return ret;
    395                                 }
    396                         }
    397 
    398                 }
    399 
    400                 // Wait for the future to be fulfilled
    401                 bool wait( future_t & this ) {
    402                         oneshot temp;
    403                         if( !setup(this, temp) ) return false;
    404 
    405                         // Wait context is setup, just wait on it
    406                         bool ret = wait( temp );
    407 
    408                         // Wait for the future to tru
    409                         while( this.ptr == 2p ) Pause();
    410                         // Make sure the state makes sense
    411                         // Should be fulfilled, could be in progress but it's out of date if so
    412                         // since if that is the case, the oneshot was fulfilled (unparking this thread)
    413                         // and the oneshot should not be needed any more
    414                         __attribute__((unused)) struct oneshot * was = this.ptr;
    415                         /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );
    416 
    417                         // Mark the future as fulfilled, to be consistent
    418                         // with potential calls to avail
    419                         // this.ptr = 1p;
    420                         return ret;
    421                 }
    422         }
    42394#endif
  • libcfa/src/concurrency/io/types.hfa

    rdafbde8 r454f478  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
     7// io/types.hfa -- PRIVATE
     8// Types used by the I/O subsystem
    89//
    910// Author           : Thierry Delisle
     
    2021}
    2122
    22 #include "bits/locks.hfa"
     23#include "kernel/fwd.hfa"
    2324
    2425#if defined(CFA_HAVE_LINUX_IO_URING_H)
  • libcfa/src/concurrency/kernel.cfa

    rdafbde8 r454f478  
    224224        }
    225225
    226         V( this->terminated );
     226        post( this->terminated );
    227227
    228228        if(this == mainProcessor) {
     
    665665//=============================================================================================
    666666//-----------------------------------------------------------------------------
    667 // Locks
    668 void  ?{}( semaphore & this, int count = 1 ) {
    669         (this.lock){};
    670         this.count = count;
    671         (this.waiting){};
    672 }
    673 void ^?{}(semaphore & this) {}
    674 
    675 bool P(semaphore & this) with( this ){
    676         lock( lock __cfaabi_dbg_ctx2 );
    677         count -= 1;
    678         if ( count < 0 ) {
    679                 // queue current task
    680                 append( waiting, active_thread() );
    681 
    682                 // atomically release spin lock and block
    683                 unlock( lock );
    684                 park();
    685                 return true;
    686         }
    687         else {
    688             unlock( lock );
    689             return false;
    690         }
    691 }
    692 
    693 bool V(semaphore & this) with( this ) {
    694         $thread * thrd = 0p;
    695         lock( lock __cfaabi_dbg_ctx2 );
    696         count += 1;
    697         if ( count <= 0 ) {
    698                 // remove task at head of waiting list
    699                 thrd = pop_head( waiting );
    700         }
    701 
    702         unlock( lock );
    703 
    704         // make new owner
    705         unpark( thrd );
    706 
    707         return thrd != 0p;
    708 }
    709 
    710 bool V(semaphore & this, unsigned diff) with( this ) {
    711         $thread * thrd = 0p;
    712         lock( lock __cfaabi_dbg_ctx2 );
    713         int release = max(-count, (int)diff);
    714         count += diff;
    715         for(release) {
    716                 unpark( pop_head( waiting ) );
    717         }
    718 
    719         unlock( lock );
    720 
    721         return thrd != 0p;
    722 }
    723 
    724 //-----------------------------------------------------------------------------
    725667// Debug
    726668__cfaabi_dbg_debug_do(
  • libcfa/src/concurrency/kernel.hfa

    rdafbde8 r454f478  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel --
     7// kernel -- Header containing the core of the kernel API
    88//
    99// Author           : Thierry Delisle
     
    2424extern "C" {
    2525        #include <bits/pthreadtypes.h>
     26        #include <pthread.h>
    2627        #include <linux/types.h>
    2728}
    2829
    2930//-----------------------------------------------------------------------------
    30 // Locks
    31 struct semaphore {
    32         __spinlock_t lock;
    33         int count;
    34         __queue_t($thread) waiting;
    35 };
    36 
    37 void  ?{}(semaphore & this, int count = 1);
    38 void ^?{}(semaphore & this);
    39 bool   P (semaphore & this);
    40 bool   V (semaphore & this);
    41 bool   V (semaphore & this, unsigned count);
     31// Underlying Locks
     32#ifdef __CFA_WITH_VERIFY__
     33        extern bool __cfaabi_dbg_in_kernel();
     34#endif
     35
     36extern "C" {
     37        char * strerror(int);
     38}
     39#define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }
     40
     41struct __bin_sem_t {
     42        pthread_mutex_t         lock;
     43        pthread_cond_t          cond;
     44        int                     val;
     45};
     46
     47static inline void ?{}(__bin_sem_t & this) with( this ) {
     48        // Create the mutex with error checking
     49        pthread_mutexattr_t mattr;
     50        pthread_mutexattr_init( &mattr );
     51        pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);
     52        pthread_mutex_init(&lock, &mattr);
     53
     54        pthread_cond_init (&cond, (const pthread_condattr_t *)0p);  // workaround trac#208: cast should not be required
     55        val = 0;
     56}
     57
     58static inline void ^?{}(__bin_sem_t & this) with( this ) {
     59        CHECKED( pthread_mutex_destroy(&lock) );
     60        CHECKED( pthread_cond_destroy (&cond) );
     61}
     62
     63static inline void wait(__bin_sem_t & this) with( this ) {
     64        verify(__cfaabi_dbg_in_kernel());
     65        CHECKED( pthread_mutex_lock(&lock) );
     66                while(val < 1) {
     67                        pthread_cond_wait(&cond, &lock);
     68                }
     69                val -= 1;
     70        CHECKED( pthread_mutex_unlock(&lock) );
     71}
     72
     73static inline bool post(__bin_sem_t & this) with( this ) {
     74        bool needs_signal = false;
     75
     76        CHECKED( pthread_mutex_lock(&lock) );
     77                if(val < 1) {
     78                        val += 1;
     79                        pthread_cond_signal(&cond);
     80                        needs_signal = true;
     81                }
     82        CHECKED( pthread_mutex_unlock(&lock) );
     83
     84        return needs_signal;
     85}
     86
     87#undef CHECKED
    4288
    4389
     
    91137
    92138        // Termination synchronisation (user semaphore)
    93         semaphore terminated;
     139        oneshot terminated;
    94140
    95141        // pthread Stack
  • libcfa/src/concurrency/kernel/fwd.hfa

    rdafbde8 r454f478  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel/fwd.hfa --
     7// kernel/fwd.hfa -- PUBLIC
     8// Fundamental code needed to implement threading M.E.S. algorithms.
    89//
    910// Author           : Thierry Delisle
     
    134135                extern uint64_t thread_rand();
    135136
     137                // Semaphore which only supports a single thread
     138                struct single_sem {
     139                        struct $thread * volatile ptr;
     140                };
     141
     142                static inline {
     143                        void  ?{}(single_sem & this) {
     144                                this.ptr = 0p;
     145                        }
     146
     147                        void ^?{}(single_sem &) {}
     148
     149                        bool wait(single_sem & this) {
     150                                for() {
     151                                        struct $thread * expected = this.ptr;
     152                                        if(expected == 1p) {
     153                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     154                                                        return false;
     155                                                }
     156                                        }
     157                                        else {
     158                                                /* paranoid */ verify( expected == 0p );
     159                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     160                                                        park();
     161                                                        return true;
     162                                                }
     163                                        }
     164
     165                                }
     166                        }
     167
     168                        bool post(single_sem & this) {
     169                                for() {
     170                                        struct $thread * expected = this.ptr;
     171                                        if(expected == 1p) return false;
     172                                        if(expected == 0p) {
     173                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     174                                                        return false;
     175                                                }
     176                                        }
     177                                        else {
     178                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     179                                                        unpark( expected );
     180                                                        return true;
     181                                                }
     182                                        }
     183                                }
     184                        }
     185                }
     186
     187                // Synchronozation primitive which only supports a single thread and one post
     188                // Similar to a binary semaphore with a 'one shot' semantic
     189                // is expected to be discarded after each party call their side
     190                struct oneshot {
     191                        // Internal state :
     192                        //     0p     : is initial state (wait will block)
     193                        //     1p     : fulfilled (wait won't block)
     194                        // any thread : a thread is currently waiting
     195                        struct $thread * volatile ptr;
     196                };
     197
     198                static inline {
     199                        void  ?{}(oneshot & this) {
     200                                this.ptr = 0p;
     201                        }
     202
     203                        void ^?{}(oneshot &) {}
     204
     205                        // Wait for the post, return immidiately if it already happened.
     206                        // return true if the thread was parked
     207                        bool wait(oneshot & this) {
     208                                for() {
     209                                        struct $thread * expected = this.ptr;
     210                                        if(expected == 1p) return false;
     211                                        /* paranoid */ verify( expected == 0p );
     212                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     213                                                park();
     214                                                /* paranoid */ verify( this.ptr == 1p );
     215                                                return true;
     216                                        }
     217                                }
     218                        }
     219
     220                        // Mark as fulfilled, wake thread if needed
     221                        // return true if a thread was unparked
     222                        bool post(oneshot & this) {
     223                                struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     224                                if( got == 0p ) return false;
     225                                unpark( got );
     226                                return true;
     227                        }
     228                }
     229
     230                // base types for future to build upon
     231                // It is based on the 'oneshot' type to allow multiple futures
     232                // to block on the same instance, permitting users to block a single
     233                // thread on "any of" [a given set of] futures.
     234                // does not support multiple threads waiting on the same future
     235                struct future_t {
     236                        // Internal state :
     237                        //     0p      : is initial state (wait will block)
     238                        //     1p      : fulfilled (wait won't block)
     239                        //     2p      : in progress ()
     240                        //     3p      : abandoned, server should delete
     241                        // any oneshot : a context has been setup to wait, a thread could wait on it
     242                        struct oneshot * volatile ptr;
     243                };
     244
     245                static inline {
     246                        void  ?{}(future_t & this) {
     247                                this.ptr = 0p;
     248                        }
     249
     250                        void ^?{}(future_t &) {}
     251
     252                        void reset(future_t & this) {
     253                                // needs to be in 0p or 1p
     254                                __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     255                        }
     256
     257                        // check if the future is available
     258                        bool available( future_t & this ) {
     259                                return this.ptr == 1p;
     260                        }
     261
     262                        // Prepare the future to be waited on
     263                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     264                        bool setup( future_t & this, oneshot & wait_ctx ) {
     265                                /* paranoid */ verify( wait_ctx.ptr == 0p );
     266                                // The future needs to set the wait context
     267                                for() {
     268                                        struct oneshot * expected = this.ptr;
     269                                        // Is the future already fulfilled?
     270                                        if(expected == 1p) return false; // Yes, just return false (didn't block)
     271
     272                                        // The future is not fulfilled, try to setup the wait context
     273                                        /* paranoid */ verify( expected == 0p );
     274                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     275                                                return true;
     276                                        }
     277                                }
     278                        }
     279
     280                        // Stop waiting on a future
     281                        // When multiple futures are waited for together in "any of" pattern
     282                        // futures that weren't fulfilled before the thread woke up
     283                        // should retract the wait ctx
     284                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     285                        void retract( future_t & this, oneshot & wait_ctx ) {
     286                                // Remove the wait context
     287                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     288
     289                                // got == 0p: future was never actually setup, just return
     290                                if( got == 0p ) return;
     291
     292                                // got == wait_ctx: since fulfil does an atomic_swap,
     293                                // if we got back the original then no one else saw context
     294                                // It is safe to delete (which could happen after the return)
     295                                if( got == &wait_ctx ) return;
     296
     297                                // got == 1p: the future is ready and the context was fully consumed
     298                                // the server won't use the pointer again
     299                                // It is safe to delete (which could happen after the return)
     300                                if( got == 1p ) return;
     301
     302                                // got == 2p: the future is ready but the context hasn't fully been consumed
     303                                // spin until it is safe to move on
     304                                if( got == 2p ) {
     305                                        while( this.ptr != 1p ) Pause();
     306                                        return;
     307                                }
     308
     309                                // got == any thing else, something wen't wrong here, abort
     310                                abort("Future in unexpected state");
     311                        }
     312
     313                        // Mark the future as abandoned, meaning it will be deleted by the server
     314                        bool abandon( future_t & this ) {
     315                                /* paranoid */ verify( this.ptr != 3p );
     316
     317                                // Mark the future as abandonned
     318                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);
     319
     320                                // If the future isn't already fulfilled, let the server delete it
     321                                if( got == 0p ) return false;
     322
     323                                // got == 2p: the future is ready but the context hasn't fully been consumed
     324                                // spin until it is safe to move on
     325                                if( got == 2p ) {
     326                                        while( this.ptr != 1p ) Pause();
     327                                        got = 1p;
     328                                }
     329
     330                                // The future is completed delete it now
     331                                /* paranoid */ verify( this.ptr != 1p );
     332                                free( &this );
     333                                return true;
     334                        }
     335
     336                        // from the server side, mark the future as fulfilled
     337                        // delete it if needed
     338                        bool fulfil( future_t & this ) {
     339                                for() {
     340                                        struct oneshot * expected = this.ptr;
     341                                        // was this abandoned?
     342                                        #if defined(__GNUC__) && __GNUC__ >= 7
     343                                                #pragma GCC diagnostic push
     344                                                #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
     345                                        #endif
     346                                                if( expected == 3p ) { free( &this ); return false; }
     347                                        #if defined(__GNUC__) && __GNUC__ >= 7
     348                                                #pragma GCC diagnostic pop
     349                                        #endif
     350
     351                                        /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen
     352                                        /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.
     353
     354                                        // If there is a wait context, we need to consume it and mark it as consumed after
     355                                        // If there is no context then we can skip the in progress phase
     356                                        struct oneshot * want = expected == 0p ? 1p : 2p;
     357                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     358                                                if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
     359                                                bool ret = post( *expected );
     360                                                __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     361                                                return ret;
     362                                        }
     363                                }
     364
     365                        }
     366
     367                        // Wait for the future to be fulfilled
     368                        bool wait( future_t & this ) {
     369                                oneshot temp;
     370                                if( !setup(this, temp) ) return false;
     371
     372                                // Wait context is setup, just wait on it
     373                                bool ret = wait( temp );
     374
     375                                // Wait for the future to tru
     376                                while( this.ptr == 2p ) Pause();
     377                                // Make sure the state makes sense
     378                                // Should be fulfilled, could be in progress but it's out of date if so
     379                                // since if that is the case, the oneshot was fulfilled (unparking this thread)
     380                                // and the oneshot should not be needed any more
     381                                __attribute__((unused)) struct oneshot * was = this.ptr;
     382                                /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );
     383
     384                                // Mark the future as fulfilled, to be consistent
     385                                // with potential calls to avail
     386                                // this.ptr = 1p;
     387                                return ret;
     388                        }
     389                }
     390
    136391                //-----------------------------------------------------------------------
    137392                // Statics call at the end of each thread to register statistics
  • libcfa/src/concurrency/kernel/startup.cfa

    rdafbde8 r454f478  
    199199        void ?{}(processor & this) with( this ) {
    200200                ( this.idle ){};
    201                 ( this.terminated ){ 0 };
     201                ( this.terminated ){};
    202202                ( this.runner ){};
    203203                init( this, "Main Processor", *mainCluster );
     
    528528void ?{}(processor & this, const char name[], cluster & _cltr) {
    529529        ( this.idle ){};
    530         ( this.terminated ){ 0 };
     530        ( this.terminated ){};
    531531        ( this.runner ){};
    532532
     
    549549                __wake_proc( &this );
    550550
    551                 P( terminated );
     551                wait( terminated );
    552552                /* paranoid */ verify( active_processor() != &this);
    553553        }
  • libcfa/src/concurrency/locks.cfa

    rdafbde8 r454f478  
    356356        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time         ) with(this) { WAIT_TIME( info, &l , time ) }
    357357}
     358
     359//-----------------------------------------------------------------------------
     360// Semaphore
     361void  ?{}( semaphore & this, int count = 1 ) {
     362        (this.lock){};
     363        this.count = count;
     364        (this.waiting){};
     365}
     366void ^?{}(semaphore & this) {}
     367
     368bool P(semaphore & this) with( this ){
     369        lock( lock __cfaabi_dbg_ctx2 );
     370        count -= 1;
     371        if ( count < 0 ) {
     372                // queue current task
     373                append( waiting, active_thread() );
     374
     375                // atomically release spin lock and block
     376                unlock( lock );
     377                park();
     378                return true;
     379        }
     380        else {
     381            unlock( lock );
     382            return false;
     383        }
     384}
     385
     386bool V(semaphore & this) with( this ) {
     387        $thread * thrd = 0p;
     388        lock( lock __cfaabi_dbg_ctx2 );
     389        count += 1;
     390        if ( count <= 0 ) {
     391                // remove task at head of waiting list
     392                thrd = pop_head( waiting );
     393        }
     394
     395        unlock( lock );
     396
     397        // make new owner
     398        unpark( thrd );
     399
     400        return thrd != 0p;
     401}
     402
     403bool V(semaphore & this, unsigned diff) with( this ) {
     404        $thread * thrd = 0p;
     405        lock( lock __cfaabi_dbg_ctx2 );
     406        int release = max(-count, (int)diff);
     407        count += diff;
     408        for(release) {
     409                unpark( pop_head( waiting ) );
     410        }
     411
     412        unlock( lock );
     413
     414        return thrd != 0p;
     415}
  • libcfa/src/concurrency/locks.hfa

    rdafbde8 r454f478  
    157157        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    158158}
     159
     160//-----------------------------------------------------------------------------
     161// Semaphore
     162struct semaphore {
     163        __spinlock_t lock;
     164        int count;
     165        __queue_t($thread) waiting;
     166};
     167
     168void  ?{}(semaphore & this, int count = 1);
     169void ^?{}(semaphore & this);
     170bool   P (semaphore & this);
     171bool   V (semaphore & this);
     172bool   V (semaphore & this, unsigned count);
Note: See TracChangeset for help on using the changeset viewer.