- File:
-
- 1 edited
-
libcfa/src/concurrency/io/call.cfa.in (modified) (10 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io/call.cfa.in
r44f09ea rec19b21 54 54 | IOSQE_IO_DRAIN 55 55 #endif 56 #if defined(CFA_HAVE_IOSQE_ASYNC) 57 | IOSQE_ASYNC 58 #endif 59 ; 60 61 static const __u32 LINK_FLAGS = 0 56 62 #if defined(CFA_HAVE_IOSQE_IO_LINK) 57 63 | IOSQE_IO_LINK … … 60 66 | IOSQE_IO_HARDLINK 61 67 #endif 62 #if defined(CFA_HAVE_IOSQE_ASYNC)63 | IOSQE_ASYNC64 #endif65 #if defined(CFA_HAVE_IOSQE_BUFFER_SELECTED)66 | IOSQE_BUFFER_SELECTED67 #endif68 68 ; 69 69 … … 74 74 ; 75 75 76 extern struct $io_context * cfa_io_allocate(struct io_uring_sqe * out_sqes[], __u32 out_idxs[], __u32 want) __attribute__((nonnull (1,2))); 77 extern void cfa_io_submit( struct $io_context * in_ctx, __u32 in_idxs[], __u32 have, bool lazy ) __attribute__((nonnull (1,2))); 76 extern [* volatile struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 77 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 78 79 static inline io_context * __get_io_context( void ) { 80 cluster * cltr = active_cluster(); 81 82 /* paranoid */ verifyf( cltr, "No active cluster for io operation\\n"); 83 assertf( cltr->io.cnt > 0, "Cluster %p has no default io contexts and no context was specified\\n", cltr ); 84 85 /* paranoid */ verifyf( cltr->io.ctxs, "default io contexts for cluster %p are missing\\n", cltr); 86 return &cltr->io.ctxs[ thread_rand() % cltr->io.cnt ]; 87 } 78 88 #endif 79 89 … … 88 98 89 99 extern "C" { 90 #include < asm/types.h>100 #include <sys/types.h> 91 101 #include <sys/socket.h> 92 102 #include <sys/syscall.h> … … 185 195 return ', '.join(args_a) 186 196 187 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, __u64 submit_flags) {{197 AsyncTemplate = """inline void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context) {{ 188 198 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_{op}) 189 199 ssize_t res = {name}({args}); … … 195 205 }} 196 206 #else 207 // we don't support LINK yet 208 if( 0 != (submit_flags & LINK_FLAGS) ) {{ 209 errno = ENOTSUP; return -1; 210 }} 211 212 if( !context ) {{ 213 context = __get_io_context(); 214 }} 215 if(cancellation) {{ 216 cancellation->target = (__u64)(uintptr_t)&future; 217 }} 218 197 219 __u8 sflags = REGULAR_FLAGS & submit_flags; 220 struct __io_data & ring = *context->thrd.ring; 221 198 222 __u32 idx; 199 223 struct io_uring_sqe * sqe; 200 struct $io_context * ctx = cfa_io_allocate( &sqe, &idx, 1);224 [(volatile struct io_uring_sqe *) sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 201 225 202 226 sqe->opcode = IORING_OP_{op}; 203 sqe->user_data = (__u64)(uintptr_t)&future;204 227 sqe->flags = sflags; 205 228 sqe->ioprio = 0; … … 216 239 217 240 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 218 cfa_io_submit( ctx, &idx, 1, 0 != (submit_flags & CFA_IO_LAZY));241 __submit( context, idx ); 219 242 #endif 220 243 }}""" 221 244 222 SyncTemplate = """{ret} cfa_{name}({params}, __u64 submit_flags) {{ 245 SyncTemplate = """{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {{ 246 if( timeout >= 0 ) {{ 247 errno = ENOTSUP; 248 return -1; 249 }} 223 250 io_future_t future; 224 251 225 async_{name}( future, {args}, submit_flags );252 async_{name}( future, {args}, submit_flags, cancellation, context ); 226 253 227 254 wait( future ); … … 388 415 if c.define: 389 416 print("""#if defined({define}) 390 {ret} cfa_{name}({params}, __u64 submit_flags);417 {ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context); 391 418 #endif""".format(define=c.define,ret=c.ret, name=c.name, params=c.params)) 392 419 else: 393 print("{ret} cfa_{name}({params}, __u64 submit_flags);"420 print("{ret} cfa_{name}({params}, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context);" 394 421 .format(ret=c.ret, name=c.name, params=c.params)) 395 422 … … 399 426 if c.define: 400 427 print("""#if defined({define}) 401 void async_{name}(io_future_t & future, {params}, __u64 submit_flags);428 void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context); 402 429 #endif""".format(define=c.define,name=c.name, params=c.params)) 403 430 else: 404 print("void async_{name}(io_future_t & future, {params}, __u64 submit_flags);"431 print("void async_{name}(io_future_t & future, {params}, int submit_flags, io_cancellation * cancellation, io_context * context);" 405 432 .format(name=c.name, params=c.params)) 406 433 print("\n") … … 447 474 448 475 print(""" 476 //----------------------------------------------------------------------------- 477 bool cancel(io_cancellation & this) { 478 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_ASYNC_CANCEL) 479 return false; 480 #else 481 io_future_t future; 482 483 io_context * context = __get_io_context(); 484 485 __u8 sflags = 0; 486 struct __io_data & ring = *context->thrd.ring; 487 488 __u32 idx; 489 volatile struct io_uring_sqe * sqe; 490 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&future ); 491 492 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 493 sqe->opcode = IORING_OP_ASYNC_CANCEL; 494 sqe->flags = sflags; 495 sqe->addr = this.target; 496 497 verify( sqe->user_data == (__u64)(uintptr_t)&future ); 498 __submit( context, idx ); 499 500 wait(future); 501 502 if( future.result == 0 ) return true; // Entry found 503 if( future.result == -EALREADY) return true; // Entry found but in progress 504 if( future.result == -ENOENT ) return false; // Entry not found 505 return false; 506 #endif 507 } 508 449 509 //----------------------------------------------------------------------------- 450 510 // Check if a function is has asynchronous
Note:
See TracChangeset
for help on using the changeset viewer.