Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/io/call.cfa.in

    r44f09ea rec19b21  
    5454                        | IOSQE_IO_DRAIN
    5555                #endif
     56                #if defined(CFA_HAVE_IOSQE_ASYNC)
     57                        | IOSQE_ASYNC
     58                #endif
     59        ;
     60
     61        static const __u32 LINK_FLAGS = 0
    5662                #if defined(CFA_HAVE_IOSQE_IO_LINK)
    5763                        | IOSQE_IO_LINK
     
    6066                        | IOSQE_IO_HARDLINK
    6167                #endif
    62                 #if defined(CFA_HAVE_IOSQE_ASYNC)
    63                         | IOSQE_ASYNC
    64                 #endif
    65                 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED)
    66                         | IOSQE_BUFFER_SELECTED
    67                 #endif
    6868        ;
    6969
     
    7474        ;
    7575
    76         extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want)  __attribute__((nonnull (1,2)));
    77         extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2)));
     76        extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data );
     77        extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1)));
     78
     79        static inline io_context * __get_io_context( void ) {
     80                cluster * cltr = active_cluster();
     81
     82                /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n");
     83                assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr );
     84
     85                /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr);
     86                return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ];
     87        }
    7888#endif
    7989
     
    8898
    8999extern "C" {
    90         #include <asm/types.h>
     100        #include <sys/types.h>
    91101        #include <sys/socket.h>
    92102        #include <sys/syscall.h>
     
    185195                return ', '.join(args_a)
    186196
    187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{
     197AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{
    188198        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op})
    189199                ssize_t res = {name}({args});
     
    195205                }}
    196206        #else
     207                // we don't support LINK yet
     208                if( 0 != (submit_flags & LINK_FLAGS) ) {{
     209                        errno = ENOTSUP; return -1;
     210                }}
     211
     212                if( !context ) {{
     213                        context = __get_io_context();
     214                }}
     215                if(cancellation) {{
     216                        cancellation->target = (__u64)(uintptr_t)&future;
     217                }}
     218
    197219                __u8 sflags = REGULAR_FLAGS & submit_flags;
     220                struct __io_data & ring = *context->thrd.ring;
     221
    198222                __u32 idx;
    199223                struct io_uring_sqe * sqe;
    200                 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1 );
     224                [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
    201225
    202226                sqe->opcode = IORING_OP_{op};
    203                 sqe->user_data = (__u64)(uintptr_t)&future;
    204227                sqe->flags = sflags;
    205228                sqe->ioprio = 0;
     
    216239
    217240                verify( sqe->user_data == (__u64)(uintptr_t)&future );
    218                 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY) );
     241                __submit( context, idx );
    219242        #endif
    220243}}"""
    221244
    222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{
     245SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{
     246        if( timeout >= 0 ) {{
     247                errno = ENOTSUP;
     248                return -1;
     249        }}
    223250        io_future_t future;
    224251
    225         async_{name}( future, {args}, submit_flags );
     252        async_{name}( future, {args}, submit_flags, cancellation, context );
    226253
    227254        wait( future );
     
    388415        if c.define:
    389416                print("""#if defined({define})
    390         {ret} cfa_{name}({params}, __u64 submit_flags);
     417        {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);
    391418#endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params))
    392419        else:
    393                 print("{ret} cfa_{name}({params}, __u64 submit_flags);"
     420                print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);"
    394421                .format(ret=c.ret, name=c.name, params=c.params))
    395422
     
    399426        if c.define:
    400427                print("""#if defined({define})
    401         void async_{name}(io_future_t & future, {params}, __u64 submit_flags);
     428        void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);
    402429#endif""".format(define=c.define,name=c.name, params=c.params))
    403430        else:
    404                 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"
     431                print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);"
    405432                .format(name=c.name, params=c.params))
    406433print("\n")
     
    447474
    448475print("""
     476//-----------------------------------------------------------------------------
     477bool cancel(io_cancellation & this) {
     478        #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL)
     479                return false;
     480        #else
     481                io_future_t future;
     482
     483                io_context * context = __get_io_context();
     484
     485                __u8 sflags = 0;
     486                struct __io_data & ring = *context->thrd.ring;
     487
     488                __u32 idx;
     489                volatile struct io_uring_sqe * sqe;
     490                [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future );
     491
     492                sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
     493                sqe->opcode = IORING_OP_ASYNC_CANCEL;
     494                sqe->flags = sflags;
     495                sqe->addr = this.target;
     496
     497                verify( sqe->user_data == (__u64)(uintptr_t)&future );
     498                __submit( context, idx );
     499
     500                wait(future);
     501
     502                if( future.result == 0 ) return true; // Entry found
     503                if( future.result == -EALREADY) return true; // Entry found but in progress
     504                if( future.result == -ENOENT ) return false; // Entry not found
     505                return false;
     506        #endif
     507}
     508
    449509//-----------------------------------------------------------------------------
    450510// Check if a function is has asynchronous
Note: See TracChangeset for help on using the changeset viewer.