Changeset 4998155
- Timestamp:
- Aug 14, 2020, 12:33:26 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 7fdae38
- Parents:
- b353a49
- Location:
- libcfa/src/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
rb353a49 r4998155 93 93 //============================================================================================= 94 94 static unsigned __collect_submitions( struct __io_data & ring ); 95 static uint32_t__release_consumed_submission( struct __io_data & ring );95 static __u32 __release_consumed_submission( struct __io_data & ring ); 96 96 97 97 static inline void process(struct io_uring_cqe & cqe ) { … … 100 100 101 101 data->result = cqe.res; 102 unpark( data->thrd __cfaabi_dbg_ctx2);102 post( data->sem ); 103 103 } 104 104 … … 136 136 unsigned head = *ring.completion_q.head; 137 137 unsigned tail = *ring.completion_q.tail; 138 const uint32_tmask = *ring.completion_q.mask;138 const __u32 mask = *ring.completion_q.mask; 139 139 140 140 // Nothing was new return 0 … … 143 143 } 144 144 145 uint32_tcount = tail - head;145 __u32 count = tail - head; 146 146 /* paranoid */ verify( count != 0 ); 147 147 for(i; count) { … … 224 224 // 225 225 226 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_tdata ) {226 [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ) { 227 227 /* paranoid */ verify( data != 0 ); 228 228 … … 230 230 __attribute((unused)) int len = 0; 231 231 __attribute((unused)) int block = 0; 232 uint32_tcnt = *ring.submit_q.num;233 uint32_tmask = *ring.submit_q.mask;232 __u32 cnt = *ring.submit_q.num; 233 __u32 mask = *ring.submit_q.mask; 234 234 235 235 disable_interrupts(); 236 uint32_toff = __tls_rand();236 __u32 off = __tls_rand(); 237 237 enable_interrupts( __cfaabi_dbg_ctx ); 238 238 … … 241 241 // Look through the list starting at some offset 242 242 for(i; cnt) { 243 uint64_texpected = 0;244 uint32_tidx = (i + off) & mask;243 __u64 expected = 0; 244 __u32 idx = (i + off) & mask; 245 245 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx]; 246 volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data;246 volatile __u64 * udata = &sqe->user_data; 247 247 248 248 if( *udata == expected && … … 270 270 } 271 271 272 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_tmask ) {272 static inline __u32 __submit_to_ready_array( struct __io_data & ring, __u32 idx, const __u32 mask ) { 273 273 /* paranoid */ verify( idx <= mask ); 274 274 /* paranoid */ verify( idx != -1ul32 ); … … 277 277 __attribute((unused)) int len = 0; 278 278 __attribute((unused)) int block = 0; 279 uint32_tready_mask = ring.submit_q.ready_cnt - 1;279 __u32 ready_mask = ring.submit_q.ready_cnt - 1; 280 280 281 281 disable_interrupts(); 282 uint32_toff = __tls_rand();282 __u32 off = __tls_rand(); 283 283 enable_interrupts( __cfaabi_dbg_ctx ); 284 284 285 uint32_tpicked;285 __u32 picked; 286 286 LOOKING: for() { 287 287 for(i; ring.submit_q.ready_cnt) { 288 288 picked = (i + off) & ready_mask; 289 uint32_texpected = -1ul32;289 __u32 expected = -1ul32; 290 290 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 291 291 break LOOKING; … … 316 316 } 317 317 318 void __submit( struct io_context * ctx, uint32_tidx ) __attribute__((nonnull (1))) {318 void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))) { 319 319 __io_data & ring = *ctx->thrd.ring; 320 320 // Get now the data we definetely need 321 volatile uint32_t* const tail = ring.submit_q.tail;322 const uint32_tmask = *ring.submit_q.mask;321 volatile __u32 * const tail = ring.submit_q.tail; 322 const __u32 mask = *ring.submit_q.mask; 323 323 324 324 // There are 2 submission schemes, check which one we are using … … 332 332 } 333 333 else if( ring.eager_submits ) { 334 uint32_tpicked = __submit_to_ready_array( ring, idx, mask );334 __u32 picked = __submit_to_ready_array( ring, idx, mask ); 335 335 336 336 for() { … … 429 429 430 430 unsigned to_submit = 0; 431 uint32_ttail = *ring.submit_q.tail;432 const uint32_tmask = *ring.submit_q.mask;431 __u32 tail = *ring.submit_q.tail; 432 const __u32 mask = *ring.submit_q.mask; 433 433 434 434 // Go through the list of ready submissions 435 435 for( i; ring.submit_q.ready_cnt ) { 436 436 // replace any submission with the sentinel, to consume it. 437 uint32_tidx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);437 __u32 idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 438 438 439 439 // If it was already the sentinel, then we are done … … 451 451 } 452 452 453 static uint32_t__release_consumed_submission( struct __io_data & ring ) {454 const uint32_tsmask = *ring.submit_q.mask;453 static __u32 __release_consumed_submission( struct __io_data & ring ) { 454 const __u32 smask = *ring.submit_q.mask; 455 455 456 456 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0; 457 uint32_tchead = *ring.submit_q.head;458 uint32_tphead = ring.submit_q.prev_head;457 __u32 chead = *ring.submit_q.head; 458 __u32 phead = ring.submit_q.prev_head; 459 459 ring.submit_q.prev_head = chead; 460 460 unlock(ring.submit_q.release_lock); 461 461 462 uint32_tcount = chead - phead;462 __u32 count = chead - phead; 463 463 for( i; count ) { 464 uint32_tidx = ring.submit_q.array[ (phead + i) & smask ];464 __u32 idx = ring.submit_q.array[ (phead + i) & smask ]; 465 465 ring.submit_q.sqes[ idx ].user_data = 0; 466 466 } -
libcfa/src/concurrency/io/setup.cfa
rb353a49 r4998155 298 298 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL; 299 299 300 uint32_tnentries = params_in.num_entries != 0 ? params_in.num_entries : 256;300 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256; 301 301 if( !is_pow2(nentries) ) { 302 302 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n"); … … 362 362 // Get the pointers from the kernel to fill the structure 363 363 // submit queue 364 sq.head = (volatile uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.head);365 sq.tail = (volatile uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);366 sq.mask = ( const uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);367 sq.num = ( const uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);368 sq.flags = ( uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);369 sq.dropped = ( uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);370 sq.array = ( uint32_t*)(((intptr_t)sq.ring_ptr) + params.sq_off.array);364 sq.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head); 365 sq.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail); 366 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask); 367 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries); 368 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags); 369 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped); 370 sq.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 371 371 sq.prev_head = *sq.head; 372 372 373 373 { 374 const uint32_tnum = *sq.num;374 const __u32 num = *sq.num; 375 375 for( i; num ) { 376 376 sq.sqes[i].user_data = 0ul64; … … 395 395 396 396 // completion queue 397 cq.head = (volatile uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.head);398 cq.tail = (volatile uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);399 cq.mask = ( const uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);400 cq.num = ( const uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);401 cq.overflow = ( uint32_t*)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);402 cq.cqes 397 cq.head = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head); 398 cq.tail = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail); 399 cq.mask = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask); 400 cq.num = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries); 401 cq.overflow = ( __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow); 402 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes); 403 403 404 404 // some paranoid checks … … 448 448 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) { 449 449 ev.events = EPOLLIN | EPOLLONESHOT; 450 ev.data.u64 = ( uint64_t)&ctx;450 ev.data.u64 = (__u64)&ctx; 451 451 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev); 452 452 if (ret < 0) { -
libcfa/src/concurrency/io/types.hfa
rb353a49 r4998155 17 17 18 18 #if defined(CFA_HAVE_LINUX_IO_URING_H) 19 extern "C" { 20 #include <linux/types.h> 21 } 22 19 23 #include "bits/locks.hfa" 20 24 … … 23 27 struct __submition_data { 24 28 // Head and tail of the ring (associated with array) 25 volatile uint32_t* head;26 volatile uint32_t* tail;27 volatile uint32_tprev_head;29 volatile __u32 * head; 30 volatile __u32 * tail; 31 volatile __u32 prev_head; 28 32 29 33 // The actual kernel ring which uses head/tail 30 34 // indexes into the sqes arrays 31 uint32_t* array;35 __u32 * array; 32 36 33 37 // number of entries and mask to go with it 34 const uint32_t* num;35 const uint32_t* mask;38 const __u32 * num; 39 const __u32 * mask; 36 40 37 41 // Submission flags (Not sure what for) 38 uint32_t* flags;42 __u32 * flags; 39 43 40 44 // number of sqes not submitted (whatever that means) 41 uint32_t* dropped;45 __u32 * dropped; 42 46 43 47 // Like head/tail but not seen by the kernel 44 volatile uint32_t* ready;45 uint32_tready_cnt;48 volatile __u32 * ready; 49 __u32 ready_cnt; 46 50 47 51 __spinlock_t lock; … … 58 62 struct __completion_data { 59 63 // Head and tail of the ring 60 volatile uint32_t* head;61 volatile uint32_t* tail;64 volatile __u32 * head; 65 volatile __u32 * tail; 62 66 63 67 // number of entries and mask to go with it 64 const uint32_t* mask;65 const uint32_t* num;68 const __u32 * mask; 69 const __u32 * num; 66 70 67 71 // number of cqes not submitted (whatever that means) 68 uint32_t* overflow;72 __u32 * overflow; 69 73 70 74 // the kernel ring … … 79 83 struct __submition_data submit_q; 80 84 struct __completion_data completion_q; 81 uint32_tring_flags;85 __u32 ring_flags; 82 86 int fd; 83 87 bool eager_submits:1; … … 89 93 // IO user data 90 94 struct __io_user_data_t { 91 int32_tresult;92 $thread * thrd;95 __s32 result; 96 oneshot sem; 93 97 }; 94 98 -
libcfa/src/concurrency/iocall.cfa
rb353a49 r4998155 32 32 #include "io/types.hfa" 33 33 34 extern [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_tdata );35 extern void __submit( struct io_context * ctx, uint32_tidx ) __attribute__((nonnull (1)));36 37 static inline void ?{}(struct io_uring_sqe & this, uint8_topcode, int fd) {34 extern [* struct io_uring_sqe, __u32] __submit_alloc( struct __io_data & ring, __u64 data ); 35 extern void __submit( struct io_context * ctx, __u32 idx ) __attribute__((nonnull (1))); 36 37 static inline void ?{}(struct io_uring_sqe & this, __u8 opcode, int fd) { 38 38 this.opcode = opcode; 39 39 #if !defined(IOSQE_ASYNC) … … 51 51 } 52 52 53 static inline void ?{}(struct io_uring_sqe & this, uint8_t opcode, int fd, void * addr, uint32_t len, uint64_toff ) {53 static inline void ?{}(struct io_uring_sqe & this, __u8 opcode, int fd, void * addr, __u32 len, __u64 off ) { 54 54 (this){ opcode, fd }; 55 55 this.off = off; 56 this.addr = ( uint64_t)(uintptr_t)addr;56 this.addr = (__u64)(uintptr_t)addr; 57 57 this.len = len; 58 58 } … … 105 105 (void)timeout; (void)cancellation; \ 106 106 if( !context ) context = __get_io_context(); \ 107 __io_user_data_t data = { 0 , active_thread()}; \107 __io_user_data_t data = { 0 }; \ 108 108 struct __io_data & ring = *context->thrd.ring; \ 109 109 struct io_uring_sqe * sqe; \ 110 uint32_tidx; \111 uint8_tsflags = REGULAR_FLAGS & submit_flags; \112 [sqe, idx] = __submit_alloc( ring, ( uint64_t)(uintptr_t)&data ); \110 __u32 idx; \ 111 __u8 sflags = REGULAR_FLAGS & submit_flags; \ 112 [sqe, idx] = __submit_alloc( ring, (__u64)(uintptr_t)&data ); \ 113 113 sqe->flags = sflags; 114 114 115 115 #define __submit_wait \ 116 116 /*__cfaabi_bits_print_safe( STDERR_FILENO, "Preparing user data %p for %p\n", &data, data.thrd );*/ \ 117 verify( sqe->user_data == ( uint64_t)(uintptr_t)&data ); \117 verify( sqe->user_data == (__u64)(uintptr_t)&data ); \ 118 118 __submit( context, idx ); \ 119 park( __cfaabi_dbg_ctx); \119 wait( data.sem ); \ 120 120 if( data.result < 0 ) { \ 121 121 errno = -data.result; \ … … 149 149 150 150 extern int fsync(int fd); 151 extern int sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags); 151 152 typedef __off64_t off_t; 153 typedef __off64_t off64_t; 154 extern int sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags); 152 155 153 156 struct msghdr; … … 160 163 extern int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen); 161 164 162 extern int fallocate(int fd, int mode, uint64_t offset, uint64_t len);163 extern int posix_fadvise(int fd, uint64_t offset, uint64_t len, int advice);165 extern int fallocate(int fd, int mode, off_t offset, off_t len); 166 extern int posix_fadvise(int fd, off_t offset, off_t len, int advice); 164 167 extern int madvise(void *addr, size_t length, int advice); 165 168 … … 190 193 sqe->fd = fd; 191 194 sqe->off = offset; 192 sqe->addr = ( uint64_t)(uintptr_t)iov;195 sqe->addr = (__u64)iov; 193 196 sqe->len = iovcnt; 194 197 sqe->rw_flags = 0; … … 207 210 __submit_prelude 208 211 209 (*sqe){ IORING_OP_WRITEV, fd, iov, iovcnt, offset }; 212 sqe->opcode = IORING_OP_WRITEV; 213 sqe->ioprio = 0; 214 sqe->fd = fd; 215 sqe->off = offset; 216 sqe->addr = (__u64)iov; 217 sqe->len = iovcnt; 218 sqe->rw_flags = 0; 219 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 210 220 211 221 __submit_wait … … 220 230 __submit_prelude 221 231 222 (*sqe){ IORING_OP_FSYNC, fd }; 223 224 __submit_wait 225 #endif 226 } 227 228 int cfa_sync_file_range(int fd, int64_t offset, int64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 232 sqe->opcode = IORING_OP_FSYNC; 233 sqe->ioprio = 0; 234 sqe->fd = fd; 235 sqe->off = 0; 236 sqe->addr = 0; 237 sqe->len = 0; 238 sqe->rw_flags = 0; 239 sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0; 240 241 __submit_wait 242 #endif 243 } 244 245 int cfa_sync_file_range(int fd, off64_t offset, off64_t nbytes, unsigned int flags, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 229 246 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_SYNC_FILE_RANGE) 230 247 return sync_file_range(fd, offset, nbytes, flags); … … 275 292 276 293 (*sqe){ IORING_OP_SEND, sockfd }; 277 sqe->addr = ( uint64_t)buf;294 sqe->addr = (__u64)buf; 278 295 sqe->len = len; 279 296 sqe->msg_flags = flags; … … 290 307 291 308 (*sqe){ IORING_OP_RECV, sockfd }; 292 sqe->addr = ( uint64_t)buf;309 sqe->addr = (__u64)buf; 293 310 sqe->len = len; 294 311 sqe->msg_flags = flags; … … 305 322 306 323 (*sqe){ IORING_OP_ACCEPT, sockfd }; 307 sqe->addr = (uint64_t)(uintptr_t)addr;308 sqe->addr2 = ( uint64_t)(uintptr_t)addrlen;324 sqe->addr = (__u64)addr; 325 sqe->addr2 = (__u64)addrlen; 309 326 sqe->accept_flags = flags; 310 327 … … 320 337 321 338 (*sqe){ IORING_OP_CONNECT, sockfd }; 322 sqe->addr = ( uint64_t)(uintptr_t)addr;323 sqe->off = ( uint64_t)(uintptr_t)addrlen;324 325 __submit_wait 326 #endif 327 } 328 329 int cfa_fallocate(int fd, int mode, uint64_t offset, uint64_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {339 sqe->addr = (__u64)addr; 340 sqe->off = (__u64)addrlen; 341 342 __submit_wait 343 #endif 344 } 345 346 int cfa_fallocate(int fd, int mode, off_t offset, off_t len, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 330 347 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FALLOCATE) 331 348 return fallocate( fd, mode, offset, len ); … … 344 361 } 345 362 346 int cfa_fadvise(int fd, uint64_t offset, uint64_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) {363 int cfa_fadvise(int fd, off_t offset, off_t len, int advice, int submit_flags, Duration timeout, io_cancellation * cancellation, io_context * context) { 347 364 #if !defined(CFA_HAVE_LINUX_IO_URING_H) || !defined(CFA_HAVE_IORING_OP_FADVISE) 348 365 return posix_fadvise( fd, offset, len, advice ); … … 351 368 352 369 (*sqe){ IORING_OP_FADVISE, fd }; 353 sqe->off = ( uint64_t)offset;370 sqe->off = (__u64)offset; 354 371 sqe->len = len; 355 372 sqe->fadvise_advice = advice; … … 366 383 367 384 (*sqe){ IORING_OP_MADVISE, 0 }; 368 sqe->addr = ( uint64_t)addr;385 sqe->addr = (__u64)addr; 369 386 sqe->len = length; 370 387 sqe->fadvise_advice = advice; … … 381 398 382 399 (*sqe){ IORING_OP_OPENAT, dirfd }; 383 sqe->addr = ( uint64_t)pathname;400 sqe->addr = (__u64)pathname; 384 401 sqe->open_flags = flags; 385 402 sqe->len = mode; … … 414 431 __submit_prelude 415 432 416 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, ( uint64_t)statxbuf };433 (*sqe){ IORING_OP_STATX, dirfd, pathname, mask, (__u64)statxbuf }; 417 434 sqe->statx_flags = flags; 418 435 … … 456 473 } 457 474 else { 458 sqe->off = ( uint64_t)-1;475 sqe->off = (__u64)-1; 459 476 } 460 477 sqe->len = len; … … 464 481 } 465 482 else { 466 sqe->splice_off_in = ( uint64_t)-1;483 sqe->splice_off_in = (__u64)-1; 467 484 } 468 485 sqe->splice_flags = flags | (SPLICE_FLAGS & submit_flags);
Note: See TracChangeset
for help on using the changeset viewer.