Ignore:
Timestamp:
Mar 4, 2021, 7:40:25 PM (5 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
77d601f
Parents:
342af53 (diff), a5040fe (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
25 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/coroutine.cfa

    r342af53 r8e4aa05  
    4646
    4747//-----------------------------------------------------------------------------
    48 FORALL_DATA_INSTANCE(CoroutineCancelled, (dtype coroutine_t), (coroutine_t))
    49 
    50 forall(dtype T)
     48FORALL_DATA_INSTANCE(CoroutineCancelled, (coroutine_t &), (coroutine_t))
     49
     50forall(T &)
    5151void mark_exception(CoroutineCancelled(T) *) {}
    5252
    53 forall(dtype T)
     53forall(T &)
    5454void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src) {
    5555        dst->virtual_table = src->virtual_table;
     
    5858}
    5959
    60 forall(dtype T)
     60forall(T &)
    6161const char * msg(CoroutineCancelled(T) *) {
    6262        return "CoroutineCancelled(...)";
     
    6464
    6565// This code should not be inlined. It is the error path on resume.
    66 forall(dtype T | is_coroutine(T))
     66forall(T & | is_coroutine(T))
    6767void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc ) {
    6868        verify( desc->cancellation );
     
    148148// Part of the Public API
    149149// Not inline since only ever called once per coroutine
    150 forall(dtype T | is_coroutine(T))
     150forall(T & | is_coroutine(T))
    151151void prime(T& cor) {
    152152        $coroutine* this = get_coroutine(cor);
     
    196196
    197197void __stack_clean  ( __stack_info_t * this ) {
    198         size_t size = ((intptr_t)this->storage->base) - ((intptr_t)this->storage->limit) + sizeof(__stack_t);
    199198        void * storage = this->storage->limit;
    200199
    201200        #if CFA_COROUTINE_USE_MMAP
     201                size_t size = ((intptr_t)this->storage->base) - ((intptr_t)this->storage->limit) + sizeof(__stack_t);
    202202                storage = (void *)(((intptr_t)storage) - __page_size);
    203203                if(munmap(storage, size + __page_size) == -1) {
  • libcfa/src/concurrency/coroutine.hfa

    r342af53 r8e4aa05  
    2222//-----------------------------------------------------------------------------
    2323// Exception thrown from resume when a coroutine stack is cancelled.
    24 FORALL_DATA_EXCEPTION(CoroutineCancelled, (dtype coroutine_t), (coroutine_t)) (
     24FORALL_DATA_EXCEPTION(CoroutineCancelled, (coroutine_t &), (coroutine_t)) (
    2525        coroutine_t * the_coroutine;
    2626        exception_t * the_exception;
    2727);
    2828
    29 forall(dtype T)
     29forall(T &)
    3030void copy(CoroutineCancelled(T) * dst, CoroutineCancelled(T) * src);
    3131
    32 forall(dtype T)
     32forall(T &)
    3333const char * msg(CoroutineCancelled(T) *);
    3434
     
    3737// Anything that implements this trait can be resumed.
    3838// Anything that is resumed is a coroutine.
    39 trait is_coroutine(dtype T | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
     39trait is_coroutine(T & | IS_RESUMPTION_EXCEPTION(CoroutineCancelled, (T))) {
    4040        void main(T & this);
    4141        $coroutine * get_coroutine(T & this);
     
    6060//-----------------------------------------------------------------------------
    6161// Public coroutine API
    62 forall(dtype T | is_coroutine(T))
     62forall(T & | is_coroutine(T))
    6363void prime(T & cor);
    6464
     
    7272        void __cfactx_invoke_coroutine(void (*main)(void *), void * this);
    7373
    74         forall(dtype T)
     74        forall(T &)
    7575        void __cfactx_start(void (*main)(T &), struct $coroutine * cor, T & this, void (*invoke)(void (*main)(void *), void *));
    7676
     
    129129}
    130130
    131 forall(dtype T | is_coroutine(T))
     131forall(T & | is_coroutine(T))
    132132void __cfaehm_cancelled_coroutine( T & cor, $coroutine * desc );
    133133
    134134// Resume implementation inlined for performance
    135 forall(dtype T | is_coroutine(T))
     135forall(T & | is_coroutine(T))
    136136static inline T & resume(T & cor) {
    137137        // optimization : read TLS once and reuse it
  • libcfa/src/concurrency/future.hfa

    r342af53 r8e4aa05  
    1919#include "monitor.hfa"
    2020
    21 forall( otype T ) {
     21forall( T ) {
    2222        struct future {
    2323                inline future_t;
     
    5858}
    5959
    60 forall( otype T ) {
     60forall( T ) {
    6161        monitor multi_future {
    6262                inline future_t;
  • libcfa/src/concurrency/io.cfa

    r342af53 r8e4aa05  
    3232        extern "C" {
    3333                #include <sys/syscall.h>
     34                #include <sys/eventfd.h>
    3435
    3536                #include <linux/io_uring.h>
     
    4142        #include "io/types.hfa"
    4243
    43         static const char * opcodes[] = {
     44        __attribute__((unused)) static const char * opcodes[] = {
    4445                "OP_NOP",
    4546                "OP_READV",
     
    7980        };
    8081
    81         // returns true of acquired as leader or second leader
    82         static inline bool try_lock( __leaderlock_t & this ) {
    83                 const uintptr_t thrd = 1z | (uintptr_t)active_thread();
    84                 bool block;
    85                 disable_interrupts();
    86                 for() {
    87                         struct $thread * expected = this.value;
    88                         if( 1p != expected && 0p != expected ) {
    89                                 /* paranoid */ verify( thrd != (uintptr_t)expected ); // We better not already be the next leader
    90                                 enable_interrupts( __cfaabi_dbg_ctx );
    91                                 return false;
    92                         }
    93                         struct $thread * desired;
    94                         if( 0p == expected ) {
    95                                 // If the lock isn't locked acquire it, no need to block
    96                                 desired = 1p;
    97                                 block = false;
    98                         }
    99                         else {
    100                                 // If the lock is already locked try becomming the next leader
    101                                 desired = (struct $thread *)thrd;
    102                                 block = true;
    103                         }
    104                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    105                 }
    106                 if( block ) {
    107                         enable_interrupts( __cfaabi_dbg_ctx );
    108                         park();
    109                         disable_interrupts();
    110                 }
    111                 return true;
    112         }
    113 
    114         static inline bool next( __leaderlock_t & this ) {
     82        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor *, __u32 idxs[], __u32 want );
     83        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * , __u32 idxs[], __u32 have, bool lazy );
     84        static void __ioarbiter_flush ( $io_arbiter & mutex this, $io_context * );
     85        static inline void __ioarbiter_notify( $io_context & ctx );
     86//=============================================================================================
     87// I/O Polling
     88//=============================================================================================
     89        static inline unsigned __flush( struct $io_context & );
     90        static inline __u32 __release_sqes( struct $io_context & );
     91
     92        void __cfa_io_drain( processor * proc ) {
    11593                /* paranoid */ verify( ! __preemption_enabled() );
    116                 struct $thread * nextt;
    117                 for() {
    118                         struct $thread * expected = this.value;
    119                         /* paranoid */ verify( (1 & (uintptr_t)expected) == 1 ); // The lock better be locked
    120 
    121                         struct $thread * desired;
    122                         if( 1p == expected ) {
    123                                 // No next leader, just unlock
    124                                 desired = 0p;
    125                                 nextt   = 0p;
    126                         }
    127                         else {
    128                                 // There is a next leader, remove but keep locked
    129                                 desired = 1p;
    130                                 nextt   = (struct $thread *)(~1z & (uintptr_t)expected);
    131                         }
    132                         if( __atomic_compare_exchange_n(&this.value, &expected, desired, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST) ) break;
    133                 }
    134 
    135                 if(nextt) {
    136                         unpark( nextt );
    137                         enable_interrupts( __cfaabi_dbg_ctx );
    138                         return true;
    139                 }
    140                 enable_interrupts( __cfaabi_dbg_ctx );
    141                 return false;
    142         }
    143 
    144 //=============================================================================================
    145 // I/O Syscall
    146 //=============================================================================================
    147         static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
    148                 bool need_sys_to_submit = false;
    149                 bool need_sys_to_complete = false;
    150                 unsigned flags = 0;
    151 
    152                 TO_SUBMIT:
    153                 if( to_submit > 0 ) {
    154                         if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    155                                 need_sys_to_submit = true;
    156                                 break TO_SUBMIT;
    157                         }
    158                         if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
    159                                 need_sys_to_submit = true;
    160                                 flags |= IORING_ENTER_SQ_WAKEUP;
    161                         }
    162                 }
    163 
    164                 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
    165                         flags |= IORING_ENTER_GETEVENTS;
    166                         if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
    167                                 need_sys_to_complete = true;
    168                         }
    169                 }
    170 
    171                 int ret = 0;
    172                 if( need_sys_to_submit || need_sys_to_complete ) {
    173                         __cfadbg_print_safe(io_core, "Kernel I/O : IO_URING enter %d %u %u\n", ring.fd, to_submit, flags);
    174                         ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, (sigset_t *)0p, _NSIG / 8);
    175                         if( ret < 0 ) {
    176                                 switch((int)errno) {
    177                                 case EAGAIN:
    178                                 case EINTR:
    179                                         ret = -1;
    180                                         break;
    181                                 default:
    182                                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    183                                 }
    184                         }
    185                 }
    186 
    187                 // Memory barrier
    188                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    189                 return ret;
    190         }
    191 
    192 //=============================================================================================
    193 // I/O Polling
    194 //=============================================================================================
    195         static unsigned __collect_submitions( struct __io_data & ring );
    196         static __u32 __release_consumed_submission( struct __io_data & ring );
    197         static inline void __clean( volatile struct io_uring_sqe * sqe );
    198 
    199         // Process a single completion message from the io_uring
    200         // This is NOT thread-safe
    201         static inline void process( volatile struct io_uring_cqe & cqe ) {
    202                 struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
    203                 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
    204 
    205                 fulfil( *future, cqe.res );
    206         }
    207 
    208         static [int, bool] __drain_io( & struct __io_data ring ) {
    209                 /* paranoid */ verify( ! __preemption_enabled() );
    210 
    211                 unsigned to_submit = 0;
    212                 if( ring.poller_submits ) {
    213                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
    214                         to_submit = __collect_submitions( ring );
    215                 }
    216 
    217                 int ret = __io_uring_enter(ring, to_submit, true);
    218                 if( ret < 0 ) {
    219                         return [0, true];
    220                 }
    221 
    222                 // update statistics
    223                 if (to_submit > 0) {
    224                         __STATS__( true,
    225                                 if( to_submit > 0 ) {
    226                                         io.submit_q.submit_avg.rdy += to_submit;
    227                                         io.submit_q.submit_avg.csm += ret;
    228                                         io.submit_q.submit_avg.cnt += 1;
    229                                 }
    230                         )
    231                 }
    232 
    233                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
    234 
    235                 // Release the consumed SQEs
    236                 __release_consumed_submission( ring );
     94                /* paranoid */ verify( proc );
     95                /* paranoid */ verify( proc->io.ctx );
    23796
    23897                // Drain the queue
    239                 unsigned head = *ring.completion_q.head;
    240                 unsigned tail = *ring.completion_q.tail;
    241                 const __u32 mask = *ring.completion_q.mask;
    242 
    243                 // Nothing was new return 0
    244                 if (head == tail) {
    245                         return [0, to_submit > 0];
    246                 }
     98                $io_context * ctx = proc->io.ctx;
     99                unsigned head = *ctx->cq.head;
     100                unsigned tail = *ctx->cq.tail;
     101                const __u32 mask = *ctx->cq.mask;
    247102
    248103                __u32 count = tail - head;
    249                 /* paranoid */ verify( count != 0 );
     104                __STATS__( false, io.calls.drain++; io.calls.completed += count; )
     105
    250106                for(i; count) {
    251107                        unsigned idx = (head + i) & mask;
    252                         volatile struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
     108                        volatile struct io_uring_cqe & cqe = ctx->cq.cqes[idx];
    253109
    254110                        /* paranoid */ verify(&cqe);
    255111
    256                         process( cqe );
    257                 }
     112                        struct io_future_t * future = (struct io_future_t *)(uintptr_t)cqe.user_data;
     113                        __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future );
     114
     115                        fulfil( *future, cqe.res );
     116                }
     117
     118                __cfadbg_print_safe(io, "Kernel I/O : %u completed\n", count);
    258119
    259120                // Mark to the kernel that the cqe has been seen
    260121                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
    261                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_SEQ_CST );
    262 
    263                 return [count, count > 0 || to_submit > 0];
    264         }
    265 
    266         void main( $io_ctx_thread & this ) {
    267                 __ioctx_register( this );
    268 
    269                 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %d (%p) ready\n", this.ring->fd, &this);
    270 
    271                 const int reset_cnt = 5;
    272                 int reset = reset_cnt;
    273                 // Then loop until we need to start
    274                 LOOP:
    275                 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
    276                         // Drain the io
    277                         int count;
    278                         bool again;
    279                         disable_interrupts();
    280                                 [count, again] = __drain_io( *this.ring );
    281 
    282                                 if(!again) reset--;
    283 
     122                __atomic_store_n( ctx->cq.head, head + count, __ATOMIC_SEQ_CST );
     123
     124                /* paranoid */ verify( ! __preemption_enabled() );
     125
     126                return;
     127        }
     128
     129        void __cfa_io_flush( processor * proc ) {
     130                /* paranoid */ verify( ! __preemption_enabled() );
     131                /* paranoid */ verify( proc );
     132                /* paranoid */ verify( proc->io.ctx );
     133
     134                $io_context & ctx = *proc->io.ctx;
     135
     136                if(!ctx.ext_sq.empty) {
     137                        __ioarbiter_flush( *ctx.arbiter, &ctx );
     138                }
     139
     140                __STATS__( true, io.calls.flush++; )
     141                int ret = syscall( __NR_io_uring_enter, ctx.fd, ctx.sq.to_submit, 0, 0, (sigset_t *)0p, _NSIG / 8);
     142                if( ret < 0 ) {
     143                        switch((int)errno) {
     144                        case EAGAIN:
     145                        case EINTR:
     146                        case EBUSY:
    284147                                // Update statistics
    285                                 __STATS__( true,
    286                                         io.complete_q.completed_avg.val += count;
    287                                         io.complete_q.completed_avg.cnt += 1;
    288                                 )
    289                         enable_interrupts( __cfaabi_dbg_ctx );
    290 
    291                         // If we got something, just yield and check again
    292                         if(reset > 1) {
    293                                 yield();
    294                                 continue LOOP;
     148                                __STATS__( false, io.calls.errors.busy ++; )
     149                                return;
     150                        default:
     151                                abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    295152                        }
    296 
    297                         // We alread failed to find completed entries a few time.
    298                         if(reset == 1) {
    299                                 // Rearm the context so it can block
    300                                 // but don't block right away
    301                                 // we need to retry one last time in case
    302                                 // something completed *just now*
    303                                 __ioctx_prepare_block( this );
    304                                 continue LOOP;
    305                         }
    306 
    307                                 __STATS__( false,
    308                                         io.complete_q.blocks += 1;
    309                                 )
    310                                 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %d (%p)\n", this.ring->fd, &this);
    311 
    312                                 // block this thread
    313                                 wait( this.sem );
    314 
    315                         // restore counter
    316                         reset = reset_cnt;
    317                 }
    318 
    319                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller %d (%p) stopping\n", this.ring->fd, &this);
     153                }
     154
     155                __cfadbg_print_safe(io, "Kernel I/O : %u submitted to io_uring %d\n", ret, ctx.fd);
     156                __STATS__( true, io.calls.submitted += ret; )
     157                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     158                /* paranoid */ verify( ctx.sq.to_submit >= ret );
     159
     160                ctx.sq.to_submit -= ret;
     161
     162                /* paranoid */ verify( ctx.sq.to_submit <= *ctx.sq.num );
     163
     164                // Release the consumed SQEs
     165                __release_sqes( ctx );
     166
     167                /* paranoid */ verify( ! __preemption_enabled() );
     168
     169                ctx.proc->io.pending = false;
    320170        }
    321171
     
    339189//         head and tail must be fully filled and shouldn't ever be touched again.
    340190//
     191        //=============================================================================================
     192        // Allocation
     193        // for user's convenience fill the sqes from the indexes
     194        static inline void __fill(struct io_uring_sqe * out_sqes[], __u32 want, __u32 idxs[], struct $io_context * ctx)  {
     195                struct io_uring_sqe * sqes = ctx->sq.sqes;
     196                for(i; want) {
     197                        __cfadbg_print_safe(io, "Kernel I/O : filling loop\n");
     198                        out_sqes[i] = &sqes[idxs[i]];
     199                }
     200        }
     201
     202        // Try to directly allocate from the a given context
     203        // Not thread-safe
     204        static inline bool __alloc(struct $io_context * ctx, __u32 idxs[], __u32 want) {
     205                __sub_ring_t & sq = ctx->sq;
     206                const __u32 mask  = *sq.mask;
     207                __u32 fhead = sq.free_ring.head;    // get the current head of the queue
     208                __u32 ftail = sq.free_ring.tail;    // get the current tail of the queue
     209
     210                // If we don't have enough sqes, fail
     211                if((ftail - fhead) < want) { return false; }
     212
     213                // copy all the indexes we want from the available list
     214                for(i; want) {
     215                        __cfadbg_print_safe(io, "Kernel I/O : allocating loop\n");
     216                        idxs[i] = sq.free_ring.array[(fhead + i) & mask];
     217                }
     218
     219                // Advance the head to mark the indexes as consumed
     220                __atomic_store_n(&sq.free_ring.head, fhead + want, __ATOMIC_RELEASE);
     221
     222                // return success
     223                return true;
     224        }
    341225
    342226        // Allocate an submit queue entry.
     
    345229        // for convenience, return both the index and the pointer to the sqe
    346230        // sqe == &sqes[idx]
    347         [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) {
    348                 /* paranoid */ verify( data != 0 );
    349 
    350                 // Prepare the data we need
    351                 __attribute((unused)) int len   = 0;
    352                 __attribute((unused)) int block = 0;
    353                 __u32 cnt = *ring.submit_q.num;
    354                 __u32 mask = *ring.submit_q.mask;
    355 
    356                 __u32 off = thread_rand();
    357 
    358                 // Loop around looking for an available spot
    359                 for() {
    360                         // Look through the list starting at some offset
    361                         for(i; cnt) {
    362                                 __u64 expected = 3;
    363                                 __u32 idx = (i + off) & mask; // Get an index from a random
    364                                 volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    365                                 volatile __u64 * udata = &sqe->user_data;
    366 
    367                                 // Allocate the entry by CASing the user_data field from 0 to the future address
    368                                 if( *udata == expected &&
    369                                         __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
    370                                 {
    371                                         // update statistics
    372                                         __STATS__( false,
    373                                                 io.submit_q.alloc_avg.val   += len;
    374                                                 io.submit_q.alloc_avg.block += block;
    375                                                 io.submit_q.alloc_avg.cnt   += 1;
    376                                         )
    377 
    378                                         // debug log
    379                                         __cfadbg_print_safe( io, "Kernel I/O : allocated [%p, %u] for %p (%p)\n", sqe, idx, active_thread(), (void*)data );
    380 
    381                                         // Success return the data
    382                                         return [sqe, idx];
    383                                 }
    384                                 verify(expected != data);
    385 
    386                                 // This one was used
    387                                 len ++;
    388                         }
    389 
    390                         block++;
    391 
    392                         abort( "Kernel I/O : all submit queue entries used, yielding\n" );
    393 
    394                         yield();
    395                 }
    396         }
    397 
    398         static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) {
    399                 /* paranoid */ verify( idx <= mask   );
    400                 /* paranoid */ verify( idx != -1ul32 );
    401 
    402                 // We need to find a spot in the ready array
    403                 __attribute((unused)) int len   = 0;
    404                 __attribute((unused)) int block = 0;
    405                 __u32 ready_mask = ring.submit_q.ready_cnt - 1;
    406 
    407                 __u32 off = thread_rand();
    408 
    409                 __u32 picked;
    410                 LOOKING: for() {
    411                         for(i; ring.submit_q.ready_cnt) {
    412                                 picked = (i + off) & ready_mask;
    413                                 __u32 expected = -1ul32;
    414                                 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
    415                                         break LOOKING;
    416                                 }
    417                                 verify(expected != idx);
    418 
    419                                 len ++;
    420                         }
    421 
    422                         block++;
    423 
    424                         __u32 released = __release_consumed_submission( ring );
    425                         if( released == 0 ) {
    426                                 yield();
    427                         }
    428                 }
    429 
    430                 // update statistics
    431                 __STATS__( false,
    432                         io.submit_q.look_avg.val   += len;
    433                         io.submit_q.look_avg.block += block;
    434                         io.submit_q.look_avg.cnt   += 1;
    435                 )
    436 
    437                 return picked;
    438         }
    439 
    440         void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) {
    441                 __io_data & ring = *ctx->thrd.ring;
    442 
     231        struct $io_context * cfa_io_allocate(struct io_uring_sqe * sqes[], __u32 idxs[], __u32 want) {
     232                __cfadbg_print_safe(io, "Kernel I/O : attempting to allocate %u\n", want);
     233
     234                disable_interrupts();
     235                processor * proc = __cfaabi_tls.this_processor;
     236                $io_context * ctx = proc->io.ctx;
     237                /* paranoid */ verify( __cfaabi_tls.this_processor );
     238                /* paranoid */ verify( ctx );
     239
     240                __cfadbg_print_safe(io, "Kernel I/O : attempting to fast allocation\n");
     241
     242                // We can proceed to the fast path
     243                if( __alloc(ctx, idxs, want) ) {
     244                        // Allocation was successful
     245                        __STATS__( true, io.alloc.fast += 1; )
     246                        enable_interrupts( __cfaabi_dbg_ctx );
     247
     248                        __cfadbg_print_safe(io, "Kernel I/O : fast allocation successful from ring %d\n", ctx->fd);
     249
     250                        __fill( sqes, want, idxs, ctx );
     251                        return ctx;
     252                }
     253                // The fast path failed, fallback
     254                __STATS__( true, io.alloc.fail += 1; )
     255
     256                // Fast path failed, fallback on arbitration
     257                __STATS__( true, io.alloc.slow += 1; )
     258                enable_interrupts( __cfaabi_dbg_ctx );
     259
     260                $io_arbiter * ioarb = proc->cltr->io.arbiter;
     261                /* paranoid */ verify( ioarb );
     262
     263                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for allocation\n");
     264
     265                struct $io_context * ret = __ioarbiter_allocate(*ioarb, proc, idxs, want);
     266
     267                __cfadbg_print_safe(io, "Kernel I/O : slow allocation completed from ring %d\n", ret->fd);
     268
     269                __fill( sqes, want, idxs,ret );
     270                return ret;
     271        }
     272
     273
     274        //=============================================================================================
     275        // submission
     276        static inline void __submit( struct $io_context * ctx, __u32 idxs[], __u32 have, bool lazy) {
     277                // We can proceed to the fast path
     278                // Get the right objects
     279                __sub_ring_t & sq = ctx->sq;
     280                const __u32 mask  = *sq.mask;
     281                __u32 tail = *sq.kring.tail;
     282
     283                // Add the sqes to the array
     284                for( i; have ) {
     285                        __cfadbg_print_safe(io, "Kernel I/O : __submit loop\n");
     286                        sq.kring.array[ (tail + i) & mask ] = idxs[i];
     287                }
     288
     289                // Make the sqes visible to the submitter
     290                __atomic_store_n(sq.kring.tail, tail + have, __ATOMIC_RELEASE);
     291                sq.to_submit++;
     292
     293                ctx->proc->io.pending = true;
     294                ctx->proc->io.dirty   = true;
     295                if(sq.to_submit > 30 || !lazy) {
     296                        __cfa_io_flush( ctx->proc );
     297                }
     298        }
     299
     300        void cfa_io_submit( struct $io_context * inctx, __u32 idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1))) {
     301                __cfadbg_print_safe(io, "Kernel I/O : attempting to submit %u (%s)\n", have, lazy ? "lazy" : "eager");
     302
     303                disable_interrupts();
     304                processor * proc = __cfaabi_tls.this_processor;
     305                $io_context * ctx = proc->io.ctx;
     306                /* paranoid */ verify( __cfaabi_tls.this_processor );
     307                /* paranoid */ verify( ctx );
     308
     309                // Can we proceed to the fast path
     310                if( ctx == inctx )              // We have the right instance?
    443311                {
    444                         __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
    445                         __cfadbg_print_safe( io,
    446                                 "Kernel I/O : submitting %u (%p) for %p\n"
    447                                 "    data: %p\n"
    448                                 "    opcode: %s\n"
    449                                 "    fd: %d\n"
    450                                 "    flags: %d\n"
    451                                 "    prio: %d\n"
    452                                 "    off: %p\n"
    453                                 "    addr: %p\n"
    454                                 "    len: %d\n"
    455                                 "    other flags: %d\n"
    456                                 "    splice fd: %d\n"
    457                                 "    pad[0]: %llu\n"
    458                                 "    pad[1]: %llu\n"
    459                                 "    pad[2]: %llu\n",
    460                                 idx, sqe,
    461                                 active_thread(),
    462                                 (void*)sqe->user_data,
    463                                 opcodes[sqe->opcode],
    464                                 sqe->fd,
    465                                 sqe->flags,
    466                                 sqe->ioprio,
    467                                 sqe->off,
    468                                 sqe->addr,
    469                                 sqe->len,
    470                                 sqe->accept_flags,
    471                                 sqe->splice_fd_in,
    472                                 sqe->__pad2[0],
    473                                 sqe->__pad2[1],
    474                                 sqe->__pad2[2]
    475                         );
    476                 }
    477 
    478 
    479                 // Get now the data we definetely need
    480                 volatile __u32 * const tail = ring.submit_q.tail;
    481                 const __u32 mask  = *ring.submit_q.mask;
    482 
    483                 // There are 2 submission schemes, check which one we are using
    484                 if( ring.poller_submits ) {
    485                         // If the poller thread submits, then we just need to add this to the ready array
    486                         __submit_to_ready_array( ring, idx, mask );
    487 
    488                         post( ctx->thrd.sem );
    489 
    490                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
    491                 }
    492                 else if( ring.eager_submits ) {
    493                         __u32 picked = __submit_to_ready_array( ring, idx, mask );
    494 
    495                         #if defined(LEADER_LOCK)
    496                                 if( !try_lock(ring.submit_q.submit_lock) ) {
    497                                         __STATS__( false,
    498                                                 io.submit_q.helped += 1;
    499                                         )
    500                                         return;
    501                                 }
    502                                 /* paranoid */ verify( ! __preemption_enabled() );
    503                                 __STATS__( true,
    504                                         io.submit_q.leader += 1;
    505                                 )
    506                         #else
    507                                 for() {
    508                                         yield();
    509 
    510                                         if( try_lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2) ) {
    511                                                 __STATS__( false,
    512                                                         io.submit_q.leader += 1;
    513                                                 )
    514                                                 break;
    515                                         }
    516 
    517                                         // If some one else collected our index, we are done
    518                                         #warning ABA problem
    519                                         if( ring.submit_q.ready[picked] != idx ) {
    520                                                 __STATS__( false,
    521                                                         io.submit_q.helped += 1;
    522                                                 )
    523                                                 return;
    524                                         }
    525 
    526                                         __STATS__( false,
    527                                                 io.submit_q.busy += 1;
    528                                         )
    529                                 }
    530                         #endif
    531 
    532                         // We got the lock
    533                         // Collect the submissions
    534                         unsigned to_submit = __collect_submitions( ring );
    535 
    536                         // Actually submit
    537                         int ret = __io_uring_enter( ring, to_submit, false );
    538 
    539                         #if defined(LEADER_LOCK)
    540                                 /* paranoid */ verify( ! __preemption_enabled() );
    541                                 next(ring.submit_q.submit_lock);
    542                         #else
    543                                 unlock(ring.submit_q.submit_lock);
    544                         #endif
    545                         if( ret < 0 ) {
    546                                 return;
    547                         }
    548 
    549                         // Release the consumed SQEs
    550                         __release_consumed_submission( ring );
    551 
    552                         // update statistics
    553                         __STATS__( false,
    554                                 io.submit_q.submit_avg.rdy += to_submit;
    555                                 io.submit_q.submit_avg.csm += ret;
    556                                 io.submit_q.submit_avg.cnt += 1;
    557                         )
    558 
    559                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u (among %u) for %p\n", idx, ret, active_thread() );
    560                 }
    561                 else
    562                 {
    563                         // get mutual exclusion
    564                         #if defined(LEADER_LOCK)
    565                                 while(!try_lock(ring.submit_q.submit_lock));
    566                         #else
    567                                 lock(ring.submit_q.submit_lock __cfaabi_dbg_ctx2);
    568                         #endif
    569 
    570                         /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 3ul64,
    571                         /* paranoid */  "index %u already reclaimed\n"
    572                         /* paranoid */  "head %u, prev %u, tail %u\n"
    573                         /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
    574                         /* paranoid */  idx,
    575                         /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
    576                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
    577                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
    578                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
    579                         /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
    580                         /* paranoid */ );
    581 
    582                         // Append to the list of ready entries
    583 
    584                         /* paranoid */ verify( idx <= mask );
    585                         ring.submit_q.array[ (*tail) & mask ] = idx;
    586                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    587 
    588                         // Submit however, many entries need to be submitted
    589                         int ret = __io_uring_enter( ring, 1, false );
    590                         if( ret < 0 ) {
    591                                 switch((int)errno) {
    592                                 default:
    593                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
    594                                 }
    595                         }
    596 
    597                         /* paranoid */ verify(ret == 1);
    598 
    599                         // update statistics
    600                         __STATS__( false,
    601                                 io.submit_q.submit_avg.csm += 1;
    602                                 io.submit_q.submit_avg.cnt += 1;
    603                         )
    604 
    605                         {
    606                                 __attribute__((unused)) volatile __u32 * const head = ring.submit_q.head;
    607                                 __attribute__((unused)) __u32 last_idx = ring.submit_q.array[ ((*head) - 1) & mask ];
    608                                 __attribute__((unused)) volatile struct io_uring_sqe * sqe = &ring.submit_q.sqes[last_idx];
    609 
    610                                 __cfadbg_print_safe( io,
    611                                         "Kernel I/O : last submitted is %u (%p)\n"
    612                                         "    data: %p\n"
    613                                         "    opcode: %s\n"
    614                                         "    fd: %d\n"
    615                                         "    flags: %d\n"
    616                                         "    prio: %d\n"
    617                                         "    off: %p\n"
    618                                         "    addr: %p\n"
    619                                         "    len: %d\n"
    620                                         "    other flags: %d\n"
    621                                         "    splice fd: %d\n"
    622                                         "    pad[0]: %llu\n"
    623                                         "    pad[1]: %llu\n"
    624                                         "    pad[2]: %llu\n",
    625                                         last_idx, sqe,
    626                                         (void*)sqe->user_data,
    627                                         opcodes[sqe->opcode],
    628                                         sqe->fd,
    629                                         sqe->flags,
    630                                         sqe->ioprio,
    631                                         sqe->off,
    632                                         sqe->addr,
    633                                         sqe->len,
    634                                         sqe->accept_flags,
    635                                         sqe->splice_fd_in,
    636                                         sqe->__pad2[0],
    637                                         sqe->__pad2[1],
    638                                         sqe->__pad2[2]
    639                                 );
    640                         }
    641 
    642                         __atomic_thread_fence( __ATOMIC_SEQ_CST );
    643                         // Release the consumed SQEs
    644                         __release_consumed_submission( ring );
    645                         // ring.submit_q.sqes[idx].user_data = 3ul64;
    646 
    647                         #if defined(LEADER_LOCK)
    648                                 next(ring.submit_q.submit_lock);
    649                         #else
    650                                 unlock(ring.submit_q.submit_lock);
    651                         #endif
    652 
    653                         __cfadbg_print_safe( io, "Kernel I/O : submitted %u for %p\n", idx, active_thread() );
    654                 }
    655         }
    656 
    657         // #define PARTIAL_SUBMIT 32
    658 
    659         // go through the list of submissions in the ready array and moved them into
    660         // the ring's submit queue
    661         static unsigned __collect_submitions( struct __io_data & ring ) {
    662                 /* paranoid */ verify( ring.submit_q.ready != 0p );
    663                 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
    664 
    665                 unsigned to_submit = 0;
    666                 __u32 tail = *ring.submit_q.tail;
    667                 const __u32 mask = *ring.submit_q.mask;
    668                 #if defined(PARTIAL_SUBMIT)
    669                         #if defined(LEADER_LOCK)
    670                                 #error PARTIAL_SUBMIT and LEADER_LOCK cannot co-exist
    671                         #endif
    672                         const __u32 cnt = ring.submit_q.ready_cnt > PARTIAL_SUBMIT ? PARTIAL_SUBMIT : ring.submit_q.ready_cnt;
    673                         const __u32 offset = ring.submit_q.prev_ready;
    674                         ring.submit_q.prev_ready += cnt;
    675                 #else
    676                         const __u32 cnt = ring.submit_q.ready_cnt;
    677                         const __u32 offset = 0;
    678                 #endif
    679 
    680                 // Go through the list of ready submissions
    681                 for( c; cnt ) {
    682                         __u32 i = (offset + c) % ring.submit_q.ready_cnt;
    683 
    684                         // replace any submission with the sentinel, to consume it.
    685                         __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
    686 
    687                         // If it was already the sentinel, then we are done
    688                         if( idx == -1ul32 ) continue;
    689 
    690                         // If we got a real submission, append it to the list
    691                         ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
    692                         to_submit++;
    693                 }
    694 
    695                 // Increment the tail based on how many we are ready to submit
    696                 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
    697 
    698                 return to_submit;
    699         }
    700 
     312                        __submit(ctx, idxs, have, lazy);
     313
     314                        // Mark the instance as no longer in-use, re-enable interrupts and return
     315                        __STATS__( true, io.submit.fast += 1; )
     316                        enable_interrupts( __cfaabi_dbg_ctx );
     317
     318                        __cfadbg_print_safe(io, "Kernel I/O : submitted on fast path\n");
     319                        return;
     320                }
     321
     322                // Fast path failed, fallback on arbitration
     323                __STATS__( true, io.submit.slow += 1; )
     324                enable_interrupts( __cfaabi_dbg_ctx );
     325
     326                __cfadbg_print_safe(io, "Kernel I/O : falling back on arbiter for submission\n");
     327
     328                __ioarbiter_submit(*inctx->arbiter, inctx, idxs, have, lazy);
     329        }
     330
     331        //=============================================================================================
     332        // Flushing
    701333        // Go through the ring's submit queue and release everything that has already been consumed
    702334        // by io_uring
    703         static __u32 __release_consumed_submission( struct __io_data & ring ) {
    704                 const __u32 smask = *ring.submit_q.mask;
    705 
    706                 // We need to get the lock to copy the old head and new head
    707                 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
     335        // This cannot be done by multiple threads
     336        static __u32 __release_sqes( struct $io_context & ctx ) {
     337                const __u32 mask = *ctx.sq.mask;
     338
    708339                __attribute__((unused))
    709                 __u32 ctail = *ring.submit_q.tail;        // get the current tail of the queue
    710                 __u32 chead = *ring.submit_q.head;              // get the current head of the queue
    711                 __u32 phead = ring.submit_q.prev_head;  // get the head the last time we were here
    712                 ring.submit_q.prev_head = chead;                // note up to were we processed
    713                 unlock(ring.submit_q.release_lock);
     340                __u32 ctail = *ctx.sq.kring.tail;    // get the current tail of the queue
     341                __u32 chead = *ctx.sq.kring.head;        // get the current head of the queue
     342                __u32 phead = ctx.sq.kring.released; // get the head the last time we were here
     343
     344                __u32 ftail = ctx.sq.free_ring.tail;  // get the current tail of the queue
    714345
    715346                // the 3 fields are organized like this diagram
     
    730361                __u32 count = chead - phead;
    731362
     363                if(count == 0) {
     364                        return 0;
     365                }
     366
    732367                // We acquired an previous-head/current-head range
    733368                // go through the range and release the sqes
    734369                for( i; count ) {
    735                         __u32 idx = ring.submit_q.array[ (phead + i) & smask ];
    736 
    737                         /* paranoid */ verify( 0 != ring.submit_q.sqes[ idx ].user_data );
    738                         __clean( &ring.submit_q.sqes[ idx ] );
    739                 }
     370                        __cfadbg_print_safe(io, "Kernel I/O : release loop\n");
     371                        __u32 idx = ctx.sq.kring.array[ (phead + i) & mask ];
     372                        ctx.sq.free_ring.array[ (ftail + i) & mask ] = idx;
     373                }
     374
     375                ctx.sq.kring.released = chead;          // note up to were we processed
     376                __atomic_store_n(&ctx.sq.free_ring.tail, ftail + count, __ATOMIC_SEQ_CST);
     377
     378                __ioarbiter_notify(ctx);
     379
    740380                return count;
    741381        }
    742382
    743         void __sqe_clean( volatile struct io_uring_sqe * sqe ) {
    744                 __clean( sqe );
    745         }
    746 
    747         static inline void __clean( volatile struct io_uring_sqe * sqe ) {
    748                 // If we are in debug mode, thrash the fields to make sure we catch reclamation errors
    749                 __cfaabi_dbg_debug_do(
    750                         memset(sqe, 0xde, sizeof(*sqe));
    751                         sqe->opcode = (sizeof(opcodes) / sizeof(const char *)) - 1;
    752                 );
    753 
    754                 // Mark the entry as unused
    755                 __atomic_store_n(&sqe->user_data, 3ul64, __ATOMIC_SEQ_CST);
     383//=============================================================================================
     384// I/O Arbiter
     385//=============================================================================================
     386        static $io_context * __ioarbiter_allocate( $io_arbiter & mutex this, processor * proc, __u32 idxs[], __u32 want ) {
     387                __cfadbg_print_safe(io, "Kernel I/O : arbiter allocating\n");
     388
     389                __STATS__( false, io.alloc.block += 1; )
     390
     391                // No one has any resources left, wait for something to finish
     392                // Mark as pending
     393                __atomic_store_n( &this.pending.flag, true, __ATOMIC_SEQ_CST );
     394
     395                // Wait for our turn to submit
     396                wait( this.pending.blocked, want );
     397
     398                __attribute((unused)) bool ret =
     399                __alloc( this.pending.ctx, idxs, want);
     400                /* paranoid */ verify( ret );
     401
     402                return this.pending.ctx;
     403
     404        }
     405
     406        static void __ioarbiter_notify( $io_arbiter & mutex this, $io_context * ctx ) {
     407                /* paranoid */ verify( !is_empty(this.pending.blocked) );
     408                this.pending.ctx = ctx;
     409
     410                while( !is_empty(this.pending.blocked) ) {
     411                        __cfadbg_print_safe(io, "Kernel I/O : notifying\n");
     412                        __u32 have = ctx->sq.free_ring.tail - ctx->sq.free_ring.head;
     413                        __u32 want = front( this.pending.blocked );
     414
     415                        if( have > want ) return;
     416
     417                        signal_block( this.pending.blocked );
     418                }
     419
     420                this.pending.flag = false;
     421        }
     422
     423        static void __ioarbiter_notify( $io_context & ctx ) {
     424                if(__atomic_load_n( &ctx.arbiter->pending.flag, __ATOMIC_SEQ_CST)) {
     425                        __ioarbiter_notify( *ctx.arbiter, &ctx );
     426                }
     427        }
     428
     429        // Simply append to the pending
     430        static void __ioarbiter_submit( $io_arbiter & mutex this, $io_context * ctx, __u32 idxs[], __u32 have, bool lazy ) {
     431                __cfadbg_print_safe(io, "Kernel I/O : submitting %u from the arbiter to context %u\n", have, ctx->fd);
     432
     433                /* paranoid */ verify( &this == ctx->arbiter );
     434
     435                // Mark as pending
     436                __atomic_store_n( &ctx->ext_sq.empty, false, __ATOMIC_SEQ_CST );
     437
     438                __cfadbg_print_safe(io, "Kernel I/O : waiting to submit %u\n", have);
     439
     440                // Wait for our turn to submit
     441                wait( ctx->ext_sq.blocked );
     442
     443                // Submit our indexes
     444                __submit(ctx, idxs, have, lazy);
     445
     446                __cfadbg_print_safe(io, "Kernel I/O : %u submitted from arbiter\n", have);
     447        }
     448
     449        static void __ioarbiter_flush( $io_arbiter & mutex this, $io_context * ctx ) {
     450                /* paranoid */ verify( &this == ctx->arbiter );
     451
     452                __STATS__( false, io.flush.external += 1; )
     453
     454                __cfadbg_print_safe(io, "Kernel I/O : arbiter flushing\n");
     455
     456                condition & blcked = ctx->ext_sq.blocked;
     457                /* paranoid */ verify( ctx->ext_sq.empty == is_empty( blcked ) );
     458                while(!is_empty( blcked )) {
     459                        signal_block( blcked );
     460                }
     461
     462                ctx->ext_sq.empty = true;
    756463        }
    757464#endif
  • libcfa/src/concurrency/io/call.cfa.in

    r342af53 r8e4aa05  
    5454                        | IOSQE_IO_DRAIN
    5555                #endif
     56                #if defined(CFA_HAVE_IOSQE_IO_LINK)
     57                        | IOSQE_IO_LINK
     58                #endif
     59                #if defined(CFA_HAVE_IOSQE_IO_HARDLINK)
     60                        | IOSQE_IO_HARDLINK
     61                #endif
    5662                #if defined(CFA_HAVE_IOSQE_ASYNC)
    5763                        | IOSQE_ASYNC
    5864                #endif
    59         ;
    60 
    61         static const __u32 LINK_FLAGS = 0
    62                 #if defined(CFA_HAVE_IOSQE_IO_LINK)
    63                         | IOSQE_IO_LINK
    64                 #endif
    65                 #if defined(CFA_HAVE_IOSQE_IO_HARDLINK)
    66                         | IOSQE_IO_HARDLINK
     65                #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED)
     66                        | IOSQE_BUFFER_SELECTED
    6767                #endif
    6868        ;
     
    7474        ;
    7575
    76         extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
    77         extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
    78 
    79         static inline io_context * __get_io_context( void ) {
    80                 cluster * cltr = active_cluster();
    81 
    82                 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n");
    83                 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr );
    84 
    85                 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr);
    86                 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ];
    87         }
     76        extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
     77        extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    8878#endif
    8979
     
    9888
    9989extern "C" {
    100         #include <sys/types.h>
     90        #include <asm/types.h>
    10191        #include <sys/socket.h>
    10292        #include <sys/syscall.h>
     
    142132        extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    143133
    144         extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
     134        extern ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags);
    145135        extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags);
    146136}
     
    195185                return ', '.join(args_a)
    196186
    197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{
     187AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{
    198188        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op})
    199189                ssize_t res = {name}({args});
     
    205195                }}
    206196        #else
    207                 // we don't support LINK yet
    208                 if( 0 != (submit_flags & LINK_FLAGS) ) {{
    209                         errno = ENOTSUP; return -1;
    210                 }}
    211 
    212                 if( !context ) {{
    213                         context = __get_io_context();
    214                 }}
    215                 if(cancellation) {{
    216                         cancellation->target = (__u64)(uintptr_t)&future;
    217                 }}
    218 
    219197                __u8 sflags = REGULAR_FLAGS & submit_flags;
    220                 struct __io_data & ring = *context->thrd.ring;
    221 
    222198                __u32 idx;
    223199                struct io_uring_sqe * sqe;
    224                 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     200                struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 );
    225201
    226202                sqe->opcode = IORING_OP_{op};
     203                sqe->user_data = (__u64)(uintptr_t)&future;
    227204                sqe->flags = sflags;
    228205                sqe->ioprio = 0;
     
    239216
    240217                verify( sqe->user_data == (__u64)(uintptr_t)&future );
    241                 __submit( context, idx );
     218                cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) );
    242219        #endif
    243220}}"""
    244221
    245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{
    246         if( timeout >= 0 ) {{
    247                 errno = ENOTSUP;
    248                 return -1;
    249         }}
     222SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{
    250223        io_future_t future;
    251224
    252         async_{name}( future, {args}, submit_flags, cancellation, context );
     225        async_{name}( future, {args}, submit_flags );
    253226
    254227        wait( future );
     
    393366        }),
    394367        # CFA_HAVE_IORING_OP_SPLICE
    395         Call('SPLICE', 'ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags)', {
     368        Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', {
    396369                'splice_fd_in': 'fd_in',
    397370                'splice_off_in': 'off_in ? (__u64)*off_in : (__u64)-1',
     
    415388        if c.define:
    416389                print("""#if defined({define})
    417         {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     390        {ret} cfa_{name}({params}, __u64 submit_flags);
    418391#endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params))
    419392        else:
    420                 print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);"
     393                print("{ret} cfa_{name}({params}, __u64 submit_flags);"
    421394                .format(ret=c.ret, name=c.name, params=c.params))
    422395
     
    426399        if c.define:
    427400                print("""#if defined({define})
    428         void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);
     401        void async_{name}(io_future_t & future, {params}, __u64 submit_flags);
    429402#endif""".format(define=c.define,name=c.name, params=c.params))
    430403        else:
    431                 print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);"
     404                print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"
    432405                .format(name=c.name, params=c.params))
    433406print("\n")
     
    474447
    475448print("""
    476 //-----------------------------------------------------------------------------
    477 bool cancel(io_cancellation & this) {
    478         #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
    479                 return false;
    480         #else
    481                 io_future_t future;
    482 
    483                 io_context * context = __get_io_context();
    484 
    485                 __u8 sflags = 0;
    486                 struct __io_data & ring = *context->thrd.ring;
    487 
    488                 __u32 idx;
    489                 volatile struct io_uring_sqe * sqe;
    490                 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    491 
    492                 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
    493                 sqe->opcode = IORING_OP_ASYNC_CANCEL;
    494                 sqe->flags = sflags;
    495                 sqe->addr = this.target;
    496 
    497                 verify( sqe->user_data == (__u64)(uintptr_t)&future );
    498                 __submit( context, idx );
    499 
    500                 wait(future);
    501 
    502                 if( future.result == 0 ) return true; // Entry found
    503                 if( future.result == -EALREADY) return true; // Entry found but in progress
    504                 if( future.result == -ENOENT ) return false; // Entry not found
    505                 return false;
    506         #endif
    507 }
    508 
    509449//-----------------------------------------------------------------------------
    510450// Check if a function is has asynchronous
  • libcfa/src/concurrency/io/setup.cfa

    r342af53 r8e4aa05  
    2626
    2727#if !defined(CFA_HAVE_LINUX_IO_URING_H)
    28         void __kernel_io_startup() {
    29                 // Nothing to do without io_uring
    30         }
    31 
    32         void __kernel_io_shutdown() {
    33                 // Nothing to do without io_uring
    34         }
    35 
    3628        void ?{}(io_context_params & this) {}
    3729
    38         void ?{}(io_context & this, struct cluster & cl) {}
    39         void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
    40 
    41         void ^?{}(io_context & this) {}
    42         void ^?{}(io_context & this, bool cluster_context) {}
     30        void  ?{}($io_context & this, struct cluster & cl) {}
     31        void ^?{}($io_context & this) {}
     32
     33        void __cfa_io_start( processor * proc ) {}
     34        void __cfa_io_flush( processor * proc ) {}
     35        void __cfa_io_stop ( processor * proc ) {}
     36
     37        $io_arbiter * create(void) { return 0p; }
     38        void destroy($io_arbiter *) {}
    4339
    4440#else
     
    6561        void ?{}(io_context_params & this) {
    6662                this.num_entries = 256;
    67                 this.num_ready = 256;
    68                 this.submit_aff = -1;
    69                 this.eager_submits = false;
    70                 this.poller_submits = false;
    71                 this.poll_submit = false;
    72                 this.poll_complete = false;
    7363        }
    7464
     
    10393
    10494//=============================================================================================
    105 // I/O Startup / Shutdown logic + Master Poller
    106 //=============================================================================================
    107 
    108         // IO Master poller loop forward
    109         static void * iopoll_loop( __attribute__((unused)) void * args );
    110 
    111         static struct {
    112                 pthread_t     thrd;    // pthread handle to io poller thread
    113                 void *        stack;   // pthread stack for io poller thread
    114                 int           epollfd; // file descriptor to the epoll instance
    115                 volatile bool run;     // Whether or not to continue
    116         } iopoll;
    117 
    118         void __kernel_io_startup(void) {
    119                 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
    120 
    121                 iopoll.epollfd = epoll_create1(0);
    122                 if (iopoll.epollfd == -1) {
    123                         abort( "internal error, epoll_create1\n");
    124                 }
    125 
    126                 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
    127 
    128                 iopoll.run = true;
    129                 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
    130         }
    131 
    132         void __kernel_io_shutdown(void) {
    133                 // Notify the io poller thread of the shutdown
    134                 iopoll.run = false;
    135                 sigval val = { 1 };
    136                 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
    137 
    138                 // Wait for the io poller thread to finish
    139 
    140                 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
    141 
    142                 int ret = close(iopoll.epollfd);
    143                 if (ret == -1) {
    144                         abort( "internal error, close epoll\n");
    145                 }
    146 
    147                 // Io polling is now fully stopped
    148 
    149                 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
    150         }
    151 
    152         static void * iopoll_loop( __attribute__((unused)) void * args ) {
    153                 __processor_id_t id;
    154                 id.full_proc = false;
    155                 id.id = doregister(&id);
    156                 __cfaabi_tls.this_proc_id = &id;
    157                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
    158 
    159                 // Block signals to control when they arrive
    160                 sigset_t mask;
    161                 sigfillset(&mask);
    162                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
    163                 abort( "internal error, pthread_sigmask" );
    164                 }
    165 
    166                 sigdelset( &mask, SIGUSR1 );
    167 
    168                 // Create sufficient events
    169                 struct epoll_event events[10];
    170                 // Main loop
    171                 while( iopoll.run ) {
    172                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
    173 
    174                         // Wait for events
    175                         int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
    176 
    177                         __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
    178 
    179                         // Check if an error occured
    180                         if (nfds == -1) {
    181                                 if( errno == EINTR ) continue;
    182                                 abort( "internal error, pthread_sigmask" );
    183                         }
    184 
    185                         for(i; nfds) {
    186                                 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
    187                                 /* paranoid */ verify( io_ctx );
    188                                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->ring->fd, io_ctx);
    189                                 #if !defined( __CFA_NO_STATISTICS__ )
    190                                         __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
    191                                 #endif
    192 
    193                                 eventfd_t v;
    194                                 eventfd_read(io_ctx->ring->efd, &v);
    195 
    196                                 post( io_ctx->sem );
    197                         }
    198                 }
    199 
    200                 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
    201                 unregister(&id);
    202                 return 0p;
    203         }
    204 
    205 //=============================================================================================
    20695// I/O Context Constrution/Destruction
    20796//=============================================================================================
    20897
    209         void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
    210         void main( $io_ctx_thread & this );
    211         static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
    212         void ^?{}( $io_ctx_thread & mutex this ) {}
    213 
    214         static void __io_create ( __io_data & this, const io_context_params & params_in );
    215         static void __io_destroy( __io_data & this );
    216 
    217         void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
    218                 (this.thrd){ cl };
    219                 this.thrd.ring = malloc();
    220                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
    221                 __io_create( *this.thrd.ring, params );
    222 
    223                 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
    224                 this.thrd.done = false;
    225                 __thrd_start( this.thrd, main );
    226 
    227                 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
    228         }
    229 
    230         void ?{}(io_context & this, struct cluster & cl) {
    231                 io_context_params params;
    232                 (this){ cl, params };
    233         }
    234 
    235         void ^?{}(io_context & this, bool cluster_context) {
    236                 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
    237 
    238                 // Notify the thread of the shutdown
    239                 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
    240 
    241                 // If this is an io_context within a cluster, things get trickier
    242                 $thread & thrd = this.thrd.self;
    243                 if( cluster_context ) {
    244                         // We are about to do weird things with the threads
    245                         // we don't need interrupts to complicate everything
    246                         disable_interrupts();
    247 
    248                         // Get cluster info
    249                         cluster & cltr = *thrd.curr_cluster;
    250                         /* paranoid */ verify( cltr.idles.total == 0 || &cltr == mainCluster );
    251                         /* paranoid */ verify( !ready_mutate_islocked() );
    252 
    253                         // We need to adjust the clean-up based on where the thread is
    254                         if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
    255                                 // This is the tricky case
    256                                 // The thread was preempted or ready to run and now it is on the ready queue
    257                                 // but the cluster is shutting down, so there aren't any processors to run the ready queue
    258                                 // the solution is to steal the thread from the ready-queue and pretend it was blocked all along
    259 
    260                                 ready_schedule_lock();
    261                                         // The thread should on the list
    262                                         /* paranoid */ verify( thrd.link.next != 0p );
    263 
    264                                         // Remove the thread from the ready queue of this cluster
    265                                         // The thread should be the last on the list
    266                                         __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
    267                                         /* paranoid */ verify( removed );
    268                                         thrd.link.next = 0p;
    269                                         thrd.link.prev = 0p;
    270 
    271                                         // Fixup the thread state
    272                                         thrd.state = Blocked;
    273                                         thrd.ticket = TICKET_BLOCKED;
    274                                         thrd.preempted = __NO_PREEMPTION;
    275 
    276                                 ready_schedule_unlock();
    277 
    278                                 // Pretend like the thread was blocked all along
    279                         }
    280                         // !!! This is not an else if !!!
    281                         // Ok, now the thread is blocked (whether we cheated to get here or not)
    282                         if( thrd.state == Blocked ) {
    283                                 // This is the "easy case"
    284                                 // The thread is parked and can easily be moved to active cluster
    285                                 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
    286                                 thrd.curr_cluster = active_cluster();
    287 
    288                                 // unpark the fast io_poller
    289                                 unpark( &thrd );
    290                         }
    291                         else {
    292                                 // The thread is in a weird state
    293                                 // I don't know what to do here
    294                                 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
    295                         }
    296 
    297                         // The weird thread kidnapping stuff is over, restore interrupts.
    298                         enable_interrupts( __cfaabi_dbg_ctx );
    299                 } else {
    300                         post( this.thrd.sem );
    301                 }
    302 
    303                 ^(this.thrd){};
    304                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
    305 
    306                 __io_destroy( *this.thrd.ring );
    307                 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
    308 
    309                 free(this.thrd.ring);
    310         }
    311 
    312         void ^?{}(io_context & this) {
    313                 ^(this){ false };
    314         }
    315 
    316         static void __io_create( __io_data & this, const io_context_params & params_in ) {
     98
     99
     100        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in, int procfd );
     101        static void __io_uring_teardown( $io_context & this );
     102        static void __epoll_register($io_context & ctx);
     103        static void __epoll_unregister($io_context & ctx);
     104        void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
     105        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
     106
     107        void ?{}($io_context & this, processor * proc, struct cluster & cl) {
     108                /* paranoid */ verify( cl.io.arbiter );
     109                this.proc = proc;
     110                this.arbiter = cl.io.arbiter;
     111                this.ext_sq.empty = true;
     112                (this.ext_sq.blocked){};
     113                __io_uring_setup( this, cl.io.params, proc->idle );
     114                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
     115        }
     116
     117        void ^?{}($io_context & this) {
     118                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
     119
     120                __io_uring_teardown( this );
     121                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
     122        }
     123
     124        extern void __disable_interrupts_hard();
     125        extern void __enable_interrupts_hard();
     126
     127        static void __io_uring_setup( $io_context & this, const io_context_params & params_in, int procfd ) {
    317128                // Step 1 : call to setup
    318129                struct io_uring_params params;
    319130                memset(&params, 0, sizeof(params));
    320                 if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
    321                 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
     131                // if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
     132                // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
    322133
    323134                __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
     
    325136                        abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
    326137                }
    327                 if( params_in.poller_submits && params_in.eager_submits ) {
    328                         abort("ERROR: I/O setup 'poller_submits' and 'eager_submits' cannot be used together\n");
    329                 }
    330138
    331139                int fd = syscall(__NR_io_uring_setup, nentries, &params );
     
    335143
    336144                // Step 2 : mmap result
    337                 memset( &this, 0, sizeof(struct __io_data) );
    338                 struct __submition_data  & sq = this.submit_q;
    339                 struct __completion_data & cq = this.completion_q;
     145                struct __sub_ring_t & sq = this.sq;
     146                struct __cmp_ring_t & cq = this.cq;
    340147
    341148                // calculate the right ring size
     
    386193                // Get the pointers from the kernel to fill the structure
    387194                // submit queue
    388                 sq.head    = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
    389                 sq.tail    = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
    390                 sq.mask    = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
    391                 sq.num     = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
    392                 sq.flags   = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
    393                 sq.dropped = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
    394                 sq.array   = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
    395                 sq.prev_head = *sq.head;
    396 
    397                 {
    398                         const __u32 num = *sq.num;
    399                         for( i; num ) {
    400                                 __sqe_clean( &sq.sqes[i] );
    401                         }
    402                 }
    403 
    404                 (sq.submit_lock){};
    405                 (sq.release_lock){};
    406 
    407                 if( params_in.poller_submits || params_in.eager_submits ) {
    408                         /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
    409                         sq.ready_cnt = max( params_in.num_ready, 8 );
    410                         sq.ready = alloc( sq.ready_cnt, 64`align );
    411                         for(i; sq.ready_cnt) {
    412                                 sq.ready[i] = -1ul32;
    413                         }
    414                         sq.prev_ready = 0;
    415                 }
    416                 else {
    417                         sq.ready_cnt = 0;
    418                         sq.ready = 0p;
    419                         sq.prev_ready = 0;
    420                 }
     195                sq.kring.head  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
     196                sq.kring.tail  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
     197                sq.kring.array = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
     198                sq.mask        = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
     199                sq.num         = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
     200                sq.flags       = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
     201                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
     202
     203                sq.kring.released = 0;
     204
     205                sq.free_ring.head = 0;
     206                sq.free_ring.tail = *sq.num;
     207                sq.free_ring.array = alloc( *sq.num, 128`align );
     208                for(i; (__u32)*sq.num) {
     209                        sq.free_ring.array[i] = i;
     210                }
     211
     212                sq.to_submit = 0;
    421213
    422214                // completion queue
     
    429221
    430222                // Step 4 : eventfd
    431                 int efd;
    432                 for() {
    433                         efd = eventfd(0, 0);
    434                         if (efd < 0) {
    435                                 if (errno == EINTR) continue;
    436                                 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
    437                         }
    438                         break;
    439                 }
    440 
    441                 int ret;
    442                 for() {
    443                         ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
    444                         if (ret < 0) {
    445                                 if (errno == EINTR) continue;
    446                                 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
    447                         }
    448                         break;
    449                 }
     223                // io_uring_register is so f*cking slow on some machine that it
     224                // will never succeed if preemption isn't hard blocked
     225                __cfadbg_print_safe(io_core, "Kernel I/O : registering %d for completion with ring %d\n", procfd, fd);
     226
     227                __disable_interrupts_hard();
     228
     229                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &procfd, 1);
     230                if (ret < 0) {
     231                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
     232                }
     233
     234                __enable_interrupts_hard();
     235
     236                __cfadbg_print_safe(io_core, "Kernel I/O : registered %d for completion with ring %d\n", procfd, fd);
    450237
    451238                // some paranoid checks
     
    457244                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
    458245                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
    459                 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
    460                 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
     246                /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
     247                /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
    461248
    462249                // Update the global ring info
    463                 this.ring_flags = params.flags;
     250                this.ring_flags = 0;
    464251                this.fd         = fd;
    465                 this.efd        = efd;
    466                 this.eager_submits  = params_in.eager_submits;
    467                 this.poller_submits = params_in.poller_submits;
    468         }
    469 
    470         static void __io_destroy( __io_data & this ) {
     252        }
     253
     254        static void __io_uring_teardown( $io_context & this ) {
    471255                // Shutdown the io rings
    472                 struct __submition_data  & sq = this.submit_q;
    473                 struct __completion_data & cq = this.completion_q;
     256                struct __sub_ring_t & sq = this.sq;
     257                struct __cmp_ring_t & cq = this.cq;
    474258
    475259                // unmap the submit queue entries
     
    486270                // close the file descriptor
    487271                close(this.fd);
    488                 close(this.efd);
    489 
    490                 free( this.submit_q.ready ); // Maybe null, doesn't matter
     272
     273                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
     274        }
     275
     276        void __cfa_io_start( processor * proc ) {
     277                proc->io.ctx = alloc();
     278                (*proc->io.ctx){proc, *proc->cltr};
     279        }
     280        void __cfa_io_stop ( processor * proc ) {
     281                ^(*proc->io.ctx){};
     282                free(proc->io.ctx);
    491283        }
    492284
     
    494286// I/O Context Sleep
    495287//=============================================================================================
    496         #define IOEVENTS EPOLLIN | EPOLLONESHOT
    497 
    498         static inline void __ioctx_epoll_ctl($io_ctx_thread & ctx, int op, const char * error) {
    499                 struct epoll_event ev;
    500                 ev.events = IOEVENTS;
    501                 ev.data.u64 = (__u64)&ctx;
    502                 int ret = epoll_ctl(iopoll.epollfd, op, ctx.ring->efd, &ev);
    503                 if (ret < 0) {
    504                         abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
    505                 }
    506         }
    507 
    508         void __ioctx_register($io_ctx_thread & ctx) {
    509                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
    510         }
    511 
    512         void __ioctx_prepare_block($io_ctx_thread & ctx) {
    513                 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.ring->fd, &ctx);
    514                 __ioctx_epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
    515         }
     288        // static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
     289        //      struct epoll_event ev;
     290        //      ev.events = EPOLLIN | EPOLLONESHOT;
     291        //      ev.data.u64 = (__u64)&ctx;
     292        //      int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
     293        //      if (ret < 0) {
     294        //              abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
     295        //      }
     296        // }
     297
     298        // static void __epoll_register($io_context & ctx) {
     299        //      __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
     300        // }
     301
     302        // static void __epoll_unregister($io_context & ctx) {
     303        //      // Read the current epoch so we know when to stop
     304        //      size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
     305
     306        //      // Remove the fd from the iopoller
     307        //      __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
     308
     309        //      // Notify the io poller thread of the shutdown
     310        //      iopoll.run = false;
     311        //      sigval val = { 1 };
     312        //      pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
     313
     314        //      // Make sure all this is done
     315        //      __atomic_thread_fence(__ATOMIC_SEQ_CST);
     316
     317        //      // Wait for the next epoch
     318        //      while(curr == iopoll.epoch && !iopoll.stopped) Pause();
     319        // }
     320
     321        // void __ioctx_prepare_block($io_context & ctx) {
     322        //      __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
     323        //      __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
     324        // }
     325
    516326
    517327//=============================================================================================
    518328// I/O Context Misc Setup
    519329//=============================================================================================
    520         void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
    521                 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
    522                 if( ret < 0 ) {
    523                         abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
    524                 }
    525 
    526                 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
    527         }
    528 
    529         void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
    530                 for(i; cltr.io.cnt) {
    531                         register_fixed_files( cltr.io.ctxs[i], files, count );
    532                 }
    533         }
     330        void ?{}( $io_arbiter & this ) {
     331                this.pending.flag = false;
     332        }
     333
     334        void ^?{}( $io_arbiter & mutex this ) {
     335                // /* paranoid */ verify( empty(this.assigned) );
     336                // /* paranoid */ verify( empty(this.available) );
     337                /* paranoid */ verify( is_empty(this.pending.blocked) );
     338        }
     339
     340        $io_arbiter * create(void) {
     341                return new();
     342        }
     343        void destroy($io_arbiter * arbiter) {
     344                delete(arbiter);
     345        }
     346
     347//=============================================================================================
     348// I/O Context Misc Setup
     349//=============================================================================================
     350
    534351#endif
  • libcfa/src/concurrency/io/types.hfa

    r342af53 r8e4aa05  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // io/types.hfa --
     7// io/types.hfa -- PRIVATE
     8// Types used by the I/O subsystem
    89//
    910// Author           : Thierry Delisle
     
    2122
    2223#include "bits/locks.hfa"
     24#include "kernel/fwd.hfa"
    2325
    2426#if defined(CFA_HAVE_LINUX_IO_URING_H)
    25         #define LEADER_LOCK
    26         struct __leaderlock_t {
    27                 struct $thread * volatile value;        // ($thread) next_leader | (bool:1) is_locked
    28         };
     27        #include "bits/sequence.hfa"
     28        #include "monitor.hfa"
    2929
    30         static inline void ?{}( __leaderlock_t & this ) { this.value = 0p; }
     30        struct processor;
     31        monitor $io_arbiter;
    3132
    3233        //-----------------------------------------------------------------------
    3334        // Ring Data structure
    34       struct __submition_data {
    35                 // Head and tail of the ring (associated with array)
    36                 volatile __u32 * head;
    37                 volatile __u32 * tail;
    38                 volatile __u32 prev_head;
     35      struct __sub_ring_t {
     36                struct {
     37                        // Head and tail of the ring (associated with array)
     38                        volatile __u32 * head;   // one passed last index consumed by the kernel
     39                        volatile __u32 * tail;   // one passed last index visible to the kernel
     40                        volatile __u32 released; // one passed last index released back to the free list
    3941
    40                 // The actual kernel ring which uses head/tail
    41                 // indexes into the sqes arrays
    42                 __u32 * array;
     42                        // The actual kernel ring which uses head/tail
     43                        // indexes into the sqes arrays
     44                        __u32 * array;
     45                } kring;
     46
     47                struct {
     48                        volatile __u32 head;
     49                        volatile __u32 tail;
     50                        // The ring which contains free allocations
     51                        // indexes into the sqes arrays
     52                        __u32 * array;
     53                } free_ring;
     54
     55                // number of sqes to submit on next system call.
     56                __u32 to_submit;
    4357
    4458                // number of entries and mask to go with it
     
    4660                const __u32 * mask;
    4761
    48                 // Submission flags (Not sure what for)
     62                // Submission flags, currently only IORING_SETUP_SQPOLL
    4963                __u32 * flags;
    5064
    51                 // number of sqes not submitted (whatever that means)
     65                // number of sqes not submitted
     66                // From documentation : [dropped] is incremented for each invalid submission queue entry encountered in the ring buffer.
    5267                __u32 * dropped;
    5368
    54                 // Like head/tail but not seen by the kernel
    55                 volatile __u32 * ready;
    56                 __u32 ready_cnt;
    57                 __u32 prev_ready;
    58 
    59                 #if defined(LEADER_LOCK)
    60                         __leaderlock_t submit_lock;
    61                 #else
    62                         __spinlock_t submit_lock;
    63                 #endif
    64                 __spinlock_t  release_lock;
    65 
    6669                // A buffer of sqes (not the actual ring)
    67                 volatile struct io_uring_sqe * sqes;
     70                struct io_uring_sqe * sqes;
    6871
    6972                // The location and size of the mmaped area
     
    7275        };
    7376
    74         struct __completion_data {
     77        struct __cmp_ring_t {
    7578                // Head and tail of the ring
    7679                volatile __u32 * head;
     
    8184                const __u32 * num;
    8285
    83                 // number of cqes not submitted (whatever that means)
     86                // I don't know what this value is for
    8487                __u32 * overflow;
    8588
     
    9295        };
    9396
    94         struct __io_data {
    95                 struct __submition_data submit_q;
    96                 struct __completion_data completion_q;
     97        struct __attribute__((aligned(128))) $io_context {
     98                $io_arbiter * arbiter;
     99                processor * proc;
     100
     101                struct {
     102                        volatile bool empty;
     103                        condition blocked;
     104                } ext_sq;
     105
     106                struct __sub_ring_t sq;
     107                struct __cmp_ring_t cq;
    97108                __u32 ring_flags;
    98109                int fd;
    99                 int efd;
    100                 bool eager_submits:1;
    101                 bool poller_submits:1;
     110        };
     111
     112        monitor __attribute__((aligned(128))) $io_arbiter {
     113                struct {
     114                        condition blocked;
     115                        $io_context * ctx;
     116                        volatile bool flag;
     117                } pending;
    102118        };
    103119
     
    131147        #endif
    132148
    133         struct $io_ctx_thread;
    134         void __ioctx_register($io_ctx_thread & ctx);
    135         void __ioctx_prepare_block($io_ctx_thread & ctx);
    136         void __sqe_clean( volatile struct io_uring_sqe * sqe );
     149        // void __ioctx_prepare_block($io_context & ctx);
    137150#endif
    138151
  • libcfa/src/concurrency/iofwd.hfa

    r342af53 r8e4aa05  
    1818#include <unistd.h>
    1919extern "C" {
    20         #include <sys/types.h>
     20        #include <asm/types.h>
    2121        #if CFA_HAVE_LINUX_IO_URING_H
    2222                #include <linux/io_uring.h>
     
    4848struct cluster;
    4949struct io_future_t;
    50 struct io_context;
    51 struct io_cancellation;
     50struct $io_context;
    5251
    5352struct iovec;
     
    5554struct sockaddr;
    5655struct statx;
     56struct epoll_event;
     57
     58//----------
     59// underlying calls
     60extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
     61extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
    5762
    5863//----------
    5964// synchronous calls
    6065#if defined(CFA_HAVE_PREADV2)
    61         extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     66        extern ssize_t cfa_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    6267#endif
    6368#if defined(CFA_HAVE_PWRITEV2)
    64         extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     69        extern ssize_t cfa_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    6570#endif
    66 extern int cfa_fsync(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    67 extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    68 extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    69 extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    70 extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    71 extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    72 extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    73 extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    74 extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    75 extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    76 extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    77 extern int cfa_madvise(void *addr, size_t length, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    78 extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     71extern int cfa_fsync(int fd, __u64 submit_flags);
     72extern int cfa_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     73extern int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
     74extern  ssize_t cfa_sendmsg(int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
     75extern ssize_t cfa_recvmsg(int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
     76extern ssize_t cfa_send(int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
     77extern ssize_t cfa_recv(int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
     78extern int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
     79extern int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     80extern int cfa_fallocate(int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
     81extern int cfa_posix_fadvise(int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
     82extern int cfa_madvise(void *addr, size_t length, int advice, __u64 submit_flags);
     83extern int cfa_openat(int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
    7984#if defined(CFA_HAVE_OPENAT2)
    80         extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     85        extern int cfa_openat2(int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
    8186#endif
    82 extern int cfa_close(int fd, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     87extern int cfa_close(int fd, __u64 submit_flags);
    8388#if defined(CFA_HAVE_STATX)
    84         extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     89        extern int cfa_statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
    8590#endif
    86 extern ssize_t cfa_read(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    87 extern ssize_t cfa_write(int fd, void * buf, size_t count, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    88 extern ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    89 extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
     91extern ssize_t cfa_read(int fd, void * buf, size_t count, __u64 submit_flags);
     92extern ssize_t cfa_write(int fd, void * buf, size_t count, __u64 submit_flags);
     93extern ssize_t cfa_splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     94extern ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    9095
    9196//----------
    9297// asynchronous calls
    9398#if defined(CFA_HAVE_PREADV2)
    94         extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
     99        extern void async_preadv2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    95100#endif
    96101#if defined(CFA_HAVE_PWRITEV2)
    97         extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
     102        extern void async_pwritev2(io_future_t & future, int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags, __u64 submit_flags);
    98103#endif
    99 extern void async_fsync(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);
    100 extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, int submit_flags, io_cancellation * cancellation, io_context * context);
    101 extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    102 extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    103 extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    104 extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    105 extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    106 extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    107 extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, int submit_flags, io_cancellation * cancellation, io_context * context);
    108 extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, int submit_flags, io_cancellation * cancellation, io_context * context);
    109 extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);
    110 extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, int submit_flags, io_cancellation * cancellation, io_context * context);
    111 extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, int submit_flags, io_cancellation * cancellation, io_context * context);
     104extern void async_fsync(io_future_t & future, int fd, __u64 submit_flags);
     105extern void async_epoll_ctl(io_future_t & future, int epfd, int op, int fd, struct epoll_event *event, __u64 submit_flags);
     106extern void async_sync_file_range(io_future_t & future, int fd, off64_t offset, off64_t nbytes, unsigned int flags, __u64 submit_flags);
     107extern void async_sendmsg(io_future_t & future, int sockfd, const struct msghdr *msg, int flags, __u64 submit_flags);
     108extern void async_recvmsg(io_future_t & future, int sockfd, struct msghdr *msg, int flags, __u64 submit_flags);
     109extern void async_send(io_future_t & future, int sockfd, const void *buf, size_t len, int flags, __u64 submit_flags);
     110extern void async_recv(io_future_t & future, int sockfd, void *buf, size_t len, int flags, __u64 submit_flags);
     111extern void async_accept4(io_future_t & future, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags, __u64 submit_flags);
     112extern void async_connect(io_future_t & future, int sockfd, const struct sockaddr *addr, socklen_t addrlen, __u64 submit_flags);
     113extern void async_fallocate(io_future_t & future, int fd, int mode, off_t offset, off_t len, __u64 submit_flags);
     114extern void async_posix_fadvise(io_future_t & future, int fd, off_t offset, off_t len, int advice, __u64 submit_flags);
     115extern void async_madvise(io_future_t & future, void *addr, size_t length, int advice, __u64 submit_flags);
     116extern void async_openat(io_future_t & future, int dirfd, const char *pathname, int flags, mode_t mode, __u64 submit_flags);
    112117#if defined(CFA_HAVE_OPENAT2)
    113         extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, int submit_flags, io_cancellation * cancellation, io_context * context);
     118        extern void async_openat2(io_future_t & future, int dirfd, const char *pathname, struct open_how * how, size_t size, __u64 submit_flags);
    114119#endif
    115 extern void async_close(io_future_t & future, int fd, int submit_flags, io_cancellation * cancellation, io_context * context);
     120extern void async_close(io_future_t & future, int fd, __u64 submit_flags);
    116121#if defined(CFA_HAVE_STATX)
    117         extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, int submit_flags, io_cancellation * cancellation, io_context * context);
     122        extern void async_statx(io_future_t & future, int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf, __u64 submit_flags);
    118123#endif
    119 void async_read(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);
    120 extern void async_write(io_future_t & future, int fd, void * buf, size_t count, int submit_flags, io_cancellation * cancellation, io_context * context);
    121 extern void async_splice(io_future_t & future, int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
    122 extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, int submit_flags, io_cancellation * cancellation, io_context * context);
     124void async_read(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
     125extern void async_write(io_future_t & future, int fd, void * buf, size_t count, __u64 submit_flags);
     126extern void async_splice(io_future_t & future, int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags, __u64 submit_flags);
     127extern void async_tee(io_future_t & future, int fd_in, int fd_out, size_t len, unsigned int flags, __u64 submit_flags);
    123128
    124129
     
    126131// Check if a function is blocks a only the user thread
    127132bool has_user_level_blocking( fptr_t func );
    128 
    129 //-----------------------------------------------------------------------------
    130 void register_fixed_files( io_context & ctx , int * files, unsigned count );
    131 void register_fixed_files( cluster    & cltr, int * files, unsigned count );
  • libcfa/src/concurrency/kernel.cfa

    r342af53 r8e4aa05  
    2222#include <signal.h>
    2323#include <unistd.h>
     24extern "C" {
     25        #include <sys/eventfd.h>
     26}
    2427
    2528//CFA Includes
     
    114117static [unsigned idle, unsigned total, * processor] query( & __cluster_idles idles );
    115118
     119extern void __cfa_io_start( processor * );
     120extern void __cfa_io_drain( processor * );
     121extern void __cfa_io_flush( processor * );
     122extern void __cfa_io_stop ( processor * );
     123static inline void __maybe_io_drain( processor * );
     124
     125extern void __disable_interrupts_hard();
     126extern void __enable_interrupts_hard();
    116127
    117128//=============================================================================================
     
    129140        verify(this);
    130141
     142        __cfa_io_start( this );
     143
    131144        __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this);
    132145        #if !defined(__CFA_NO_STATISTICS__)
     
    140153                preemption_scope scope = { this };
    141154
     155                #if !defined(__CFA_NO_STATISTICS__)
     156                        unsigned long long last_tally = rdtscl();
     157                #endif
     158
     159
    142160                __cfadbg_print_safe(runtime_core, "Kernel : core %p started\n", this);
    143161
     
    145163                MAIN_LOOP:
    146164                for() {
     165                        // Check if there is pending io
     166                        __maybe_io_drain( this );
     167
    147168                        // Try to get the next thread
    148169                        readyThread = __next_thread( this->cltr );
    149170
    150171                        if( !readyThread ) {
     172                                __cfa_io_flush( this );
    151173                                readyThread = __next_thread_slow( this->cltr );
    152174                        }
     
    184206                                #endif
    185207
    186                                 wait( this->idle );
     208                                __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle);
     209
     210                                __disable_interrupts_hard();
     211                                eventfd_t val;
     212                                eventfd_read( this->idle, &val );
     213                                __enable_interrupts_hard();
    187214
    188215                                #if !defined(__CFA_NO_STATISTICS__)
     
    201228                        /* paranoid */ verify( readyThread );
    202229
     230                        // Reset io dirty bit
     231                        this->io.dirty = false;
     232
    203233                        // We found a thread run it
    204234                        __run_thread(this, readyThread);
     
    206236                        // Are we done?
    207237                        if( __atomic_load_n(&this->do_terminate, __ATOMIC_SEQ_CST) ) break MAIN_LOOP;
     238
     239                        #if !defined(__CFA_NO_STATISTICS__)
     240                                unsigned long long curr = rdtscl();
     241                                if(curr > (last_tally + 500000000)) {
     242                                        __tally_stats(this->cltr->stats, __cfaabi_tls.this_stats);
     243                                        last_tally = curr;
     244                                }
     245                        #endif
     246
     247                        if(this->io.pending && !this->io.dirty) {
     248                                __cfa_io_flush( this );
     249                        }
    208250                }
    209251
     
    211253        }
    212254
    213         V( this->terminated );
     255        __cfa_io_stop( this );
     256
     257        post( this->terminated );
     258
    214259
    215260        if(this == mainProcessor) {
     
    234279        /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next );
    235280        __builtin_prefetch( thrd_dst->context.SP );
     281
     282        __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name);
    236283
    237284        $coroutine * proc_cor = get_coroutine(this->runner);
     
    316363        // Just before returning to the processor, set the processor coroutine to active
    317364        proc_cor->state = Active;
     365
     366        __cfadbg_print_safe(runtime_core, "Kernel : core %p finished running thread %p\n", this, thrd_dst);
    318367
    319368        /* paranoid */ verify( ! __preemption_enabled() );
     
    550599
    551600        // We found a processor, wake it up
    552         post( p->idle );
     601        eventfd_t val;
     602        val = 1;
     603        eventfd_write( p->idle, val );
    553604
    554605        #if !defined(__CFA_NO_STATISTICS__)
     
    568619        disable_interrupts();
    569620                /* paranoid */ verify( ! __preemption_enabled() );
    570                 post( this->idle );
     621                eventfd_t val;
     622                val = 1;
     623                eventfd_write( this->idle, val );
    571624        enable_interrupts( __cfaabi_dbg_ctx );
    572625}
     
    611664// Unexpected Terminating logic
    612665//=============================================================================================
    613 static __spinlock_t kernel_abort_lock;
    614 static bool kernel_abort_called = false;
    615 
    616 void * kernel_abort(void) __attribute__ ((__nothrow__)) {
    617         // abort cannot be recursively entered by the same or different processors because all signal handlers return when
    618         // the globalAbort flag is true.
    619         lock( kernel_abort_lock __cfaabi_dbg_ctx2 );
    620 
    621         // disable interrupts, it no longer makes sense to try to interrupt this processor
    622         disable_interrupts();
    623 
    624         // first task to abort ?
    625         if ( kernel_abort_called ) {                    // not first task to abort ?
    626                 unlock( kernel_abort_lock );
    627 
    628                 sigset_t mask;
    629                 sigemptyset( &mask );
    630                 sigaddset( &mask, SIGALRM );            // block SIGALRM signals
    631                 sigaddset( &mask, SIGUSR1 );            // block SIGALRM signals
    632                 sigsuspend( &mask );                            // block the processor to prevent further damage during abort
    633                 _exit( EXIT_FAILURE );                          // if processor unblocks before it is killed, terminate it
    634         }
    635         else {
    636                 kernel_abort_called = true;
    637                 unlock( kernel_abort_lock );
    638         }
    639 
    640         return __cfaabi_tls.this_thread;
    641 }
    642 
    643 void kernel_abort_msg( void * kernel_data, char * abort_text, int abort_text_size ) {
    644         $thread * thrd = ( $thread * ) kernel_data;
     666void __kernel_abort_msg( char * abort_text, int abort_text_size ) {
     667        $thread * thrd = __cfaabi_tls.this_thread;
    645668
    646669        if(thrd) {
     
    662685}
    663686
    664 int kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {
    665         return get_coroutine(kernelTLS().this_thread) == get_coroutine(mainThread) ? 4 : 2;
     687int __kernel_abort_lastframe( void ) __attribute__ ((__nothrow__)) {
     688        return get_coroutine(__cfaabi_tls.this_thread) == get_coroutine(mainThread) ? 4 : 2;
    666689}
    667690
     
    681704// Kernel Utilities
    682705//=============================================================================================
    683 //-----------------------------------------------------------------------------
    684 // Locks
    685 void  ?{}( semaphore & this, int count = 1 ) {
    686         (this.lock){};
    687         this.count = count;
    688         (this.waiting){};
    689 }
    690 void ^?{}(semaphore & this) {}
    691 
    692 bool P(semaphore & this) with( this ){
    693         lock( lock __cfaabi_dbg_ctx2 );
    694         count -= 1;
    695         if ( count < 0 ) {
    696                 // queue current task
    697                 append( waiting, active_thread() );
    698 
    699                 // atomically release spin lock and block
    700                 unlock( lock );
    701                 park();
    702                 return true;
    703         }
    704         else {
    705             unlock( lock );
    706             return false;
    707         }
    708 }
    709 
    710 bool V(semaphore & this) with( this ) {
    711         $thread * thrd = 0p;
    712         lock( lock __cfaabi_dbg_ctx2 );
    713         count += 1;
    714         if ( count <= 0 ) {
    715                 // remove task at head of waiting list
    716                 thrd = pop_head( waiting );
    717         }
    718 
    719         unlock( lock );
    720 
    721         // make new owner
    722         unpark( thrd );
    723 
    724         return thrd != 0p;
    725 }
    726 
    727 bool V(semaphore & this, unsigned diff) with( this ) {
    728         $thread * thrd = 0p;
    729         lock( lock __cfaabi_dbg_ctx2 );
    730         int release = max(-count, (int)diff);
    731         count += diff;
    732         for(release) {
    733                 unpark( pop_head( waiting ) );
    734         }
    735 
    736         unlock( lock );
    737 
    738         return thrd != 0p;
     706#if defined(CFA_HAVE_LINUX_IO_URING_H)
     707#include "io/types.hfa"
     708#endif
     709
     710static inline void __maybe_io_drain( processor * proc ) {
     711        #if defined(CFA_HAVE_LINUX_IO_URING_H)
     712                __cfadbg_print_safe(runtime_core, "Kernel : core %p checking io for ring %d\n", proc, proc->io.ctx->fd);
     713
     714                // Check if we should drain the queue
     715                $io_context * ctx = proc->io.ctx;
     716                unsigned head = *ctx->cq.head;
     717                unsigned tail = *ctx->cq.tail;
     718                if(head != tail) __cfa_io_drain( proc );
     719        #endif
    739720}
    740721
  • libcfa/src/concurrency/kernel.hfa

    r342af53 r8e4aa05  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel --
     7// kernel -- Header containing the core of the kernel API
    88//
    99// Author           : Thierry Delisle
     
    2424extern "C" {
    2525        #include <bits/pthreadtypes.h>
     26        #include <pthread.h>
    2627        #include <linux/types.h>
    2728}
    2829
    29 //-----------------------------------------------------------------------------
    30 // Locks
    31 struct semaphore {
    32         __spinlock_t lock;
    33         int count;
    34         __queue_t($thread) waiting;
    35 };
    36 
    37 void  ?{}(semaphore & this, int count = 1);
    38 void ^?{}(semaphore & this);
    39 bool   P (semaphore & this);
    40 bool   V (semaphore & this);
    41 bool   V (semaphore & this, unsigned count);
    42 
     30#ifdef __CFA_WITH_VERIFY__
     31        extern bool __cfaabi_dbg_in_kernel();
     32#endif
     33
     34//-----------------------------------------------------------------------------
     35// I/O
     36struct cluster;
     37struct $io_context;
     38struct $io_arbiter;
     39
     40struct io_context_params {
     41        int num_entries;
     42};
     43
     44void  ?{}(io_context_params & this);
    4345
    4446//-----------------------------------------------------------------------------
     
    8082        pthread_t kernel_thread;
    8183
     84        struct {
     85                $io_context * ctx;
     86                bool pending;
     87                bool dirty;
     88        } io;
     89
    8290        // Preemption data
    8391        // Node which is added in the discrete event simulaiton
     
    8896
    8997        // Idle lock (kernel semaphore)
    90         __bin_sem_t idle;
     98        int idle;
    9199
    92100        // Termination synchronisation (user semaphore)
    93         semaphore terminated;
     101        oneshot terminated;
    94102
    95103        // pthread Stack
     
    118126
    119127DLISTED_MGD_IMPL_OUT(processor)
    120 
    121 //-----------------------------------------------------------------------------
    122 // I/O
    123 struct __io_data;
    124 
    125 // IO poller user-thread
    126 // Not using the "thread" keyword because we want to control
    127 // more carefully when to start/stop it
    128 struct $io_ctx_thread {
    129         struct __io_data * ring;
    130         single_sem sem;
    131         volatile bool done;
    132         $thread self;
    133 };
    134 
    135 
    136 struct io_context {
    137         $io_ctx_thread thrd;
    138 };
    139 
    140 struct io_context_params {
    141         int num_entries;
    142         int num_ready;
    143         int submit_aff;
    144         bool eager_submits:1;
    145         bool poller_submits:1;
    146         bool poll_submit:1;
    147         bool poll_complete:1;
    148 };
    149 
    150 void  ?{}(io_context_params & this);
    151 
    152 void  ?{}(io_context & this, struct cluster & cl);
    153 void  ?{}(io_context & this, struct cluster & cl, const io_context_params & params);
    154 void ^?{}(io_context & this);
    155 
    156 struct io_cancellation {
    157         __u64 target;
    158 };
    159 
    160 static inline void  ?{}(io_cancellation & this) { this.target = -1u; }
    161 static inline void ^?{}(io_cancellation &) {}
    162 bool cancel(io_cancellation & this);
    163128
    164129//-----------------------------------------------------------------------------
     
    246211
    247212        struct {
    248                 io_context * ctxs;
    249                 unsigned cnt;
     213                $io_arbiter * arbiter;
     214                io_context_params params;
    250215        } io;
    251216
  • libcfa/src/concurrency/kernel/fwd.hfa

    r342af53 r8e4aa05  
    55// file "LICENCE" distributed with Cforall.
    66//
    7 // kernel/fwd.hfa --
     7// kernel/fwd.hfa -- PUBLIC
     8// Fundamental code needed to implement threading M.E.S. algorithms.
    89//
    910// Author           : Thierry Delisle
     
    134135                extern uint64_t thread_rand();
    135136
     137                // Semaphore which only supports a single thread
     138                struct single_sem {
     139                        struct $thread * volatile ptr;
     140                };
     141
     142                static inline {
     143                        void  ?{}(single_sem & this) {
     144                                this.ptr = 0p;
     145                        }
     146
     147                        void ^?{}(single_sem &) {}
     148
     149                        bool wait(single_sem & this) {
     150                                for() {
     151                                        struct $thread * expected = this.ptr;
     152                                        if(expected == 1p) {
     153                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     154                                                        return false;
     155                                                }
     156                                        }
     157                                        else {
     158                                                /* paranoid */ verify( expected == 0p );
     159                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     160                                                        park();
     161                                                        return true;
     162                                                }
     163                                        }
     164
     165                                }
     166                        }
     167
     168                        bool post(single_sem & this) {
     169                                for() {
     170                                        struct $thread * expected = this.ptr;
     171                                        if(expected == 1p) return false;
     172                                        if(expected == 0p) {
     173                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     174                                                        return false;
     175                                                }
     176                                        }
     177                                        else {
     178                                                if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     179                                                        unpark( expected );
     180                                                        return true;
     181                                                }
     182                                        }
     183                                }
     184                        }
     185                }
     186
     187                // Synchronozation primitive which only supports a single thread and one post
     188                // Similar to a binary semaphore with a 'one shot' semantic
     189                // is expected to be discarded after each party call their side
     190                struct oneshot {
     191                        // Internal state :
     192                        //     0p     : is initial state (wait will block)
     193                        //     1p     : fulfilled (wait won't block)
     194                        // any thread : a thread is currently waiting
     195                        struct $thread * volatile ptr;
     196                };
     197
     198                static inline {
     199                        void  ?{}(oneshot & this) {
     200                                this.ptr = 0p;
     201                        }
     202
     203                        void ^?{}(oneshot &) {}
     204
     205                        // Wait for the post, return immidiately if it already happened.
     206                        // return true if the thread was parked
     207                        bool wait(oneshot & this) {
     208                                for() {
     209                                        struct $thread * expected = this.ptr;
     210                                        if(expected == 1p) return false;
     211                                        /* paranoid */ verify( expected == 0p );
     212                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     213                                                park();
     214                                                /* paranoid */ verify( this.ptr == 1p );
     215                                                return true;
     216                                        }
     217                                }
     218                        }
     219
     220                        // Mark as fulfilled, wake thread if needed
     221                        // return true if a thread was unparked
     222                        bool post(oneshot & this) {
     223                                struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     224                                if( got == 0p ) return false;
     225                                unpark( got );
     226                                return true;
     227                        }
     228                }
     229
     230                // base types for future to build upon
     231                // It is based on the 'oneshot' type to allow multiple futures
     232                // to block on the same instance, permitting users to block a single
     233                // thread on "any of" [a given set of] futures.
     234                // does not support multiple threads waiting on the same future
     235                struct future_t {
     236                        // Internal state :
     237                        //     0p      : is initial state (wait will block)
     238                        //     1p      : fulfilled (wait won't block)
     239                        //     2p      : in progress ()
     240                        //     3p      : abandoned, server should delete
     241                        // any oneshot : a context has been setup to wait, a thread could wait on it
     242                        struct oneshot * volatile ptr;
     243                };
     244
     245                static inline {
     246                        void  ?{}(future_t & this) {
     247                                this.ptr = 0p;
     248                        }
     249
     250                        void ^?{}(future_t &) {}
     251
     252                        void reset(future_t & this) {
     253                                // needs to be in 0p or 1p
     254                                __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     255                        }
     256
     257                        // check if the future is available
     258                        bool available( future_t & this ) {
     259                                return this.ptr == 1p;
     260                        }
     261
     262                        // Prepare the future to be waited on
     263                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     264                        bool setup( future_t & this, oneshot & wait_ctx ) {
     265                                /* paranoid */ verify( wait_ctx.ptr == 0p );
     266                                // The future needs to set the wait context
     267                                for() {
     268                                        struct oneshot * expected = this.ptr;
     269                                        // Is the future already fulfilled?
     270                                        if(expected == 1p) return false; // Yes, just return false (didn't block)
     271
     272                                        // The future is not fulfilled, try to setup the wait context
     273                                        /* paranoid */ verify( expected == 0p );
     274                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     275                                                return true;
     276                                        }
     277                                }
     278                        }
     279
     280                        // Stop waiting on a future
     281                        // When multiple futures are waited for together in "any of" pattern
     282                        // futures that weren't fulfilled before the thread woke up
     283                        // should retract the wait ctx
     284                        // intented to be use by wait, wait_any, waitfor, etc. rather than used directly
     285                        void retract( future_t & this, oneshot & wait_ctx ) {
     286                                // Remove the wait context
     287                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);
     288
     289                                // got == 0p: future was never actually setup, just return
     290                                if( got == 0p ) return;
     291
     292                                // got == wait_ctx: since fulfil does an atomic_swap,
     293                                // if we got back the original then no one else saw context
     294                                // It is safe to delete (which could happen after the return)
     295                                if( got == &wait_ctx ) return;
     296
     297                                // got == 1p: the future is ready and the context was fully consumed
     298                                // the server won't use the pointer again
     299                                // It is safe to delete (which could happen after the return)
     300                                if( got == 1p ) return;
     301
     302                                // got == 2p: the future is ready but the context hasn't fully been consumed
     303                                // spin until it is safe to move on
     304                                if( got == 2p ) {
     305                                        while( this.ptr != 1p ) Pause();
     306                                        return;
     307                                }
     308
     309                                // got == any thing else, something wen't wrong here, abort
     310                                abort("Future in unexpected state");
     311                        }
     312
     313                        // Mark the future as abandoned, meaning it will be deleted by the server
     314                        bool abandon( future_t & this ) {
     315                                /* paranoid */ verify( this.ptr != 3p );
     316
     317                                // Mark the future as abandonned
     318                                struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);
     319
     320                                // If the future isn't already fulfilled, let the server delete it
     321                                if( got == 0p ) return false;
     322
     323                                // got == 2p: the future is ready but the context hasn't fully been consumed
     324                                // spin until it is safe to move on
     325                                if( got == 2p ) {
     326                                        while( this.ptr != 1p ) Pause();
     327                                        got = 1p;
     328                                }
     329
     330                                // The future is completed delete it now
     331                                /* paranoid */ verify( this.ptr != 1p );
     332                                free( &this );
     333                                return true;
     334                        }
     335
     336                        // from the server side, mark the future as fulfilled
     337                        // delete it if needed
     338                        bool fulfil( future_t & this ) {
     339                                for() {
     340                                        struct oneshot * expected = this.ptr;
     341                                        // was this abandoned?
     342                                        #if defined(__GNUC__) && __GNUC__ >= 7
     343                                                #pragma GCC diagnostic push
     344                                                #pragma GCC diagnostic ignored "-Wfree-nonheap-object"
     345                                        #endif
     346                                                if( expected == 3p ) { free( &this ); return false; }
     347                                        #if defined(__GNUC__) && __GNUC__ >= 7
     348                                                #pragma GCC diagnostic pop
     349                                        #endif
     350
     351                                        /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen
     352                                        /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.
     353
     354                                        // If there is a wait context, we need to consume it and mark it as consumed after
     355                                        // If there is no context then we can skip the in progress phase
     356                                        struct oneshot * want = expected == 0p ? 1p : 2p;
     357                                        if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
     358                                                if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }
     359                                                bool ret = post( *expected );
     360                                                __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);
     361                                                return ret;
     362                                        }
     363                                }
     364
     365                        }
     366
     367                        // Wait for the future to be fulfilled
     368                        bool wait( future_t & this ) {
     369                                oneshot temp;
     370                                if( !setup(this, temp) ) return false;
     371
     372                                // Wait context is setup, just wait on it
     373                                bool ret = wait( temp );
     374
     375                                // Wait for the future to tru
     376                                while( this.ptr == 2p ) Pause();
     377                                // Make sure the state makes sense
     378                                // Should be fulfilled, could be in progress but it's out of date if so
     379                                // since if that is the case, the oneshot was fulfilled (unparking this thread)
     380                                // and the oneshot should not be needed any more
     381                                __attribute__((unused)) struct oneshot * was = this.ptr;
     382                                /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );
     383
     384                                // Mark the future as fulfilled, to be consistent
     385                                // with potential calls to avail
     386                                // this.ptr = 1p;
     387                                return ret;
     388                        }
     389                }
     390
    136391                //-----------------------------------------------------------------------
    137392                // Statics call at the end of each thread to register statistics
  • libcfa/src/concurrency/kernel/startup.cfa

    r342af53 r8e4aa05  
    2222extern "C" {
    2323      #include <limits.h>       // PTHREAD_STACK_MIN
     24        #include <sys/eventfd.h>  // eventfd
    2425      #include <sys/mman.h>     // mprotect
    2526      #include <sys/resource.h> // getrlimit
     
    8990extern void __kernel_alarm_startup(void);
    9091extern void __kernel_alarm_shutdown(void);
    91 extern void __kernel_io_startup (void);
    92 extern void __kernel_io_shutdown(void);
    9392
    9493//-----------------------------------------------------------------------------
     
    102101KERNEL_STORAGE($thread,              mainThread);
    103102KERNEL_STORAGE(__stack_t,            mainThreadCtx);
    104 KERNEL_STORAGE(io_context,           mainPollerThread);
    105103KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);
    106104#if !defined(__CFA_NO_STATISTICS__)
     
    198196
    199197        void ?{}(processor & this) with( this ) {
    200                 ( this.idle ){};
    201                 ( this.terminated ){ 0 };
     198                ( this.terminated ){};
    202199                ( this.runner ){};
    203200                init( this, "Main Processor", *mainCluster );
     
    226223        __kernel_alarm_startup();
    227224
    228         // Start IO
    229         __kernel_io_startup();
    230 
    231225        // Add the main thread to the ready queue
    232226        // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread
     
    241235        // THE SYSTEM IS NOW COMPLETELY RUNNING
    242236
    243 
    244         // SKULLDUGGERY: The constructor for the mainCluster will call alloc with a dimension of 0
    245         // malloc *can* return a non-null value, we should free it if that is the case
    246         free( mainCluster->io.ctxs );
    247 
    248         // Now that the system is up, finish creating systems that need threading
    249         mainCluster->io.ctxs = (io_context *)&storage_mainPollerThread;
    250         mainCluster->io.cnt  = 1;
    251         (*mainCluster->io.ctxs){ *mainCluster };
    252 
    253237        __cfadbg_print_safe(runtime_core, "Kernel : Started\n--------------------------------------------------\n\n");
    254238
     
    260244
    261245static void __kernel_shutdown(void) {
    262         //Before we start shutting things down, wait for systems that need threading to shutdown
    263         ^(*mainCluster->io.ctxs){};
    264         mainCluster->io.cnt  = 0;
    265         mainCluster->io.ctxs = 0p;
    266 
    267246        /* paranoid */ verify( __preemption_enabled() );
    268247        disable_interrupts();
     
    282261        // Disable preemption
    283262        __kernel_alarm_shutdown();
    284 
    285         // Stop IO
    286         __kernel_io_shutdown();
    287263
    288264        // Destroy the main processor and its context in reverse order of construction
     
    484460        pending_preemption = false;
    485461
     462        this.io.ctx = 0p;
     463        this.io.pending = false;
     464        this.io.dirty   = false;
     465
     466        this.idle = eventfd(0, 0);
     467        if (idle < 0) {
     468                abort("KERNEL ERROR: PROCESSOR EVENTFD - %s\n", strerror(errno));
     469        }
     470
    486471        #if !defined(__CFA_NO_STATISTICS__)
    487472                print_stats = 0;
     
    524509        // Finally we don't need the read_lock any more
    525510        unregister((__processor_id_t*)&this);
     511
     512        close(this.idle);
    526513}
    527514
    528515void ?{}(processor & this, const char name[], cluster & _cltr) {
    529         ( this.idle ){};
    530         ( this.terminated ){ 0 };
     516        ( this.terminated ){};
    531517        ( this.runner ){};
    532518
     
    549535                __wake_proc( &this );
    550536
    551                 P( terminated );
     537                wait( terminated );
    552538                /* paranoid */ verify( active_processor() != &this);
    553539        }
     
    582568        threads{ __get };
    583569
     570        io.arbiter = create();
     571        io.params = io_params;
     572
    584573        doregister(this);
    585574
     
    594583        ready_mutate_unlock( last_size );
    595584        enable_interrupts_noPoll(); // Don't poll, could be in main cluster
    596 
    597 
    598         this.io.cnt  = num_io;
    599         this.io.ctxs = aalloc(num_io);
    600         for(i; this.io.cnt) {
    601                 (this.io.ctxs[i]){ this, io_params };
    602         }
    603585}
    604586
    605587void ^?{}(cluster & this) {
    606         for(i; this.io.cnt) {
    607                 ^(this.io.ctxs[i]){ true };
    608         }
    609         free(this.io.ctxs);
     588        destroy(this.io.arbiter);
    610589
    611590        // Lock the RWlock so no-one pushes/pops while we are changing the queue
     
    736715}
    737716
    738 
    739717#if defined(__CFA_WITH_VERIFY__)
    740718static bool verify_fwd_bck_rng(void) {
  • libcfa/src/concurrency/kernel_private.hfa

    r342af53 r8e4aa05  
    7777//-----------------------------------------------------------------------------
    7878// I/O
    79 void ^?{}(io_context & this, bool );
     79$io_arbiter * create(void);
     80void destroy($io_arbiter *);
    8081
    8182//=======================================================================
  • libcfa/src/concurrency/locks.cfa

    r342af53 r8e4aa05  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// locks.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2021
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
     17#define __cforall_thread__
     18
    119#include "locks.hfa"
    220#include "kernel_private.hfa"
     
    725//-----------------------------------------------------------------------------
    826// info_thread
    9 forall(dtype L | is_blocking_lock(L)) {
     27forall(L & | is_blocking_lock(L)) {
    1028        struct info_thread {
    1129                // used to put info_thread on a dl queue (aka sequence)
     
    5674
    5775void ^?{}( blocking_lock & this ) {}
    58 void  ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };}
    59 void ^?{}( single_acquisition_lock & this ) {}
    60 void  ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };}
    61 void ^?{}( owner_lock & this ) {}
    62 void  ?{}( multiple_acquisition_lock & this ) {((blocking_lock &)this){ true, false };}
    63 void ^?{}( multiple_acquisition_lock & this ) {}
     76
    6477
    6578void lock( blocking_lock & this ) with( this ) {
     
    170183
    171184//-----------------------------------------------------------------------------
    172 // Overloaded routines for traits
    173 // These routines are temporary until an inheritance bug is fixed
    174 void   lock      ( single_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
    175 void   unlock    ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
    176 void   on_wait   ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
    177 void   on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    178 void   set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    179 size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
    180 
    181 void   lock     ( owner_lock & this ) { lock   ( (blocking_lock &)this ); }
    182 void   unlock   ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }
    183 void   on_wait  ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }
    184 void   on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
    185 void   set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
    186 size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
    187 
    188 void   lock     ( multiple_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
    189 void   unlock   ( multiple_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
    190 void   on_wait  ( multiple_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
    191 void   on_notify( multiple_acquisition_lock & this, struct $thread * t ){ on_notify( (blocking_lock &)this, t ); }
    192 void   set_recursion_count( multiple_acquisition_lock & this, size_t recursion ){ set_recursion_count( (blocking_lock &)this, recursion ); }
    193 size_t get_recursion_count( multiple_acquisition_lock & this ){ return get_recursion_count( (blocking_lock &)this ); }
    194 
    195 //-----------------------------------------------------------------------------
    196185// alarm node wrapper
    197 forall(dtype L | is_blocking_lock(L)) {
     186forall(L & | is_blocking_lock(L)) {
    198187        struct alarm_node_wrap {
    199188                alarm_node_t alarm_node;
     
    239228//-----------------------------------------------------------------------------
    240229// condition variable
    241 forall(dtype L | is_blocking_lock(L)) {
     230forall(L & | is_blocking_lock(L)) {
    242231
    243232        void ?{}( condition_variable(L) & this ){
     
    356345        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time         ) with(this) { WAIT_TIME( info, &l , time ) }
    357346}
     347
     348//-----------------------------------------------------------------------------
     349// Semaphore
     350void  ?{}( semaphore & this, int count = 1 ) {
     351        (this.lock){};
     352        this.count = count;
     353        (this.waiting){};
     354}
     355void ^?{}(semaphore & this) {}
     356
     357bool P(semaphore & this) with( this ){
     358        lock( lock __cfaabi_dbg_ctx2 );
     359        count -= 1;
     360        if ( count < 0 ) {
     361                // queue current task
     362                append( waiting, active_thread() );
     363
     364                // atomically release spin lock and block
     365                unlock( lock );
     366                park();
     367                return true;
     368        }
     369        else {
     370            unlock( lock );
     371            return false;
     372        }
     373}
     374
     375bool V(semaphore & this) with( this ) {
     376        $thread * thrd = 0p;
     377        lock( lock __cfaabi_dbg_ctx2 );
     378        count += 1;
     379        if ( count <= 0 ) {
     380                // remove task at head of waiting list
     381                thrd = pop_head( waiting );
     382        }
     383
     384        unlock( lock );
     385
     386        // make new owner
     387        unpark( thrd );
     388
     389        return thrd != 0p;
     390}
     391
     392bool V(semaphore & this, unsigned diff) with( this ) {
     393        $thread * thrd = 0p;
     394        lock( lock __cfaabi_dbg_ctx2 );
     395        int release = max(-count, (int)diff);
     396        count += diff;
     397        for(release) {
     398                unpark( pop_head( waiting ) );
     399        }
     400
     401        unlock( lock );
     402
     403        return thrd != 0p;
     404}
  • libcfa/src/concurrency/locks.hfa

    r342af53 r8e4aa05  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// locks.hfa -- PUBLIC
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2021
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include <stdbool.h>
    420
    5 #include "bits/locks.hfa"
    6 #include "bits/sequence.hfa"
    7 
    8 #include "invoke.h"
     21#include "bits/weakso_locks.hfa"
    922
    1023#include "time_t.hfa"
    1124#include "time.hfa"
    1225
     26//----------
     27struct single_acquisition_lock {
     28        inline blocking_lock;
     29};
     30
     31static inline void  ?{}( single_acquisition_lock & this ) {((blocking_lock &)this){ false, false };}
     32static inline void ^?{}( single_acquisition_lock & this ) {}
     33static inline void   lock      ( single_acquisition_lock & this ) { lock   ( (blocking_lock &)this ); }
     34static inline void   unlock    ( single_acquisition_lock & this ) { unlock ( (blocking_lock &)this ); }
     35static inline void   on_wait   ( single_acquisition_lock & this ) { on_wait( (blocking_lock &)this ); }
     36static inline void   on_notify ( single_acquisition_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
     37static inline void   set_recursion_count( single_acquisition_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
     38static inline size_t get_recursion_count( single_acquisition_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
     39
     40//----------
     41struct owner_lock {
     42        inline blocking_lock;
     43};
     44
     45static inline void  ?{}( owner_lock & this ) {((blocking_lock &)this){ true, true };}
     46static inline void ^?{}( owner_lock & this ) {}
     47static inline void   lock     ( owner_lock & this ) { lock   ( (blocking_lock &)this ); }
     48static inline void   unlock   ( owner_lock & this ) { unlock ( (blocking_lock &)this ); }
     49static inline void   on_wait  ( owner_lock & this ) { on_wait( (blocking_lock &)this ); }
     50static inline void   on_notify( owner_lock & this, struct $thread * t ) { on_notify( (blocking_lock &)this, t ); }
     51static inline void   set_recursion_count( owner_lock & this, size_t recursion ) { set_recursion_count( (blocking_lock &)this, recursion ); }
     52static inline size_t get_recursion_count( owner_lock & this ) { return get_recursion_count( (blocking_lock &)this ); }
     53
    1354//-----------------------------------------------------------------------------
    1455// is_blocking_lock
    15 trait is_blocking_lock(dtype L | sized(L)) {
     56trait is_blocking_lock(L & | sized(L)) {
    1657        // For synchronization locks to use when acquiring
    1758        void on_notify( L &, struct $thread * );
     
    3172// the info thread is a wrapper around a thread used
    3273// to store extra data for use in the condition variable
    33 forall(dtype L | is_blocking_lock(L)) {
     74forall(L & | is_blocking_lock(L)) {
    3475        struct info_thread;
    3576
     
    4081
    4182//-----------------------------------------------------------------------------
    42 // Blocking Locks
    43 struct blocking_lock {
    44         // Spin lock used for mutual exclusion
    45         __spinlock_t lock;
    46 
    47         // List of blocked threads
    48         Sequence( $thread ) blocked_threads;
    49 
    50         // Count of current blocked threads
    51         size_t wait_count;
    52 
    53         // Flag if the lock allows multiple acquisition
    54         bool multi_acquisition;
    55 
    56         // Flag if lock can be released by non owner
    57         bool strict_owner;
    58 
    59         // Current thread owning the lock
    60         struct $thread * owner;
    61 
    62         // Number of recursion level
    63         size_t recursion_count;
    64 };
    65 
    66 struct single_acquisition_lock {
    67         inline blocking_lock;
    68 };
    69 
    70 struct owner_lock {
    71         inline blocking_lock;
    72 };
    73 
    74 struct multiple_acquisition_lock {
    75         inline blocking_lock;
    76 };
    77 
    78 void  ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner );
    79 void ^?{}( blocking_lock & this );
    80 
    81 void  ?{}( single_acquisition_lock & this );
    82 void ^?{}( single_acquisition_lock & this );
    83 
    84 void  ?{}( owner_lock & this );
    85 void ^?{}( owner_lock & this );
    86 
    87 void  ?{}( multiple_acquisition_lock & this );
    88 void ^?{}( multiple_acquisition_lock & this );
    89 
    90 void lock( blocking_lock & this );
    91 bool try_lock( blocking_lock & this );
    92 void unlock( blocking_lock & this );
    93 void on_notify( blocking_lock & this, struct $thread * t );
    94 void on_wait( blocking_lock & this );
    95 size_t wait_count( blocking_lock & this );
    96 void set_recursion_count( blocking_lock & this, size_t recursion );
    97 size_t get_recursion_count( blocking_lock & this );
    98 
    99 void lock( single_acquisition_lock & this );
    100 void unlock( single_acquisition_lock & this );
    101 void on_notify( single_acquisition_lock & this, struct $thread * t );
    102 void on_wait( single_acquisition_lock & this );
    103 void set_recursion_count( single_acquisition_lock & this, size_t recursion );
    104 size_t get_recursion_count( single_acquisition_lock & this );
    105 
    106 void lock( owner_lock & this );
    107 void unlock( owner_lock & this );
    108 void on_notify( owner_lock & this, struct $thread * t );
    109 void on_wait( owner_lock & this );
    110 void set_recursion_count( owner_lock & this, size_t recursion );
    111 size_t get_recursion_count( owner_lock & this );
    112 
    113 void lock( multiple_acquisition_lock & this );
    114 void unlock( multiple_acquisition_lock & this );
    115 void on_notify( multiple_acquisition_lock & this, struct $thread * t );
    116 void on_wait( multiple_acquisition_lock & this );
    117 void set_recursion_count( multiple_acquisition_lock & this, size_t recursion );
    118 size_t get_recursion_count( multiple_acquisition_lock & this );
    119 
    120 //-----------------------------------------------------------------------------
    12183// Synchronization Locks
    122 forall(dtype L | is_blocking_lock(L)) {
     84forall(L & | is_blocking_lock(L)) {
    12385        struct condition_variable {
    12486                // Spin lock used for mutual exclusion
     
    157119        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time );
    158120}
     121
     122//-----------------------------------------------------------------------------
     123// Semaphore
     124struct semaphore {
     125        __spinlock_t lock;
     126        int count;
     127        __queue_t($thread) waiting;
     128};
     129
     130void  ?{}(semaphore & this, int count = 1);
     131void ^?{}(semaphore & this);
     132bool   P (semaphore & this);
     133bool   V (semaphore & this);
     134bool   V (semaphore & this, unsigned count);
  • libcfa/src/concurrency/monitor.cfa

    r342af53 r8e4aa05  
    5050static inline [$thread *, int] search_entry_queue( const __waitfor_mask_t &, $monitor * monitors [], __lock_size_t count );
    5151
    52 forall(dtype T | sized( T ))
     52forall(T & | sized( T ))
    5353static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val );
    5454static inline __lock_size_t count_max    ( const __waitfor_mask_t & mask );
     
    949949}
    950950
    951 forall(dtype T | sized( T ))
     951forall(T & | sized( T ))
    952952static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) {
    953953        if( !val ) return size;
  • libcfa/src/concurrency/monitor.hfa

    r342af53 r8e4aa05  
    2222#include "stdlib.hfa"
    2323
    24 trait is_monitor(dtype T) {
     24trait is_monitor(T &) {
    2525        $monitor * get_monitor( T & );
    2626        void ^?{}( T & mutex );
     
    5959void ^?{}( monitor_dtor_guard_t & this );
    6060
    61 static inline forall( dtype T | sized(T) | { void ^?{}( T & mutex ); } )
     61static inline forall( T & | sized(T) | { void ^?{}( T & mutex ); } )
    6262void delete( T * th ) {
    63         ^(*th){};
     63        if(th) ^(*th){};
    6464        free( th );
    6565}
  • libcfa/src/concurrency/mutex.cfa

    r342af53 r8e4aa05  
    164164}
    165165
    166 forall(dtype L | is_lock(L))
     166forall(L & | is_lock(L))
    167167void wait(condition_variable & this, L & l) {
    168168        lock( this.lock __cfaabi_dbg_ctx2 );
     
    176176//-----------------------------------------------------------------------------
    177177// Scopes
    178 forall(dtype L | is_lock(L))
     178forall(L & | is_lock(L))
    179179void lock_all  ( L * locks[], size_t count) {
    180180        // Sort locks based on addresses
     
    188188}
    189189
    190 forall(dtype L | is_lock(L))
     190forall(L & | is_lock(L))
    191191void unlock_all( L * locks[], size_t count) {
    192192        // Lock all
  • libcfa/src/concurrency/mutex.hfa

    r342af53 r8e4aa05  
    4242};
    4343
    44 void ?{}(mutex_lock & this);
    45 void ^?{}(mutex_lock & this);
    46 void lock(mutex_lock & this);
    47 bool try_lock(mutex_lock & this);
    48 void unlock(mutex_lock & this);
     44void ?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     45void ^?{}(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     46void lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     47bool try_lock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     48void unlock(mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    4949
    5050// Exclusive lock - recursive
     
    6464};
    6565
    66 void ?{}(recursive_mutex_lock & this);
    67 void ^?{}(recursive_mutex_lock & this);
    68 void lock(recursive_mutex_lock & this);
    69 bool try_lock(recursive_mutex_lock & this);
    70 void unlock(recursive_mutex_lock & this);
     66void ?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     67void ^?{}(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     68void lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     69bool try_lock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     70void unlock(recursive_mutex_lock & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    7171
    72 trait is_lock(dtype L | sized(L)) {
     72trait is_lock(L & | sized(L)) {
    7373        void lock  (L &);
    7474        void unlock(L &);
     
    8686};
    8787
    88 void ?{}(condition_variable & this);
    89 void ^?{}(condition_variable & this);
     88void ?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     89void ^?{}(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9090
    91 void notify_one(condition_variable & this);
    92 void notify_all(condition_variable & this);
     91void notify_one(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
     92void notify_all(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9393
    94 void wait(condition_variable & this);
     94void wait(condition_variable & this) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9595
    96 forall(dtype L | is_lock(L))
    97 void wait(condition_variable & this, L & l);
     96forall(L & | is_lock(L))
     97void wait(condition_variable & this, L & l) __attribute__((deprecated("use concurrency/locks.hfa instead")));
    9898
    9999//-----------------------------------------------------------------------------
    100100// Scopes
    101 forall(dtype L | is_lock(L)) {
     101forall(L & | is_lock(L)) {
    102102        #if !defined( __TUPLE_ARRAYS_EXIST__ )
    103103        void lock  ( L * locks [], size_t count);
  • libcfa/src/concurrency/preemption.cfa

    r342af53 r8e4aa05  
    424424static void timeout( $thread * this ) {
    425425        unpark( this );
     426}
     427
     428void __disable_interrupts_hard() {
     429        sigset_t oldset;
     430        int ret;
     431        ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset);  // workaround trac#208: cast should be unnecessary
     432        if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); }
     433
     434        ret = sigismember(&oldset, SIGUSR1);
     435        if(ret <  0) { abort("ERROR sigismember returned %d", ret); }
     436        if(ret == 1) { abort("ERROR SIGUSR1 is disabled"); }
     437
     438        ret = sigismember(&oldset, SIGALRM);
     439        if(ret <  0) { abort("ERROR sigismember returned %d", ret); }
     440        if(ret == 0) { abort("ERROR SIGALRM is enabled"); }
     441
     442        signal_block( SIGUSR1 );
     443}
     444
     445void __enable_interrupts_hard() {
     446        signal_unblock( SIGUSR1 );
     447
     448        sigset_t oldset;
     449        int ret;
     450        ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset);  // workaround trac#208: cast should be unnecessary
     451        if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); }
     452
     453        ret = sigismember(&oldset, SIGUSR1);
     454        if(ret <  0) { abort("ERROR sigismember returned %d", ret); }
     455        if(ret == 1) { abort("ERROR SIGUSR1 is disabled"); }
     456
     457        ret = sigismember(&oldset, SIGALRM);
     458        if(ret <  0) { abort("ERROR sigismember returned %d", ret); }
     459        if(ret == 0) { abort("ERROR SIGALRM is enabled"); }
    426460}
    427461
     
    551585
    552586        // Setup proper signal handlers
    553         __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO | SA_RESTART ); // __cfactx_switch handler
    554         __cfaabi_sigaction( SIGALRM, sigHandler_alarm    , SA_SIGINFO | SA_RESTART ); // debug handler
     587        __cfaabi_sigaction( SIGUSR1, sigHandler_ctxSwitch, SA_SIGINFO ); // __cfactx_switch handler
     588        __cfaabi_sigaction( SIGALRM, sigHandler_alarm    , SA_SIGINFO ); // debug handler
    555589
    556590        signal_block( SIGALRM );
     
    580614
    581615        __cfaabi_dbg_print_safe( "Kernel : Preemption stopped\n" );
     616}
     617
     618// Prevent preemption since we are about to start terminating things
     619void __kernel_abort_lock(void) {
     620        signal_block( SIGUSR1 );
    582621}
    583622
  • libcfa/src/concurrency/ready_queue.cfa

    r342af53 r8e4aa05  
    330330        #if defined(BIAS)
    331331                // Don't bother trying locally too much
    332                 int local_tries = 8;
    333332                preferred = kernelTLS().this_processor->id * 4;
    334333        #endif
  • libcfa/src/concurrency/stats.cfa

    r342af53 r8e4aa05  
    2525
    2626                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    27                         stats->io.submit_q.submit_avg.rdy = 0;
    28                         stats->io.submit_q.submit_avg.csm = 0;
    29                         stats->io.submit_q.submit_avg.cnt = 0;
    30                         stats->io.submit_q.look_avg.val   = 0;
    31                         stats->io.submit_q.look_avg.cnt   = 0;
    32                         stats->io.submit_q.look_avg.block = 0;
    33                         stats->io.submit_q.alloc_avg.val   = 0;
    34                         stats->io.submit_q.alloc_avg.cnt   = 0;
    35                         stats->io.submit_q.alloc_avg.block = 0;
    36                         stats->io.submit_q.helped = 0;
    37                         stats->io.submit_q.leader = 0;
    38                         stats->io.submit_q.busy   = 0;
    39                         stats->io.complete_q.completed_avg.val = 0;
    40                         stats->io.complete_q.completed_avg.cnt = 0;
    41                         stats->io.complete_q.blocks = 0;
     27                        stats->io.alloc.fast        = 0;
     28                        stats->io.alloc.slow        = 0;
     29                        stats->io.alloc.fail        = 0;
     30                        stats->io.alloc.revoke      = 0;
     31                        stats->io.alloc.block       = 0;
     32                        stats->io.submit.fast       = 0;
     33                        stats->io.submit.slow       = 0;
     34                        stats->io.flush.external    = 0;
     35                        stats->io.calls.flush       = 0;
     36                        stats->io.calls.submitted   = 0;
     37                        stats->io.calls.drain       = 0;
     38                        stats->io.calls.completed   = 0;
     39                        stats->io.calls.errors.busy = 0;
     40                        stats->io.poller.sleeps     = 0;
    4241                #endif
    4342        }
     
    6059
    6160                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    62                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.rdy     , proc->io.submit_q.submit_avg.rdy     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.rdy      = 0;
    63                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.csm     , proc->io.submit_q.submit_avg.csm     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.csm      = 0;
    64                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.avl     , proc->io.submit_q.submit_avg.avl     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.avl      = 0;
    65                         __atomic_fetch_add( &cltr->io.submit_q.submit_avg.cnt     , proc->io.submit_q.submit_avg.cnt     , __ATOMIC_SEQ_CST ); proc->io.submit_q.submit_avg.cnt      = 0;
    66                         __atomic_fetch_add( &cltr->io.submit_q.look_avg.val       , proc->io.submit_q.look_avg.val       , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.val        = 0;
    67                         __atomic_fetch_add( &cltr->io.submit_q.look_avg.cnt       , proc->io.submit_q.look_avg.cnt       , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.cnt        = 0;
    68                         __atomic_fetch_add( &cltr->io.submit_q.look_avg.block     , proc->io.submit_q.look_avg.block     , __ATOMIC_SEQ_CST ); proc->io.submit_q.look_avg.block      = 0;
    69                         __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.val      , proc->io.submit_q.alloc_avg.val      , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.val       = 0;
    70                         __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.cnt      , proc->io.submit_q.alloc_avg.cnt      , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.cnt       = 0;
    71                         __atomic_fetch_add( &cltr->io.submit_q.alloc_avg.block    , proc->io.submit_q.alloc_avg.block    , __ATOMIC_SEQ_CST ); proc->io.submit_q.alloc_avg.block     = 0;
    72                         __atomic_fetch_add( &cltr->io.submit_q.helped             , proc->io.submit_q.helped             , __ATOMIC_SEQ_CST ); proc->io.submit_q.helped              = 0;
    73                         __atomic_fetch_add( &cltr->io.submit_q.leader             , proc->io.submit_q.leader             , __ATOMIC_SEQ_CST ); proc->io.submit_q.leader              = 0;
    74                         __atomic_fetch_add( &cltr->io.submit_q.busy               , proc->io.submit_q.busy               , __ATOMIC_SEQ_CST ); proc->io.submit_q.busy                = 0;
    75                         __atomic_fetch_add( &cltr->io.complete_q.completed_avg.val, proc->io.complete_q.completed_avg.val, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.val = 0;
    76                         __atomic_fetch_add( &cltr->io.complete_q.completed_avg.cnt, proc->io.complete_q.completed_avg.cnt, __ATOMIC_SEQ_CST ); proc->io.complete_q.completed_avg.cnt = 0;
    77                         __atomic_fetch_add( &cltr->io.complete_q.blocks           , proc->io.complete_q.blocks           , __ATOMIC_SEQ_CST ); proc->io.complete_q.blocks            = 0;
     61                        __atomic_fetch_add( &cltr->io.alloc.fast       , proc->io.alloc.fast       , __ATOMIC_SEQ_CST ); proc->io.alloc.fast        = 0;
     62                        __atomic_fetch_add( &cltr->io.alloc.slow       , proc->io.alloc.slow       , __ATOMIC_SEQ_CST ); proc->io.alloc.slow        = 0;
     63                        __atomic_fetch_add( &cltr->io.alloc.fail       , proc->io.alloc.fail       , __ATOMIC_SEQ_CST ); proc->io.alloc.fail        = 0;
     64                        __atomic_fetch_add( &cltr->io.alloc.revoke     , proc->io.alloc.revoke     , __ATOMIC_SEQ_CST ); proc->io.alloc.revoke      = 0;
     65                        __atomic_fetch_add( &cltr->io.alloc.block      , proc->io.alloc.block      , __ATOMIC_SEQ_CST ); proc->io.alloc.block       = 0;
     66                        __atomic_fetch_add( &cltr->io.submit.fast      , proc->io.submit.fast      , __ATOMIC_SEQ_CST ); proc->io.submit.fast       = 0;
     67                        __atomic_fetch_add( &cltr->io.submit.slow      , proc->io.submit.slow      , __ATOMIC_SEQ_CST ); proc->io.submit.slow       = 0;
     68                        __atomic_fetch_add( &cltr->io.flush.external   , proc->io.flush.external   , __ATOMIC_SEQ_CST ); proc->io.flush.external    = 0;
     69                        __atomic_fetch_add( &cltr->io.calls.flush      , proc->io.calls.flush      , __ATOMIC_SEQ_CST ); proc->io.calls.flush       = 0;
     70                        __atomic_fetch_add( &cltr->io.calls.submitted  , proc->io.calls.submitted  , __ATOMIC_SEQ_CST ); proc->io.calls.submitted   = 0;
     71                        __atomic_fetch_add( &cltr->io.calls.drain      , proc->io.calls.drain      , __ATOMIC_SEQ_CST ); proc->io.calls.drain       = 0;
     72                        __atomic_fetch_add( &cltr->io.calls.completed  , proc->io.calls.completed  , __ATOMIC_SEQ_CST ); proc->io.calls.completed   = 0;
     73                        __atomic_fetch_add( &cltr->io.calls.errors.busy, proc->io.calls.errors.busy, __ATOMIC_SEQ_CST ); proc->io.calls.errors.busy = 0;
     74                        __atomic_fetch_add( &cltr->io.poller.sleeps    , proc->io.poller.sleeps    , __ATOMIC_SEQ_CST ); proc->io.poller.sleeps     = 0;
    7875                #endif
    7976        }
     
    8279
    8380                if( flags & CFA_STATS_READY_Q ) {
    84                         double push_sur = (100.0 * ((double)ready.pick.push.success) / ready.pick.push.attempt);
    85                         double pop_sur  = (100.0 * ((double)ready.pick.pop .success) / ready.pick.pop .attempt);
    86 
    8781                        double push_len = ((double)ready.pick.push.attempt) / ready.pick.push.success;
    8882                        double pop_len  = ((double)ready.pick.pop .attempt) / ready.pick.pop .success;
    89 
    90                         double lpush_sur = (100.0 * ((double)ready.pick.push.lsuccess) / ready.pick.push.local);
    91                         double lpop_sur  = (100.0 * ((double)ready.pick.pop .lsuccess) / ready.pick.pop .local);
    9283
    9384                        double lpush_len = ((double)ready.pick.push.local) / ready.pick.push.lsuccess;
     
    9687                        __cfaabi_bits_print_safe( STDOUT_FILENO,
    9788                                "----- %s \"%s\" (%p) - Ready Q Stats -----\n"
    98                                 "- total threads run      : %'15" PRIu64 "\n"
    99                                 "- total threads scheduled: %'15" PRIu64 "\n"
    100                                 "- push average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    101                                 "- pop  average probe len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    102                                 "- local push avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    103                                 "- local pop  avg prb len : %'18.2lf, %'18.2lf%% (%'15" PRIu64 " attempts)\n"
    104                                 "- thread migrations      : %'15" PRIu64 "\n"
    105                                 "- Idle Sleep -\n"
    106                                 "-- halts                 : %'15" PRIu64 "\n"
    107                                 "-- cancelled halts       : %'15" PRIu64 "\n"
    108                                 "-- schedule wake         : %'15" PRIu64 "\n"
    109                                 "-- wake on exit          : %'15" PRIu64 "\n"
     89                                "- total threads  : %'15" PRIu64 "run, %'15" PRIu64 "schd (%'" PRIu64 "mig )\n"
     90                                "- push avg probe : %'3.2lf, %'3.2lfl (%'15" PRIu64 " attempts, %'15" PRIu64 " locals)\n"
     91                                "- pop  avg probe : %'3.2lf, %'3.2lfl (%'15" PRIu64 " attempts, %'15" PRIu64 " locals)\n"
     92                                "- Idle Sleep     : %'15" PRIu64 "h, %'15" PRIu64 "c, %'15" PRIu64 "w, %'15" PRIu64 "e\n"
    11093                                "\n"
    11194                                , type, name, id
    11295                                , ready.pick.pop.success
    11396                                , ready.pick.push.success
    114                                 , push_len, push_sur, ready.pick.push.attempt
    115                                 , pop_len , pop_sur , ready.pick.pop .attempt
    116                                 , lpush_len, lpush_sur, ready.pick.push.local
    117                                 , lpop_len , lpop_sur , ready.pick.pop .local
    11897                                , ready.threads.migration
     98                                , push_len, lpush_len, ready.pick.push.attempt, ready.pick.push.local
     99                                , pop_len , lpop_len , ready.pick.pop .attempt, ready.pick.pop .local
    119100                                , ready.sleep.halts, ready.sleep.cancels, ready.sleep.wakes, ready.sleep.exits
    120101                        );
     
    123104                #if defined(CFA_HAVE_LINUX_IO_URING_H)
    124105                        if( flags & CFA_STATS_IO ) {
    125                                 double avgrdy = ((double)io.submit_q.submit_avg.rdy) / io.submit_q.submit_avg.cnt;
    126                                 double avgcsm = ((double)io.submit_q.submit_avg.csm) / io.submit_q.submit_avg.cnt;
     106                                uint64_t total_allocs = io.alloc.fast + io.alloc.slow;
     107                                double avgfasta = ((double)io.alloc.fast) / total_allocs;
    127108
    128                                 double lavgv = 0;
    129                                 double lavgb = 0;
    130                                 if(io.submit_q.look_avg.cnt != 0) {
    131                                         lavgv = ((double)io.submit_q.look_avg.val  ) / io.submit_q.look_avg.cnt;
    132                                         lavgb = ((double)io.submit_q.look_avg.block) / io.submit_q.look_avg.cnt;
    133                                 }
     109                                uint64_t total_submits = io.submit.fast + io.submit.slow;
     110                                double avgfasts = ((double)io.submit.fast) / total_submits;
    134111
    135                                 double aavgv = 0;
    136                                 double aavgb = 0;
    137                                 if(io.submit_q.alloc_avg.cnt != 0) {
    138                                         aavgv = ((double)io.submit_q.alloc_avg.val  ) / io.submit_q.alloc_avg.cnt;
    139                                         aavgb = ((double)io.submit_q.alloc_avg.block) / io.submit_q.alloc_avg.cnt;
    140                                 }
     112                                double avgsubs = ((double)io.calls.submitted) / io.calls.flush;
     113                                double avgcomp = ((double)io.calls.completed) / io.calls.drain;
    141114
    142115                                __cfaabi_bits_print_safe( STDOUT_FILENO,
    143116                                        "----- %s \"%s\" (%p) - I/O Stats -----\n"
    144                                         "- total submit calls     : %'15" PRIu64 "\n"
    145                                         "- avg ready entries      : %'18.2lf\n"
    146                                         "- avg submitted entries  : %'18.2lf\n"
    147                                         "- total helped entries   : %'15" PRIu64 "\n"
    148                                         "- total leader entries   : %'15" PRIu64 "\n"
    149                                         "- total busy submit      : %'15" PRIu64 "\n"
    150                                         "- total ready search     : %'15" PRIu64 "\n"
    151                                         "- avg ready search len   : %'18.2lf\n"
    152                                         "- avg ready search block : %'18.2lf\n"
    153                                         "- total alloc search     : %'15" PRIu64 "\n"
    154                                         "- avg alloc search len   : %'18.2lf\n"
    155                                         "- avg alloc search block : %'18.2lf\n"
    156                                         "- total wait calls       : %'15" PRIu64 "\n"
    157                                         "- avg completion/wait    : %'18.2lf\n"
    158                                         "- total completion blocks: %'15" PRIu64 "\n"
     117                                        "- total allocations : %'" PRIu64 "f, %'" PRIu64 "s (%'2.2lff) \n"
     118                                        "-     failures      : %'" PRIu64 "oom, %'" PRIu64 "rvk, %'" PRIu64 "blk\n"
     119                                        "- total submits     : %'" PRIu64 "f, %'" PRIu64 "s (%'2.2lf) \n"
     120                                        "- flush external    : %'" PRIu64 "\n"
     121                                        "- io_uring_enter    : %'" PRIu64 " (%'" PRIu64 ", %'" PRIu64 " EBUSY)\n"
     122                                        "-     submits       : %'" PRIu64 " (%'.2lf) \n"
     123                                        "-     completes     : %'" PRIu64 " (%'.2lf) \n"
     124                                        "- poller sleeping   : %'" PRIu64 "\n"
    159125                                        "\n"
    160126                                        , type,  name, id
    161                                         , io.submit_q.submit_avg.cnt
    162                                         , avgrdy, avgcsm
    163                                         , io.submit_q.helped, io.submit_q.leader, io.submit_q.busy
    164                                         , io.submit_q.look_avg.cnt
    165                                         , lavgv, lavgb
    166                                         , io.submit_q.alloc_avg.cnt
    167                                         , aavgv, aavgb
    168                                         , io.complete_q.completed_avg.cnt
    169                                         , ((double)io.complete_q.completed_avg.val) / io.complete_q.completed_avg.cnt
    170                                         , io.complete_q.blocks
     127                                        , io.alloc.fast, io.alloc.slow, avgfasta
     128                                        , io.alloc.fail, io.alloc.revoke, io.alloc.block
     129                                        , io.submit.fast, io.submit.slow, avgfasts
     130                                        , io.flush.external
     131                                        , io.calls.flush, io.calls.drain, io.calls.errors.busy
     132                                        , io.calls.submitted, avgsubs
     133                                        , io.calls.completed, avgcomp
     134                                        , io.poller.sleeps
    171135                                );
    172136                        }
  • libcfa/src/concurrency/stats.hfa

    r342af53 r8e4aa05  
    22
    33#include <stdint.h>
     4
     5enum {
     6        CFA_STATS_READY_Q  = 0x01,
     7        CFA_STATS_IO = 0x02,
     8};
    49
    510#if defined(__CFA_NO_STATISTICS__)
     
    914        static inline void __print_stats( struct __stats_t *, int, const char *, const char *, void * ) {}
    1015#else
    11         enum {
    12                 CFA_STATS_READY_Q  = 0x01,
    13                 #if defined(CFA_HAVE_LINUX_IO_URING_H)
    14                         CFA_STATS_IO = 0x02,
    15                 #endif
    16         };
    1716
    1817        struct __attribute__((aligned(64))) __stats_readQ_t {
     
    6766                struct __attribute__((aligned(64))) __stats_io_t{
    6867                        struct {
     68                                volatile uint64_t fast;
     69                                volatile uint64_t slow;
     70                                volatile uint64_t fail;
     71                                volatile uint64_t revoke;
     72                                volatile uint64_t block;
     73                        } alloc;
     74                        struct {
     75                                volatile uint64_t fast;
     76                                volatile uint64_t slow;
     77                        } submit;
     78                        struct {
     79                                volatile uint64_t external;
     80                        } flush;
     81                        struct {
     82                                volatile uint64_t drain;
     83                                volatile uint64_t completed;
     84                                volatile uint64_t flush;
     85                                volatile uint64_t submitted;
    6986                                struct {
    70                                         volatile uint64_t rdy;
    71                                         volatile uint64_t csm;
    72                                         volatile uint64_t avl;
    73                                         volatile uint64_t cnt;
    74                                 } submit_avg;
    75                                 struct {
    76                                         volatile uint64_t val;
    77                                         volatile uint64_t cnt;
    78                                         volatile uint64_t block;
    79                                 } look_avg;
    80                                 struct {
    81                                         volatile uint64_t val;
    82                                         volatile uint64_t cnt;
    83                                         volatile uint64_t block;
    84                                 } alloc_avg;
    85                                 volatile uint64_t helped;
    86                                 volatile uint64_t leader;
    87                                 volatile uint64_t busy;
    88                         } submit_q;
     87                                        volatile uint64_t busy;
     88                                } errors;
     89                        } calls;
    8990                        struct {
    90                                 struct {
    91                                         volatile uint64_t val;
    92                                         volatile uint64_t cnt;
    93                                 } completed_avg;
    94                                 volatile uint64_t blocks;
    95                         } complete_q;
     91                                volatile uint64_t sleeps;
     92                        } poller;
    9693                };
    9794        #endif
  • libcfa/src/concurrency/thread.cfa

    r342af53 r8e4aa05  
    6262}
    6363
    64 FORALL_DATA_INSTANCE(ThreadCancelled, (dtype thread_t), (thread_t))
     64FORALL_DATA_INSTANCE(ThreadCancelled, (thread_t &), (thread_t))
    6565
    66 forall(dtype T)
     66forall(T &)
    6767void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src) {
    6868        dst->virtual_table = src->virtual_table;
     
    7171}
    7272
    73 forall(dtype T)
     73forall(T &)
    7474const char * msg(ThreadCancelled(T) *) {
    7575        return "ThreadCancelled";
    7676}
    7777
    78 forall(dtype T)
     78forall(T &)
    7979static void default_thread_cancel_handler(ThreadCancelled(T) & ) {
    8080        abort( "Unhandled thread cancellation.\n" );
    8181}
    8282
    83 forall(dtype T | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
     83forall(T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)))
    8484void ?{}( thread_dtor_guard_t & this,
    85                 T & thrd, void(*defaultResumptionHandler)(ThreadCancelled(T) &)) {
    86         $monitor * m = get_monitor(thrd);
     85                T & thrd, void(*cancelHandler)(ThreadCancelled(T) &)) {
     86        $monitor * m = get_monitor(thrd);
    8787        $thread * desc = get_thread(thrd);
    8888
    8989        // Setup the monitor guard
    9090        void (*dtor)(T& mutex this) = ^?{};
    91         bool join = defaultResumptionHandler != (void(*)(ThreadCancelled(T)&))0;
     91        bool join = cancelHandler != (void(*)(ThreadCancelled(T)&))0;
    9292        (this.mg){&m, (void(*)())dtor, join};
    9393
     
    103103        }
    104104        desc->state = Cancelled;
    105         if (!join) {
    106                 defaultResumptionHandler = default_thread_cancel_handler;
    107         }
     105        void(*defaultResumptionHandler)(ThreadCancelled(T) &) =
     106                join ? cancelHandler : default_thread_cancel_handler;
    108107
    109108        ThreadCancelled(T) except;
     
    125124//-----------------------------------------------------------------------------
    126125// Starting and stopping threads
    127 forall( dtype T | is_thread(T) )
     126forall( T & | is_thread(T) )
    128127void __thrd_start( T & this, void (*main_p)(T &) ) {
    129128        $thread * this_thrd = get_thread(this);
     
    141140//-----------------------------------------------------------------------------
    142141// Support for threads that don't ues the thread keyword
    143 forall( dtype T | sized(T) | is_thread(T) | { void ?{}(T&); } )
     142forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } )
    144143void ?{}( scoped(T)& this ) with( this ) {
    145144        handle{};
     
    147146}
    148147
    149 forall( dtype T, ttype P | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
     148forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
    150149void ?{}( scoped(T)& this, P params ) with( this ) {
    151150        handle{ params };
     
    153152}
    154153
    155 forall( dtype T | sized(T) | is_thread(T) )
     154forall( T & | sized(T) | is_thread(T) )
    156155void ^?{}( scoped(T)& this ) with( this ) {
    157156        ^handle{};
     
    159158
    160159//-----------------------------------------------------------------------------
    161 forall(dtype T | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
     160forall(T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)))
    162161T & join( T & this ) {
    163162        thread_dtor_guard_t guard = { this, defaultResumptionHandler };
  • libcfa/src/concurrency/thread.hfa

    r342af53 r8e4aa05  
    2626//-----------------------------------------------------------------------------
    2727// thread trait
    28 trait is_thread(dtype T) {
     28trait is_thread(T &) {
    2929        void ^?{}(T& mutex this);
    3030        void main(T& this);
     
    3232};
    3333
    34 FORALL_DATA_EXCEPTION(ThreadCancelled, (dtype thread_t), (thread_t)) (
     34FORALL_DATA_EXCEPTION(ThreadCancelled, (thread_t &), (thread_t)) (
    3535        thread_t * the_thread;
    3636        exception_t * the_exception;
    3737);
    3838
    39 forall(dtype T)
     39forall(T &)
    4040void copy(ThreadCancelled(T) * dst, ThreadCancelled(T) * src);
    4141
    42 forall(dtype T)
     42forall(T &)
    4343const char * msg(ThreadCancelled(T) *);
    4444
     
    4747
    4848// Inline getters for threads/coroutines/monitors
    49 forall( dtype T | is_thread(T) )
     49forall( T & | is_thread(T) )
    5050static inline $coroutine* get_coroutine(T & this) __attribute__((const)) { return &get_thread(this)->self_cor; }
    5151
    52 forall( dtype T | is_thread(T) )
     52forall( T & | is_thread(T) )
    5353static inline $monitor  * get_monitor  (T & this) __attribute__((const)) { return &get_thread(this)->self_mon; }
    5454
     
    6060extern struct cluster * mainCluster;
    6161
    62 forall( dtype T | is_thread(T) )
     62forall( T & | is_thread(T) )
    6363void __thrd_start( T & this, void (*)(T &) );
    6464
     
    8282};
    8383
    84 forall( dtype T | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
     84forall( T & | is_thread(T) | IS_EXCEPTION(ThreadCancelled, (T)) )
    8585void ?{}( thread_dtor_guard_t & this, T & thrd, void(*)(ThreadCancelled(T) &) );
    8686void ^?{}( thread_dtor_guard_t & this );
     
    8989// thread runner
    9090// Structure that actually start and stop threads
    91 forall( dtype T | sized(T) | is_thread(T) )
     91forall( T & | sized(T) | is_thread(T) )
    9292struct scoped {
    9393        T handle;
    9494};
    9595
    96 forall( dtype T | sized(T) | is_thread(T) | { void ?{}(T&); } )
     96forall( T & | sized(T) | is_thread(T) | { void ?{}(T&); } )
    9797void ?{}( scoped(T)& this );
    9898
    99 forall( dtype T, ttype P | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
     99forall( T &, P... | sized(T) | is_thread(T) | { void ?{}(T&, P); } )
    100100void ?{}( scoped(T)& this, P params );
    101101
    102 forall( dtype T | sized(T) | is_thread(T) )
     102forall( T & | sized(T) | is_thread(T) )
    103103void ^?{}( scoped(T)& this );
    104104
     
    115115void unpark( $thread * this );
    116116
    117 forall( dtype T | is_thread(T) )
     117forall( T & | is_thread(T) )
    118118static inline void unpark( T & this ) { if(!&this) return; unpark( get_thread( this ) );}
    119119
     
    128128//----------
    129129// join
    130 forall( dtype T | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
     130forall( T & | is_thread(T) | IS_RESUMPTION_EXCEPTION(ThreadCancelled, (T)) )
    131131T & join( T & this );
    132132
Note: See TracChangeset for help on using the changeset viewer.