Ignore:
Timestamp:
Jun 12, 2023, 2:45:32 PM (2 years ago)
Author:
Fangren Yu <f37yu@…>
Branches:
ast-experimental, master
Children:
62d62db
Parents:
34b4268 (diff), 251ce80 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' into ast-experimental

Location:
libcfa/src/concurrency
Files:
5 added
28 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/clib/cfathread.cfa

    r34b4268 r24d6572  
    1616// #define EPOLL_FOR_SOCKETS
    1717
     18#include <string.h>
     19
    1820#include "fstream.hfa"
    1921#include "locks.hfa"
     
    2325#include "time.hfa"
    2426#include "stdlib.hfa"
    25 
     27#include "iofwd.hfa"
    2628#include "cfathread.h"
    27 
    28 extern "C" {
    29                 #include <string.h>
    30                 #include <errno.h>
    31 }
    3229
    3330extern void ?{}(processor &, const char[], cluster &, thread$ *);
    3431extern "C" {
    35       extern void __cfactx_invoke_thread(void (*main)(void *), void * this);
    36         extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
     32        extern void __cfactx_invoke_thread(void (*main)(void *), void * this);
    3733}
    3834
     
    439435        // Mutex
    440436        struct cfathread_mutex {
    441                 linear_backoff_then_block_lock impl;
     437                exp_backoff_then_block_lock impl;
    442438        };
    443439        int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; }
     
    454450        // Condition
    455451        struct cfathread_condition {
    456                 condition_variable(linear_backoff_then_block_lock) impl;
     452                condition_variable(exp_backoff_then_block_lock) impl;
    457453        };
    458454        int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; }
     
    472468}
    473469
    474 #include <iofwd.hfa>
    475 
    476470extern "C" {
    477         #include <unistd.h>
    478         #include <sys/types.h>
    479         #include <sys/socket.h>
    480 
    481471        //--------------------
    482472        // IO operations
     
    488478                , protocol);
    489479        }
    490         int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) {
     480        int cfathread_bind(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len) {
    491481                return bind(socket, address, address_len);
    492482        }
     
    496486        }
    497487
    498         int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) {
     488        int cfathread_accept(int socket, __SOCKADDR_ARG address, socklen_t *restrict address_len) {
    499489                #if defined(EPOLL_FOR_SOCKETS)
    500490                        int ret;
     
    513503        }
    514504
    515         int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) {
     505        int cfathread_connect(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len) {
    516506                #if defined(EPOLL_FOR_SOCKETS)
    517507                        int ret;
  • libcfa/src/concurrency/clib/cfathread.h

    r34b4268 r24d6572  
    99// Author           : Thierry Delisle
    1010// Created On       : Tue Sep 22 15:31:20 2020
    11 // Last Modified By :
    12 // Last Modified On :
    13 // Update Count     :
     11// Last Modified By : Peter A. Buhr
     12// Last Modified On : Mon Mar 13 23:48:40 2023
     13// Update Count     : 7
    1414//
    1515
     16#pragma once
     17
    1618#if defined(__cforall) || defined(__cplusplus)
     19#include <unistd.h>
     20#include <errno.h>
     21#include <sys/socket.h>
     22
    1723extern "C" {
    1824#endif
    19         #include <asm/types.h>
    20         #include <errno.h>
    21         #include <unistd.h>
    22 
    23 
    2425        //--------------------
    2526        // Basic types
     
    7374        } cfathread_mutexattr_t;
    7475        typedef struct cfathread_mutex * cfathread_mutex_t;
    75         int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict attr) __attribute__((nonnull (1)));
     76        int cfathread_mutex_init(cfathread_mutex_t * restrict mut, const cfathread_mutexattr_t * restrict attr) __attribute__((nonnull (1)));
    7677        int cfathread_mutex_destroy(cfathread_mutex_t *mut) __attribute__((nonnull (1)));
    7778        int cfathread_mutex_lock(cfathread_mutex_t *mut) __attribute__((nonnull (1)));
     
    9192        //--------------------
    9293        // IO operations
    93         struct sockaddr;
    94         struct msghdr;
    9594        int cfathread_socket(int domain, int type, int protocol);
    96         int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len);
     95        int cfathread_bind(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len);
    9796        int cfathread_listen(int socket, int backlog);
    98         int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);
    99         int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len);
     97        int cfathread_accept(int socket, __SOCKADDR_ARG address, socklen_t * restrict address_len);
     98        int cfathread_connect(int socket, __CONST_SOCKADDR_ARG address, socklen_t address_len);
    10099        int cfathread_dup(int fildes);
    101100        int cfathread_close(int fildes);
  • libcfa/src/concurrency/coroutine.cfa

    r34b4268 r24d6572  
    1010// Created On       : Mon Nov 28 12:27:26 2016
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Dec 15 12:06:04 2020
    13 // Update Count     : 23
     12// Last Modified On : Thu Feb 16 15:34:46 2023
     13// Update Count     : 24
    1414//
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#include "coroutine.hfa"
  • libcfa/src/concurrency/coroutine.hfa

    r34b4268 r24d6572  
    1010// Created On       : Mon Nov 28 12:27:26 2016
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Thu Jan  6 16:33:16 2022
    13 // Update Count     : 12
     12// Last Modified On : Thu Feb  2 11:31:42 2023
     13// Update Count     : 13
    1414//
    1515
     
    3838// Anything that implements this trait can be resumed.
    3939// Anything that is resumed is a coroutine.
    40 trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled(T))) {
     40forall( T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled(T)) )
     41trait is_coroutine {
    4142        void main(T & this);
    4243        coroutine$ * get_coroutine(T & this);
  • libcfa/src/concurrency/future.hfa

    r34b4268 r24d6572  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
    8 //
    9 // Author           : Thierry Delisle & Peiran Hong
     7// concurrency/future.hfa --
     8//
     9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
    1010// Created On       : Wed Jan 06 17:33:18 2021
    1111// Last Modified By :
     
    1818#include "bits/locks.hfa"
    1919#include "monitor.hfa"
    20 
     20#include "select.hfa"
     21#include "locks.hfa"
     22
     23//----------------------------------------------------------------------------
     24// future
     25// I don't use future_t here since I need to use a lock for this future
     26//  since it supports multiple consumers
     27//  future_t is lockfree and uses atomics which aren't needed given we use locks here
    2128forall( T ) {
     29    // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
     30
     31    // temporary enum replacement
     32    const int FUTURE_EMPTY = 0;
     33    const int FUTURE_FULFILLED = 1;
     34
    2235        struct future {
     36                int state;
     37                T result;
     38                dlist( select_node ) waiters;
     39        futex_mutex lock;
     40        };
     41
     42    struct future_node {
     43        inline select_node;
     44        T * my_result;
     45    };
     46
     47        static inline {
     48
     49        void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
     50            ((select_node &)this){ blocked_thread };
     51            this.my_result = my_result;
     52        }
     53
     54        void ?{}( future(T) & this ) {
     55                        this.waiters{};
     56            this.state = FUTURE_EMPTY;
     57            this.lock{};
     58                }
     59
     60                // Reset future back to original state
     61                void reset( future(T) & this ) with(this)
     62        {
     63            lock( lock );
     64            if( ! waiters`isEmpty )
     65                abort("Attempting to reset a future with blocked waiters");
     66            state = FUTURE_EMPTY;
     67            unlock( lock );
     68        }
     69
     70                // check if the future is available
     71        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
     72                bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
     73
     74
     75        // memcpy wrapper to help copy values
     76        void copy_T( T & from, T & to ) {
     77            memcpy((void *)&to, (void *)&from, sizeof(T));
     78        }
     79
     80        // internal helper to signal waiters off of the future
     81        void _internal_flush( future(T) & this ) with(this) {
     82            while( ! waiters`isEmpty ) {
     83                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
     84                    break; // if handle_OR returns false then waiters is empty so break
     85                select_node &s = try_pop_front( waiters );
     86
     87                if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
     88                    copy_T( result, *(((future_node(T) &)s).my_result) );
     89               
     90                wake_one( waiters, s );
     91            }
     92        }
     93
     94                // Fulfil the future, returns whether or not someone was unblocked
     95                bool fulfil( future(T) & this, T val ) with(this) {
     96            lock( lock );
     97            if( state != FUTURE_EMPTY )
     98                abort("Attempting to fulfil a future that has already been fulfilled");
     99
     100            copy_T( val, result );
     101
     102            bool ret_val = ! waiters`isEmpty;
     103            state = FUTURE_FULFILLED;
     104                        _internal_flush( this );
     105            unlock( lock );
     106            return ret_val;
     107                }
     108
     109                // Wait for the future to be fulfilled
     110                // Also return whether the thread had to block or not
     111                [T, bool] get( future(T) & this ) with( this ) {
     112            lock( lock );
     113            T ret_val;
     114            if( state == FUTURE_FULFILLED ) {
     115                copy_T( result, ret_val );
     116                unlock( lock );
     117                return [ret_val, false];
     118            }
     119
     120            future_node(T) node = { active_thread(), &ret_val };
     121            insert_last( waiters, ((select_node &)node) );
     122            unlock( lock );
     123            park( );
     124
     125                        return [ret_val, true];
     126                }
     127
     128                // Wait for the future to be fulfilled
     129                T get( future(T) & this ) {
     130                        [T, bool] tt;
     131                        tt = get(this);
     132                        return tt.0;
     133                }
     134
     135        // Gets value if it is available and returns [ val, true ]
     136        // otherwise returns [ default_val, false]
     137        // will not block
     138        [T, bool] try_get( future(T) & this ) with(this) {
     139            lock( lock );
     140            T ret_val;
     141            if( state == FUTURE_FULFILLED ) {
     142                copy_T( result, ret_val );
     143                unlock( lock );
     144                return [ret_val, true];
     145            }
     146            unlock( lock );
     147           
     148            return [ret_val, false];
     149        }
     150
     151        bool register_select( future(T) & this, select_node & s ) with(this) {
     152            lock( lock );
     153
     154            // check if we can complete operation. If so race to establish winner in special OR case
     155            if ( !s.park_counter && state != FUTURE_EMPTY ) {
     156                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
     157                    unlock( lock );
     158                    return false;
     159                }
     160            }
     161
     162            // future not ready -> insert select node and return
     163            if( state == FUTURE_EMPTY ) {
     164                insert_last( waiters, s );
     165                unlock( lock );
     166                return false;
     167            }
     168
     169            __make_select_node_available( s );
     170            unlock( lock );
     171            return true;
     172        }
     173
     174        bool unregister_select( future(T) & this, select_node & s ) with(this) {
     175            if ( ! s`isListed ) return false;
     176            lock( lock );
     177            if ( s`isListed ) remove( s );
     178            unlock( lock );
     179            return false;
     180        }
     181               
     182        void on_selected( future(T) & this, select_node & node ) {}
     183        }
     184}
     185
     186//--------------------------------------------------------------------------------------------------------
     187// These futures below do not support select statements so they may not have as many features as 'future'
     188//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
     189//  since it uses raw atomics and no locks
     190//
     191// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
     192//  since it is monitor based and also is not compatible with select statements
     193//--------------------------------------------------------------------------------------------------------
     194
     195forall( T ) {
     196        struct single_future {
    23197                inline future_t;
    24198                T result;
     
    27201        static inline {
    28202                // Reset future back to original state
    29                 void reset(future(T) & this) { reset( (future_t&)this ); }
     203                void reset(single_future(T) & this) { reset( (future_t&)this ); }
    30204
    31205                // check if the future is available
    32                 bool available( future(T) & this ) { return available( (future_t&)this ); }
     206                bool available( single_future(T) & this ) { return available( (future_t&)this ); }
    33207
    34208                // Mark the future as abandoned, meaning it will be deleted by the server
    35209                // This doesn't work beause of the potential need for a destructor
    36                 void abandon( future(T) & this );
     210                void abandon( single_future(T) & this );
    37211
    38212                // Fulfil the future, returns whether or not someone was unblocked
    39                 thread$ * fulfil( future(T) & this, T result ) {
     213                thread$ * fulfil( single_future(T) & this, T result ) {
    40214                        this.result = result;
    41215                        return fulfil( (future_t&)this );
     
    44218                // Wait for the future to be fulfilled
    45219                // Also return whether the thread had to block or not
    46                 [T, bool] wait( future(T) & this ) {
     220                [T, bool] wait( single_future(T) & this ) {
    47221                        bool r = wait( (future_t&)this );
    48222                        return [this.result, r];
     
    50224
    51225                // Wait for the future to be fulfilled
    52                 T wait( future(T) & this ) {
     226                T wait( single_future(T) & this ) {
    53227                        [T, bool] tt;
    54228                        tt = wait(this);
  • libcfa/src/concurrency/invoke.h

    r34b4268 r24d6572  
    1010// Created On       : Tue Jan 17 12:27:26 2016
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Nov 29 20:42:21 2022
    13 // Update Count     : 56
    14 //
     12// Last Modified On : Tue Mar 14 13:39:31 2023
     13// Update Count     : 59
     14//
     15
     16// No not use #pragma once was this file is included twice in some places. It has its own guard system.
    1517
    1618#include "bits/containers.hfa"
     
    215217                struct __thread_user_link cltr_link;
    216218
    217                 // used to store state between clh lock/unlock
    218                 volatile bool * clh_prev;
    219 
    220                 // used to point to this thd's current clh node
    221                 volatile bool * clh_node;
    222 
    223219                struct processor * last_proc;
     220
     221        // ptr used during handover between blocking lists to allow for stack allocation of intrusive nodes
     222        // main use case is wait-morphing to allow a different node to be used to block on condvar vs lock
     223        void * link_node;
    224224
    225225                PRNG_STATE_T random_state;                                              // fast random numbers
  • libcfa/src/concurrency/io.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#if defined(__CFA_DEBUG__)
     
    8584        static io_context$ * __ioarbiter_allocate( io_arbiter$ & this, __u32 idxs[], __u32 want );
    8685        static void __ioarbiter_submit( io_context$ * , __u32 idxs[], __u32 have, bool lazy );
    87         static void __ioarbiter_flush ( io_context$ & );
     86        static void __ioarbiter_flush ( io_context$ &, bool kernel );
    8887        static inline void __ioarbiter_notify( io_context$ & ctx );
    8988//=============================================================================================
     
    9493        extern void __kernel_unpark( thread$ * thrd, unpark_hint );
    9594
     95        static inline void __post(oneshot & this, bool kernel, unpark_hint hint) {
     96                thread$ * t = post( this, false );
     97                if(kernel) __kernel_unpark( t, hint );
     98                else unpark( t, hint );
     99        }
     100
     101        // actual system call of io uring
     102        // wrap so everything that needs to happen around it is always done
     103        //   i.e., stats, book keeping, sqe reclamation, etc.
    96104        static void ioring_syscsll( struct io_context$ & ctx, unsigned int min_comp, unsigned int flags ) {
    97105                __STATS__( true, io.calls.flush++; )
    98106                int ret;
    99107                for() {
     108                        // do the system call in a loop, repeat on interrupts
    100109                        ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, min_comp, flags, (sigset_t *)0p, _NSIG / 8);
    101110                        if( ret < 0 ) {
     
    120129                /* paranoid */ verify( ctx.sq.to_submit >= ret );
    121130
    122                 ctx.sq.to_submit -= ret;
     131                // keep track of how many still need submitting
     132                __atomic_fetch_sub(&ctx.sq.to_submit, ret, __ATOMIC_SEQ_CST);
    123133
    124134                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     
    129139                /* paranoid */ verify( ! __preemption_enabled() );
    130140
     141                // mark that there is no pending io left
    131142                __atomic_store_n(&ctx.proc->io.pending, false, __ATOMIC_RELAXED);
    132143        }
    133144
     145        // try to acquire an io context for draining, helping means we never *need* to drain, we can always do it later
    134146        static bool try_acquire( io_context$ * ctx ) __attribute__((nonnull(1))) {
    135147                /* paranoid */ verify( ! __preemption_enabled() );
     
    138150
    139151                {
     152                        // if there is nothing to drain there is no point in acquiring anything
    140153                        const __u32 head = *ctx->cq.head;
    141154                        const __u32 tail = *ctx->cq.tail;
     
    144157                }
    145158
    146                 // Drain the queue
    147                 if(!__atomic_try_acquire(&ctx->cq.lock)) {
     159                // try a simple spinlock acquire, it's likely there are completions to drain
     160                if(!__atomic_try_acquire(&ctx->cq.try_lock)) {
     161                        // some other processor already has it
    148162                        __STATS__( false, io.calls.locked++; )
    149163                        return false;
    150164                }
    151165
     166                // acquired!!
    152167                return true;
    153168        }
    154169
     170        // actually drain the completion
    155171        static bool __cfa_do_drain( io_context$ * ctx, cluster * cltr ) __attribute__((nonnull(1, 2))) {
    156172                /* paranoid */ verify( ! __preemption_enabled() );
    157173                /* paranoid */ verify( ready_schedule_islocked() );
    158                 /* paranoid */ verify( ctx->cq.lock == true );
    159 
     174                /* paranoid */ verify( ctx->cq.try_lock == true );
     175
     176                // get all the invariants and initial state
    160177                const __u32 mask = *ctx->cq.mask;
    161178                const __u32 num  = *ctx->cq.num;
     
    166183                for() {
    167184                        // re-read the head and tail in case it already changed.
     185                        // count the difference between the two
    168186                        const __u32 head = *ctx->cq.head;
    169187                        const __u32 tail = *ctx->cq.tail;
     
    171189                        __STATS__( false, io.calls.drain++; io.calls.completed += count; )
    172190
     191                        // for everything between head and tail, drain it
    173192                        for(i; count) {
    174193                                unsigned idx = (head + i) & mask;
     
    177196                                /* paranoid */ verify(&cqe);
    178197
     198                                // find the future in the completion
    179199                                struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    180200                                // __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    181201
     202                                // don't directly fulfill the future, preemption is disabled so we need to use kernel_unpark
    182203                                __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL );
    183204                        }
    184205
     206                        // update the timestamps accordingly
     207                        // keep a local copy so we can update the relaxed copy
    185208                        ts_next = ctx->cq.ts = rdtscl();
    186209
     
    190213                        ctx->proc->idle_wctx.drain_time = ts_next;
    191214
     215                        // we finished draining the completions... unless the ring buffer was full and there are more secret completions in the kernel.
    192216                        if(likely(count < num)) break;
    193217
     218                        // the ring buffer was full, there could be more stuff in the kernel.
    194219                        ioring_syscsll( *ctx, 0, IORING_ENTER_GETEVENTS);
    195220                }
     
    199224                /* paranoid */ verify( ! __preemption_enabled() );
    200225
    201                 __atomic_unlock(&ctx->cq.lock);
    202 
     226                // everything is drained, we can release the lock
     227                __atomic_unlock(&ctx->cq.try_lock);
     228
     229                // update the relaxed timestamp
    203230                touch_tsc( cltr->sched.io.tscs, ctx->cq.id, ts_prev, ts_next, false );
    204231
     
    206233        }
    207234
     235        // call from a processor to flush
     236        // contains all the bookkeeping a proc must do, not just the barebones flushing logic
     237        void __cfa_do_flush( io_context$ & ctx, bool kernel ) {
     238                /* paranoid */ verify( ! __preemption_enabled() );
     239
     240                // flush any external requests
     241                ctx.sq.last_external = false; // clear the external bit, the arbiter will reset it if needed
     242                __ioarbiter_flush( ctx, kernel );
     243
     244                // if submitting must be submitted, do the system call
     245                if(ctx.sq.to_submit != 0) {
     246                        ioring_syscsll(ctx, 0, 0);
     247                }
     248        }
     249
     250        // call from a processor to drain
     251        // contains all the bookkeeping a proc must do, not just the barebones draining logic
    208252        bool __cfa_io_drain( struct processor * proc ) {
    209253                bool local = false;
    210254                bool remote = false;
    211255
     256                // make sure no ones creates/destroys io contexts
    212257                ready_schedule_lock();
    213258
     
    217262                /* paranoid */ verify( ctx );
    218263
     264                // Help if needed
    219265                with(cltr->sched) {
    220266                        const size_t ctxs_count = io.count;
     
    230276                        const unsigned long long ctsc = rdtscl();
    231277
     278                        // only help once every other time
     279                        // pick a target when not helping
    232280                        if(proc->io.target == UINT_MAX) {
    233281                                uint64_t chaos = __tls_rand();
     282                                // choose who to help and whether to accept helping far processors
    234283                                unsigned ext = chaos & 0xff;
    235284                                unsigned other  = (chaos >> 8) % (ctxs_count);
    236285
     286                                // if the processor is on the same cache line or is lucky ( 3 out of 256 odds ) help it
    237287                                if(ext < 3 || __atomic_load_n(&caches[other / __shard_factor.io].id, __ATOMIC_RELAXED) == this_cache) {
    238288                                        proc->io.target = other;
     
    240290                        }
    241291                        else {
     292                                // a target was picked last time, help it
    242293                                const unsigned target = proc->io.target;
    243294                                /* paranoid */ verify( io.tscs[target].t.tv != ULLONG_MAX );
     295                                // make sure the target hasn't stopped existing since last time
    244296                                HELP: if(target < ctxs_count) {
     297                                        // calculate it's age and how young it could be before we give up on helping
    245298                                        const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false);
    246299                                        const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false);
    247300                                        __cfadbg_print_safe(io, "Kernel I/O: Help attempt on %u from %u, age %'llu vs cutoff %'llu, %s\n", target, ctx->cq.id, age, cutoff, age > cutoff ? "yes" : "no");
     301                                        // is the target older than the cutoff, recall 0 is oldest and bigger ints are younger
    248302                                        if(age <= cutoff) break HELP;
    249303
    250                                         if(!try_acquire(io.data[target])) break HELP;
    251 
     304                                        // attempt to help the submission side
     305                                        __cfa_do_flush( *io.data[target], true );
     306
     307                                        // attempt to help the completion side
     308                                        if(!try_acquire(io.data[target])) break HELP; // already acquire no help needed
     309
     310                                        // actually help
    252311                                        if(!__cfa_do_drain( io.data[target], cltr )) break HELP;
    253312
     313                                        // track we did help someone
    254314                                        remote = true;
    255315                                        __STATS__( true, io.calls.helped++; )
    256316                                }
     317
     318                                // reset the target
    257319                                proc->io.target = UINT_MAX;
    258320                        }
    259321                }
    260 
    261322
    262323                // Drain the local queue
     
    270331
    271332                ready_schedule_unlock();
     333
     334                // return true if some completion entry, local or remote, was drained
    272335                return local || remote;
    273336        }
    274337
     338
     339
     340        // call from a processor to flush
     341        // contains all the bookkeeping a proc must do, not just the barebones flushing logic
    275342        bool __cfa_io_flush( struct processor * proc ) {
    276343                /* paranoid */ verify( ! __preemption_enabled() );
     
    278345                /* paranoid */ verify( proc->io.ctx );
    279346
    280                 io_context$ & ctx = *proc->io.ctx;
    281 
    282                 __ioarbiter_flush( ctx );
    283 
    284                 if(ctx.sq.to_submit != 0) {
    285                         ioring_syscsll(ctx, 0, 0);
    286 
    287                 }
    288 
     347                __cfa_do_flush( *proc->io.ctx, false );
     348
     349                // also drain since some stuff will immediately complete
    289350                return __cfa_io_drain( proc );
    290351        }
     
    393454        //=============================================================================================
    394455        // submission
    395         static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have) {
     456        // barebones logic to submit a group of sqes
     457        static inline void __submit_only( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lock) {
     458                if(!lock)
     459                        lock( ctx->ext_sq.lock __cfaabi_dbg_ctx2 );
    396460                // We can proceed to the fast path
    397461                // Get the right objects
     
    408472                // Make the sqes visible to the submitter
    409473                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
    410                 sq.to_submit += have;
    411 
     474                __atomic_fetch_add(&sq.to_submit, have, __ATOMIC_SEQ_CST);
     475
     476                // set the bit to mark things need to be flushed
    412477                __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_RELAXED);
    413478                __atomic_store_n(&ctx->proc->io.dirty  , true, __ATOMIC_RELAXED);
    414         }
    415 
     479
     480                if(!lock)
     481                        unlock( ctx->ext_sq.lock );
     482        }
     483
     484        // submission logic + maybe flushing
    416485        static inline void __submit( struct io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy) {
    417486                __sub_ring_t & sq = ctx->sq;
    418                 __submit_only(ctx, idxs, have);
     487                __submit_only(ctx, idxs, have, false);
    419488
    420489                if(sq.to_submit > 30) {
     
    428497        }
    429498
     499        // call from a processor to flush
     500        // might require arbitration if the thread was migrated after the allocation
    430501        void cfa_io_submit( struct io_context$ * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) libcfa_public {
    431502                // __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     
    441512                if( ctx == inctx )              // We have the right instance?
    442513                {
     514                        // yes! fast submit
    443515                        __submit(ctx, idxs, have, lazy);
    444516
     
    507579                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
    508580
     581                // notify the allocator that new allocations can be made
    509582                __ioarbiter_notify(ctx);
    510583
     
    557630        }
    558631
     632        // notify the arbiter that new allocations are available
    559633        static void __ioarbiter_notify( io_arbiter$ & this, io_context$ * ctx ) {
    560634                /* paranoid */ verify( !empty(this.pending.queue) );
    561 
     635                /* paranoid */ verify( __preemption_enabled() );
     636
     637                // mutual exclusion is needed
    562638                lock( this.pending.lock __cfaabi_dbg_ctx2 );
    563639                {
     640                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     641
     642                        // as long as there are pending allocations try to satisfy them
     643                        // for simplicity do it in FIFO order
    564644                        while( !empty(this.pending.queue) ) {
    565                                 __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     645                                // get first pending allocs
    566646                                __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
    567647                                __pending_alloc & pa = (__pending_alloc&)head( this.pending.queue );
    568648
     649                                // check if we have enough to satisfy the request
    569650                                if( have > pa.want ) goto DONE;
     651
     652                                // if there are enough allocations it means we can drop the request
    570653                                drop( this.pending.queue );
    571654
    572655                                /* paranoid */__attribute__((unused)) bool ret =
    573656
     657                                // actually do the alloc
    574658                                __alloc(ctx, pa.idxs, pa.want);
    575659
    576660                                /* paranoid */ verify( ret );
    577661
     662                                // write out which context statisfied the request and post
     663                                // this
    578664                                pa.ctx = ctx;
    579 
    580665                                post( pa.waitctx );
    581666                        }
     
    585670                }
    586671                unlock( this.pending.lock );
    587         }
    588 
     672
     673                /* paranoid */ verify( __preemption_enabled() );
     674        }
     675
     676        // short hand to avoid the mutual exclusion of the pending is empty regardless
    589677        static void __ioarbiter_notify( io_context$ & ctx ) {
    590                 if(!empty( ctx.arbiter->pending )) {
    591                         __ioarbiter_notify( *ctx.arbiter, &ctx );
    592                 }
    593         }
    594 
    595         // Simply append to the pending
     678                if(empty( ctx.arbiter->pending )) return;
     679                __ioarbiter_notify( *ctx.arbiter, &ctx );
     680        }
     681
     682        // Submit from outside the local processor: append to the outstanding list
    596683        static void __ioarbiter_submit( io_context$ * ctx, __u32 idxs[], __u32 have, bool lazy ) {
    597684                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     
    599686                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
    600687
     688                // create the intrusive object to append
    601689                __external_io ei;
    602690                ei.idxs = idxs;
     
    604692                ei.lazy = lazy;
    605693
     694                // enqueue the io
    606695                bool we = enqueue(ctx->ext_sq, (__outstanding_io&)ei);
    607696
     697                // mark pending
    608698                __atomic_store_n(&ctx->proc->io.pending, true, __ATOMIC_SEQ_CST);
    609699
     700                // if this is the first to be enqueued, signal the processor in an attempt to speed up flushing
     701                // if it's not the first enqueue, a signal is already in transit
    610702                if( we ) {
    611703                        sigval_t value = { PREEMPT_IO };
    612704                        __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value);
    613                 }
    614 
     705                        __STATS__( false, io.flush.signal += 1; )
     706                }
     707                __STATS__( false, io.submit.extr += 1; )
     708
     709                // to avoid dynamic allocation/memory reclamation headaches, wait for it to have been submitted
    615710                wait( ei.waitctx );
    616711
     
    618713        }
    619714
    620         static void __ioarbiter_flush( io_context$ & ctx ) {
    621                 if(!empty( ctx.ext_sq )) {
    622                         __STATS__( false, io.flush.external += 1; )
    623 
    624                         __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
    625 
    626                         lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
    627                         {
    628                                 while( !empty(ctx.ext_sq.queue) ) {
    629                                         __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
    630 
    631                                         __submit_only(&ctx, ei.idxs, ei.have);
    632 
    633                                         post( ei.waitctx );
    634                                 }
    635 
    636                                 ctx.ext_sq.empty = true;
     715        // flush the io arbiter: move all external io operations to the submission ring
     716        static void __ioarbiter_flush( io_context$ & ctx, bool kernel ) {
     717                // if there are no external operations just return
     718                if(empty( ctx.ext_sq )) return;
     719
     720                // stats and logs
     721                __STATS__( false, io.flush.external += 1; )
     722                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     723
     724                // this can happen from multiple processors, mutual exclusion is needed
     725                lock( ctx.ext_sq.lock __cfaabi_dbg_ctx2 );
     726                {
     727                        // pop each operation one at a time.
     728                        // There is no wait morphing because of the io sq ring
     729                        while( !empty(ctx.ext_sq.queue) ) {
     730                                // drop the element from the queue
     731                                __external_io & ei = (__external_io&)drop( ctx.ext_sq.queue );
     732
     733                                // submit it
     734                                __submit_only(&ctx, ei.idxs, ei.have, true);
     735
     736                                // wake the thread that was waiting on it
     737                                // since this can both be called from kernel and user, check the flag before posting
     738                                __post( ei.waitctx, kernel, UNPARK_LOCAL );
    637739                        }
    638                         unlock(ctx.ext_sq.lock );
     740
     741                        // mark the queue as empty
     742                        ctx.ext_sq.empty = true;
     743                        ctx.sq.last_external = true;
     744                }
     745                unlock(ctx.ext_sq.lock );
     746        }
     747
     748        extern "C" {
     749                // debug functions used for gdb
     750                // io_uring doesn't yet support gdb soe the kernel-shared data structures aren't viewable in gdb
     751                // these functions read the data that gdb can't and should be removed once the support is added
     752                static __u32 __cfagdb_cq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.head; }
     753                static __u32 __cfagdb_cq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.tail; }
     754                static __u32 __cfagdb_cq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->cq.mask; }
     755                static __u32 __cfagdb_sq_head( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.head; }
     756                static __u32 __cfagdb_sq_tail( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.kring.tail; }
     757                static __u32 __cfagdb_sq_mask( io_context$ * ctx ) __attribute__((nonnull(1),used,noinline)) { return *ctx->sq.mask; }
     758
     759                // fancier version that reads an sqe and copies it out.
     760                static struct io_uring_sqe __cfagdb_sq_at( io_context$ * ctx, __u32 at ) __attribute__((nonnull(1),used,noinline)) {
     761                        __u32 ax = at & *ctx->sq.mask;
     762                        __u32 ix = ctx->sq.kring.array[ax];
     763                        return ctx->sq.sqes[ix];
    639764                }
    640765        }
  • libcfa/src/concurrency/io/call.cfa.in

    r34b4268 r24d6572  
    3131Prelude = """#define __cforall_thread__
    3232
     33#include <unistd.h>
     34#include <errno.h>
     35#include <sys/socket.h>
     36#include <time.hfa>
     37
    3338#include "bits/defs.hfa"
    3439#include "kernel.hfa"
     
    4348        #include <assert.h>
    4449        #include <stdint.h>
    45         #include <errno.h>
    4650        #include <linux/io_uring.h>
    47 
    4851        #include "kernel/fwd.hfa"
    4952
     
    8285// I/O Forwards
    8386//=============================================================================================
    84 #include <time.hfa>
    85 
    86 // Some forward declarations
    87 #include <errno.h>
    88 #include <unistd.h>
    8987
    9088extern "C" {
    91         #include <asm/types.h>
    92         #include <sys/socket.h>
    93         #include <sys/syscall.h>
    94 
    9589#if defined(CFA_HAVE_PREADV2)
    9690        struct iovec;
    97         extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     91        extern ssize_t preadv2 (int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags);
    9892#endif
    9993#if defined(CFA_HAVE_PWRITEV2)
    10094        struct iovec;
    101         extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     95        extern ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags);
    10296#endif
    10397
     
    114108        struct msghdr;
    115109        struct sockaddr;
    116         extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
    117         extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
    118         extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
    119         extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
    120         extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);
    121         extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
     110        extern ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags);
     111        extern ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags);
     112        extern ssize_t send(int sockfd, const void * buf, size_t len, int flags);
     113        extern ssize_t recv(int sockfd, void * buf, size_t len, int flags);
    122114
    123115        extern int fallocate(int fd, int mode, off_t offset, off_t len);
    124116        extern int posix_fadvise(int fd, off_t offset, off_t len, int advice);
    125         extern int madvise(void *addr, size_t length, int advice);
    126 
    127         extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
     117        extern int madvise(void * addr, size_t length, int advice);
     118
     119        extern int openat(int dirfd, const char * pathname, int flags, mode_t mode);
    128120        extern int close(int fd);
    129121
    130         extern ssize_t read (int fd, void *buf, size_t count);
     122        extern ssize_t read (int fd, void * buf, size_t count);
    131123
    132124        struct epoll_event;
    133         extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    134 
    135         extern ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags);
     125        extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event);
     126
     127        extern ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags);
    136128        extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags);
    137129}
     
    232224calls = [
    233225        # CFA_HAVE_IORING_OP_READV
    234         Call('READV', 'ssize_t preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags)', {
     226        Call('READV', 'ssize_t preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', {
    235227                'fd'  : 'fd',
     228                'addr': '(typeof(sqe->addr))iov',
     229                'len' : 'iovcnt',
    236230                'off' : 'offset',
    237                 'addr': '(uintptr_t)iov',
    238                 'len' : 'iovcnt',
     231                'rw_flags' : 'flags'
    239232        }, define = 'CFA_HAVE_PREADV2'),
    240233        # CFA_HAVE_IORING_OP_WRITEV
    241         Call('WRITEV', 'ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags)', {
     234        Call('WRITEV', 'ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', {
    242235                'fd'  : 'fd',
     236                'addr': '(typeof(sqe->addr))iov',
     237                'len' : 'iovcnt',
    243238                'off' : 'offset',
    244                 'addr': '(uintptr_t)iov',
    245                 'len' : 'iovcnt'
     239                'rw_flags' : 'flags'
    246240        }, define = 'CFA_HAVE_PWRITEV2'),
    247241        # CFA_HAVE_IORING_OP_FSYNC
     
    250244        }),
    251245        # CFA_HAVE_IORING_OP_EPOLL_CTL
    252         Call('EPOLL_CTL', 'int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)', {
     246        Call('EPOLL_CTL', 'int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event)', {
    253247                'fd': 'epfd',
     248                'len': 'op',
    254249                'addr': 'fd',
    255                 'len': 'op',
    256                 'off': '(uintptr_t)event'
     250                'off': '(typeof(sqe->off))event'
    257251        }),
    258252        # CFA_HAVE_IORING_OP_SYNC_FILE_RANGE
     
    264258        }),
    265259        # CFA_HAVE_IORING_OP_SENDMSG
    266         Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)', {
    267                 'fd': 'sockfd',
    268                 'addr': '(uintptr_t)(struct msghdr *)msg',
     260        Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags)', {
     261                'fd': 'sockfd',
     262                'addr': '(typeof(sqe->addr))(struct msghdr *)msg',
    269263                'len': '1',
    270264                'msg_flags': 'flags'
    271265        }),
    272266        # CFA_HAVE_IORING_OP_RECVMSG
    273         Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)', {
    274                 'fd': 'sockfd',
    275                 'addr': '(uintptr_t)(struct msghdr *)msg',
     267        Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags)', {
     268                'fd': 'sockfd',
     269                'addr': '(typeof(sqe->addr))(struct msghdr *)msg',
    276270                'len': '1',
    277271                'msg_flags': 'flags'
    278272        }),
    279273        # CFA_HAVE_IORING_OP_SEND
    280         Call('SEND', 'ssize_t send(int sockfd, const void *buf, size_t len, int flags)', {
    281                 'fd': 'sockfd',
    282                 'addr': '(uintptr_t)buf',
     274        Call('SEND', 'ssize_t send(int sockfd, const void * buf, size_t len, int flags)', {
     275                'fd': 'sockfd',
     276                'addr': '(typeof(sqe->addr))buf',
    283277                'len': 'len',
    284278                'msg_flags': 'flags'
    285279        }),
    286280        # CFA_HAVE_IORING_OP_RECV
    287         Call('RECV', 'ssize_t recv(int sockfd, void *buf, size_t len, int flags)', {
    288                 'fd': 'sockfd',
    289                 'addr': '(uintptr_t)buf',
     281        Call('RECV', 'ssize_t recv(int sockfd, void * buf, size_t len, int flags)', {
     282                'fd': 'sockfd',
     283                'addr': '(typeof(sqe->addr))buf',
    290284                'len': 'len',
    291285                'msg_flags': 'flags'
    292286        }),
    293287        # CFA_HAVE_IORING_OP_ACCEPT
    294         Call('ACCEPT', 'int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)', {
    295                 'fd': 'sockfd',
    296                 'addr': '(uintptr_t)addr',
    297                 'addr2': '(uintptr_t)addrlen',
     288        Call('ACCEPT', 'int accept4(int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags)', {
     289                'fd': 'sockfd',
     290                'addr': '(typeof(sqe->addr))&addr',
     291                'addr2': '(typeof(sqe->addr2))addrlen',
    298292                'accept_flags': 'flags'
    299293        }),
    300294        # CFA_HAVE_IORING_OP_CONNECT
    301         Call('CONNECT', 'int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen)', {
    302                 'fd': 'sockfd',
    303                 'addr': '(uintptr_t)addr',
     295        Call('CONNECT', 'int connect(int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen)', {
     296                'fd': 'sockfd',
     297                'addr': '(typeof(sqe->addr))&addr',
    304298                'off': 'addrlen'
    305299        }),
     
    307301        Call('FALLOCATE', 'int fallocate(int fd, int mode, off_t offset, off_t len)', {
    308302                'fd': 'fd',
    309                 'addr': '(uintptr_t)len',
    310303                'len': 'mode',
    311                 'off': 'offset'
     304                'off': 'offset',
     305                'addr': 'len'
    312306        }),
    313307        # CFA_HAVE_IORING_OP_FADVISE
     
    319313        }),
    320314        # CFA_HAVE_IORING_OP_MADVISE
    321         Call('MADVISE', 'int madvise(void *addr, size_t length, int advice)', {
    322                 'addr': '(uintptr_t)addr',
     315        Call('MADVISE', 'int madvise(void * addr, size_t length, int advice)', {
     316                'addr': '(typeof(sqe->addr))addr',
    323317                'len': 'length',
    324318                'fadvise_advice': 'advice'
    325319        }),
    326320        # CFA_HAVE_IORING_OP_OPENAT
    327         Call('OPENAT', 'int openat(int dirfd, const char *pathname, int flags, mode_t mode)', {
     321        Call('OPENAT', 'int openat(int dirfd, const char * pathname, int flags, mode_t mode)', {
    328322                'fd': 'dirfd',
    329                 'addr': '(uintptr_t)pathname',
    330                 'len': 'mode',
    331                 'open_flags': 'flags;'
     323                'addr': '(typeof(sqe->addr))pathname',
     324                'open_flags': 'flags;',
     325                'len': 'mode'
    332326        }),
    333327        # CFA_HAVE_IORING_OP_OPENAT2
    334         Call('OPENAT2', 'int openat2(int dirfd, const char *pathname, struct open_how * how, size_t size)', {
     328        Call('OPENAT2', 'int openat2(int dirfd, const char * pathname, struct open_how * how, size_t size)', {
    335329                'fd': 'dirfd',
    336                 'addr': 'pathname',
    337                 'len': 'sizeof(*how)',
    338                 'off': '(uintptr_t)how',
     330                'addr': '(typeof(sqe->addr))pathname',
     331                'off': '(typeof(sqe->off))how',
     332                'len': 'sizeof(*how)'
    339333        }, define = 'CFA_HAVE_OPENAT2'),
    340334        # CFA_HAVE_IORING_OP_CLOSE
     
    343337        }),
    344338        # CFA_HAVE_IORING_OP_STATX
    345         Call('STATX', 'int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf)', {
     339        Call('STATX', 'int statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf)', {
    346340                'fd': 'dirfd',
    347                 'off': '(uintptr_t)statxbuf',
    348                 'addr': 'pathname',
     341                'addr': '(typeof(sqe->addr))pathname',
     342                'statx_flags': 'flags',
    349343                'len': 'mask',
    350                 'statx_flags': 'flags'
     344                'off': '(typeof(sqe->off))statxbuf'
    351345        }, define = 'CFA_HAVE_STATX'),
    352346        # CFA_HAVE_IORING_OP_READ
    353347        Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', {
    354348                'fd': 'fd',
    355                 'addr': '(uintptr_t)buf',
     349                'addr': '(typeof(sqe->addr))buf',
    356350                'len': 'count'
    357351        }),
     
    359353        Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', {
    360354                'fd': 'fd',
    361                 'addr': '(uintptr_t)buf',
     355                'addr': '(typeof(sqe->addr))buf',
    362356                'len': 'count'
    363357        }),
    364358        # CFA_HAVE_IORING_OP_SPLICE
    365         Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', {
     359        Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags)', {
    366360                'splice_fd_in': 'fd_in',
    367                 'splice_off_in': 'off_in ? (__u64)*off_in : (__u64)-1',
     361                'splice_off_in': 'off_in ? (typeof(sqe->splice_off_in))*off_in : (typeof(sqe->splice_off_in))-1',
    368362                'fd': 'fd_out',
    369                 'off': 'off_out ? (__u64)*off_out : (__u64)-1',
     363                'off': 'off_out ? (typeof(sqe->off))*off_out : (typeof(sqe->off))-1',
    370364                'len': 'len',
    371365                'splice_flags': 'flags'
  • libcfa/src/concurrency/io/setup.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#if defined(__CFA_DEBUG__)
     
    216215
    217216                // completion queue
    218                 cq.lock      = false;
     217                cq.try_lock  = false;
    219218                cq.id        = MAX;
    220219                cq.ts        = rdtscl();
  • libcfa/src/concurrency/io/types.hfa

    r34b4268 r24d6572  
    3737        //-----------------------------------------------------------------------
    3838        // Ring Data structure
    39       struct __sub_ring_t {
     39        // represent the io_uring submission ring which contains operations that will be sent to io_uring for processing
     40        struct __sub_ring_t {
     41                // lock needed because remote processors might need to flush the instance
     42                __spinlock_t lock;
     43
    4044                struct {
    4145                        // Head and tail of the ring (associated with array)
     
    5862
    5963                // number of sqes to submit on next system call.
    60                 __u32 to_submit;
     64                volatile __u32 to_submit;
    6165
    6266                // number of entries and mask to go with it
     
    7781                void * ring_ptr;
    7882                size_t ring_sz;
    79         };
    80 
     83
     84                // for debug purposes, whether or not the last flush was due to a arbiter flush
     85                bool last_external;
     86        };
     87
     88        // represent the io_uring completion ring which contains operations that have completed
    8189        struct __cmp_ring_t {
    82                 volatile bool lock;
    83 
     90                // needed because remote processors can help drain the buffer
     91                volatile bool try_lock;
     92
     93                // id of the ring, used for the helping/topology algorithms
    8494                unsigned id;
    8595
     96                // timestamp from last time it was drained
    8697                unsigned long long ts;
    8798
     
    105116        };
    106117
     118        // struct representing an io operation that still needs processing
     119        // actual operations are expected to inherit from this
    107120        struct __outstanding_io {
     121                // intrusive link fields
    108122                inline Colable;
     123
     124                // primitive on which to block until the io is processed
    109125                oneshot waitctx;
    110126        };
    111127        static inline __outstanding_io *& Next( __outstanding_io * n ) { return (__outstanding_io *)Next( (Colable *)n ); }
    112128
     129        // queue of operations that are outstanding
    113130        struct __outstanding_io_queue {
     131                // spinlock for protection
     132                // TODO: changing to a lock that blocks, I haven't examined whether it should be a kernel or user lock
    114133                __spinlock_t lock;
     134
     135                // the actual queue
    115136                Queue(__outstanding_io) queue;
     137
     138                // volatile used to avoid the need for taking the lock if it's empty
    116139                volatile bool empty;
    117140        };
    118141
     142        // struct representing an operation that was submitted
    119143        struct __external_io {
     144                // inherits from outstanding io
    120145                inline __outstanding_io;
     146
     147                // pointer and count to an array of ids to be submitted
    121148                __u32 * idxs;
    122149                __u32 have;
     150
     151                // whether or not these can be accumulated before flushing the buffer
    123152                bool lazy;
    124153        };
    125154
    126 
     155        // complete io_context, contains all the data for io submission and completion
    127156        struct __attribute__((aligned(64))) io_context$ {
     157                // arbiter, used in cases where threads for migrated at unfortunate moments
    128158                io_arbiter$ * arbiter;
     159
     160                // which prcessor the context is tied to
    129161                struct processor * proc;
    130162
     163                // queue of io submissions that haven't beeen processed.
    131164                __outstanding_io_queue ext_sq;
    132165
     166                // io_uring ring data structures
    133167                struct __sub_ring_t sq;
    134168                struct __cmp_ring_t cq;
     169
     170                // flag the io_uring rings where created with
    135171                __u32 ring_flags;
     172
     173                // file descriptor that identifies the io_uring instance
    136174                int fd;
    137175        };
    138176
     177        // short hand to check when the io_context was last processed (io drained)
    139178        static inline unsigned long long ts(io_context$ *& this) {
    140179                const __u32 head = *this->cq.head;
    141180                const __u32 tail = *this->cq.tail;
    142181
     182                // if there is no pending completions, just pretend it's infinetely recent
    143183                if(head == tail) return ULLONG_MAX;
    144184
     
    146186        }
    147187
     188        // structure represeting allocations that couldn't succeed locally
    148189        struct __pending_alloc {
     190                // inherit from outstanding io
    149191                inline __outstanding_io;
     192
     193                // array and size of the desired allocation
    150194                __u32 * idxs;
    151195                __u32 want;
     196
     197                // output param, the context the io was allocated from
    152198                io_context$ * ctx;
    153199        };
    154200
     201        // arbiter that handles cases where the context tied to the local processor is unable to satisfy the io
    155202        monitor __attribute__((aligned(64))) io_arbiter$ {
     203                // contains a queue of io for pending allocations
    156204                __outstanding_io_queue pending;
    157205        };
  • libcfa/src/concurrency/iofwd.hfa

    r34b4268 r24d6572  
    99// Author           : Thierry Delisle
    1010// Created On       : Thu Apr 23 17:31:00 2020
    11 // Last Modified By :
    12 // Last Modified On :
    13 // Update Count     :
     11// Last Modified By : Peter A. Buhr
     12// Last Modified On : Mon Mar 13 23:54:57 2023
     13// Update Count     : 1
    1414//
    1515
     
    1717
    1818#include <unistd.h>
     19#include <sys/socket.h>
     20
    1921extern "C" {
    2022        #include <asm/types.h>
     
    4850typedef __off64_t off64_t;
    4951
    50 struct cluster;
    51 struct io_context$;
    52 
    53 struct iovec;
    54 struct msghdr;
    55 struct sockaddr;
    56 struct statx;
    5752struct epoll_event;
    58 
    59 struct io_uring_sqe;
    6053
    6154//-----------------------------------------------------------------------
     
    8881// synchronous calls
    8982#if defined(CFA_HAVE_PREADV2)
    90         extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
     83        extern ssize_t cfa_preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    9184#endif
    9285#if defined(CFA_HAVE_PWRITEV2)
    93         extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
     86        extern ssize_t cfa_pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    9487#endif
    9588extern int cfa_fsync(int fd, __u64 submit_flags);
    96 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     89extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event * event, __u64 submit_flags);
    9790extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
    98 extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
    99 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
    100 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
    101 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
    102 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
    103 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     91extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr * msg, int flags, __u64 submit_flags);
     92extern ssize_t cfa_recvmsg(int sockfd, struct msghdr * msg, int flags, __u64 submit_flags);
     93extern ssize_t cfa_send(int sockfd, const void * buf, size_t len, int flags, __u64 submit_flags);
     94extern ssize_t cfa_recv(int sockfd, void * buf, size_t len, int flags, __u64 submit_flags);
     95extern int cfa_accept4(int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags, __u64 submit_flags);
     96extern int cfa_connect(int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen, __u64 submit_flags);
    10497extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
    10598extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
    106 extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags);
    107 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
     99extern int cfa_madvise(void * addr, size_t length, int advice, __u64 submit_flags);
     100extern int cfa_openat(int dirfd, const char * pathname, int flags, mode_t mode, __u64 submit_flags);
    108101#if defined(CFA_HAVE_OPENAT2)
    109         extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
     102        extern int cfa_openat2(int dirfd, const char * pathname, struct open_how * how, size_t size, __u64 submit_flags);
    110103#endif
    111104extern int cfa_close(int fd, __u64 submit_flags);
    112105#if defined(CFA_HAVE_STATX)
    113         extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
     106        extern int cfa_statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf, __u64 submit_flags);
    114107#endif
    115108extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags);
    116109extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags);
    117 extern ssize_t cfa_splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     110extern ssize_t cfa_splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags, __u64 submit_flags);
    118111extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    119112
     
    121114// asynchronous calls
    122115#if defined(CFA_HAVE_PREADV2)
    123         extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
     116        extern void async_preadv2(io_future_t & future, int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    124117#endif
    125118#if defined(CFA_HAVE_PWRITEV2)
    126         extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
     119        extern void async_pwritev2(io_future_t & future, int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    127120#endif
    128121extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags);
    129 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     122extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event * event, __u64 submit_flags);
    130123extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
    131 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
    132 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
    133 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
    134 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
    135 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
    136 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     124extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr * msg, int flags, __u64 submit_flags);
     125extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr * msg, int flags, __u64 submit_flags);
     126extern void async_send(io_future_t & future, int sockfd, const void * buf, size_t len, int flags, __u64 submit_flags);
     127extern void async_recv(io_future_t & future, int sockfd, void * buf, size_t len, int flags, __u64 submit_flags);
     128extern void async_accept4(io_future_t & future, int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags, __u64 submit_flags);
     129extern void async_connect(io_future_t & future, int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen, __u64 submit_flags);
    137130extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
    138131extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
    139 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags);
    140 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
     132extern void async_madvise(io_future_t & future, void * addr, size_t length, int advice, __u64 submit_flags);
     133extern void async_openat(io_future_t & future, int dirfd, const char * pathname, int flags, mode_t mode, __u64 submit_flags);
    141134#if defined(CFA_HAVE_OPENAT2)
    142         extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
     135        extern void async_openat2(io_future_t & future, int dirfd, const char * pathname, struct open_how * how, size_t size, __u64 submit_flags);
    143136#endif
    144137extern void async_close(io_future_t & future, int fd, __u64 submit_flags);
    145138#if defined(CFA_HAVE_STATX)
    146         extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
     139        extern void async_statx(io_future_t & future, int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf, __u64 submit_flags);
    147140#endif
    148141void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
    149142extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
    150 extern void async_splice(io_future_t & future, int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     143extern void async_splice(io_future_t & future, int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags, __u64 submit_flags);
    151144extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    152145
  • libcfa/src/concurrency/kernel.cfa

    r34b4268 r24d6572  
    1010// Created On       : Tue Jan 17 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Wed Nov 30 18:14:08 2022
    13 // Update Count     : 76
     12// Last Modified On : Mon Jan  9 08:42:05 2023
     13// Update Count     : 77
    1414//
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918// #define __CFA_DEBUG_PRINT_RUNTIME_CORE__
     
    258257                __cfadbg_print_safe(runtime_core, "Kernel : core %p stopping\n", this);
    259258        }
     259
     260        __cfa_io_flush( this );
     261        __cfa_io_drain( this );
    260262
    261263        post( this->terminated );
  • libcfa/src/concurrency/kernel/cluster.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#include "bits/defs.hfa"
     
    6968        return max_cores_l;
    7069}
    71 
    72 #if   defined(CFA_HAVE_LINUX_LIBRSEQ)
    73         // No forward declaration needed
    74         #define __kernel_rseq_register rseq_register_current_thread
    75         #define __kernel_rseq_unregister rseq_unregister_current_thread
    76 #elif defined(CFA_HAVE_LINUX_RSEQ_H)
    77         static void __kernel_raw_rseq_register  (void);
    78         static void __kernel_raw_rseq_unregister(void);
    79 
    80         #define __kernel_rseq_register __kernel_raw_rseq_register
    81         #define __kernel_rseq_unregister __kernel_raw_rseq_unregister
    82 #else
    83         // No forward declaration needed
    84         // No initialization needed
    85         static inline void noop(void) {}
    86 
    87         #define __kernel_rseq_register noop
    88         #define __kernel_rseq_unregister noop
    89 #endif
    9070
    9171//=======================================================================
     
    11191// Lock-Free registering/unregistering of threads
    11292unsigned register_proc_id( void ) with(__scheduler_lock.lock) {
    113         __kernel_rseq_register();
    114 
    11593        bool * handle = (bool *)&kernelTLS().sched_lock;
    11694
     
    162140
    163141        __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
    164 
    165         __kernel_rseq_unregister();
    166142}
    167143
     
    505481        /* paranoid */ verify( mock_head(this)    == this.l.prev );
    506482}
    507 
    508 #if   defined(CFA_HAVE_LINUX_LIBRSEQ)
    509         // No definition needed
    510 #elif defined(CFA_HAVE_LINUX_RSEQ_H)
    511 
    512         #if defined( __x86_64 ) || defined( __i386 )
    513                 #define RSEQ_SIG        0x53053053
    514         #elif defined( __ARM_ARCH )
    515                 #ifdef __ARMEB__
    516                 #define RSEQ_SIG    0xf3def5e7      /* udf    #24035    ; 0x5de3 (ARMv6+) */
    517                 #else
    518                 #define RSEQ_SIG    0xe7f5def3      /* udf    #24035    ; 0x5de3 */
    519                 #endif
    520         #endif
    521 
    522         extern void __disable_interrupts_hard();
    523         extern void __enable_interrupts_hard();
    524 
    525         static void __kernel_raw_rseq_register  (void) {
    526                 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
    527 
    528                 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
    529                 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
    530                 if(ret != 0) {
    531                         int e = errno;
    532                         switch(e) {
    533                         case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
    534                         case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
    535                         case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
    536                         case EBUSY : abort("KERNEL ERROR: rseq register already registered");
    537                         case EPERM : abort("KERNEL ERROR: rseq register sig  argument  on unregistration does not match the signature received on registration");
    538                         default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
    539                         }
    540                 }
    541         }
    542 
    543         static void __kernel_raw_rseq_unregister(void) {
    544                 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
    545 
    546                 // int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
    547                 int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
    548                 if(ret != 0) {
    549                         int e = errno;
    550                         switch(e) {
    551                         case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
    552                         case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
    553                         case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
    554                         case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
    555                         case EPERM : abort("KERNEL ERROR: rseq unregister sig  argument  on unregistration does not match the signature received on registration");
    556                         default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
    557                         }
    558                 }
    559         }
    560 #else
    561         // No definition needed
    562 #endif
  • libcfa/src/concurrency/kernel/cluster.hfa

    r34b4268 r24d6572  
    4040
    4141// convert to log2 scale but using double
    42 static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2(intsc); }
     42static inline __readyQ_avg_t __to_readyQ_avg(unsigned long long intsc) { if(unlikely(0 == intsc)) return 0.0; else return log2((__readyQ_avg_t)intsc); }
    4343
    4444#define warn_large_before warnf( !strict || old_avg < 35.0, "Suspiciously large previous average: %'lf, %'" PRId64 "ms \n", old_avg, program()`ms )
     
    146146}
    147147
    148 static struct {
    149         const unsigned readyq;
    150         const unsigned io;
     148const static struct {
     149        unsigned readyq;
     150        unsigned io;
    151151} __shard_factor = { 2, 1 };
    152152
  • libcfa/src/concurrency/kernel/private.hfa

    r34b4268 r24d6572  
    1010// Created On       : Mon Feb 13 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Wed Aug 12 08:21:33 2020
    13 // Update Count     : 9
     12// Last Modified On : Thu Mar  2 16:04:46 2023
     13// Update Count     : 11
    1414//
    1515
     
    2929
    3030extern "C" {
    31 #if   defined(CFA_HAVE_LINUX_LIBRSEQ)
    32         #include <rseq/rseq.h>
    33 #elif defined(CFA_HAVE_LINUX_RSEQ_H)
    34         #include <linux/rseq.h>
    35 #else
    36         #ifndef _GNU_SOURCE
    37         #error kernel/private requires gnu_source
    38         #endif
    3931        #include <sched.h>
    40 #endif
    4132}
    4233
     
    110101// Hardware
    111102
    112 #if   defined(CFA_HAVE_LINUX_LIBRSEQ)
    113         // No data needed
    114 #elif defined(CFA_HAVE_LINUX_RSEQ_H)
    115         extern "Cforall" {
    116                 extern __attribute__((aligned(64))) __thread volatile struct rseq __cfaabi_rseq;
    117         }
    118 #else
    119         // No data needed
    120 #endif
    121 
    122103static inline int __kernel_getcpu() {
    123104        /* paranoid */ verify( ! __preemption_enabled() );
    124 #if   defined(CFA_HAVE_LINUX_LIBRSEQ)
    125         return rseq_current_cpu();
    126 #elif defined(CFA_HAVE_LINUX_RSEQ_H)
    127         int r = __cfaabi_rseq.cpu_id;
    128         /* paranoid */ verify( r >= 0 );
    129         return r;
    130 #else
    131105        return sched_getcpu();
    132 #endif
    133106}
    134107
  • libcfa/src/concurrency/kernel/startup.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918// #define __CFA_DEBUG_PRINT_RUNTIME_CORE__
    2019
    2120// C Includes
    22 #include <errno.h>                                      // errno
     21#include <errno.h>                                                                              // errno
    2322#include <signal.h>
    24 #include <string.h>                                     // strerror
    25 #include <unistd.h>                                     // sysconf
    26 
     23#include <string.h>                                                                             // strerror
     24#include <unistd.h>
     25#include <limits.h>                                                                             // PTHREAD_STACK_MIN
    2726extern "C" {
    28         #include <limits.h>                             // PTHREAD_STACK_MIN
    29         #include <unistd.h>                             // syscall
    30         #include <sys/eventfd.h>                        // eventfd
    31         #include <sys/mman.h>                           // mprotect
    32         #include <sys/resource.h>                       // getrlimit
     27        #include <sys/eventfd.h>                                                        // eventfd
     28        #include <sys/mman.h>                                                           // mprotect
     29        #include <sys/resource.h>                                                       // getrlimit
    3330}
    3431
     
    3633#include "kernel/private.hfa"
    3734#include "iofwd.hfa"
    38 #include "startup.hfa"                                  // STARTUP_PRIORITY_XXX
     35#include "startup.hfa"                                                                  // STARTUP_PRIORITY_XXX
    3936#include "limits.hfa"
    4037#include "math.hfa"
     
    150147__scheduler_RWLock_t __scheduler_lock @= { 0 };
    151148
    152 #if   defined(CFA_HAVE_LINUX_LIBRSEQ)
    153         // No data needed
    154 #elif defined(CFA_HAVE_LINUX_RSEQ_H)
    155         extern "Cforall" {
    156                 __attribute__((aligned(64))) __thread volatile struct rseq __cfaabi_rseq @= {
    157                         .cpu_id : RSEQ_CPU_ID_UNINITIALIZED,
    158                 };
    159         }
    160 #else
    161         // No data needed
    162 #endif
    163 
    164149//-----------------------------------------------------------------------------
    165150// Struct to steal stack
  • libcfa/src/concurrency/locks.cfa

    r34b4268 r24d6572  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // locks.hfa -- LIBCFATHREAD
     7// locks.cfa -- LIBCFATHREAD
    88// Runtime locks that used with the runtime thread system.
    99//
     
    1616
    1717#define __cforall_thread__
    18 #define _GNU_SOURCE
    1918
    2019#include "locks.hfa"
     
    8079        // lock is held by some other thread
    8180        if ( owner != 0p && owner != thrd ) {
    82                 insert_last( blocked_threads, *thrd );
     81        select_node node;
     82                insert_last( blocked_threads, node );
    8383                wait_count++;
    8484                unlock( lock );
    8585                park( );
    86         }
    87         // multi acquisition lock is held by current thread
    88         else if ( owner == thrd && multi_acquisition ) {
     86        return;
     87        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
    8988                recursion_count++;
    90                 unlock( lock );
    91         }
    92         // lock isn't held
    93         else {
     89        } else {  // lock isn't held
    9490                owner = thrd;
    9591                recursion_count = 1;
    96                 unlock( lock );
    97         }
     92        }
     93    unlock( lock );
    9894}
    9995
     
    118114}
    119115
    120 static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
    121         thread$ * t = &try_pop_front( blocked_threads );
    122         owner = t;
    123         recursion_count = ( t ? 1 : 0 );
    124         if ( t ) wait_count--;
    125         unpark( t );
     116static inline void pop_node( blocking_lock & this ) with( this ) {
     117    __handle_waituntil_OR( blocked_threads );
     118    select_node * node = &try_pop_front( blocked_threads );
     119    if ( node ) {
     120        wait_count--;
     121        owner = node->blocked_thread;
     122        recursion_count = 1;
     123        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     124        wake_one( blocked_threads, *node );
     125    } else {
     126        owner = 0p;
     127        recursion_count = 0;
     128    }
    126129}
    127130
     
    135138        recursion_count--;
    136139        if ( recursion_count == 0 ) {
    137                 pop_and_set_new_owner( this );
     140                pop_node( this );
    138141        }
    139142        unlock( lock );
     
    148151        // lock held
    149152        if ( owner != 0p ) {
    150                 insert_last( blocked_threads, *t );
     153                insert_last( blocked_threads, *(select_node *)t->link_node );
    151154                wait_count++;
    152                 unlock( lock );
    153155        }
    154156        // lock not held
     
    157159                recursion_count = 1;
    158160                unpark( t );
    159                 unlock( lock );
    160         }
    161 }
    162 
    163 size_t on_wait( blocking_lock & this ) with( this ) {
     161        }
     162    unlock( lock );
     163}
     164
     165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
    164166        lock( lock __cfaabi_dbg_ctx2 );
    165167        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    168170        size_t ret = recursion_count;
    169171
    170         pop_and_set_new_owner( this );
     172        pop_node( this );
     173
     174    select_node node;
     175    active_thread()->link_node = (void *)&node;
    171176        unlock( lock );
     177
     178    pre_park_then_park( pp_fn, pp_datum );
     179
    172180        return ret;
    173181}
     
    176184        recursion_count = recursion;
    177185}
     186
     187// waituntil() support
     188bool register_select( blocking_lock & this, select_node & node ) with(this) {
     189    lock( lock __cfaabi_dbg_ctx2 );
     190        thread$ * thrd = active_thread();
     191
     192        // single acquisition lock is held by current thread
     193        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
     194
     195    if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
     196        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     197           unlock( lock );
     198           return false;
     199        }
     200    }
     201
     202        // lock is held by some other thread
     203        if ( owner != 0p && owner != thrd ) {
     204                insert_last( blocked_threads, node );
     205                wait_count++;
     206                unlock( lock );
     207        return false;
     208        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
     209                recursion_count++;
     210        } else {  // lock isn't held
     211                owner = thrd;
     212                recursion_count = 1;
     213        }
     214
     215    if ( node.park_counter ) __make_select_node_available( node );
     216    unlock( lock );
     217    return true;
     218}
     219
     220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
     221    lock( lock __cfaabi_dbg_ctx2 );
     222    if ( node`isListed ) {
     223        remove( node );
     224        wait_count--;
     225        unlock( lock );
     226        return false;
     227    }
     228   
     229    if ( owner == active_thread() ) {
     230        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
     231        // if recursion count is zero release lock and set new owner if one is waiting
     232        recursion_count--;
     233        if ( recursion_count == 0 ) {
     234            pop_node( this );
     235        }
     236    }
     237        unlock( lock );
     238    return false;
     239}
     240
     241void on_selected( blocking_lock & this, select_node & node ) {}
    178242
    179243//-----------------------------------------------------------------------------
     
    312376        int counter( condition_variable(L) & this ) with(this) { return count; }
    313377
    314         static size_t queue_and_get_recursion( condition_variable(L) & this, info_thread(L) * i ) with(this) {
     378        static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
    315379                // add info_thread to waiting queue
    316380                insert_last( blocked_threads, *i );
    317381                count++;
    318                 size_t recursion_count = 0;
    319                 if (i->lock) {
    320                         // if lock was passed get recursion count to reset to after waking thread
    321                         recursion_count = on_wait( *i->lock );
    322                 }
    323                 return recursion_count;
    324         }
     382        }
     383
     384    static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
     385        size_t recursion_count = 0;
     386                if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
     387                        recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
     388                else
     389            pre_park_then_park( pp_fn, pp_datum );
     390        return recursion_count;
     391    }
     392    static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
    325393
    326394        // helper for wait()'s' with no timeout
    327395        static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
    328396                lock( lock __cfaabi_dbg_ctx2 );
    329                 size_t recursion_count = queue_and_get_recursion(this, &i);
     397        enqueue_thread( this, &i );
    330398                unlock( lock );
    331399
    332400                // blocks here
    333                 park( );
     401        size_t recursion_count = block_and_get_recursion( i );
    334402
    335403                // resets recursion count here after waking
    336                 if (i.lock) on_wakeup(*i.lock, recursion_count);
     404                if ( i.lock ) on_wakeup( *i.lock, recursion_count );
    337405        }
    338406
     
    341409                queue_info_thread( this, i );
    342410
     411    static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
     412
    343413        // helper for wait()'s' with a timeout
    344414        static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    345415                lock( lock __cfaabi_dbg_ctx2 );
    346                 size_t recursion_count = queue_and_get_recursion(this, &info);
     416        enqueue_thread( this, &info );
    347417                alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    348418                unlock( lock );
    349419
    350                 // registers alarm outside cond lock to avoid deadlock
    351                 register_self( &node_wrap.alarm_node );
    352 
    353                 // blocks here
    354                 park();
     420                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     421        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     422                // park();
    355423
    356424                // unregisters alarm so it doesn't go off if this happens first
     
    358426
    359427                // resets recursion count here after waking
    360                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     428                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    361429        }
    362430
     
    418486                info_thread( L ) i = { active_thread(), info, &l };
    419487                insert_last( blocked_threads, i );
    420                 size_t recursion_count = on_wait( *i.lock );
    421                 park( );
     488                size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
     489                // park( );
    422490                on_wakeup(*i.lock, recursion_count);
    423491        }
     
    460528        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
    461529
    462         static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
    463                 // add info_thread to waiting queue
    464                 insert_last( blocked_threads, *i );
    465                 size_t recursion_count = 0;
    466                 recursion_count = on_wait( *i->lock );
    467                 return recursion_count;
    468         }
    469        
    470530        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
    471531                lock( lock __cfaabi_dbg_ctx2 );
    472                 size_t recursion_count = queue_and_get_recursion(this, &info);
     532        insert_last( blocked_threads, info );
    473533                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
    474534                unlock( lock );
    475535
    476                 // registers alarm outside cond lock to avoid deadlock
    477                 register_self( &node_wrap.alarm_node );
    478 
    479                 // blocks here
    480                 park();
    481 
    482                 // unregisters alarm so it doesn't go off if this happens first
     536                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
     537        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
     538
     539                // unregisters alarm so it doesn't go off if signal happens first
    483540                unregister_self( &node_wrap.alarm_node );
    484541
    485542                // resets recursion count here after waking
    486                 if (info.lock) on_wakeup(*info.lock, recursion_count);
     543                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
    487544        }
    488545
     
    494551                lock( lock __cfaabi_dbg_ctx2 );
    495552                info_thread( L ) i = { active_thread(), info, &l };
    496                 size_t recursion_count = queue_and_get_recursion(this, &i);
    497                 unlock( lock );
    498                 park( );
    499                 on_wakeup(*i.lock, recursion_count);
     553        insert_last( blocked_threads, i );
     554                unlock( lock );
     555
     556        // blocks here
     557                size_t recursion_count = block_and_get_recursion( i );
     558
     559                on_wakeup( *i.lock, recursion_count );
    500560        }
    501561
     
    585645        return thrd != 0p;
    586646}
     647
  • libcfa/src/concurrency/locks.hfa

    r34b4268 r24d6572  
    3030#include "time.hfa"
    3131
     32#include "select.hfa"
     33
    3234#include <fstream.hfa>
    33 
    3435
    3536// futex headers
     
    3839#include <unistd.h>
    3940
    40 // undef to make a number of the locks not reacquire upon waking from a condlock
    41 #define REACQ 1
     41typedef void (*__cfa_pre_park)( void * );
     42
     43static inline void pre_park_noop( void * ) {}
     44
     45//-----------------------------------------------------------------------------
     46// is_blocking_lock
     47forall( L & | sized(L) )
     48trait is_blocking_lock {
     49        // For synchronization locks to use when acquiring
     50        void on_notify( L &, struct thread$ * );
     51
     52        // For synchronization locks to use when releasing
     53        size_t on_wait( L &, __cfa_pre_park pp_fn, void * pp_datum );
     54
     55        // to set recursion count after getting signalled;
     56        void on_wakeup( L &, size_t recursion );
     57};
     58
     59static inline void pre_park_then_park( __cfa_pre_park pp_fn, void * pp_datum ) {
     60    pp_fn( pp_datum );
     61    park();
     62}
     63
     64// macros for default routine impls for is_blocking_lock trait that do not wait-morph
     65
     66#define DEFAULT_ON_NOTIFY( lock_type ) \
     67    static inline void on_notify( lock_type & this, thread$ * t ){ unpark(t); }
     68
     69#define DEFAULT_ON_WAIT( lock_type ) \
     70    static inline size_t on_wait( lock_type & this, __cfa_pre_park pp_fn, void * pp_datum ) { \
     71        unlock( this ); \
     72        pre_park_then_park( pp_fn, pp_datum ); \
     73        return 0; \
     74    }
     75
     76// on_wakeup impl if lock should be reacquired after waking up
     77#define DEFAULT_ON_WAKEUP_REACQ( lock_type ) \
     78    static inline void on_wakeup( lock_type & this, size_t recursion ) { lock( this ); }
     79
     80// on_wakeup impl if lock will not be reacquired after waking up
     81#define DEFAULT_ON_WAKEUP_NO_REACQ( lock_type ) \
     82    static inline void on_wakeup( lock_type & this, size_t recursion ) {}
     83
     84
    4285
    4386//-----------------------------------------------------------------------------
     
    66109static inline bool   try_lock ( single_acquisition_lock & this ) { return try_lock( (blocking_lock &)this ); }
    67110static inline void   unlock   ( single_acquisition_lock & this ) { unlock  ( (blocking_lock &)this ); }
    68 static inline size_t on_wait  ( single_acquisition_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     111static inline size_t on_wait  ( single_acquisition_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    69112static inline void   on_wakeup( single_acquisition_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    70113static inline void   on_notify( single_acquisition_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     114static inline bool   register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     115static inline bool   unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     116static inline void   on_selected( single_acquisition_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); }
    71117
    72118//----------
     
    80126static inline bool   try_lock ( owner_lock & this ) { return try_lock( (blocking_lock &)this ); }
    81127static inline void   unlock   ( owner_lock & this ) { unlock  ( (blocking_lock &)this ); }
    82 static inline size_t on_wait  ( owner_lock & this ) { return on_wait ( (blocking_lock &)this ); }
     128static inline size_t on_wait  ( owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) { return on_wait ( (blocking_lock &)this, pp_fn, pp_datum ); }
    83129static inline void   on_wakeup( owner_lock & this, size_t v ) { on_wakeup ( (blocking_lock &)this, v ); }
    84130static inline void   on_notify( owner_lock & this, struct thread$ * t ) { on_notify( (blocking_lock &)this, t ); }
     131static inline bool   register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); }
     132static inline bool   unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); }
     133static inline void   on_selected( owner_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); }
    85134
    86135//-----------------------------------------------------------------------------
     
    127176static inline void ?{}(mcs_spin_node & this) { this.next = 0p; this.locked = true; }
    128177
    129 static inline mcs_spin_node * volatile & ?`next ( mcs_spin_node * node ) {
    130         return node->next;
    131 }
    132 
    133178struct mcs_spin_lock {
    134179        mcs_spin_queue queue;
     
    136181
    137182static inline void lock(mcs_spin_lock & l, mcs_spin_node & n) {
     183    n.locked = true;
    138184        mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST);
    139         n.locked = true;
    140         if(prev == 0p) return;
     185        if( prev == 0p ) return;
    141186        prev->next = &n;
    142         while(__atomic_load_n(&n.locked, __ATOMIC_RELAXED)) Pause();
     187        while( __atomic_load_n(&n.locked, __ATOMIC_RELAXED) ) Pause();
    143188}
    144189
     
    146191        mcs_spin_node * n_ptr = &n;
    147192        if (__atomic_compare_exchange_n(&l.queue.tail, &n_ptr, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) return;
    148         while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) {}
     193        while (__atomic_load_n(&n.next, __ATOMIC_RELAXED) == 0p) Pause();
    149194        n.next->locked = false;
    150195}
     
    153198// futex_mutex
    154199
    155 // - No cond var support
    156200// - Kernel thd blocking alternative to the spinlock
    157201// - No ownership (will deadlock on reacq)
     202// - no reacq on wakeup
    158203struct futex_mutex {
    159204        // lock state any state other than UNLOCKED is locked
     
    169214}
    170215
    171 static inline void  ?{}( futex_mutex & this ) with(this) { val = 0; }
    172 
    173 static inline bool internal_try_lock(futex_mutex & this, int & compare_val) with(this) {
     216static inline void ?{}( futex_mutex & this ) with(this) { val = 0; }
     217
     218static inline bool internal_try_lock( futex_mutex & this, int & compare_val) with(this) {
    174219        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
    175220}
    176221
    177 static inline int internal_exchange(futex_mutex & this) with(this) {
     222static inline int internal_exchange( futex_mutex & this ) with(this) {
    178223        return __atomic_exchange_n((int*)&val, 2, __ATOMIC_ACQUIRE);
    179224}
    180225
    181226// if this is called recursively IT WILL DEADLOCK!!!!!
    182 static inline void lock(futex_mutex & this) with(this) {
     227static inline void lock( futex_mutex & this ) with(this) {
    183228        int state;
    184229
    185        
    186         // // linear backoff omitted for now
    187         // for( int spin = 4; spin < 1024; spin += spin) {
    188         //      state = 0;
    189         //      // if unlocked, lock and return
    190         //      if (internal_try_lock(this, state)) return;
    191         //      if (2 == state) break;
    192         //      for (int i = 0; i < spin; i++) Pause();
    193         // }
    194 
    195         // no contention try to acquire
    196         if (internal_try_lock(this, state)) return;
     230        for( int spin = 4; spin < 1024; spin += spin) {
     231                state = 0;
     232                // if unlocked, lock and return
     233                if (internal_try_lock(this, state)) return;
     234                if (2 == state) break;
     235                for (int i = 0; i < spin; i++) Pause();
     236        }
    197237       
    198238        // if not in contended state, set to be in contended state
     
    207247
    208248static inline void unlock(futex_mutex & this) with(this) {
    209         // if uncontended do atomice unlock and then return
    210         if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel
     249        // if uncontended do atomic unlock and then return
     250    if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;
    211251       
    212252        // otherwise threads are blocked so we must wake one
    213         __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);
    214253        futex((int *)&val, FUTEX_WAKE, 1);
    215254}
    216255
    217 static inline void on_notify( futex_mutex & f, thread$ * t){ unpark(t); }
    218 static inline size_t on_wait( futex_mutex & f ) {unlock(f); return 0;}
    219 
    220 // to set recursion count after getting signalled;
    221 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {}
    222 
    223 //-----------------------------------------------------------------------------
    224 // CLH Spinlock
    225 // - No recursive acquisition
    226 // - Needs to be released by owner
    227 
    228 struct clh_lock {
    229         volatile bool * volatile tail;
    230 };
    231 
    232 static inline void  ?{}( clh_lock & this ) { this.tail = malloc(); *this.tail = true; }
    233 static inline void ^?{}( clh_lock & this ) { free(this.tail); }
    234 
    235 static inline void lock(clh_lock & l) {
    236         thread$ * curr_thd = active_thread();
    237         *(curr_thd->clh_node) = false;
    238         volatile bool * prev = __atomic_exchange_n((bool **)(&l.tail), (bool *)(curr_thd->clh_node), __ATOMIC_SEQ_CST);
    239         while(!__atomic_load_n(prev, __ATOMIC_ACQUIRE)) Pause();
    240         curr_thd->clh_prev = prev;
    241 }
    242 
    243 static inline void unlock(clh_lock & l) {
    244         thread$ * curr_thd = active_thread();
    245         __atomic_store_n(curr_thd->clh_node, true, __ATOMIC_RELEASE);
    246         curr_thd->clh_node = curr_thd->clh_prev;
    247 }
    248 
    249 static inline void on_notify(clh_lock & this, struct thread$ * t ) { unpark(t); }
    250 static inline size_t on_wait(clh_lock & this) { unlock(this); return 0; }
    251 static inline void on_wakeup(clh_lock & this, size_t recursion ) {
    252         #ifdef REACQ
    253         lock(this);
    254         #endif
    255 }
    256 
    257 
    258 //-----------------------------------------------------------------------------
    259 // Linear backoff Spinlock
    260 struct linear_backoff_then_block_lock {
     256DEFAULT_ON_NOTIFY( futex_mutex )
     257DEFAULT_ON_WAIT( futex_mutex )
     258DEFAULT_ON_WAKEUP_NO_REACQ( futex_mutex )
     259
     260//-----------------------------------------------------------------------------
     261// go_mutex
     262
     263// - Kernel thd blocking alternative to the spinlock
     264// - No ownership (will deadlock on reacq)
     265// - Golang's flavour of mutex
     266// - Impl taken from Golang: src/runtime/lock_futex.go
     267struct go_mutex {
     268        // lock state any state other than UNLOCKED is locked
     269        // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 };
     270       
     271        // stores a lock state
     272        int val;
     273};
     274static inline void  ?{}( go_mutex & this ) with(this) { val = 0; }
     275// static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted
     276// static inline void ?=?( go_mutex & this, go_mutex this2 ) = void;
     277
     278static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {
     279        return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);
     280}
     281
     282static inline int internal_exchange(go_mutex & this, int swap ) with(this) {
     283        return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE);
     284}
     285
     286// if this is called recursively IT WILL DEADLOCK!!!!!
     287static inline void lock( go_mutex & this ) with( this ) {
     288        int state, init_state;
     289
     290    // speculative grab
     291    state = internal_exchange(this, 1);
     292    if ( !state ) return; // state == 0
     293    init_state = state;
     294    for (;;) {
     295        for( int i = 0; i < 4; i++ ) {
     296            while( !val ) { // lock unlocked
     297                state = 0;
     298                if ( internal_try_lock( this, state, init_state ) ) return;
     299            }
     300            for (int i = 0; i < 30; i++) Pause();
     301        }
     302
     303        while( !val ) { // lock unlocked
     304            state = 0;
     305            if ( internal_try_lock( this, state, init_state ) ) return;
     306        }
     307        sched_yield();
     308       
     309        // if not in contended state, set to be in contended state
     310        state = internal_exchange( this, 2 );
     311        if ( !state ) return; // state == 0
     312        init_state = 2;
     313        futex( (int*)&val, FUTEX_WAIT, 2 ); // if val is not 2 this returns with EWOULDBLOCK
     314    }
     315}
     316
     317static inline void unlock( go_mutex & this ) with(this) {
     318        // if uncontended do atomic unlock and then return
     319    if ( __atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1 ) return;
     320       
     321        // otherwise threads are blocked so we must wake one
     322        futex( (int *)&val, FUTEX_WAKE, 1 );
     323}
     324
     325DEFAULT_ON_NOTIFY( go_mutex )
     326DEFAULT_ON_WAIT( go_mutex )
     327DEFAULT_ON_WAKEUP_NO_REACQ( go_mutex )
     328
     329//-----------------------------------------------------------------------------
     330// Exponential backoff then block lock
     331struct exp_backoff_then_block_lock {
    261332        // Spin lock used for mutual exclusion
    262333        __spinlock_t spinlock;
     
    269340};
    270341
    271 static inline void  ?{}( linear_backoff_then_block_lock & this ) {
     342static inline void  ?{}( exp_backoff_then_block_lock & this ) {
    272343        this.spinlock{};
    273344        this.blocked_threads{};
    274345        this.lock_value = 0;
    275346}
    276 static inline void ^?{}( linear_backoff_then_block_lock & this ) {}
    277 // static inline void ?{}( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    278 // static inline void ?=?( linear_backoff_then_block_lock & this, linear_backoff_then_block_lock this2 ) = void;
    279 
    280 static inline bool internal_try_lock(linear_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    281         if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
    282                 return true;
    283         }
    284         return false;
    285 }
    286 
    287 static inline bool try_lock(linear_backoff_then_block_lock & this) { size_t compare_val = 0; return internal_try_lock(this, compare_val); }
    288 
    289 static inline bool try_lock_contention(linear_backoff_then_block_lock & this) with(this) {
    290         if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
    291                 return true;
    292         }
    293         return false;
    294 }
    295 
    296 static inline bool block(linear_backoff_then_block_lock & this) with(this) {
    297         lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
    298         if (lock_value != 2) {
    299                 unlock( spinlock );
    300                 return true;
    301         }
    302         insert_last( blocked_threads, *active_thread() );
    303         unlock( spinlock );
     347static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     348static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
     349
     350static inline void  ^?{}( exp_backoff_then_block_lock & this ){}
     351
     352static inline bool internal_try_lock( exp_backoff_then_block_lock & this, size_t & compare_val ) with(this) {
     353        return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
     354}
     355
     356static inline bool try_lock( exp_backoff_then_block_lock & this ) { size_t compare_val = 0; return internal_try_lock( this, compare_val ); }
     357
     358static inline bool try_lock_contention( exp_backoff_then_block_lock & this ) with(this) {
     359        return !__atomic_exchange_n( &lock_value, 2, __ATOMIC_ACQUIRE );
     360}
     361
     362static inline bool block( exp_backoff_then_block_lock & this ) with(this) {
     363    lock( spinlock __cfaabi_dbg_ctx2 );
     364    if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
     365        unlock( spinlock );
     366        return true;
     367    }
     368    insert_last( blocked_threads, *active_thread() );
     369    unlock( spinlock );
    304370        park( );
    305371        return true;
    306372}
    307373
    308 static inline void lock(linear_backoff_then_block_lock & this) with(this) {
     374static inline void lock( exp_backoff_then_block_lock & this ) with(this) {
    309375        size_t compare_val = 0;
    310376        int spin = 4;
     377
    311378        // linear backoff
    312379        for( ;; ) {
     
    324391}
    325392
    326 static inline void unlock(linear_backoff_then_block_lock & this) with(this) {
     393static inline void unlock( exp_backoff_then_block_lock & this ) with(this) {
    327394    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    328         lock( spinlock __cfaabi_dbg_ctx2 );
    329         thread$ * t = &try_pop_front( blocked_threads );
    330         unlock( spinlock );
    331         unpark( t );
    332 }
    333 
    334 static inline void on_notify(linear_backoff_then_block_lock & this, struct thread$ * t ) { unpark(t); }
    335 static inline size_t on_wait(linear_backoff_then_block_lock & this) { unlock(this); return 0; }
    336 static inline void on_wakeup(linear_backoff_then_block_lock & this, size_t recursion ) {
    337         #ifdef REACQ
    338         lock(this);
    339         #endif
    340 }
     395    lock( spinlock __cfaabi_dbg_ctx2 );
     396    thread$ * t = &try_pop_front( blocked_threads );
     397    unlock( spinlock );
     398    unpark( t );
     399}
     400
     401DEFAULT_ON_NOTIFY( exp_backoff_then_block_lock )
     402DEFAULT_ON_WAIT( exp_backoff_then_block_lock )
     403DEFAULT_ON_WAKEUP_REACQ( exp_backoff_then_block_lock )
    341404
    342405//-----------------------------------------------------------------------------
     
    368431
    369432// if this is called recursively IT WILL DEADLOCK!!!!!
    370 static inline void lock(fast_block_lock & this) with(this) {
     433static inline void lock( fast_block_lock & this ) with(this) {
    371434        lock( lock __cfaabi_dbg_ctx2 );
    372435        if ( held ) {
     
    380443}
    381444
    382 static inline void unlock(fast_block_lock & this) with(this) {
     445static inline void unlock( fast_block_lock & this ) with(this) {
    383446        lock( lock __cfaabi_dbg_ctx2 );
    384447        /* paranoid */ verifyf( held != false, "Attempt to release lock %p that isn't held", &this );
     
    389452}
    390453
    391 static inline void on_notify(fast_block_lock & this, struct thread$ * t ) with(this) {
    392         #ifdef REACQ
    393                 lock( lock __cfaabi_dbg_ctx2 );
    394                 insert_last( blocked_threads, *t );
    395                 unlock( lock );
    396         #else
    397                 unpark(t);
    398         #endif
    399 }
    400 static inline size_t on_wait(fast_block_lock & this) { unlock(this); return 0; }
    401 static inline void on_wakeup(fast_block_lock & this, size_t recursion ) { }
     454static inline void on_notify( fast_block_lock & this, struct thread$ * t ) with(this) {
     455    lock( lock __cfaabi_dbg_ctx2 );
     456    insert_last( blocked_threads, *t );
     457    unlock( lock );
     458}
     459DEFAULT_ON_WAIT( fast_block_lock )
     460DEFAULT_ON_WAKEUP_NO_REACQ( fast_block_lock )
    402461
    403462//-----------------------------------------------------------------------------
     
    410469struct simple_owner_lock {
    411470        // List of blocked threads
    412         dlist( thread$ ) blocked_threads;
     471        dlist( select_node ) blocked_threads;
    413472
    414473        // Spin lock used for mutual exclusion
     
    431490static inline void ?=?( simple_owner_lock & this, simple_owner_lock this2 ) = void;
    432491
    433 static inline void lock(simple_owner_lock & this) with(this) {
    434         if (owner == active_thread()) {
     492static inline void lock( simple_owner_lock & this ) with(this) {
     493        if ( owner == active_thread() ) {
    435494                recursion_count++;
    436495                return;
     
    438497        lock( lock __cfaabi_dbg_ctx2 );
    439498
    440         if (owner != 0p) {
    441                 insert_last( blocked_threads, *active_thread() );
     499        if ( owner != 0p ) {
     500        select_node node;
     501                insert_last( blocked_threads, node );
    442502                unlock( lock );
    443503                park( );
     
    449509}
    450510
    451 // TODO: fix duplicate def issue and bring this back
    452 // void pop_and_set_new_owner( simple_owner_lock & this ) with( this ) {
    453         // thread$ * t = &try_pop_front( blocked_threads );
    454         // owner = t;
    455         // recursion_count = ( t ? 1 : 0 );
    456         // unpark( t );
    457 // }
    458 
    459 static inline void unlock(simple_owner_lock & this) with(this) {
     511static inline void pop_node( simple_owner_lock & this ) with(this) {
     512    __handle_waituntil_OR( blocked_threads );
     513    select_node * node = &try_pop_front( blocked_threads );
     514    if ( node ) {
     515        owner = node->blocked_thread;
     516        recursion_count = 1;
     517        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
     518        wake_one( blocked_threads, *node );
     519    } else {
     520        owner = 0p;
     521        recursion_count = 0;
     522    }
     523}
     524
     525static inline void unlock( simple_owner_lock & this ) with(this) {
    460526        lock( lock __cfaabi_dbg_ctx2 );
    461527        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    464530        recursion_count--;
    465531        if ( recursion_count == 0 ) {
    466                 // pop_and_set_new_owner( this );
    467                 thread$ * t = &try_pop_front( blocked_threads );
    468                 owner = t;
    469                 recursion_count = ( t ? 1 : 0 );
    470                 unpark( t );
     532                pop_node( this );
    471533        }
    472534        unlock( lock );
    473535}
    474536
    475 static inline void on_notify(simple_owner_lock & this, struct thread$ * t ) with(this) {
     537static inline void on_notify( simple_owner_lock & this, thread$ * t ) with(this) {
    476538        lock( lock __cfaabi_dbg_ctx2 );
    477539        // lock held
    478540        if ( owner != 0p ) {
    479                 insert_last( blocked_threads, *t );
     541                insert_last( blocked_threads, *(select_node *)t->link_node );
    480542        }
    481543        // lock not held
     
    488550}
    489551
    490 static inline size_t on_wait(simple_owner_lock & this) with(this) {
     552static inline size_t on_wait( simple_owner_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with(this) {
    491553        lock( lock __cfaabi_dbg_ctx2 );
    492554        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
     
    495557        size_t ret = recursion_count;
    496558
    497         // pop_and_set_new_owner( this );
    498 
    499         thread$ * t = &try_pop_front( blocked_threads );
    500         owner = t;
    501         recursion_count = ( t ? 1 : 0 );
    502         unpark( t );
    503 
     559        pop_node( this );
     560
     561    select_node node;
     562    active_thread()->link_node = (void *)&node;
    504563        unlock( lock );
     564
     565    pre_park_then_park( pp_fn, pp_datum );
     566
    505567        return ret;
    506568}
    507569
    508 static inline void on_wakeup(simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     570static inline void on_wakeup( simple_owner_lock & this, size_t recursion ) with(this) { recursion_count = recursion; }
     571
     572// waituntil() support
     573static inline bool register_select( simple_owner_lock & this, select_node & node ) with(this) {
     574    lock( lock __cfaabi_dbg_ctx2 );
     575
     576    // check if we can complete operation. If so race to establish winner in special OR case
     577    if ( !node.park_counter && ( owner == active_thread() || owner == 0p ) ) {
     578        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
     579           unlock( lock );
     580           return false;
     581        }
     582    }
     583
     584    if ( owner == active_thread() ) {
     585                recursion_count++;
     586        if ( node.park_counter ) __make_select_node_available( node );
     587        unlock( lock );
     588                return true;
     589        }
     590
     591    if ( owner != 0p ) {
     592                insert_last( blocked_threads, node );
     593                unlock( lock );
     594                return false;
     595        }
     596   
     597        owner = active_thread();
     598        recursion_count = 1;
     599
     600    if ( node.park_counter ) __make_select_node_available( node );
     601    unlock( lock );
     602    return true;
     603}
     604
     605static inline bool unregister_select( simple_owner_lock & this, select_node & node ) with(this) {
     606    lock( lock __cfaabi_dbg_ctx2 );
     607    if ( node`isListed ) {
     608        remove( node );
     609        unlock( lock );
     610        return false;
     611    }
     612
     613    if ( owner == active_thread() ) {
     614        recursion_count--;
     615        if ( recursion_count == 0 ) {
     616            pop_node( this );
     617        }
     618    }
     619    unlock( lock );
     620    return false;
     621}
     622
     623static inline void on_selected( simple_owner_lock & this, select_node & node ) {}
     624
    509625
    510626//-----------------------------------------------------------------------------
     
    521637        // flag showing if lock is held
    522638        volatile bool held;
    523 
    524         #ifdef __CFA_DEBUG__
    525         // for deadlock detection
    526         struct thread$ * owner;
    527         #endif
    528639};
    529640
     
    536647static inline void ?=?( spin_queue_lock & this, spin_queue_lock this2 ) = void;
    537648
    538 // if this is called recursively IT WILL DEADLOCK!!!!!
    539 static inline void lock(spin_queue_lock & this) with(this) {
     649// if this is called recursively IT WILL DEADLOCK!
     650static inline void lock( spin_queue_lock & this ) with(this) {
    540651        mcs_spin_node node;
    541652        lock( lock, node );
     
    545656}
    546657
    547 static inline void unlock(spin_queue_lock & this) with(this) {
     658static inline void unlock( spin_queue_lock & this ) with(this) {
    548659        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    549660}
    550661
    551 static inline void on_notify(spin_queue_lock & this, struct thread$ * t ) {
    552         unpark(t);
    553 }
    554 static inline size_t on_wait(spin_queue_lock & this) { unlock(this); return 0; }
    555 static inline void on_wakeup(spin_queue_lock & this, size_t recursion ) {
    556         #ifdef REACQ
    557         lock(this);
    558         #endif
    559 }
    560 
     662DEFAULT_ON_NOTIFY( spin_queue_lock )
     663DEFAULT_ON_WAIT( spin_queue_lock )
     664DEFAULT_ON_WAKEUP_REACQ( spin_queue_lock )
    561665
    562666//-----------------------------------------------------------------------------
     
    584688
    585689// if this is called recursively IT WILL DEADLOCK!!!!!
    586 static inline void lock(mcs_block_spin_lock & this) with(this) {
     690static inline void lock( mcs_block_spin_lock & this ) with(this) {
    587691        mcs_node node;
    588692        lock( lock, node );
     
    596700}
    597701
    598 static inline void on_notify(mcs_block_spin_lock & this, struct thread$ * t ) { unpark(t); }
    599 static inline size_t on_wait(mcs_block_spin_lock & this) { unlock(this); return 0; }
    600 static inline void on_wakeup(mcs_block_spin_lock & this, size_t recursion ) {
    601         #ifdef REACQ
    602         lock(this);
    603         #endif
    604 }
     702DEFAULT_ON_NOTIFY( mcs_block_spin_lock )
     703DEFAULT_ON_WAIT( mcs_block_spin_lock )
     704DEFAULT_ON_WAKEUP_REACQ( mcs_block_spin_lock )
    605705
    606706//-----------------------------------------------------------------------------
     
    628728
    629729// if this is called recursively IT WILL DEADLOCK!!!!!
    630 static inline void lock(block_spin_lock & this) with(this) {
     730static inline void lock( block_spin_lock & this ) with(this) {
    631731        lock( lock );
    632732        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
     
    635735}
    636736
    637 static inline void unlock(block_spin_lock & this) with(this) {
     737static inline void unlock( block_spin_lock & this ) with(this) {
    638738        __atomic_store_n(&held, false, __ATOMIC_RELEASE);
    639739}
    640740
    641 static inline void on_notify(block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    642   #ifdef REACQ
     741static inline void on_notify( block_spin_lock & this, struct thread$ * t ) with(this.lock) {
    643742        // first we acquire internal fast_block_lock
    644743        lock( lock __cfaabi_dbg_ctx2 );
     
    652751        unlock( lock );
    653752
    654   #endif
    655753        unpark(t);
    656        
    657 }
    658 static inline size_t on_wait(block_spin_lock & this) { unlock(this); return 0; }
    659 static inline void on_wakeup(block_spin_lock & this, size_t recursion ) with(this) {
    660   #ifdef REACQ
     754}
     755DEFAULT_ON_WAIT( block_spin_lock )
     756static inline void on_wakeup( block_spin_lock & this, size_t recursion ) with(this) {
    661757        // now we acquire the entire block_spin_lock upon waking up
    662758        while(__atomic_load_n(&held, __ATOMIC_SEQ_CST)) Pause();
    663759        __atomic_store_n(&held, true, __ATOMIC_RELEASE);
    664760        unlock( lock ); // Now we release the internal fast_spin_lock
    665   #endif
    666 }
    667 
    668 //-----------------------------------------------------------------------------
    669 // is_blocking_lock
    670 trait is_blocking_lock(L & | sized(L)) {
    671         // For synchronization locks to use when acquiring
    672         void on_notify( L &, struct thread$ * );
    673 
    674         // For synchronization locks to use when releasing
    675         size_t on_wait( L & );
    676 
    677         // to set recursion count after getting signalled;
    678         void on_wakeup( L &, size_t recursion );
    679 };
     761}
    680762
    681763//-----------------------------------------------------------------------------
     
    685767forall(L & | is_blocking_lock(L)) {
    686768        struct info_thread;
    687 
    688         // // for use by sequence
    689         // info_thread(L) *& Back( info_thread(L) * this );
    690         // info_thread(L) *& Next( info_thread(L) * this );
    691769}
    692770
  • libcfa/src/concurrency/monitor.cfa

    r34b4268 r24d6572  
    1010// Created On       : Thd Feb 23 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Wed Dec  4 07:55:14 2019
    13 // Update Count     : 10
     12// Last Modified On : Sun Feb 19 17:00:59 2023
     13// Update Count     : 12
    1414//
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#include "monitor.hfa"
  • libcfa/src/concurrency/monitor.hfa

    r34b4268 r24d6572  
    1010// Created On       : Thd Feb 23 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Wed Dec  4 07:55:32 2019
    13 // Update Count     : 11
     12// Last Modified On : Thu Feb  2 11:29:21 2023
     13// Update Count     : 12
    1414//
    1515
     
    2222#include "stdlib.hfa"
    2323
    24 trait is_monitor(T &) {
     24forall( T & )
     25trait is_monitor {
    2526        monitor$ * get_monitor( T & );
    2627        void ^?{}( T & mutex );
  • libcfa/src/concurrency/mutex.cfa

    r34b4268 r24d6572  
    1212// Created On       : Fri May 25 01:37:11 2018
    1313// Last Modified By : Peter A. Buhr
    14 // Last Modified On : Wed Dec  4 09:16:39 2019
    15 // Update Count     : 1
     14// Last Modified On : Sun Feb 19 17:01:36 2023
     15// Update Count     : 3
    1616//
    1717
    1818#define __cforall_thread__
    19 #define _GNU_SOURCE
    2019
    2120#include "mutex.hfa"
  • libcfa/src/concurrency/mutex.hfa

    r34b4268 r24d6572  
    1212// Created On       : Fri May 25 01:24:09 2018
    1313// Last Modified By : Peter A. Buhr
    14 // Last Modified On : Wed Dec  4 09:16:53 2019
    15 // Update Count     : 1
     14// Last Modified On : Thu Feb  2 11:46:08 2023
     15// Update Count     : 2
    1616//
    1717
     
    7070void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    7171
    72 trait is_lock(L & | sized(L)) {
     72forall( L & | sized(L) )
     73trait is_lock {
    7374        void lock  (L &);
    7475        void unlock(L &);
  • libcfa/src/concurrency/mutex_stmt.hfa

    r34b4268 r24d6572  
     1#pragma once
     2
    13#include "bits/algorithm.hfa"
    24#include "bits/defs.hfa"
     
    46//-----------------------------------------------------------------------------
    57// is_lock
    6 trait is_lock(L & | sized(L)) {
     8forall(L & | sized(L))
     9trait is_lock {
    710        // For acquiring a lock
    811        void lock( L & );
     
    1114        void unlock( L & );
    1215};
    13 
    1416
    1517struct __mutex_stmt_lock_guard {
     
    2426    // Sort locks based on address
    2527    __libcfa_small_sort(this.lockarr, count);
    26 
    27     // acquire locks in order
    28     // for ( size_t i = 0; i < count; i++ ) {
    29     //     lock(*this.lockarr[i]);
    30     // }
    31 }
    32 
    33 static inline void ^?{}( __mutex_stmt_lock_guard & this ) with(this) {
    34     // for ( size_t i = count; i > 0; i-- ) {
    35     //     unlock(*lockarr[i - 1]);
    36     // }
    3728}
    3829
    3930forall(L & | is_lock(L)) {
    40 
    41     struct scoped_lock {
    42         L * internal_lock;
    43     };
    44 
    45     static inline void ?{}( scoped_lock(L) & this, L & internal_lock ) {
    46         this.internal_lock = &internal_lock;
    47         lock(internal_lock);
    48     }
    49    
    50     static inline void ^?{}( scoped_lock(L) & this ) with(this) {
    51         unlock(*internal_lock);
    52     }
    53 
    54     static inline void * __get_mutexstmt_lock_ptr( L & this ) {
    55         return &this;
    56     }
    57 
    58     static inline L __get_mutexstmt_lock_type( L & this );
    59 
    60     static inline L __get_mutexstmt_lock_type( L * this );
     31    static inline void * __get_mutexstmt_lock_ptr( L & this ) { return &this; }
     32    static inline L __get_mutexstmt_lock_type( L & this ) {}
     33    static inline L __get_mutexstmt_lock_type( L * this ) {}
    6134}
  • libcfa/src/concurrency/preemption.cfa

    r34b4268 r24d6572  
    1010// Created On       : Mon Jun 5 14:20:42 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Thu Feb 17 11:18:57 2022
    13 // Update Count     : 59
     12// Last Modified On : Mon Jan  9 08:42:59 2023
     13// Update Count     : 60
    1414//
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918// #define __CFA_DEBUG_PRINT_PREEMPTION__
     
    118117                __cfadbg_print_buffer_decl( preemption, " KERNEL: preemption tick %lu\n", currtime.tn);
    119118                Duration period = node->period;
    120                 if( period == 0) {
     119                if( period == 0 ) {
    121120                        node->set = false;                  // Node is one-shot, just mark it as not pending
    122121                }
  • libcfa/src/concurrency/pthread.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#include <signal.h>
     
    3534struct pthread_values{
    3635        inline Seqable;
    37         void* value;
     36        void * value;
    3837        bool in_use;
    3938};
     
    5150struct pthread_keys {
    5251        bool in_use;
    53         void (*destructor)( void * );
     52        void (* destructor)( void * );
    5453        Sequence(pthread_values) threads;
    5554};
    5655
    57 static void ?{}(pthread_keys& k){
     56static void ?{}(pthread_keys& k) {
    5857        k.threads{};
    5958}
     
    6261static pthread_keys cfa_pthread_keys_storage[PTHREAD_KEYS_MAX] __attribute__((aligned (16)));
    6362
    64 static void init_pthread_storage(){
    65         for (int i = 0; i < PTHREAD_KEYS_MAX; i++){
     63static void init_pthread_storage() {
     64        for ( int i = 0; i < PTHREAD_KEYS_MAX; i++ ) {
    6665                cfa_pthread_keys_storage[i]{};
    6766        }
     
    9695
    9796/* condvar helper routines */
    98 static void init(pthread_cond_t* pcond){
     97static void init(pthread_cond_t * pcond) {
    9998        static_assert(sizeof(pthread_cond_t) >= sizeof(cfa2pthr_cond_var_t),"sizeof(pthread_t) < sizeof(cfa2pthr_cond_var_t)");
    100         cfa2pthr_cond_var_t* _cond = (cfa2pthr_cond_var_t*)pcond;
     99        cfa2pthr_cond_var_t * _cond = (cfa2pthr_cond_var_t *)pcond;
    101100        ?{}(*_cond);
    102101}
    103102
    104 static cfa2pthr_cond_var_t* get(pthread_cond_t* pcond){
     103static cfa2pthr_cond_var_t * get(pthread_cond_t * pcond) {
    105104        static_assert(sizeof(pthread_cond_t) >= sizeof(cfa2pthr_cond_var_t),"sizeof(pthread_t) < sizeof(cfa2pthr_cond_var_t)");
    106         return (cfa2pthr_cond_var_t*)pcond;
    107 }
    108 
    109 static void destroy(pthread_cond_t* cond){
     105        return (cfa2pthr_cond_var_t *)pcond;
     106}
     107
     108static void destroy(pthread_cond_t * cond) {
    110109        static_assert(sizeof(pthread_cond_t) >= sizeof(cfa2pthr_cond_var_t),"sizeof(pthread_t) < sizeof(cfa2pthr_cond_var_t)");
    111110        ^?{}(*get(cond));
     
    116115
    117116/* mutex helper routines */
    118 static void mutex_check(pthread_mutex_t* t){
     117static void mutex_check(pthread_mutex_t * t) {
    119118        // Use double check to improve performance.
    120119        // Check is safe on x86; volatile prevents compiler reordering
    121         volatile pthread_mutex_t *const mutex_ = t;
     120        volatile pthread_mutex_t * const mutex_ = t;
    122121
    123122        // SKULLDUGGERY: not a portable way to access the kind field, /usr/include/x86_64-linux-gnu/bits/pthreadtypes.h
     
    136135
    137136
    138 static void init(pthread_mutex_t* plock){
     137static void init(pthread_mutex_t * plock) {
    139138        static_assert(sizeof(pthread_mutex_t) >= sizeof(simple_owner_lock),"sizeof(pthread_mutex_t) < sizeof(simple_owner_lock)");
    140         simple_owner_lock* _lock = (simple_owner_lock*)plock;
     139        simple_owner_lock * _lock = (simple_owner_lock *)plock;
    141140        ?{}(*_lock);
    142141}
    143142
    144 static simple_owner_lock* get(pthread_mutex_t* plock){
     143static simple_owner_lock * get(pthread_mutex_t * plock) {
    145144        static_assert(sizeof(pthread_mutex_t) >= sizeof(simple_owner_lock),"sizeof(pthread_mutex_t) < sizeof(simple_owner_lock)");
    146         return (simple_owner_lock*)plock;
    147 }
    148 
    149 static void destroy(pthread_mutex_t* plock){
     145        return (simple_owner_lock *)plock;
     146}
     147
     148static void destroy(pthread_mutex_t * plock) {
    150149        static_assert(sizeof(pthread_mutex_t) >= sizeof(simple_owner_lock),"sizeof(pthread_mutex_t) < sizeof(simple_owner_lock)");
    151150        ^?{}(*get(plock));
     
    153152
    154153//######################### Attr helpers #########################
    155 struct cfaPthread_attr_t {                                                              // thread attributes
     154typedef struct cfaPthread_attr_t {                                              // thread attributes
    156155                int contentionscope;
    157156                int detachstate;
    158157                size_t stacksize;
    159                 void *stackaddr;
     158                void * stackaddr;
    160159                int policy;
    161160                int inheritsched;
    162161                struct sched_param param;
    163 } typedef cfaPthread_attr_t;
    164 
    165 static const cfaPthread_attr_t default_attrs{
     162} cfaPthread_attr_t;
     163
     164static const cfaPthread_attr_t default_attrs {
    166165        0,
    167166        0,
    168         (size_t)65000,
    169         (void *)NULL,
     167        65_000,
     168        NULL,
    170169        0,
    171170        0,
     
    173172};
    174173
    175 static cfaPthread_attr_t* get(const pthread_attr_t* attr){
    176         static_assert(sizeof(pthread_attr_t) >= sizeof(cfaPthread_attr_t),"sizeof(pthread_attr_t) < sizeof(cfaPthread_attr_t)");
    177         return (cfaPthread_attr_t*)attr;
     174static cfaPthread_attr_t * get(const pthread_attr_t * attr) {
     175        static_assert(sizeof(pthread_attr_t) >= sizeof(cfaPthread_attr_t), "sizeof(pthread_attr_t) < sizeof(cfaPthread_attr_t)");
     176        return (cfaPthread_attr_t *)attr;
    178177}
    179178
     
    190189
    191190        // pthreads return value
    192         void *joinval;
     191        void * joinval;
    193192
    194193        // pthread attributes
    195194        pthread_attr_t pthread_attr;
    196195
    197         void *(*start_routine)(void *);
    198         void *start_arg;
     196        void *(* start_routine)(void *);
     197        void * start_arg;
    199198
    200199        // thread local data
    201         pthread_values* pthreadData;
     200        pthread_values * pthreadData;
    202201
    203202        // flag used for tryjoin
     
    207206/* thread part routines */
    208207//  cfaPthread entry point
    209 void main(cfaPthread& _thread) with(_thread){
    210         joinval =  start_routine(start_arg);
     208void main(cfaPthread & _thread) with(_thread) {
     209        joinval = start_routine(start_arg);
    211210        isTerminated = true;
    212211}
    213212
    214 static cfaPthread *lookup( pthread_t p ){
    215         static_assert(sizeof(pthread_t) >= sizeof(cfaPthread*),"sizeof(pthread_t) < sizeof(cfaPthread*)");
    216         return (cfaPthread*)p;
    217 }
    218 
    219 static void pthread_deletespecific_( pthread_values* values )  { // see uMachContext::invokeTask
    220         pthread_values* value;
    221         pthread_keys* key;
     213static cfaPthread * lookup( pthread_t p ) {
     214        static_assert(sizeof(pthread_t) >= sizeof(cfaPthread *),"sizeof(pthread_t) < sizeof(cfaPthread *)");
     215        return (cfaPthread *)p;
     216}
     217
     218static void pthread_deletespecific_( pthread_values * values )  { // see uMachContext::invokeTask
     219        pthread_values * value;
     220        pthread_keys * key;
    222221        bool destcalled = true;
    223         if (values != NULL){
     222        if (values != NULL) {
    224223                for ( int attempts = 0; attempts < PTHREAD_DESTRUCTOR_ITERATIONS && destcalled ; attempts += 1 ) {
    225224                        destcalled = false;
    226225                        lock(key_lock);
    227                         for (int i = 0; i < PTHREAD_KEYS_MAX; i++){
     226                        for ( int i = 0; i < PTHREAD_KEYS_MAX; i++ ) {
    228227                                // for each valid key
    229                                 if ( values[i].in_use){
     228                                if ( values[i].in_use) {
    230229                                        value = &values[i];
    231230                                        key = &cfa_pthread_keys[i];
     
    234233                                        // if  a  key  value  has  a  non-NULL  destructor pointer,  and  the  thread  has  a  non-NULL  value associated with that key,
    235234                                        // the value of the key is set to NULL, and then the function pointed to is called with the previously associated value as its sole argument.
    236                                         if (value->value != NULL && key->destructor != NULL){
     235                                        if (value->value != NULL && key->destructor != NULL) {
    237236                                                unlock(key_lock);
    238237                                                key->destructor(value->value); // run destructor
     
    249248}
    250249
    251 static void ^?{}(cfaPthread & mutex t){
     250static void ^?{}(cfaPthread & mutex t) {
    252251        // delete pthread local storage
    253252        pthread_values * values = t.pthreadData;
     
    255254}
    256255
    257 static void ?{}(cfaPthread &t, pthread_t* _thread, const pthread_attr_t * _attr,void *(*start_routine)(void *), void * arg) {
    258         static_assert(sizeof(pthread_t) >= sizeof(cfaPthread*), "pthread_t too small to hold a pointer: sizeof(pthread_t) < sizeof(cfaPthread*)");
     256static void ?{}(cfaPthread & t, pthread_t * _thread, const pthread_attr_t * _attr,void *(* start_routine)(void *), void * arg) {
     257        static_assert(sizeof(pthread_t) >= sizeof(cfaPthread *), "pthread_t too small to hold a pointer: sizeof(pthread_t) < sizeof(cfaPthread *)");
    259258
    260259        // set up user thread stackSize
     
    278277        //######################### Pthread Attrs #########################
    279278
    280         int pthread_attr_init(pthread_attr_t *attr) libcfa_public __THROW {
    281                 cfaPthread_attr_t* _attr = get(attr);
     279        int pthread_attr_init(pthread_attr_t * attr) libcfa_public __THROW {
     280                cfaPthread_attr_t * _attr = get(attr);
    282281                ?{}(*_attr, default_attrs);
    283282                return 0;
    284283        }
    285         int pthread_attr_destroy(pthread_attr_t *attr) libcfa_public __THROW {
     284        int pthread_attr_destroy(pthread_attr_t * attr) libcfa_public __THROW {
    286285                ^?{}(*get(attr));
    287286                return 0;
    288287        }
    289288
    290         int pthread_attr_setscope( pthread_attr_t *attr, int contentionscope ) libcfa_public __THROW {
     289        int pthread_attr_setscope( pthread_attr_t * attr, int contentionscope ) libcfa_public __THROW {
    291290                get( attr )->contentionscope = contentionscope;
    292291                return 0;
    293292        } // pthread_attr_setscope
    294293
    295         int pthread_attr_getscope( const pthread_attr_t *attr, int *contentionscope ) libcfa_public __THROW {
     294        int pthread_attr_getscope( const pthread_attr_t * attr, int * contentionscope ) libcfa_public __THROW {
    296295                *contentionscope = get( attr )->contentionscope;
    297296                return 0;
    298297        } // pthread_attr_getscope
    299298
    300         int pthread_attr_setdetachstate( pthread_attr_t *attr, int detachstate ) libcfa_public __THROW {
     299        int pthread_attr_setdetachstate( pthread_attr_t * attr, int detachstate ) libcfa_public __THROW {
    301300                get( attr )->detachstate = detachstate;
    302301                return 0;
    303302        } // pthread_attr_setdetachstate
    304303
    305         int pthread_attr_getdetachstate( const pthread_attr_t *attr, int *detachstate ) libcfa_public __THROW {
     304        int pthread_attr_getdetachstate( const pthread_attr_t * attr, int * detachstate ) libcfa_public __THROW {
    306305                *detachstate = get( attr )->detachstate;
    307306                return 0;
    308307        } // pthread_attr_getdetachstate
    309308
    310         int pthread_attr_setstacksize( pthread_attr_t *attr, size_t stacksize ) libcfa_public __THROW {
     309        int pthread_attr_setstacksize( pthread_attr_t * attr, size_t stacksize ) libcfa_public __THROW {
    311310                get( attr )->stacksize = stacksize;
    312311                return 0;
    313312        } // pthread_attr_setstacksize
    314313
    315         int pthread_attr_getstacksize( const pthread_attr_t *attr, size_t *stacksize ) libcfa_public __THROW {
     314        int pthread_attr_getstacksize( const pthread_attr_t * attr, size_t * stacksize ) libcfa_public __THROW {
    316315                *stacksize = get( attr )->stacksize;
    317316                return 0;
     
    326325        } // pthread_attr_setguardsize
    327326
    328         int pthread_attr_setstackaddr( pthread_attr_t *attr, void *stackaddr ) libcfa_public __THROW {
     327        int pthread_attr_setstackaddr( pthread_attr_t * attr, void * stackaddr ) libcfa_public __THROW {
    329328                get( attr )->stackaddr = stackaddr;
    330329                return 0;
    331330        } // pthread_attr_setstackaddr
    332331
    333         int pthread_attr_getstackaddr( const pthread_attr_t *attr, void **stackaddr ) libcfa_public __THROW {
     332        int pthread_attr_getstackaddr( const pthread_attr_t * attr, void ** stackaddr ) libcfa_public __THROW {
    334333                *stackaddr = get( attr )->stackaddr;
    335334                return 0;
    336335        } // pthread_attr_getstackaddr
    337336
    338         int pthread_attr_setstack( pthread_attr_t *attr, void *stackaddr, size_t stacksize ) libcfa_public __THROW {
     337        int pthread_attr_setstack( pthread_attr_t * attr, void * stackaddr, size_t stacksize ) libcfa_public __THROW {
    339338                get( attr )->stackaddr = stackaddr;
    340339                get( attr )->stacksize = stacksize;
     
    342341        } // pthread_attr_setstack
    343342
    344         int pthread_attr_getstack( const pthread_attr_t *attr, void **stackaddr, size_t *stacksize ) libcfa_public __THROW {
     343        int pthread_attr_getstack( const pthread_attr_t * attr, void ** stackaddr, size_t * stacksize ) libcfa_public __THROW {
    345344                *stackaddr = get( attr )->stackaddr;
    346345                *stacksize = get( attr )->stacksize;
     
    351350        // already running thread threadID. It shall be called on unitialized attr
    352351        // and destroyed with pthread_attr_destroy when no longer needed.
    353         int pthread_getattr_np( pthread_t threadID, pthread_attr_t *attr ) libcfa_public __THROW { // GNU extension
     352        int pthread_getattr_np( pthread_t threadID, pthread_attr_t * attr ) libcfa_public __THROW { // GNU extension
    354353                check_nonnull(attr);
    355354
     
    363362        //######################### Threads #########################
    364363
    365         int pthread_create(pthread_t * _thread, const pthread_attr_t * attr, void *(*start_routine)(void *), void * arg) libcfa_public __THROW {
    366                 cfaPthread *t = alloc();
     364        int pthread_create(pthread_t * _thread, const pthread_attr_t * attr, void *(* start_routine)(void *), void * arg) libcfa_public __THROW {
     365                cfaPthread * t = alloc();
    367366                (*t){_thread, attr, start_routine, arg};
    368367                return 0;
    369368        }
    370369
    371 
    372         int pthread_join(pthread_t _thread, void **value_ptr) libcfa_public __THROW {
     370        int pthread_join(pthread_t _thread, void ** value_ptr) libcfa_public __THROW {
    373371                // if thread is invalid
    374372                if (_thread == NULL) return EINVAL;
     
    376374
    377375                // get user thr pointer
    378                 cfaPthread* p = lookup(_thread);
     376                cfaPthread * p = lookup(_thread);
    379377                try {
    380378                        join(*p);
     
    389387        }
    390388
    391         int pthread_tryjoin_np(pthread_t _thread, void **value_ptr) libcfa_public __THROW {
     389        int pthread_tryjoin_np(pthread_t _thread, void ** value_ptr) libcfa_public __THROW {
    392390                // if thread is invalid
    393391                if (_thread == NULL) return EINVAL;
    394392                if (_thread == pthread_self()) return EDEADLK;
    395393
    396                 cfaPthread* p = lookup(_thread);
     394                cfaPthread * p = lookup(_thread);
    397395
    398396                // thread not finished ?
     
    412410        void pthread_exit(void * status) libcfa_public __THROW {
    413411                pthread_t pid = pthread_self();
    414                 cfaPthread* _thread = (cfaPthread*)pid;
     412                cfaPthread * _thread = (cfaPthread *)pid;
    415413                _thread->joinval = status;  // set return value
    416414                _thread->isTerminated = 1;  // set terminated flag
     
    426424        //######################### Mutex #########################
    427425
    428         int pthread_mutex_init(pthread_mutex_t *_mutex, const pthread_mutexattr_t *attr) libcfa_public __THROW {
     426        int pthread_mutex_init(pthread_mutex_t *_mutex, const pthread_mutexattr_t * attr) libcfa_public __THROW {
    429427                check_nonnull(_mutex);
    430428                init(_mutex);
     
    435433        int pthread_mutex_destroy(pthread_mutex_t *_mutex) libcfa_public __THROW {
    436434                check_nonnull(_mutex);
    437                 simple_owner_lock* _lock = get(_mutex);
    438                 if (_lock->owner != NULL){
     435                simple_owner_lock * _lock = get(_mutex);
     436                if (_lock->owner != NULL) {
    439437                        return EBUSY;
    440438                }
     
    446444                check_nonnull(_mutex);
    447445                mutex_check(_mutex);
    448                 simple_owner_lock* _lock = get(_mutex);
     446                simple_owner_lock * _lock = get(_mutex);
    449447                lock(*_lock);
    450448                return 0;
     
    453451        int pthread_mutex_unlock(pthread_mutex_t *_mutex) libcfa_public __THROW {
    454452                check_nonnull(_mutex);
    455                 simple_owner_lock* _lock = get(_mutex);
    456                 if (_lock->owner != active_thread()){
     453                simple_owner_lock * _lock = get(_mutex);
     454                if (_lock->owner != active_thread()) {
    457455                        return EPERM;
    458456                } // current thread does not hold the mutex
     
    463461        int pthread_mutex_trylock(pthread_mutex_t *_mutex) libcfa_public __THROW {
    464462                check_nonnull(_mutex);
    465                 simple_owner_lock* _lock = get(_mutex);
    466                 if (_lock->owner != active_thread() && _lock->owner != NULL){
     463                simple_owner_lock * _lock = get(_mutex);
     464                if (_lock->owner != active_thread() && _lock->owner != NULL) {
    467465                        return EBUSY;
    468466                }   // if mutex is owned
     
    474472
    475473        /* conditional variable routines */
    476         int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) libcfa_public __THROW {
     474        int pthread_cond_init(pthread_cond_t * cond, const pthread_condattr_t * attr) libcfa_public __THROW {
    477475                check_nonnull(cond);
    478476                init(cond);
     
    480478        }  //pthread_cond_init
    481479
    482         int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *_mutex) libcfa_public __THROW {
     480        int pthread_cond_wait(pthread_cond_t * cond, pthread_mutex_t *_mutex) libcfa_public __THROW {
    483481                check_nonnull(_mutex);
    484482                check_nonnull(cond);
     
    494492        } // pthread_cond_timedwait
    495493
    496         int pthread_cond_signal(pthread_cond_t *cond) libcfa_public __THROW {
     494        int pthread_cond_signal(pthread_cond_t * cond) libcfa_public __THROW {
    497495                check_nonnull(cond);
    498496                return notify_one(*get(cond));
    499497        } // pthread_cond_signal
    500498
    501         int pthread_cond_broadcast(pthread_cond_t *cond) libcfa_public __THROW {
     499        int pthread_cond_broadcast(pthread_cond_t * cond) libcfa_public __THROW {
    502500                check_nonnull(cond);
    503501                return notify_all(*get(cond));
    504502        } // pthread_cond_broadcast
    505503
    506         int pthread_cond_destroy(pthread_cond_t *cond) libcfa_public __THROW {
     504        int pthread_cond_destroy(pthread_cond_t * cond) libcfa_public __THROW {
    507505                check_nonnull(cond);
    508506                destroy(cond);
     
    514512        //######################### Local storage #########################
    515513
    516         int pthread_once(pthread_once_t *once_control, void (*init_routine)(void)) libcfa_public __THROW {
     514        int pthread_once(pthread_once_t * once_control, void (* init_routine)(void)) libcfa_public __THROW {
    517515                static_assert(sizeof(pthread_once_t) >= sizeof(int),"sizeof(pthread_once_t) < sizeof(int)");
    518516                check_nonnull(once_control);
     
    527525        } // pthread_once
    528526
    529         int pthread_key_create( pthread_key_t *key, void (*destructor)( void * ) ) libcfa_public __THROW {
     527        int pthread_key_create( pthread_key_t * key, void (* destructor)( void * ) ) libcfa_public __THROW {
    530528                lock(key_lock);
    531529                for ( int i = 0; i < PTHREAD_KEYS_MAX; i += 1 ) {
     
    562560        }   // pthread_key_delete
    563561
    564         int pthread_setspecific( pthread_key_t key, const void *value ) libcfa_public __THROW {
     562        int pthread_setspecific( pthread_key_t key, const void * value ) libcfa_public __THROW {
    565563                // get current thread
    566                 cfaPthread* t = lookup(pthread_self());
     564                cfaPthread * t = lookup(pthread_self());
    567565                // if current thread's pthreadData is NULL; initialize it
    568                 pthread_values* values;
    569                 if (t->pthreadData == NULL){
     566                pthread_values * values;
     567                if (t->pthreadData == NULL) {
    570568                        values = anew( PTHREAD_KEYS_MAX);
    571569                        t->pthreadData = values;
    572                         for (int i = 0;i < PTHREAD_KEYS_MAX; i++){
     570                        for ( int i = 0;i < PTHREAD_KEYS_MAX; i++ ) {
    573571                                t->pthreadData[i].in_use = false;
    574572                        }   // for
     
    593591        } //pthread_setspecific
    594592
    595         void* pthread_getspecific(pthread_key_t key) libcfa_public __THROW {
     593        void * pthread_getspecific(pthread_key_t key) libcfa_public __THROW {
    596594                if (key >= PTHREAD_KEYS_MAX || ! cfa_pthread_keys[key].in_use) return NULL;
    597595
    598596                // get current thread
    599                 cfaPthread* t = lookup(pthread_self());
     597                cfaPthread * t = lookup(pthread_self());
    600598                if (t->pthreadData == NULL) return NULL;
    601599                lock(key_lock);
    602                 pthread_values &entry = ((pthread_values *)t->pthreadData)[key];
     600                pthread_values & entry = ((pthread_values *)t->pthreadData)[key];
    603601                if ( ! entry.in_use ) {
    604602                        unlock( key_lock );
    605603                        return NULL;
    606604                } // if
    607                 void *value = entry.value;
     605                void * value = entry.value;
    608606                unlock(key_lock);
    609607
     
    875873        //######################### Parallelism #########################
    876874
    877         int pthread_setaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
    878                 abort( "pthread_setaffinity_np" );
    879         } // pthread_setaffinity_np
    880 
    881         int pthread_getaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
    882                 abort( "pthread_getaffinity_np" );
    883         } // pthread_getaffinity_np
    884 
    885         int pthread_attr_setaffinity_np( pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
    886                 abort( "pthread_attr_setaffinity_np" );
    887         } // pthread_attr_setaffinity_np
    888 
    889         int pthread_attr_getaffinity_np( __const pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
    890                 abort( "pthread_attr_getaffinity_np" );
    891         } // pthread_attr_getaffinity_np
     875        // int pthread_setaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
     876        //      abort( "pthread_setaffinity_np" );
     877        // } // pthread_setaffinity_np
     878
     879        // int pthread_getaffinity_np( pthread_t /* __th */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
     880        //      abort( "pthread_getaffinity_np" );
     881        // } // pthread_getaffinity_np
     882
     883        // int pthread_attr_setaffinity_np( pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, __const cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
     884        //      abort( "pthread_attr_setaffinity_np" );
     885        // } // pthread_attr_setaffinity_np
     886
     887        // int pthread_attr_getaffinity_np( __const pthread_attr_t * /* __attr */, size_t /* __cpusetsize */, cpu_set_t * /* __cpuset */ ) libcfa_public __THROW {
     888        //      abort( "pthread_attr_getaffinity_np" );
     889        // } // pthread_attr_getaffinity_np
    892890
    893891        //######################### Cancellation #########################
     
    906904        } // pthread_cancel
    907905
    908         int pthread_setcancelstate( int state, int *oldstate ) libcfa_public __THROW {
     906        int pthread_setcancelstate( int state, int * oldstate ) libcfa_public __THROW {
    909907                abort("pthread_setcancelstate not implemented");
    910908                return 0;
    911909        } // pthread_setcancelstate
    912910
    913         int pthread_setcanceltype( int type, int *oldtype ) libcfa_public __THROW {
     911        int pthread_setcanceltype( int type, int * oldtype ) libcfa_public __THROW {
    914912                abort("pthread_setcanceltype not implemented");
    915913                return 0;
     
    918916
    919917#pragma GCC diagnostic pop
    920 
  • libcfa/src/concurrency/ready_queue.cfa

    r34b4268 r24d6572  
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918// #define __CFA_DEBUG_PRINT_READY_QUEUE__
  • libcfa/src/concurrency/thread.cfa

    r34b4268 r24d6572  
    1010// Created On       : Tue Jan 17 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Sun Dec 11 20:56:54 2022
    13 // Update Count     : 102
     12// Last Modified On : Mon Jan  9 08:42:33 2023
     13// Update Count     : 103
    1414//
    1515
    1616#define __cforall_thread__
    17 #define _GNU_SOURCE
    1817
    1918#include "thread.hfa"
     
    5453        preferred = ready_queue_new_preferred();
    5554        last_proc = 0p;
     55    link_node = 0p;
    5656        PRNG_SET_SEED( random_state, __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl() );
    5757        #if defined( __CFA_WITH_VERIFY__ )
     
    6060        #endif
    6161
    62         clh_node = malloc( );
    63         *clh_node = false;
    64 
    6562        doregister(curr_cluster, this);
    6663        monitors{ &self_mon_p, 1, (fptr_t)0 };
     
    7168                canary = 0xDEADDEADDEADDEADp;
    7269        #endif
    73         free(clh_node);
    7470        unregister(curr_cluster, this);
    7571        ^self_cor{};
  • libcfa/src/concurrency/thread.hfa

    r34b4268 r24d6572  
    1010// Created On       : Tue Jan 17 12:27:26 2017
    1111// Last Modified By : Peter A. Buhr
    12 // Last Modified On : Tue Nov 22 22:18:34 2022
    13 // Update Count     : 35
     12// Last Modified On : Thu Feb  2 11:27:59 2023
     13// Update Count     : 37
    1414//
    1515
     
    2727//-----------------------------------------------------------------------------
    2828// thread trait
    29 trait is_thread(T &) {
     29forall( T & )
     30trait is_thread {
    3031        void ^?{}(T& mutex this);
    3132        void main(T& this);
Note: See TracChangeset for help on using the changeset viewer.