Ignore:
Timestamp:
Mar 24, 2023, 4:51:11 PM (21 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
512d937c
Parents:
0e16a2d (diff), 1633e04 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/channel.hfa

    r0e16a2d r75d874a  
    22
    33#include <locks.hfa>
    4 
    5 struct no_reacq_lock {
    6     inline exp_backoff_then_block_lock;
    7 };
    8 
    9 // have to override these by hand to get around plan 9 inheritance bug where resolver can't find the appropriate routine to call
    10 static inline void   ?{}( no_reacq_lock & this ) { ((exp_backoff_then_block_lock &)this){}; }
    11 static inline bool   try_lock(no_reacq_lock & this) { return try_lock(((exp_backoff_then_block_lock &)this)); }
    12 static inline void   lock(no_reacq_lock & this) { lock(((exp_backoff_then_block_lock &)this)); }
    13 static inline void   unlock(no_reacq_lock & this) { unlock(((exp_backoff_then_block_lock &)this)); }
    14 static inline void   on_notify(no_reacq_lock & this, struct thread$ * t ) { on_notify(((exp_backoff_then_block_lock &)this), t); }
    15 static inline size_t on_wait(no_reacq_lock & this) { return on_wait(((exp_backoff_then_block_lock &)this)); }
    16 // override wakeup so that we don't reacquire the lock if using a condvar
    17 static inline void   on_wakeup( no_reacq_lock & this, size_t recursion ) {}
    18 
    19 #define __PREVENTION_CHANNEL
     4#include <list.hfa>
     5
     6#define __COOP_CHANNEL
    207#ifdef __PREVENTION_CHANNEL
    218forall( T ) {
    229struct channel {
    23     size_t size;
    24     size_t front, back, count;
     10    size_t size, count, front, back;
    2511    T * buffer;
    2612    thread$ * chair;
     
    8773        return;
    8874    }
    89     else insert_( chan, elem );
     75    insert_( chan, elem );
    9076
    9177    unlock( mutex_lock );
     
    11096
    11197    // wait if buffer is empty, work will be completed by someone else
    112     if ( count == 0 ) { 
     98    if ( count == 0 ) {
    11399        chair = active_thread();
    114100        chair_elem = &retval;
     
    121107    memcpy((void *)&retval, (void *)&buffer[front], sizeof(T));
    122108    count -= 1;
    123     front = (front + 1) % size;
     109    front++;
     110    if ( front == size ) front = 0;
    124111
    125112    if ( chair != 0p ) {
     
    142129
    143130#ifdef __COOP_CHANNEL
     131
     132// link field used for threads waiting on channel
     133struct wait_link {
     134    // used to put wait_link on a dl queue
     135    inline dlink(wait_link);
     136
     137    // waiting thread
     138    struct thread$ * t;
     139
     140    // shadow field
     141    void * elem;
     142};
     143P9_EMBEDDED( wait_link, dlink(wait_link) )
     144
     145static inline void ?{}( wait_link & this, thread$ * t, void * elem ) {
     146    this.t = t;
     147    this.elem = elem;
     148}
     149
    144150forall( T ) {
     151
    145152struct channel {
    146153    size_t size;
    147154    size_t front, back, count;
    148155    T * buffer;
    149     fast_cond_var( no_reacq_lock ) prods, cons;
    150     no_reacq_lock mutex_lock;
     156    dlist( wait_link ) prods, cons;
     157    exp_backoff_then_block_lock mutex_lock;
    151158};
    152159
     
    164171static inline size_t get_count( channel(T) & chan ) with(chan) { return count; }
    165172static inline size_t get_size( channel(T) & chan ) with(chan) { return size; }
    166 static inline bool has_waiters( channel(T) & chan ) with(chan) { return !empty( cons ) || !empty( prods ); }
    167 static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !empty( cons ); }
    168 static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !empty( prods ); }
     173static inline bool has_waiters( channel(T) & chan ) with(chan) { return !cons`isEmpty || !prods`isEmpty; }
     174static inline bool has_waiting_consumers( channel(T) & chan ) with(chan) { return !cons`isEmpty; }
     175static inline bool has_waiting_producers( channel(T) & chan ) with(chan) { return !prods`isEmpty; }
    169176
    170177static inline void insert_( channel(T) & chan, T & elem ) with(chan) {
     
    175182}
    176183
     184static inline void wake_one( dlist( wait_link ) & queue ) {
     185    wait_link & popped = try_pop_front( queue );
     186    unpark( popped.t );
     187}
     188
     189static inline void block( dlist( wait_link ) & queue, void * elem_ptr, exp_backoff_then_block_lock & lock ) {
     190    wait_link w{ active_thread(), elem_ptr };
     191    insert_last( queue, w );
     192    unlock( lock );
     193    park();
     194}
    177195
    178196static inline void insert( channel(T) & chan, T elem ) with(chan) {
     
    180198
    181199    // have to check for the zero size channel case
    182     if ( size == 0 && !empty( cons ) ) {
    183         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    184         notify_one( cons );
     200    if ( size == 0 && !cons`isEmpty ) {
     201        memcpy(cons`first.elem, (void *)&elem, sizeof(T));
     202        wake_one( cons );
    185203        unlock( mutex_lock );
    186204        return;
     
    188206
    189207    // wait if buffer is full, work will be completed by someone else
    190     if ( count == size ) { 
    191         wait( prods, mutex_lock, (uintptr_t)&elem );
     208    if ( count == size ) {
     209        block( prods, &elem, mutex_lock );
    192210        return;
    193211    } // if
    194212
    195     if ( count == 0 && !empty( cons ) )
    196         // do waiting consumer work
    197         memcpy((void *)front( cons ), (void *)&elem, sizeof(T));
    198     else insert_( chan, elem );
     213    if ( count == 0 && !cons`isEmpty ) {
     214        memcpy(cons`first.elem, (void *)&elem, sizeof(T)); // do waiting consumer work
     215        wake_one( cons );
     216    } else insert_( chan, elem );
    199217   
    200     notify_one( cons );
    201218    unlock( mutex_lock );
    202219}
     
    207224
    208225    // have to check for the zero size channel case
    209     if ( size == 0 && !empty( prods ) ) {
    210         memcpy((void *)&retval, (void *)front( prods ), sizeof(T));
    211         notify_one( prods );
     226    if ( size == 0 && !prods`isEmpty ) {
     227        memcpy((void *)&retval, (void *)prods`first.elem, sizeof(T));
     228        wake_one( prods );
    212229        unlock( mutex_lock );
    213230        return retval;
     
    215232
    216233    // wait if buffer is empty, work will be completed by someone else
    217     if (count == 0) { 
    218         wait( cons, mutex_lock, (uintptr_t)&retval );
     234    if (count == 0) {
     235        block( cons, &retval, mutex_lock );
    219236        return retval;
    220237    }
     
    225242    front = (front + 1) % size;
    226243
    227     if (count == size - 1 && !empty( prods ) )
    228         insert_( chan, *((T *)front( prods )) );  // do waiting producer work
    229 
    230     notify_one( prods );
     244    if (count == size - 1 && !prods`isEmpty ) {
     245        insert_( chan, *(T *)prods`first.elem );  // do waiting producer work
     246        wake_one( prods );
     247    }
     248
    231249    unlock( mutex_lock );
    232250    return retval;
    233251}
    234 
    235252} // forall( T )
    236253#endif
  • libcfa/src/concurrency/io.cfa

    r0e16a2d r75d874a  
    295295                                // make sure the target hasn't stopped existing since last time
    296296                                HELP: if(target < ctxs_count) {
    297                                         // calculate it's age and how young it could be before we give ip on helping
     297                                        // calculate it's age and how young it could be before we give up on helping
    298298                                        const __readyQ_avg_t cutoff = calc_cutoff(ctsc, ctx->cq.id, ctxs_count, io.data, io.tscs, __shard_factor.io, false);
    299299                                        const __readyQ_avg_t age = moving_average(ctsc, io.tscs[target].t.tv, io.tscs[target].t.ma, false);
  • libcfa/src/concurrency/io/call.cfa.in

    r0e16a2d r75d874a  
    8989#if defined(CFA_HAVE_PREADV2)
    9090        struct iovec;
    91         extern ssize_t preadv2 (int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     91        extern ssize_t preadv2 (int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags);
    9292#endif
    9393#if defined(CFA_HAVE_PWRITEV2)
    9494        struct iovec;
    95         extern ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags);
     95        extern ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags);
    9696#endif
    9797
     
    108108        struct msghdr;
    109109        struct sockaddr;
    110         extern ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
    111         extern ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);
    112         extern ssize_t send(int sockfd, const void *buf, size_t len, int flags);
    113         extern ssize_t recv(int sockfd, void *buf, size_t len, int flags);
     110        extern ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags);
     111        extern ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags);
     112        extern ssize_t send(int sockfd, const void * buf, size_t len, int flags);
     113        extern ssize_t recv(int sockfd, void * buf, size_t len, int flags);
    114114
    115115        extern int fallocate(int fd, int mode, off_t offset, off_t len);
    116116        extern int posix_fadvise(int fd, off_t offset, off_t len, int advice);
    117         extern int madvise(void *addr, size_t length, int advice);
    118 
    119         extern int openat(int dirfd, const char *pathname, int flags, mode_t mode);
     117        extern int madvise(void * addr, size_t length, int advice);
     118
     119        extern int openat(int dirfd, const char * pathname, int flags, mode_t mode);
    120120        extern int close(int fd);
    121121
    122         extern ssize_t read (int fd, void *buf, size_t count);
     122        extern ssize_t read (int fd, void * buf, size_t count);
    123123
    124124        struct epoll_event;
    125         extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
    126 
    127         extern ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags);
     125        extern int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event);
     126
     127        extern ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags);
    128128        extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags);
    129129}
     
    224224calls = [
    225225        # CFA_HAVE_IORING_OP_READV
    226         Call('READV', 'ssize_t preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags)', {
     226        Call('READV', 'ssize_t preadv2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', {
    227227                'fd'  : 'fd',
     228                'addr': '(typeof(sqe->addr))iov',
     229                'len' : 'iovcnt',
    228230                'off' : 'offset',
    229                 'addr': '(uintptr_t)iov',
    230                 'len' : 'iovcnt',
     231                'rw_flags' : 'flags'
    231232        }, define = 'CFA_HAVE_PREADV2'),
    232233        # CFA_HAVE_IORING_OP_WRITEV
    233         Call('WRITEV', 'ssize_t pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags)', {
     234        Call('WRITEV', 'ssize_t pwritev2(int fd, const struct iovec * iov, int iovcnt, off_t offset, int flags)', {
    234235                'fd'  : 'fd',
     236                'addr': '(typeof(sqe->addr))iov',
     237                'len' : 'iovcnt',
    235238                'off' : 'offset',
    236                 'addr': '(uintptr_t)iov',
    237                 'len' : 'iovcnt'
     239                'rw_flags' : 'flags'
    238240        }, define = 'CFA_HAVE_PWRITEV2'),
    239241        # CFA_HAVE_IORING_OP_FSYNC
     
    242244        }),
    243245        # CFA_HAVE_IORING_OP_EPOLL_CTL
    244         Call('EPOLL_CTL', 'int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)', {
     246        Call('EPOLL_CTL', 'int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event)', {
    245247                'fd': 'epfd',
     248                'len': 'op',
    246249                'addr': 'fd',
    247                 'len': 'op',
    248                 'off': '(uintptr_t)event'
     250                'off': '(typeof(sqe->off))event'
    249251        }),
    250252        # CFA_HAVE_IORING_OP_SYNC_FILE_RANGE
     
    256258        }),
    257259        # CFA_HAVE_IORING_OP_SENDMSG
    258         Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags)', {
    259                 'fd': 'sockfd',
    260                 'addr': '(uintptr_t)(struct msghdr *)msg',
     260        Call('SENDMSG', 'ssize_t sendmsg(int sockfd, const struct msghdr * msg, int flags)', {
     261                'fd': 'sockfd',
     262                'addr': '(typeof(sqe->addr))(struct msghdr *)msg',
    261263                'len': '1',
    262264                'msg_flags': 'flags'
    263265        }),
    264266        # CFA_HAVE_IORING_OP_RECVMSG
    265         Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags)', {
    266                 'fd': 'sockfd',
    267                 'addr': '(uintptr_t)(struct msghdr *)msg',
     267        Call('RECVMSG', 'ssize_t recvmsg(int sockfd, struct msghdr * msg, int flags)', {
     268                'fd': 'sockfd',
     269                'addr': '(typeof(sqe->addr))(struct msghdr *)msg',
    268270                'len': '1',
    269271                'msg_flags': 'flags'
    270272        }),
    271273        # CFA_HAVE_IORING_OP_SEND
    272         Call('SEND', 'ssize_t send(int sockfd, const void *buf, size_t len, int flags)', {
    273                 'fd': 'sockfd',
    274                 'addr': '(uintptr_t)buf',
     274        Call('SEND', 'ssize_t send(int sockfd, const void * buf, size_t len, int flags)', {
     275                'fd': 'sockfd',
     276                'addr': '(typeof(sqe->addr))buf',
    275277                'len': 'len',
    276278                'msg_flags': 'flags'
    277279        }),
    278280        # CFA_HAVE_IORING_OP_RECV
    279         Call('RECV', 'ssize_t recv(int sockfd, void *buf, size_t len, int flags)', {
    280                 'fd': 'sockfd',
    281                 'addr': '(uintptr_t)buf',
     281        Call('RECV', 'ssize_t recv(int sockfd, void * buf, size_t len, int flags)', {
     282                'fd': 'sockfd',
     283                'addr': '(typeof(sqe->addr))buf',
    282284                'len': 'len',
    283285                'msg_flags': 'flags'
     
    286288        Call('ACCEPT', 'int accept4(int sockfd, __SOCKADDR_ARG addr, socklen_t * restrict addrlen, int flags)', {
    287289                'fd': 'sockfd',
    288                 'addr': '(uintptr_t)&addr',
    289                 'addr2': '(uintptr_t)addrlen',
     290                'addr': '(typeof(sqe->addr))&addr',
     291                'addr2': '(typeof(sqe->addr2))addrlen',
    290292                'accept_flags': 'flags'
    291293        }),
     
    293295        Call('CONNECT', 'int connect(int sockfd, __CONST_SOCKADDR_ARG addr, socklen_t addrlen)', {
    294296                'fd': 'sockfd',
    295                 'addr': '(uintptr_t)&addr',
     297                'addr': '(typeof(sqe->addr))&addr',
    296298                'off': 'addrlen'
    297299        }),
     
    299301        Call('FALLOCATE', 'int fallocate(int fd, int mode, off_t offset, off_t len)', {
    300302                'fd': 'fd',
    301                 'addr': '(uintptr_t)len',
    302303                'len': 'mode',
    303                 'off': 'offset'
     304                'off': 'offset',
     305                'addr': 'len'
    304306        }),
    305307        # CFA_HAVE_IORING_OP_FADVISE
     
    311313        }),
    312314        # CFA_HAVE_IORING_OP_MADVISE
    313         Call('MADVISE', 'int madvise(void *addr, size_t length, int advice)', {
    314                 'addr': '(uintptr_t)addr',
     315        Call('MADVISE', 'int madvise(void * addr, size_t length, int advice)', {
     316                'addr': '(typeof(sqe->addr))addr',
    315317                'len': 'length',
    316318                'fadvise_advice': 'advice'
    317319        }),
    318320        # CFA_HAVE_IORING_OP_OPENAT
    319         Call('OPENAT', 'int openat(int dirfd, const char *pathname, int flags, mode_t mode)', {
     321        Call('OPENAT', 'int openat(int dirfd, const char * pathname, int flags, mode_t mode)', {
    320322                'fd': 'dirfd',
    321                 'addr': '(uintptr_t)pathname',
    322                 'len': 'mode',
    323                 'open_flags': 'flags;'
     323                'addr': '(typeof(sqe->addr))pathname',
     324                'open_flags': 'flags;',
     325                'len': 'mode'
    324326        }),
    325327        # CFA_HAVE_IORING_OP_OPENAT2
    326         Call('OPENAT2', 'int openat2(int dirfd, const char *pathname, struct open_how * how, size_t size)', {
     328        Call('OPENAT2', 'int openat2(int dirfd, const char * pathname, struct open_how * how, size_t size)', {
    327329                'fd': 'dirfd',
    328                 'addr': 'pathname',
    329                 'len': 'sizeof(*how)',
    330                 'off': '(uintptr_t)how',
     330                'addr': '(typeof(sqe->addr))pathname',
     331                'off': '(typeof(sqe->off))how',
     332                'len': 'sizeof(*how)'
    331333        }, define = 'CFA_HAVE_OPENAT2'),
    332334        # CFA_HAVE_IORING_OP_CLOSE
     
    335337        }),
    336338        # CFA_HAVE_IORING_OP_STATX
    337         Call('STATX', 'int statx(int dirfd, const char *pathname, int flags, unsigned int mask, struct statx *statxbuf)', {
     339        Call('STATX', 'int statx(int dirfd, const char * pathname, int flags, unsigned int mask, struct statx * statxbuf)', {
    338340                'fd': 'dirfd',
    339                 'off': '(uintptr_t)statxbuf',
    340                 'addr': 'pathname',
     341                'addr': '(typeof(sqe->addr))pathname',
     342                'statx_flags': 'flags',
    341343                'len': 'mask',
    342                 'statx_flags': 'flags'
     344                'off': '(typeof(sqe->off))statxbuf'
    343345        }, define = 'CFA_HAVE_STATX'),
    344346        # CFA_HAVE_IORING_OP_READ
    345347        Call('READ', 'ssize_t read(int fd, void * buf, size_t count)', {
    346348                'fd': 'fd',
    347                 'addr': '(uintptr_t)buf',
     349                'addr': '(typeof(sqe->addr))buf',
    348350                'len': 'count'
    349351        }),
     
    351353        Call('WRITE', 'ssize_t write(int fd, void * buf, size_t count)', {
    352354                'fd': 'fd',
    353                 'addr': '(uintptr_t)buf',
     355                'addr': '(typeof(sqe->addr))buf',
    354356                'len': 'count'
    355357        }),
    356358        # CFA_HAVE_IORING_OP_SPLICE
    357         Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t *off_in, int fd_out, __off64_t *off_out, size_t len, unsigned int flags)', {
     359        Call('SPLICE', 'ssize_t splice(int fd_in, __off64_t * off_in, int fd_out, __off64_t * off_out, size_t len, unsigned int flags)', {
    358360                'splice_fd_in': 'fd_in',
    359                 'splice_off_in': 'off_in ? (__u64)*off_in : (__u64)-1',
     361                'splice_off_in': 'off_in ? (typeof(sqe->splice_off_in))*off_in : (typeof(sqe->splice_off_in))-1',
    360362                'fd': 'fd_out',
    361                 'off': 'off_out ? (__u64)*off_out : (__u64)-1',
     363                'off': 'off_out ? (typeof(sqe->off))*off_out : (typeof(sqe->off))-1',
    362364                'len': 'len',
    363365                'splice_flags': 'flags'
  • libcfa/src/concurrency/locks.hfa

    r0e16a2d r75d874a  
    253253static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); }
    254254
    255 
    256255//-----------------------------------------------------------------------------
    257256// Exponential backoff then block lock
     
    272271        this.lock_value = 0;
    273272}
    274 static inline void ^?{}( exp_backoff_then_block_lock & this ) {}
    275 // static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    276 // static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void;
    277273
    278274static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) {
    279         if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
    280                 return true;
    281         }
    282         return false;
     275        return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
    283276}
    284277
     
    286279
    287280static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) {
    288         if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) {
    289                 return true;
    290         }
    291         return false;
     281        return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE);
    292282}
    293283
    294284static inline bool block(exp_backoff_then_block_lock & this) with(this) {
    295         lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC)
    296         if (lock_value != 2) {
    297                 unlock( spinlock );
    298                 return true;
    299         }
    300         insert_last( blocked_threads, *active_thread() );
    301         unlock( spinlock );
     285    lock( spinlock __cfaabi_dbg_ctx2 );
     286    if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) {
     287        unlock( spinlock );
     288        return true;
     289    }
     290    insert_last( blocked_threads, *active_thread() );
     291    unlock( spinlock );
    302292        park( );
    303293        return true;
     
    307297        size_t compare_val = 0;
    308298        int spin = 4;
     299
    309300        // linear backoff
    310301        for( ;; ) {
     
    324315static inline void unlock(exp_backoff_then_block_lock & this) with(this) {
    325316    if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return;
    326         lock( spinlock __cfaabi_dbg_ctx2 );
    327         thread$ * t = &try_pop_front( blocked_threads );
    328         unlock( spinlock );
    329         unpark( t );
     317    lock( spinlock __cfaabi_dbg_ctx2 );
     318    thread$ * t = &try_pop_front( blocked_threads );
     319    unlock( spinlock );
     320    unpark( t );
    330321}
    331322
Note: See TracChangeset for help on using the changeset viewer.