Changeset d384787
- Timestamp:
- Apr 24, 2020, 1:54:29 PM (5 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 0ea6c5a
- Parents:
- ecf6b46
- Location:
- libcfa/src/concurrency
- Files:
-
- 1 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/io.cfa
recf6b46 rd384787 101 101 102 102 // Requires features 103 // // adjust the size according to the parameters 104 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 105 // cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 106 // } 103 #if defined(IORING_FEAT_SINGLE_MMAP) 104 // adjust the size according to the parameters 105 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 106 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz); 107 } 108 #endif 107 109 108 110 // mmap the Submit Queue into existence … … 112 114 } 113 115 114 // mmap the Completion Queue into existence (may or may not be needed)115 116 // Requires features 116 // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 117 // cq->ring_ptr = sq->ring_ptr; 118 // } 119 // else { 117 #if defined(IORING_FEAT_SINGLE_MMAP) 118 // mmap the Completion Queue into existence (may or may not be needed) 119 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) { 120 cq->ring_ptr = sq->ring_ptr; 121 } 122 else 123 #endif 124 { 120 125 // We need multiple call to MMAP 121 126 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); … … 124 129 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno)); 125 130 } 126 //}131 } 127 132 128 133 // mmap the submit queue entries … … 171 176 (this.io.submit){ min(*sq.num, *cq.num) }; 172 177 178 // Initialize statistics 179 this.io.submit_q.stats.submit_avg.val = 0; 180 this.io.submit_q.stats.submit_avg.cnt = 0; 181 this.io.completion_q.stats.completed_avg.val = 0; 182 this.io.completion_q.stats.completed_avg.cnt = 0; 183 173 184 // Create the poller thread 174 185 this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this ); … … 185 196 pthread_join( this.io.poller, 0p ); 186 197 free( this.io.stack ); 198 199 // print statistics 200 __cfaabi_bits_print_safe( STDERR_FILENO, 201 "----- I/O uRing Stats -----\n" 202 "- total submit calls : %llu\n" 203 "- avg submit : %lf\n" 204 "- total wait calls : %llu\n" 205 "- avg completion/wait : %lf\n", 206 this.io.submit_q.stats.submit_avg.cnt, 207 ((double)this.io.submit_q.stats.submit_avg.val) / this.io.submit_q.stats.submit_avg.cnt, 208 this.io.completion_q.stats.completed_avg.cnt, 209 ((double)this.io.completion_q.stats.completed_avg.val) / this.io.completion_q.stats.completed_avg.cnt 210 ); 187 211 188 212 // Shutdown the io rings … … 215 239 // Process a single completion message from the io_uring 216 240 // This is NOT thread-safe 217 static bool __io_process(struct io_ring & ring) { 241 int __drain_io( struct io_ring & ring, sigset_t & mask, int waitcnt ) { 242 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 243 if( ret < 0 ) { 244 switch((int)errno) { 245 case EAGAIN: 246 case EINTR: 247 return -EAGAIN; 248 default: 249 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 250 } 251 } 252 253 // Drain the queue 218 254 unsigned head = *ring.completion_q.head; 219 255 unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE); 220 256 221 if (head == tail) return false; 222 223 unsigned idx = head & (*ring.completion_q.mask); 224 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 225 226 /* paranoid */ verify(&cqe); 227 228 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 229 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 230 231 data->result = cqe.res; 232 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 257 // Nothing was new return 0 258 if (head == tail) { 259 ring.completion_q.stats.completed_avg.cnt += 1; 260 return 0; 261 } 262 263 uint32_t count = tail - head; 264 for(i; count) { 265 unsigned idx = (head + i) & (*ring.completion_q.mask); 266 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx]; 267 268 /* paranoid */ verify(&cqe); 269 270 struct io_user_data * data = (struct io_user_data *)cqe.user_data; 271 // __cfaabi_bits_print_safe( STDERR_FILENO, "Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd ); 272 273 data->result = cqe.res; 274 __unpark( data->thrd __cfaabi_dbg_ctx2 ); 275 } 233 276 234 277 // Allow new submissions to happen 235 V(ring.submit );278 V(ring.submit, count); 236 279 237 280 // Mark to the kernel that the cqe has been seen 238 281 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read. 239 __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELAXED ); 240 241 return true; 282 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 283 284 ring.completion_q.stats.completed_avg.val += count; 285 ring.completion_q.stats.completed_avg.cnt += 1; 286 287 return count; 242 288 } 243 289 … … 257 303 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) ); 258 304 259 LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 260 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8); 261 if( ret < 0 ) { 262 switch((int)errno) { 263 case EAGAIN: 264 case EINTR: 265 continue LOOP; 266 default: 267 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) ); 268 } 269 } 270 271 // Drain the queue 272 while(__io_process(ring)) {} 305 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 306 __drain_io( ring, mask, 1 ); 273 307 } 274 308 … … 343 377 } 344 378 379 ring.submit_q.stats.submit_avg.val += 1; 380 ring.submit_q.stats.submit_avg.cnt += 1; 381 345 382 unlock(ring.submit_q.lock); 346 383 // Make sure that idx was submitted … … 512 549 int cfa_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) { 513 550 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_ACCEPT) 514 __SOCKADDR_ARG _addr; 515 _addr.__sockaddr__ = addr; 516 return accept4( sockfd, _addr, addrlen, flags ); 551 #pragma GCC diagnostic push 552 #pragma GCC diagnostic ignored "-Wattributes" 553 __SOCKADDR_ARG _addr; 554 _addr.__sockaddr__ = addr; 555 return accept4( sockfd, _addr, addrlen, flags ); 556 #pragma GCC diagnostic pop 517 557 #else 518 558 __submit_prelude … … 529 569 int cfa_connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen) { 530 570 #if !defined(HAVE_LINUX_IO_URING_H) || !defined(IORING_OP_CONNECT) 531 __CONST_SOCKADDR_ARG _addr; 532 _addr.__sockaddr__ = addr; 533 return connect( sockfd, _addr, addrlen ); 571 #pragma GCC diagnostic push 572 #pragma GCC diagnostic ignored "-Wattributes" 573 __CONST_SOCKADDR_ARG _addr; 574 _addr.__sockaddr__ = addr; 575 return connect( sockfd, _addr, addrlen ); 576 #pragma GCC diagnostic pop 534 577 #else 535 578 __submit_prelude -
libcfa/src/concurrency/kernel.cfa
recf6b46 rd384787 1004 1004 } 1005 1005 1006 bool V(semaphore & this, unsigned diff) with( this ) { 1007 $thread * thrd = 0p; 1008 lock( lock __cfaabi_dbg_ctx2 ); 1009 int release = max(-count, (int)diff); 1010 count += diff; 1011 for(release) { 1012 unpark( pop_head( waiting ) __cfaabi_dbg_ctx2 ); 1013 } 1014 1015 unlock( lock ); 1016 1017 return thrd != 0p; 1018 } 1019 1006 1020 //----------------------------------------------------------------------------- 1007 1021 // Global Queues -
libcfa/src/concurrency/kernel.hfa
recf6b46 rd384787 40 40 void P (semaphore & this); 41 41 bool V (semaphore & this); 42 bool V (semaphore & this, unsigned count); 42 43 43 44 … … 144 145 void * ring_ptr; 145 146 size_t ring_sz; 147 148 // Statistics 149 struct { 150 struct { 151 unsigned long long int val; 152 unsigned long long int cnt; 153 } submit_avg; 154 } stats; 146 155 }; 147 156 … … 164 173 void * ring_ptr; 165 174 size_t ring_sz; 175 176 // Statistics 177 struct { 178 struct { 179 unsigned long long int val; 180 unsigned long long int cnt; 181 } completed_avg; 182 } stats; 166 183 }; 167 184
Note: See TracChangeset
for help on using the changeset viewer.