Changeset 7922158 for libcfa/src


Ignore:
Timestamp:
Jul 10, 2020, 4:07:23 PM (4 years ago)
Author:
Thierry Delisle <tdelisle@…>
Branches:
ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
Children:
d34575b
Parents:
ab44413 (diff), 365e423 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

Location:
libcfa/src/concurrency
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io.cfa

    rab44413 r7922158  
    1414//
    1515
    16 // #define __CFA_DEBUG_PRINT_IO__
    17 // #define __CFA_DEBUG_PRINT_IO_CORE__
     16#if defined(__CFA_DEBUG__)
     17        // #define __CFA_DEBUG_PRINT_IO__
     18        #define __CFA_DEBUG_PRINT_IO_CORE__
     19#endif
    1820
    1921#include "kernel.hfa"
     
    325327
    326328                // Create the poller thread
    327                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
     329                __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
    328330                this.io->poller.slow.blocked = false;
    329331                this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
     
    430432        }
    431433
     434        int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
     435                bool need_sys_to_submit = false;
     436                bool need_sys_to_complete = false;
     437                unsigned min_complete = 0;
     438                unsigned flags = 0;
     439
     440
     441                TO_SUBMIT:
     442                if( to_submit > 0 ) {
     443                        if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     444                                need_sys_to_submit = true;
     445                                break TO_SUBMIT;
     446                        }
     447                        if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
     448                                need_sys_to_submit = true;
     449                                flags |= IORING_ENTER_SQ_WAKEUP;
     450                        }
     451                }
     452
     453                TO_COMPLETE:
     454                if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
     455                        flags |= IORING_ENTER_GETEVENTS;
     456                        if( mask ) {
     457                                need_sys_to_complete = true;
     458                                min_complete = 1;
     459                                break TO_COMPLETE;
     460                        }
     461                        if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
     462                                need_sys_to_complete = true;
     463                        }
     464                }
     465
     466                int ret = 0;
     467                if( need_sys_to_submit || need_sys_to_complete ) {
     468                        ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
     469                        if( ret < 0 ) {
     470                                switch((int)errno) {
     471                                case EAGAIN:
     472                                case EINTR:
     473                                        ret = -1;
     474                                        break;
     475                                default:
     476                                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     477                                }
     478                        }
     479                }
     480
     481                // Memory barrier
     482                __atomic_thread_fence( __ATOMIC_SEQ_CST );
     483                return ret;
     484        }
     485
    432486//=============================================================================================
    433487// I/O Polling
     
    438492        // Process a single completion message from the io_uring
    439493        // This is NOT thread-safe
    440         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
     494        static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
    441495                /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
    442496
     
    447501                }
    448502
    449                 if (to_submit > 0 || waitcnt > 0) {
    450                         int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
    451                         if( ret < 0 ) {
    452                                 switch((int)errno) {
    453                                 case EAGAIN:
    454                                 case EINTR:
    455                                         return [0, true];
    456                                 default:
    457                                         abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    458                                 }
    459                         }
    460 
    461                         // Release the consumed SQEs
    462                         __release_consumed_submission( ring );
    463 
    464                         // update statistics
     503                int ret = __io_uring_enter(ring, to_submit, true, mask);
     504                if( ret < 0 ) {
     505                        return [0, true];
     506                }
     507
     508                // update statistics
     509                if (to_submit > 0) {
    465510                        __STATS__( true,
    466511                                if( to_submit > 0 ) {
     
    472517                }
    473518
    474                 // Memory barrier
    475                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
     519                // Release the consumed SQEs
     520                __release_consumed_submission( ring );
    476521
    477522                // Drain the queue
     
    497542
    498543                        data->result = cqe.res;
    499                         if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
    500                         else         { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
     544                        if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
     545                        else      { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
    501546                }
    502547
     
    546591                                int count;
    547592                                bool again;
    548                                 [count, again] = __drain_io( ring, &mask, 1, true );
     593                                [count, again] = __drain_io( ring, &mask );
    549594
    550595                                __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
     
    568613                                int count;
    569614                                bool again;
    570                                 [count, again] = __drain_io( ring, &mask, 1, true );
     615                                [count, again] = __drain_io( ring, &mask );
    571616
    572617                                // Update statistics
     
    606651                        bool again;
    607652                        disable_interrupts();
    608                                 [count, again] = __drain_io( *this.ring, 0p, 0, false );
     653                                [count, again] = __drain_io( *this.ring, 0p );
    609654
    610655                                if(!again) reset++;
     
    800845                        // We got the lock
    801846                        unsigned to_submit = __collect_submitions( ring );
    802                         int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, 0, 0p, _NSIG / 8);
     847                        int ret = __io_uring_enter( ring, to_submit, false, 0p );
    803848                        if( ret < 0 ) {
    804                                 switch((int)errno) {
    805                                 case EAGAIN:
    806                                 case EINTR:
    807                                         unlock(ring.submit_q.lock);
    808                                         return;
    809                                 default:
    810                                         abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
    811                                 }
    812                         }
    813 
    814                         /* paranoid */ verify( ret > 0 );
     849                                unlock(ring.submit_q.lock);
     850                                return;
     851                        }
     852
     853                        /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
    815854
    816855                        // Release the consumed SQEs
     
    830869                        lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
    831870
     871                        /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
     872                        /* paranoid */  "index %u already reclaimed\n"
     873                        /* paranoid */  "head %u, prev %u, tail %u\n"
     874                        /* paranoid */  "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
     875                        /* paranoid */  idx,
     876                        /* paranoid */  *ring.submit_q.head, ring.submit_q.prev_head, *tail
     877                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
     878                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
     879                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
     880                        /* paranoid */  ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
     881                        /* paranoid */ );
     882
    832883                        // Append to the list of ready entries
    833884
    834885                        /* paranoid */ verify( idx <= mask );
    835 
    836                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
     886                        ring.submit_q.array[ (*tail) & mask ] = idx;
    837887                        __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
    838888
    839                         /* paranoid */ verify( ring.submit_q.sqes[ idx ].user_data != 0 );
    840 
    841889                        // Submit however, many entries need to be submitted
    842                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
     890                        int ret = __io_uring_enter( ring, 1, false, 0p );
    843891                        if( ret < 0 ) {
    844892                                switch((int)errno) {
     
    906954                return count;
    907955        }
     956
     957//=============================================================================================
     958// I/O Submissions
     959//=============================================================================================
     960
     961        void register_fixed_files( cluster & cl, int * files, unsigned count ) {
     962                int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
     963                if( ret < 0 ) {
     964                        abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
     965                }
     966
     967                __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
     968        }
    908969#endif
  • libcfa/src/concurrency/iocall.cfa

    rab44413 r7922158  
    108108
    109109        extern ssize_t read (int fd, void *buf, size_t count);
     110
     111        extern ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
     112        extern ssize_t tee(int fd_in, int fd_out, size_t len, unsigned int flags);
    110113}
    111114
     
    128131                #endif
    129132        }
     133
     134        ssize_t cfa_preadv2_fixed(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
     135                #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READV)
     136                        return preadv2(fd, iov, iovcnt, offset, flags);
     137                #else
     138                        __submit_prelude
     139
     140                        (*sqe){ IORING_OP_READV, fd, iov, iovcnt, offset };
     141                        sqe->flags |= IOSQE_FIXED_FILE;
     142
     143                        __submit_wait
     144                #endif
     145        }
    130146#endif
    131147
     
    329345}
    330346
    331 
    332347ssize_t cfa_read(int fd, void *buf, size_t count) {
    333348        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_READ)
     
    349364
    350365                (*sqe){ IORING_OP_WRITE, fd, buf, count, 0 };
     366
     367                __submit_wait
     368        #endif
     369}
     370
     371ssize_t cfa_splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags) {
     372        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_SPLICE)
     373                return splice( fd_in, off_in, fd_out, off_out, len, flags );
     374        #else
     375                __submit_prelude
     376
     377                (*sqe){ IORING_OP_SPLICE, fd_out, 0p, len, off_out };
     378                sqe->splice_fd_in  = fd_in;
     379                sqe->splice_off_in = off_in;
     380                sqe->splice_flags  = flags;
     381
     382                __submit_wait
     383        #endif
     384}
     385
     386ssize_t cfa_tee(int fd_in, int fd_out, size_t len, unsigned int flags) {
     387        #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_TEE)
     388                return tee( fd_in, fd_out, len, flags );
     389        #else
     390                __submit_prelude
     391
     392                (*sqe){ IORING_OP_TEE, fd_out, 0p, len, 0 };
     393                sqe->splice_fd_in = fd_in;
     394                sqe->splice_flags = flags;
    351395
    352396                __submit_wait
     
    453497                        #define _CFA_IO_FEATURE_IORING_OP_WRITE ,
    454498                        return IS_DEFINED(IORING_OP_WRITE);
     499
     500                if( /*func == (fptr_t)splice || */
     501                        func == (fptr_t)cfa_splice )
     502                        #define _CFA_IO_FEATURE_IORING_OP_SPLICE ,
     503                        return IS_DEFINED(IORING_OP_SPLICE);
     504
     505                if( /*func == (fptr_t)tee || */
     506                        func == (fptr_t)cfa_tee )
     507                        #define _CFA_IO_FEATURE_IORING_OP_TEE ,
     508                        return IS_DEFINED(IORING_OP_TEE);
    455509        #endif
    456510
Note: See TracChangeset for help on using the changeset viewer.