source: libcfa/src/concurrency/io.cfa@ 527acfc

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 527acfc was 59f74a2, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 29.8 KB
RevLine 
[ecf6b46]1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
[20ab637]16#if defined(__CFA_DEBUG__)
17 // #define __CFA_DEBUG_PRINT_IO__
18 #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
[4069faad]20
[92976d9]21#include "kernel.hfa"
[5c581cc]22#include "bitmanip.hfa"
[92976d9]23
24#if !defined(HAVE_LINUX_IO_URING_H)
[dd4e2d7]25 void __kernel_io_startup( cluster &, unsigned, bool ) {
[92976d9]26 // Nothing to do without io_uring
27 }
28
[3f7d0b4]29 void __kernel_io_finish_start( cluster & ) {
[f6660520]30 // Nothing to do without io_uring
31 }
32
[3f7d0b4]33 void __kernel_io_prepare_stop( cluster & ) {
[f6660520]34 // Nothing to do without io_uring
35 }
36
[3f7d0b4]37 void __kernel_io_shutdown( cluster &, bool ) {
[92976d9]38 // Nothing to do without io_uring
39 }
40
41#else
[31bb2e1]42 #define _GNU_SOURCE /* See feature_test_macros(7) */
43 #include <errno.h>
44 #include <stdint.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48
[92976d9]49 extern "C" {
50 #include <sys/syscall.h>
51
52 #include <linux/io_uring.h>
53 }
54
55 #include "bits/signal.hfa"
56 #include "kernel_private.hfa"
57 #include "thread.hfa"
58
59 uint32_t entries_per_cluster() {
60 return 256;
61 }
62
[f6660520]63 static void * __io_poller_slow( void * arg );
64
65 // Weirdly, some systems that do support io_uring don't actually define these
66 #ifdef __alpha__
67 /*
68 * alpha is the only exception, all other architectures
69 * have common numbers for new system calls.
70 */
71 #ifndef __NR_io_uring_setup
72 #define __NR_io_uring_setup 535
73 #endif
74 #ifndef __NR_io_uring_enter
75 #define __NR_io_uring_enter 536
76 #endif
77 #ifndef __NR_io_uring_register
78 #define __NR_io_uring_register 537
79 #endif
80 #else /* !__alpha__ */
81 #ifndef __NR_io_uring_setup
82 #define __NR_io_uring_setup 425
83 #endif
84 #ifndef __NR_io_uring_enter
85 #define __NR_io_uring_enter 426
86 #endif
87 #ifndef __NR_io_uring_register
88 #define __NR_io_uring_register 427
89 #endif
90 #endif
91
[61dd73d]92 // Fast poller user-thread
93 // Not using the "thread" keyword because we want to control
94 // more carefully when to start/stop it
95 struct __io_poller_fast {
96 struct __io_data * ring;
97 $thread thrd;
98 };
99
100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101 this.ring = cltr.io;
102 (this.thrd){ "Fast I/O Poller", cltr };
103 }
104 void ^?{}( __io_poller_fast & mutex this );
105 void main( __io_poller_fast & this );
106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107 void ^?{}( __io_poller_fast & mutex this ) {}
108
109 struct __submition_data {
110 // Head and tail of the ring (associated with array)
111 volatile uint32_t * head;
112 volatile uint32_t * tail;
[34b61882]113 volatile uint32_t prev_head;
[61dd73d]114
115 // The actual kernel ring which uses head/tail
116 // indexes into the sqes arrays
117 uint32_t * array;
118
119 // number of entries and mask to go with it
120 const uint32_t * num;
121 const uint32_t * mask;
122
123 // Submission flags (Not sure what for)
124 uint32_t * flags;
125
126 // number of sqes not submitted (whatever that means)
127 uint32_t * dropped;
128
129 // Like head/tail but not seen by the kernel
[5dadc9b7]130 volatile uint32_t * ready;
131 uint32_t ready_cnt;
[61dd73d]132
133 __spinlock_t lock;
[732b406]134 __spinlock_t release_lock;
[61dd73d]135
136 // A buffer of sqes (not the actual ring)
137 struct io_uring_sqe * sqes;
138
139 // The location and size of the mmaped area
140 void * ring_ptr;
141 size_t ring_sz;
142 };
143
144 struct __completion_data {
145 // Head and tail of the ring
146 volatile uint32_t * head;
147 volatile uint32_t * tail;
148
149 // number of entries and mask to go with it
150 const uint32_t * mask;
151 const uint32_t * num;
152
153 // number of cqes not submitted (whatever that means)
154 uint32_t * overflow;
155
156 // the kernel ring
157 struct io_uring_cqe * cqes;
158
159 // The location and size of the mmaped area
160 void * ring_ptr;
161 size_t ring_sz;
162 };
163
164 struct __io_data {
165 struct __submition_data submit_q;
166 struct __completion_data completion_q;
[b6f2b213]167 uint32_t ring_flags;
168 int cltr_flags;
[61dd73d]169 int fd;
170 semaphore submit;
171 volatile bool done;
172 struct {
173 struct {
[13c5e19]174 __processor_id_t id;
[61dd73d]175 void * stack;
176 pthread_t kthrd;
[5c581cc]177 volatile bool blocked;
[61dd73d]178 } slow;
179 __io_poller_fast fast;
180 __bin_sem_t sem;
181 } poller;
182 };
[185efe6]183
[92976d9]184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
[dd4e2d7]187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
[e46c753]188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190 }
191
[61dd73d]192 this.io = malloc();
193
[92976d9]194 // Step 1 : call to setup
195 struct io_uring_params params;
196 memset(&params, 0, sizeof(params));
[47746a2]197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL;
198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
[92976d9]199
[2d8f7b0]200 uint32_t nentries = entries_per_cluster();
201
202 int fd = syscall(__NR_io_uring_setup, nentries, &params );
[92976d9]203 if(fd < 0) {
204 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205 }
206
207 // Step 2 : mmap result
[61dd73d]208 memset( this.io, 0, sizeof(struct __io_data) );
209 struct __submition_data & sq = this.io->submit_q;
210 struct __completion_data & cq = this.io->completion_q;
[92976d9]211
212 // calculate the right ring size
[2d8f7b0]213 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
214 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
[92976d9]215
216 // Requires features
[d384787]217 #if defined(IORING_FEAT_SINGLE_MMAP)
218 // adjust the size according to the parameters
219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
[fb98462]220 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
[d384787]221 }
222 #endif
[92976d9]223
224 // mmap the Submit Queue into existence
[2d8f7b0]225 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226 if (sq.ring_ptr == (void*)MAP_FAILED) {
[92976d9]227 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228 }
229
230 // Requires features
[d384787]231 #if defined(IORING_FEAT_SINGLE_MMAP)
232 // mmap the Completion Queue into existence (may or may not be needed)
233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
[fb98462]234 cq.ring_ptr = sq.ring_ptr;
[d384787]235 }
236 else
237 #endif
238 {
[92976d9]239 // We need multiple call to MMAP
[2d8f7b0]240 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241 if (cq.ring_ptr == (void*)MAP_FAILED) {
242 munmap(sq.ring_ptr, sq.ring_sz);
[92976d9]243 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244 }
[d384787]245 }
[92976d9]246
247 // mmap the submit queue entries
248 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
[2d8f7b0]249 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251 munmap(sq.ring_ptr, sq.ring_sz);
252 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
[92976d9]253 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254 }
255
256 // Get the pointers from the kernel to fill the structure
257 // submit queue
[2d8f7b0]258 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
[34b61882]265 sq.prev_head = *sq.head;
[6f121b8]266
267 {
268 const uint32_t num = *sq.num;
269 for( i; num ) {
270 sq.sqes[i].user_data = 0ul64;
271 }
272 }
[5dadc9b7]273
[47746a2]274 (sq.lock){};
[732b406]275 (sq.release_lock){};
[47746a2]276
[e46c753]277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
[5c581cc]278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
[dd4e2d7]279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
[0335620]280 sq.ready = alloc_align( 64, sq.ready_cnt );
[5dadc9b7]281 for(i; sq.ready_cnt) {
282 sq.ready[i] = -1ul32;
283 }
284 }
285 else {
286 sq.ready_cnt = 0;
287 sq.ready = 0p;
288 }
[92976d9]289
290 // completion queue
[2d8f7b0]291 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298 // some paranoid checks
299 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
300 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
[92976d9]308
309 // Update the global ring info
[b6f2b213]310 this.io->ring_flags = params.flags;
311 this.io->cltr_flags = io_flags;
312 this.io->fd = fd;
313 this.io->done = false;
[61dd73d]314 (this.io->submit){ min(*sq.num, *cq.num) };
[92976d9]315
[f6660520]316 if(!main_cluster) {
317 __kernel_io_finish_start( this );
318 }
319 }
320
321 void __kernel_io_finish_start( cluster & this ) {
[b6f2b213]322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324 (this.io->poller.fast){ this };
325 __thrd_start( this.io->poller.fast, main );
326 }
[f6660520]327
[92976d9]328 // Create the poller thread
[20ab637]329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
[5c581cc]330 this.io->poller.slow.blocked = false;
[61dd73d]331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
[92976d9]332 }
333
[f6660520]334 void __kernel_io_prepare_stop( cluster & this ) {
[0a805f2]335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
[92976d9]336 // Notify the poller thread of the shutdown
[61dd73d]337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
[f6660520]338
339 // Stop the IO Poller
[92976d9]340 sigval val = { 1 };
[61dd73d]341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342 post( this.io->poller.sem );
[92976d9]343
344 // Wait for the poller thread to finish
[61dd73d]345 pthread_join( this.io->poller.slow.kthrd, 0p );
346 free( this.io->poller.slow.stack );
[f6660520]347
[0a805f2]348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
[4069faad]349
[b6f2b213]350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
[05cfa4d]351 with( this.io->poller.fast ) {
[13c5e19]352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353 /* paranoid */ verify( !ready_mutate_islocked() );
[05cfa4d]354
355 // We need to adjust the clean-up based on where the thread is
[5dadc9b7]356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
[05cfa4d]357
[13c5e19]358 ready_schedule_lock( (struct __processor_id_t *)active_processor() );
[2f1cb37]359
[13c5e19]360 // This is the tricky case
361 // The thread was preempted and now it is on the ready queue
362 // The thread should be the last on the list
363 /* paranoid */ verify( thrd.link.next != 0p );
[05cfa4d]364
[13c5e19]365 // Remove the thread from the ready queue of this cluster
366 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367 /* paranoid */ verify( removed );
368 thrd.link.next = 0p;
369 thrd.link.prev = 0p;
370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
[05cfa4d]371
[13c5e19]372 // Fixup the thread state
373 thrd.state = Blocked;
374 thrd.ticket = 0;
375 thrd.preempted = __NO_PREEMPTION;
376
377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
[05cfa4d]378
379 // Pretend like the thread was blocked all along
380 }
381 // !!! This is not an else if !!!
382 if( thrd.state == Blocked ) {
[6502a2b]383
[05cfa4d]384 // This is the "easy case"
385 // The thread is parked and can easily be moved to active cluster
386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387 thrd.curr_cluster = active_cluster();
[6502a2b]388
[13c5e19]389 // unpark the fast io_poller
[05cfa4d]390 unpark( &thrd __cfaabi_dbg_ctx2 );
391 }
392 else {
393
394 // The thread is in a weird state
395 // I don't know what to do here
396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397 }
398
399 }
[f6660520]400
[61dd73d]401 ^(this.io->poller.fast){};
[4069faad]402
[0a805f2]403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
[b6f2b213]404 }
[f6660520]405 }
406
407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408 if(!main_cluster) {
409 __kernel_io_prepare_stop( this );
410 }
[92976d9]411
412 // Shutdown the io rings
[61dd73d]413 struct __submition_data & sq = this.io->submit_q;
414 struct __completion_data & cq = this.io->completion_q;
[92976d9]415
416 // unmap the submit queue entries
[2d8f7b0]417 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
[92976d9]418
419 // unmap the Submit Queue ring
420 munmap(sq.ring_ptr, sq.ring_sz);
421
422 // unmap the Completion Queue ring, if it is different
423 if (cq.ring_ptr != sq.ring_ptr) {
424 munmap(cq.ring_ptr, cq.ring_sz);
425 }
426
427 // close the file descriptor
[61dd73d]428 close(this.io->fd);
429
[5dadc9b7]430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
[61dd73d]431 free( this.io );
[92976d9]432 }
433
[20ab637]434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435 bool need_sys_to_submit = false;
436 bool need_sys_to_complete = false;
437 unsigned min_complete = 0;
438 unsigned flags = 0;
439
440
441 TO_SUBMIT:
442 if( to_submit > 0 ) {
443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444 need_sys_to_submit = true;
445 break TO_SUBMIT;
446 }
447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448 need_sys_to_submit = true;
449 flags |= IORING_ENTER_SQ_WAKEUP;
450 }
451 }
452
453 TO_COMPLETE:
454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455 flags |= IORING_ENTER_GETEVENTS;
456 if( mask ) {
457 need_sys_to_complete = true;
458 min_complete = 1;
459 break TO_COMPLETE;
460 }
461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462 need_sys_to_complete = true;
463 }
464 }
465
466 int ret = 0;
467 if( need_sys_to_submit || need_sys_to_complete ) {
468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469 if( ret < 0 ) {
470 switch((int)errno) {
471 case EAGAIN:
472 case EINTR:
473 ret = -1;
474 break;
475 default:
476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477 }
478 }
479 }
480
481 // Memory barrier
482 __atomic_thread_fence( __ATOMIC_SEQ_CST );
483 return ret;
484 }
485
[92976d9]486//=============================================================================================
487// I/O Polling
488//=============================================================================================
[1d5e4711]489 static unsigned __collect_submitions( struct __io_data & ring );
[34b61882]490 static uint32_t __release_consumed_submission( struct __io_data & ring );
[1d5e4711]491
[92976d9]492 // Process a single completion message from the io_uring
493 // This is NOT thread-safe
[20ab637]494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
[e46c753]495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
496
[5dadc9b7]497 unsigned to_submit = 0;
498 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
499 // If the poller thread also submits, then we need to aggregate the submissions which are ready
[e46c753]500 to_submit = __collect_submitions( ring );
[5dadc9b7]501 }
502
[20ab637]503 int ret = __io_uring_enter(ring, to_submit, true, mask);
504 if( ret < 0 ) {
505 return [0, true];
506 }
[1d5e4711]507
[20ab637]508 // update statistics
509 if (to_submit > 0) {
[1d5e4711]510 __STATS__( true,
511 if( to_submit > 0 ) {
512 io.submit_q.submit_avg.rdy += to_submit;
513 io.submit_q.submit_avg.csm += ret;
514 io.submit_q.submit_avg.cnt += 1;
515 }
516 )
[6f121b8]517 }
518
[20ab637]519 // Release the consumed SQEs
520 __release_consumed_submission( ring );
[6f121b8]521
[d384787]522 // Drain the queue
[92976d9]523 unsigned head = *ring.completion_q.head;
[6f121b8]524 unsigned tail = *ring.completion_q.tail;
525 const uint32_t mask = *ring.completion_q.mask;
526
[d384787]527 // Nothing was new return 0
528 if (head == tail) {
[e46c753]529 return [0, to_submit > 0];
[d384787]530 }
[92976d9]531
[d384787]532 uint32_t count = tail - head;
[1d5e4711]533 /* paranoid */ verify( count != 0 );
[d384787]534 for(i; count) {
[6f121b8]535 unsigned idx = (head + i) & mask;
[d384787]536 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
[92976d9]537
[d384787]538 /* paranoid */ verify(&cqe);
[92976d9]539
[31bb2e1]540 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
[4069faad]541 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
[2d8f7b0]542
[d384787]543 data->result = cqe.res;
[20ab637]544 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
545 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
[d384787]546 }
[2d8f7b0]547
548 // Allow new submissions to happen
[6f121b8]549 // V(ring.submit, count);
[92976d9]550
551 // Mark to the kernel that the cqe has been seen
552 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
[6f121b8]553 __atomic_thread_fence( __ATOMIC_SEQ_CST );
[d384787]554 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
[92976d9]555
[5dadc9b7]556 return [count, count > 0 || to_submit > 0];
[92976d9]557 }
558
[f6660520]559 static void * __io_poller_slow( void * arg ) {
[13c5e19]560 #if !defined( __CFA_NO_STATISTICS__ )
561 __stats_t local_stats;
562 __init_stats( &local_stats );
563 kernelTLS.this_stats = &local_stats;
564 #endif
565
[92976d9]566 cluster * cltr = (cluster *)arg;
[61dd73d]567 struct __io_data & ring = *cltr->io;
[92976d9]568
[13c5e19]569 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
570
[92976d9]571 sigset_t mask;
572 sigfillset(&mask);
573 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
574 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
575 }
576
577 sigdelset( &mask, SIGUSR1 );
578
579 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
580 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
581
[1539bbd]582 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
583
[b6f2b213]584 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
585 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
[5dadc9b7]586
[5c581cc]587 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
588
[f6660520]589 // In the user-thread approach drain and if anything was drained,
590 // batton pass to the user-thread
[5dadc9b7]591 int count;
592 bool again;
[20ab637]593 [count, again] = __drain_io( ring, &mask );
[5c581cc]594
595 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
[3c039b0]596
597 // Update statistics
[47746a2]598 __STATS__( true,
599 io.complete_q.completed_avg.val += count;
600 io.complete_q.completed_avg.slow_cnt += 1;
601 )
[3c039b0]602
[5dadc9b7]603 if(again) {
[0a805f2]604 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
[13c5e19]605 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
[f6660520]606 wait( ring.poller.sem );
607 }
[b6f2b213]608 }
609 }
610 else {
611 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
[f6660520]612 //In the naive approach, just poll the io completion queue directly
[5dadc9b7]613 int count;
614 bool again;
[20ab637]615 [count, again] = __drain_io( ring, &mask );
[3c039b0]616
617 // Update statistics
[47746a2]618 __STATS__( true,
619 io.complete_q.completed_avg.val += count;
620 io.complete_q.completed_avg.slow_cnt += 1;
621 )
[b6f2b213]622 }
[92976d9]623 }
624
[1539bbd]625 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
626
[13c5e19]627 unregister( &ring.poller.slow.id );
628
[df40a56]629 #if !defined(__CFA_NO_STATISTICS__)
630 __tally_stats(cltr->stats, &local_stats);
631 #endif
632
[92976d9]633 return 0p;
634 }
635
[61dd73d]636 void main( __io_poller_fast & this ) {
[b6f2b213]637 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
638
[61dd73d]639 // Start parked
640 park( __cfaabi_dbg_ctx );
[f6660520]641
[61dd73d]642 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
[1539bbd]643
[4e74466]644 int reset = 0;
645
[61dd73d]646 // Then loop until we need to start
647 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
[5dadc9b7]648
[61dd73d]649 // Drain the io
[5dadc9b7]650 int count;
651 bool again;
[13c5e19]652 disable_interrupts();
[20ab637]653 [count, again] = __drain_io( *this.ring, 0p );
[5dadc9b7]654
[13c5e19]655 if(!again) reset++;
[3c039b0]656
[13c5e19]657 // Update statistics
[47746a2]658 __STATS__( true,
659 io.complete_q.completed_avg.val += count;
660 io.complete_q.completed_avg.fast_cnt += 1;
661 )
[13c5e19]662 enable_interrupts( __cfaabi_dbg_ctx );
[3c039b0]663
[5dadc9b7]664 // If we got something, just yield and check again
[4e74466]665 if(reset < 5) {
[61dd73d]666 yield();
667 }
[5dadc9b7]668 // We didn't get anything baton pass to the slow poller
[61dd73d]669 else {
670 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
[5dadc9b7]671 reset = 0;
672
673 // wake up the slow poller
[61dd73d]674 post( this.ring->poller.sem );
[5dadc9b7]675
676 // park this thread
[61dd73d]677 park( __cfaabi_dbg_ctx );
[f6660520]678 }
679 }
[61dd73d]680
681 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
682 }
[f6660520]683
[0335620]684 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
[5dadc9b7]685 static inline void __wake_poller( struct __io_data & ring ) {
[5c581cc]686 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
687
688 sigval val = { 1 };
689 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
[5dadc9b7]690 }
691
[92976d9]692//=============================================================================================
693// I/O Submissions
694//=============================================================================================
695
[2d8f7b0]696// Submition steps :
[e46c753]697// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
[2d8f7b0]698// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
699// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
700// need to write an allocator that allows allocating concurrently.
701//
[e46c753]702// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
[2d8f7b0]703//
[e46c753]704// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
[2d8f7b0]705// needs to arrive to two concensus at the same time:
706// A - The order in which entries are listed in the array: no two threads must pick the
707// same index for their entries
708// B - When can the tail be update for the kernel. EVERY entries in the array between
709// head and tail must be fully filled and shouldn't ever be touched again.
710//
711
[31bb2e1]712 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
[e46c753]713 /* paranoid */ verify( data != 0 );
[13c5e19]714
[6f121b8]715 // Prepare the data we need
716 __attribute((unused)) int len = 0;
717 __attribute((unused)) int block = 0;
718 uint32_t cnt = *ring.submit_q.num;
719 uint32_t mask = *ring.submit_q.mask;
[8ae4165]720
721 disable_interrupts();
722 uint32_t off = __tls_rand();
723 enable_interrupts( __cfaabi_dbg_ctx );
[6f121b8]724
725 // Loop around looking for an available spot
[13c5e19]726 for() {
[6f121b8]727 // Look through the list starting at some offset
728 for(i; cnt) {
729 uint64_t expected = 0;
730 uint32_t idx = (i + off) & mask;
731 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
732 volatile uint64_t * udata = &sqe->user_data;
733
734 if( *udata == expected &&
735 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
736 {
737 // update statistics
[47746a2]738 __STATS__( false,
739 io.submit_q.alloc_avg.val += len;
740 io.submit_q.alloc_avg.block += block;
741 io.submit_q.alloc_avg.cnt += 1;
742 )
[6f121b8]743
[13c5e19]744
[6f121b8]745 // Success return the data
746 return [sqe, idx];
747 }
748 verify(expected != data);
[2489d31]749
[6f121b8]750 len ++;
751 }
[2489d31]752
[6f121b8]753 block++;
754 yield();
755 }
[2489d31]756 }
757
[df40a56]758 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
759 /* paranoid */ verify( idx <= mask );
760 /* paranoid */ verify( idx != -1ul32 );
761
762 // We need to find a spot in the ready array
763 __attribute((unused)) int len = 0;
764 __attribute((unused)) int block = 0;
765 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
766
767 disable_interrupts();
768 uint32_t off = __tls_rand();
769 enable_interrupts( __cfaabi_dbg_ctx );
770
771 uint32_t picked;
772 LOOKING: for() {
773 for(i; ring.submit_q.ready_cnt) {
774 picked = (i + off) & ready_mask;
775 uint32_t expected = -1ul32;
776 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
777 break LOOKING;
778 }
779 verify(expected != idx);
780
781 len ++;
782 }
783
784 block++;
[34b61882]785 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
786 __release_consumed_submission( ring );
787 unlock( ring.submit_q.lock );
788 }
789 else {
790 yield();
791 }
[df40a56]792 }
793
794 // update statistics
[47746a2]795 __STATS__( false,
796 io.submit_q.look_avg.val += len;
797 io.submit_q.look_avg.block += block;
798 io.submit_q.look_avg.cnt += 1;
799 )
[df40a56]800
801 return picked;
802 }
803
[31bb2e1]804 void __submit( struct __io_data & ring, uint32_t idx ) {
[5dadc9b7]805 // Get now the data we definetely need
806 uint32_t * const tail = ring.submit_q.tail;
[2489d31]807 const uint32_t mask = *ring.submit_q.mask;
808
[5dadc9b7]809 // There are 2 submission schemes, check which one we are using
810 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
811 // If the poller thread submits, then we just need to add this to the ready array
[df40a56]812 __submit_to_ready_array( ring, idx, mask );
[5dadc9b7]813
814 __wake_poller( ring );
815
[dd4e2d7]816 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
[2d8f7b0]817 }
[e46c753]818 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
819 uint32_t picked = __submit_to_ready_array( ring, idx, mask );
820
821 for() {
822 yield();
823
824 // If some one else collected our index, we are done
[8bb239d]825 #warning ABA problem
[e46c753]826 if( ring.submit_q.ready[picked] != idx ) {
[47746a2]827 __STATS__( false,
828 io.submit_q.helped += 1;
829 )
[e46c753]830 return;
831 }
832
833 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
[47746a2]834 __STATS__( false,
835 io.submit_q.leader += 1;
836 )
[e46c753]837 break;
838 }
[8bb239d]839
[47746a2]840 __STATS__( false,
841 io.submit_q.busy += 1;
842 )
[e46c753]843 }
844
845 // We got the lock
846 unsigned to_submit = __collect_submitions( ring );
[20ab637]847 int ret = __io_uring_enter( ring, to_submit, false, 0p );
[e46c753]848 if( ret < 0 ) {
[20ab637]849 unlock(ring.submit_q.lock);
850 return;
[e46c753]851 }
852
[20ab637]853 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
[e46c753]854
855 // Release the consumed SQEs
[34b61882]856 __release_consumed_submission( ring );
[e46c753]857
858 // update statistics
[47746a2]859 __STATS__( true,
860 io.submit_q.submit_avg.rdy += to_submit;
861 io.submit_q.submit_avg.csm += ret;
862 io.submit_q.submit_avg.cnt += 1;
863 )
[e46c753]864
865 unlock(ring.submit_q.lock);
866 }
[5dadc9b7]867 else {
868 // get mutual exclusion
869 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
[2489d31]870
[20ab637]871 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
872 /* paranoid */ "index %u already reclaimed\n"
873 /* paranoid */ "head %u, prev %u, tail %u\n"
874 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
875 /* paranoid */ idx,
876 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail
877 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
881 /* paranoid */ );
882
[5dadc9b7]883 // Append to the list of ready entries
884
885 /* paranoid */ verify( idx <= mask );
[20ab637]886 ring.submit_q.array[ (*tail) & mask ] = idx;
[5dadc9b7]887 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
[d384787]888
[5dadc9b7]889 // Submit however, many entries need to be submitted
[20ab637]890 int ret = __io_uring_enter( ring, 1, false, 0p );
[5dadc9b7]891 if( ret < 0 ) {
892 switch((int)errno) {
893 default:
894 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
895 }
896 }
[d384787]897
[5dadc9b7]898 // update statistics
[47746a2]899 __STATS__( false,
900 io.submit_q.submit_avg.csm += 1;
901 io.submit_q.submit_avg.cnt += 1;
902 )
[5dadc9b7]903
[34b61882]904 // Release the consumed SQEs
905 __release_consumed_submission( ring );
[7bfc849]906
[5dadc9b7]907 unlock(ring.submit_q.lock);
[dd4e2d7]908
909 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
[5dadc9b7]910 }
[2489d31]911 }
[e46c753]912
913 static unsigned __collect_submitions( struct __io_data & ring ) {
914 /* paranoid */ verify( ring.submit_q.ready != 0p );
915 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
916
917 unsigned to_submit = 0;
918 uint32_t tail = *ring.submit_q.tail;
919 const uint32_t mask = *ring.submit_q.mask;
920
921 // Go through the list of ready submissions
922 for( i; ring.submit_q.ready_cnt ) {
923 // replace any submission with the sentinel, to consume it.
924 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
925
926 // If it was already the sentinel, then we are done
927 if( idx == -1ul32 ) continue;
928
929 // If we got a real submission, append it to the list
930 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
931 to_submit++;
932 }
933
934 // Increment the tail based on how many we are ready to submit
935 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
936
937 return to_submit;
938 }
[34b61882]939
940 static uint32_t __release_consumed_submission( struct __io_data & ring ) {
941 const uint32_t smask = *ring.submit_q.mask;
[732b406]942
943 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
[34b61882]944 uint32_t chead = *ring.submit_q.head;
945 uint32_t phead = ring.submit_q.prev_head;
946 ring.submit_q.prev_head = chead;
[732b406]947 unlock(ring.submit_q.release_lock);
948
[34b61882]949 uint32_t count = chead - phead;
950 for( i; count ) {
951 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
952 ring.submit_q.sqes[ idx ].user_data = 0;
953 }
954 return count;
955 }
[20ab637]956
957//=============================================================================================
958// I/O Submissions
959//=============================================================================================
960
961 void register_fixed_files( cluster & cl, int * files, unsigned count ) {
962 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
963 if( ret < 0 ) {
964 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
965 }
966
967 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
968 }
[47746a2]969#endif
Note: See TracBrowser for help on using the repository browser.