source: libcfa/src/concurrency/io.cfa@ 20ab637

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 20ab637 was 20ab637, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added quick and dirty support for fixed files reads.
Added support for kernel side polling.

  • Property mode set to 100644
File size: 29.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#if defined(__CFA_DEBUG__)
17 // #define __CFA_DEBUG_PRINT_IO__
18 #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(HAVE_LINUX_IO_URING_H)
25 void __kernel_io_startup( cluster &, unsigned, bool ) {
26 // Nothing to do without io_uring
27 }
28
29 void __kernel_io_finish_start( cluster & ) {
30 // Nothing to do without io_uring
31 }
32
33 void __kernel_io_prepare_stop( cluster & ) {
34 // Nothing to do without io_uring
35 }
36
37 void __kernel_io_shutdown( cluster &, bool ) {
38 // Nothing to do without io_uring
39 }
40
41#else
42 #define _GNU_SOURCE /* See feature_test_macros(7) */
43 #include <errno.h>
44 #include <stdint.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48
49 extern "C" {
50 #include <sys/syscall.h>
51
52 #include <linux/io_uring.h>
53 }
54
55 #include "bits/signal.hfa"
56 #include "kernel_private.hfa"
57 #include "thread.hfa"
58
59 uint32_t entries_per_cluster() {
60 return 256;
61 }
62
63 static void * __io_poller_slow( void * arg );
64
65 // Weirdly, some systems that do support io_uring don't actually define these
66 #ifdef __alpha__
67 /*
68 * alpha is the only exception, all other architectures
69 * have common numbers for new system calls.
70 */
71 #ifndef __NR_io_uring_setup
72 #define __NR_io_uring_setup 535
73 #endif
74 #ifndef __NR_io_uring_enter
75 #define __NR_io_uring_enter 536
76 #endif
77 #ifndef __NR_io_uring_register
78 #define __NR_io_uring_register 537
79 #endif
80 #else /* !__alpha__ */
81 #ifndef __NR_io_uring_setup
82 #define __NR_io_uring_setup 425
83 #endif
84 #ifndef __NR_io_uring_enter
85 #define __NR_io_uring_enter 426
86 #endif
87 #ifndef __NR_io_uring_register
88 #define __NR_io_uring_register 427
89 #endif
90 #endif
91
92 // Fast poller user-thread
93 // Not using the "thread" keyword because we want to control
94 // more carefully when to start/stop it
95 struct __io_poller_fast {
96 struct __io_data * ring;
97 $thread thrd;
98 };
99
100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101 this.ring = cltr.io;
102 (this.thrd){ "Fast I/O Poller", cltr };
103 }
104 void ^?{}( __io_poller_fast & mutex this );
105 void main( __io_poller_fast & this );
106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107 void ^?{}( __io_poller_fast & mutex this ) {}
108
109 struct __submition_data {
110 // Head and tail of the ring (associated with array)
111 volatile uint32_t * head;
112 volatile uint32_t * tail;
113 volatile uint32_t prev_head;
114
115 // The actual kernel ring which uses head/tail
116 // indexes into the sqes arrays
117 uint32_t * array;
118
119 // number of entries and mask to go with it
120 const uint32_t * num;
121 const uint32_t * mask;
122
123 // Submission flags (Not sure what for)
124 uint32_t * flags;
125
126 // number of sqes not submitted (whatever that means)
127 uint32_t * dropped;
128
129 // Like head/tail but not seen by the kernel
130 volatile uint32_t * ready;
131 uint32_t ready_cnt;
132
133 __spinlock_t lock;
134 __spinlock_t release_lock;
135
136 // A buffer of sqes (not the actual ring)
137 struct io_uring_sqe * sqes;
138
139 // The location and size of the mmaped area
140 void * ring_ptr;
141 size_t ring_sz;
142 };
143
144 struct __completion_data {
145 // Head and tail of the ring
146 volatile uint32_t * head;
147 volatile uint32_t * tail;
148
149 // number of entries and mask to go with it
150 const uint32_t * mask;
151 const uint32_t * num;
152
153 // number of cqes not submitted (whatever that means)
154 uint32_t * overflow;
155
156 // the kernel ring
157 struct io_uring_cqe * cqes;
158
159 // The location and size of the mmaped area
160 void * ring_ptr;
161 size_t ring_sz;
162 };
163
164 struct __io_data {
165 struct __submition_data submit_q;
166 struct __completion_data completion_q;
167 uint32_t ring_flags;
168 int cltr_flags;
169 int fd;
170 semaphore submit;
171 volatile bool done;
172 struct {
173 struct {
174 __processor_id_t id;
175 void * stack;
176 pthread_t kthrd;
177 volatile bool blocked;
178 } slow;
179 __io_poller_fast fast;
180 __bin_sem_t sem;
181 } poller;
182 };
183
184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190 }
191
192 this.io = malloc();
193
194 // Step 1 : call to setup
195 struct io_uring_params params;
196 memset(&params, 0, sizeof(params));
197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL;
198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
199
200 uint32_t nentries = entries_per_cluster();
201
202 int fd = syscall(__NR_io_uring_setup, nentries, &params );
203 if(fd < 0) {
204 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205 }
206
207 // Step 2 : mmap result
208 memset( this.io, 0, sizeof(struct __io_data) );
209 struct __submition_data & sq = this.io->submit_q;
210 struct __completion_data & cq = this.io->completion_q;
211
212 // calculate the right ring size
213 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
214 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
215
216 // Requires features
217 #if defined(IORING_FEAT_SINGLE_MMAP)
218 // adjust the size according to the parameters
219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
220 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
221 }
222 #endif
223
224 // mmap the Submit Queue into existence
225 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226 if (sq.ring_ptr == (void*)MAP_FAILED) {
227 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228 }
229
230 // Requires features
231 #if defined(IORING_FEAT_SINGLE_MMAP)
232 // mmap the Completion Queue into existence (may or may not be needed)
233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
234 cq->ring_ptr = sq->ring_ptr;
235 }
236 else
237 #endif
238 {
239 // We need multiple call to MMAP
240 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241 if (cq.ring_ptr == (void*)MAP_FAILED) {
242 munmap(sq.ring_ptr, sq.ring_sz);
243 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244 }
245 }
246
247 // mmap the submit queue entries
248 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
249 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251 munmap(sq.ring_ptr, sq.ring_sz);
252 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
253 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254 }
255
256 // Get the pointers from the kernel to fill the structure
257 // submit queue
258 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
265 sq.prev_head = *sq.head;
266
267 {
268 const uint32_t num = *sq.num;
269 for( i; num ) {
270 sq.sqes[i].user_data = 0ul64;
271 }
272 }
273
274 (sq.lock){};
275 (sq.release_lock){};
276
277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
280 sq.ready = alloc_align( 64, sq.ready_cnt );
281 for(i; sq.ready_cnt) {
282 sq.ready[i] = -1ul32;
283 }
284 }
285 else {
286 sq.ready_cnt = 0;
287 sq.ready = 0p;
288 }
289
290 // completion queue
291 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298 // some paranoid checks
299 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
300 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
308
309 // Update the global ring info
310 this.io->ring_flags = params.flags;
311 this.io->cltr_flags = io_flags;
312 this.io->fd = fd;
313 this.io->done = false;
314 (this.io->submit){ min(*sq.num, *cq.num) };
315
316 if(!main_cluster) {
317 __kernel_io_finish_start( this );
318 }
319 }
320
321 void __kernel_io_finish_start( cluster & this ) {
322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324 (this.io->poller.fast){ this };
325 __thrd_start( this.io->poller.fast, main );
326 }
327
328 // Create the poller thread
329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
330 this.io->poller.slow.blocked = false;
331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
332 }
333
334 void __kernel_io_prepare_stop( cluster & this ) {
335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
336 // Notify the poller thread of the shutdown
337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
338
339 // Stop the IO Poller
340 sigval val = { 1 };
341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342 post( this.io->poller.sem );
343
344 // Wait for the poller thread to finish
345 pthread_join( this.io->poller.slow.kthrd, 0p );
346 free( this.io->poller.slow.stack );
347
348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
349
350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
351 with( this.io->poller.fast ) {
352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353 /* paranoid */ verify( !ready_mutate_islocked() );
354
355 // We need to adjust the clean-up based on where the thread is
356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
357
358 ready_schedule_lock( (struct __processor_id_t *)active_processor() );
359
360 // This is the tricky case
361 // The thread was preempted and now it is on the ready queue
362 // The thread should be the last on the list
363 /* paranoid */ verify( thrd.link.next != 0p );
364
365 // Remove the thread from the ready queue of this cluster
366 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367 /* paranoid */ verify( removed );
368 thrd.link.next = 0p;
369 thrd.link.prev = 0p;
370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
371
372 // Fixup the thread state
373 thrd.state = Blocked;
374 thrd.ticket = 0;
375 thrd.preempted = __NO_PREEMPTION;
376
377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
378
379 // Pretend like the thread was blocked all along
380 }
381 // !!! This is not an else if !!!
382 if( thrd.state == Blocked ) {
383
384 // This is the "easy case"
385 // The thread is parked and can easily be moved to active cluster
386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387 thrd.curr_cluster = active_cluster();
388
389 // unpark the fast io_poller
390 unpark( &thrd __cfaabi_dbg_ctx2 );
391 }
392 else {
393
394 // The thread is in a weird state
395 // I don't know what to do here
396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397 }
398
399 }
400
401 ^(this.io->poller.fast){};
402
403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
404 }
405 }
406
407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408 if(!main_cluster) {
409 __kernel_io_prepare_stop( this );
410 }
411
412 // Shutdown the io rings
413 struct __submition_data & sq = this.io->submit_q;
414 struct __completion_data & cq = this.io->completion_q;
415
416 // unmap the submit queue entries
417 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
418
419 // unmap the Submit Queue ring
420 munmap(sq.ring_ptr, sq.ring_sz);
421
422 // unmap the Completion Queue ring, if it is different
423 if (cq.ring_ptr != sq.ring_ptr) {
424 munmap(cq.ring_ptr, cq.ring_sz);
425 }
426
427 // close the file descriptor
428 close(this.io->fd);
429
430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
431 free( this.io );
432 }
433
434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435 bool need_sys_to_submit = false;
436 bool need_sys_to_complete = false;
437 unsigned min_complete = 0;
438 unsigned flags = 0;
439
440
441 TO_SUBMIT:
442 if( to_submit > 0 ) {
443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444 need_sys_to_submit = true;
445 break TO_SUBMIT;
446 }
447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448 need_sys_to_submit = true;
449 flags |= IORING_ENTER_SQ_WAKEUP;
450 }
451 }
452
453 TO_COMPLETE:
454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455 flags |= IORING_ENTER_GETEVENTS;
456 if( mask ) {
457 need_sys_to_complete = true;
458 min_complete = 1;
459 break TO_COMPLETE;
460 }
461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462 need_sys_to_complete = true;
463 }
464 }
465
466 int ret = 0;
467 if( need_sys_to_submit || need_sys_to_complete ) {
468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469 if( ret < 0 ) {
470 switch((int)errno) {
471 case EAGAIN:
472 case EINTR:
473 ret = -1;
474 break;
475 default:
476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477 }
478 }
479 }
480
481 // Memory barrier
482 __atomic_thread_fence( __ATOMIC_SEQ_CST );
483 return ret;
484 }
485
486//=============================================================================================
487// I/O Polling
488//=============================================================================================
489 static unsigned __collect_submitions( struct __io_data & ring );
490 static uint32_t __release_consumed_submission( struct __io_data & ring );
491
492 // Process a single completion message from the io_uring
493 // This is NOT thread-safe
494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
496 const uint32_t smask = *ring.submit_q.mask;
497
498 unsigned to_submit = 0;
499 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
500 // If the poller thread also submits, then we need to aggregate the submissions which are ready
501 to_submit = __collect_submitions( ring );
502 }
503
504 int ret = __io_uring_enter(ring, to_submit, true, mask);
505 if( ret < 0 ) {
506 return [0, true];
507 }
508
509 // update statistics
510 if (to_submit > 0) {
511 __STATS__( true,
512 if( to_submit > 0 ) {
513 io.submit_q.submit_avg.rdy += to_submit;
514 io.submit_q.submit_avg.csm += ret;
515 io.submit_q.submit_avg.cnt += 1;
516 }
517 )
518 }
519
520 // Release the consumed SQEs
521 __release_consumed_submission( ring );
522
523 // Drain the queue
524 unsigned head = *ring.completion_q.head;
525 unsigned tail = *ring.completion_q.tail;
526 const uint32_t mask = *ring.completion_q.mask;
527
528 // Nothing was new return 0
529 if (head == tail) {
530 return [0, to_submit > 0];
531 }
532
533 uint32_t count = tail - head;
534 /* paranoid */ verify( count != 0 );
535 for(i; count) {
536 unsigned idx = (head + i) & mask;
537 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
538
539 /* paranoid */ verify(&cqe);
540
541 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
542 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
543
544 data->result = cqe.res;
545 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
546 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
547 }
548
549 // Allow new submissions to happen
550 // V(ring.submit, count);
551
552 // Mark to the kernel that the cqe has been seen
553 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
554 __atomic_thread_fence( __ATOMIC_SEQ_CST );
555 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
556
557 return [count, count > 0 || to_submit > 0];
558 }
559
560 static void * __io_poller_slow( void * arg ) {
561 #if !defined( __CFA_NO_STATISTICS__ )
562 __stats_t local_stats;
563 __init_stats( &local_stats );
564 kernelTLS.this_stats = &local_stats;
565 #endif
566
567 cluster * cltr = (cluster *)arg;
568 struct __io_data & ring = *cltr->io;
569
570 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
571
572 sigset_t mask;
573 sigfillset(&mask);
574 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
575 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
576 }
577
578 sigdelset( &mask, SIGUSR1 );
579
580 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
581 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
582
583 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
584
585 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
586 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
587
588 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
589
590 // In the user-thread approach drain and if anything was drained,
591 // batton pass to the user-thread
592 int count;
593 bool again;
594 [count, again] = __drain_io( ring, &mask );
595
596 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
597
598 // Update statistics
599 __STATS__( true,
600 io.complete_q.completed_avg.val += count;
601 io.complete_q.completed_avg.slow_cnt += 1;
602 )
603
604 if(again) {
605 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
606 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
607 wait( ring.poller.sem );
608 }
609 }
610 }
611 else {
612 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
613 //In the naive approach, just poll the io completion queue directly
614 int count;
615 bool again;
616 [count, again] = __drain_io( ring, &mask );
617
618 // Update statistics
619 __STATS__( true,
620 io.complete_q.completed_avg.val += count;
621 io.complete_q.completed_avg.slow_cnt += 1;
622 )
623 }
624 }
625
626 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
627
628 unregister( &ring.poller.slow.id );
629
630 #if !defined(__CFA_NO_STATISTICS__)
631 __tally_stats(cltr->stats, &local_stats);
632 #endif
633
634 return 0p;
635 }
636
637 void main( __io_poller_fast & this ) {
638 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
639
640 // Start parked
641 park( __cfaabi_dbg_ctx );
642
643 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
644
645 int reset = 0;
646
647 // Then loop until we need to start
648 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
649
650 // Drain the io
651 int count;
652 bool again;
653 disable_interrupts();
654 [count, again] = __drain_io( *this.ring, 0p );
655
656 if(!again) reset++;
657
658 // Update statistics
659 __STATS__( true,
660 io.complete_q.completed_avg.val += count;
661 io.complete_q.completed_avg.fast_cnt += 1;
662 )
663 enable_interrupts( __cfaabi_dbg_ctx );
664
665 // If we got something, just yield and check again
666 if(reset < 5) {
667 yield();
668 }
669 // We didn't get anything baton pass to the slow poller
670 else {
671 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
672 reset = 0;
673
674 // wake up the slow poller
675 post( this.ring->poller.sem );
676
677 // park this thread
678 park( __cfaabi_dbg_ctx );
679 }
680 }
681
682 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
683 }
684
685 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
686 static inline void __wake_poller( struct __io_data & ring ) {
687 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
688
689 sigval val = { 1 };
690 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
691 }
692
693//=============================================================================================
694// I/O Submissions
695//=============================================================================================
696
697// Submition steps :
698// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
699// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
700// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
701// need to write an allocator that allows allocating concurrently.
702//
703// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
704//
705// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
706// needs to arrive to two concensus at the same time:
707// A - The order in which entries are listed in the array: no two threads must pick the
708// same index for their entries
709// B - When can the tail be update for the kernel. EVERY entries in the array between
710// head and tail must be fully filled and shouldn't ever be touched again.
711//
712
713 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
714 /* paranoid */ verify( data != 0 );
715
716 // Prepare the data we need
717 __attribute((unused)) int len = 0;
718 __attribute((unused)) int block = 0;
719 uint32_t cnt = *ring.submit_q.num;
720 uint32_t mask = *ring.submit_q.mask;
721
722 disable_interrupts();
723 uint32_t off = __tls_rand();
724 enable_interrupts( __cfaabi_dbg_ctx );
725
726 // Loop around looking for an available spot
727 for() {
728 // Look through the list starting at some offset
729 for(i; cnt) {
730 uint64_t expected = 0;
731 uint32_t idx = (i + off) & mask;
732 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
733 volatile uint64_t * udata = &sqe->user_data;
734
735 if( *udata == expected &&
736 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
737 {
738 // update statistics
739 __STATS__( false,
740 io.submit_q.alloc_avg.val += len;
741 io.submit_q.alloc_avg.block += block;
742 io.submit_q.alloc_avg.cnt += 1;
743 )
744
745
746 // Success return the data
747 return [sqe, idx];
748 }
749 verify(expected != data);
750
751 len ++;
752 }
753
754 block++;
755 yield();
756 }
757 }
758
759 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
760 /* paranoid */ verify( idx <= mask );
761 /* paranoid */ verify( idx != -1ul32 );
762
763 // We need to find a spot in the ready array
764 __attribute((unused)) int len = 0;
765 __attribute((unused)) int block = 0;
766 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
767
768 disable_interrupts();
769 uint32_t off = __tls_rand();
770 enable_interrupts( __cfaabi_dbg_ctx );
771
772 uint32_t picked;
773 LOOKING: for() {
774 for(i; ring.submit_q.ready_cnt) {
775 picked = (i + off) & ready_mask;
776 uint32_t expected = -1ul32;
777 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
778 break LOOKING;
779 }
780 verify(expected != idx);
781
782 len ++;
783 }
784
785 block++;
786 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
787 __release_consumed_submission( ring );
788 unlock( ring.submit_q.lock );
789 }
790 else {
791 yield();
792 }
793 }
794
795 // update statistics
796 __STATS__( false,
797 io.submit_q.look_avg.val += len;
798 io.submit_q.look_avg.block += block;
799 io.submit_q.look_avg.cnt += 1;
800 )
801
802 return picked;
803 }
804
805 void __submit( struct __io_data & ring, uint32_t idx ) {
806 // Get now the data we definetely need
807 uint32_t * const tail = ring.submit_q.tail;
808 const uint32_t mask = *ring.submit_q.mask;
809
810 // There are 2 submission schemes, check which one we are using
811 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
812 // If the poller thread submits, then we just need to add this to the ready array
813 __submit_to_ready_array( ring, idx, mask );
814
815 __wake_poller( ring );
816
817 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
818 }
819 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
820 uint32_t picked = __submit_to_ready_array( ring, idx, mask );
821
822 for() {
823 yield();
824
825 // If some one else collected our index, we are done
826 #warning ABA problem
827 if( ring.submit_q.ready[picked] != idx ) {
828 __STATS__( false,
829 io.submit_q.helped += 1;
830 )
831 return;
832 }
833
834 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
835 __STATS__( false,
836 io.submit_q.leader += 1;
837 )
838 break;
839 }
840
841 __STATS__( false,
842 io.submit_q.busy += 1;
843 )
844 }
845
846 // We got the lock
847 unsigned to_submit = __collect_submitions( ring );
848 int ret = __io_uring_enter( ring, to_submit, false, 0p );
849 if( ret < 0 ) {
850 unlock(ring.submit_q.lock);
851 return;
852 }
853
854 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
855
856 // Release the consumed SQEs
857 __release_consumed_submission( ring );
858
859 // update statistics
860 __STATS__( true,
861 io.submit_q.submit_avg.rdy += to_submit;
862 io.submit_q.submit_avg.csm += ret;
863 io.submit_q.submit_avg.cnt += 1;
864 )
865
866 unlock(ring.submit_q.lock);
867 }
868 else {
869 // get mutual exclusion
870 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
871
872 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
873 /* paranoid */ "index %u already reclaimed\n"
874 /* paranoid */ "head %u, prev %u, tail %u\n"
875 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
876 /* paranoid */ idx,
877 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail
878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
881 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
882 /* paranoid */ );
883
884 // Append to the list of ready entries
885
886 /* paranoid */ verify( idx <= mask );
887 ring.submit_q.array[ (*tail) & mask ] = idx;
888 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
889
890 // Submit however, many entries need to be submitted
891 int ret = __io_uring_enter( ring, 1, false, 0p );
892 if( ret < 0 ) {
893 switch((int)errno) {
894 default:
895 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
896 }
897 }
898
899 // update statistics
900 __STATS__( false,
901 io.submit_q.submit_avg.csm += 1;
902 io.submit_q.submit_avg.cnt += 1;
903 )
904
905 // Release the consumed SQEs
906 __release_consumed_submission( ring );
907
908 unlock(ring.submit_q.lock);
909
910 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
911 }
912 }
913
914 static unsigned __collect_submitions( struct __io_data & ring ) {
915 /* paranoid */ verify( ring.submit_q.ready != 0p );
916 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
917
918 unsigned to_submit = 0;
919 uint32_t tail = *ring.submit_q.tail;
920 const uint32_t mask = *ring.submit_q.mask;
921
922 // Go through the list of ready submissions
923 for( i; ring.submit_q.ready_cnt ) {
924 // replace any submission with the sentinel, to consume it.
925 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
926
927 // If it was already the sentinel, then we are done
928 if( idx == -1ul32 ) continue;
929
930 // If we got a real submission, append it to the list
931 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
932 to_submit++;
933 }
934
935 // Increment the tail based on how many we are ready to submit
936 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
937
938 return to_submit;
939 }
940
941 static uint32_t __release_consumed_submission( struct __io_data & ring ) {
942 const uint32_t smask = *ring.submit_q.mask;
943
944 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
945 uint32_t chead = *ring.submit_q.head;
946 uint32_t phead = ring.submit_q.prev_head;
947 ring.submit_q.prev_head = chead;
948 unlock(ring.submit_q.release_lock);
949
950 uint32_t count = chead - phead;
951 for( i; count ) {
952 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
953 ring.submit_q.sqes[ idx ].user_data = 0;
954 }
955 return count;
956 }
957
958//=============================================================================================
959// I/O Submissions
960//=============================================================================================
961
962 void register_fixed_files( cluster & cl, int * files, unsigned count ) {
963 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
964 if( ret < 0 ) {
965 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
966 }
967
968 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
969 }
970#endif
Note: See TracBrowser for help on using the repository browser.