source: libcfa/src/concurrency/io.cfa@ d34575b

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since d34575b was 59f74a2, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 29.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#if defined(__CFA_DEBUG__)
17 // #define __CFA_DEBUG_PRINT_IO__
18 #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(HAVE_LINUX_IO_URING_H)
25 void __kernel_io_startup( cluster &, unsigned, bool ) {
26 // Nothing to do without io_uring
27 }
28
29 void __kernel_io_finish_start( cluster & ) {
30 // Nothing to do without io_uring
31 }
32
33 void __kernel_io_prepare_stop( cluster & ) {
34 // Nothing to do without io_uring
35 }
36
37 void __kernel_io_shutdown( cluster &, bool ) {
38 // Nothing to do without io_uring
39 }
40
41#else
42 #define _GNU_SOURCE /* See feature_test_macros(7) */
43 #include <errno.h>
44 #include <stdint.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48
49 extern "C" {
50 #include <sys/syscall.h>
51
52 #include <linux/io_uring.h>
53 }
54
55 #include "bits/signal.hfa"
56 #include "kernel_private.hfa"
57 #include "thread.hfa"
58
59 uint32_t entries_per_cluster() {
60 return 256;
61 }
62
63 static void * __io_poller_slow( void * arg );
64
65 // Weirdly, some systems that do support io_uring don't actually define these
66 #ifdef __alpha__
67 /*
68 * alpha is the only exception, all other architectures
69 * have common numbers for new system calls.
70 */
71 #ifndef __NR_io_uring_setup
72 #define __NR_io_uring_setup 535
73 #endif
74 #ifndef __NR_io_uring_enter
75 #define __NR_io_uring_enter 536
76 #endif
77 #ifndef __NR_io_uring_register
78 #define __NR_io_uring_register 537
79 #endif
80 #else /* !__alpha__ */
81 #ifndef __NR_io_uring_setup
82 #define __NR_io_uring_setup 425
83 #endif
84 #ifndef __NR_io_uring_enter
85 #define __NR_io_uring_enter 426
86 #endif
87 #ifndef __NR_io_uring_register
88 #define __NR_io_uring_register 427
89 #endif
90 #endif
91
92 // Fast poller user-thread
93 // Not using the "thread" keyword because we want to control
94 // more carefully when to start/stop it
95 struct __io_poller_fast {
96 struct __io_data * ring;
97 $thread thrd;
98 };
99
100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101 this.ring = cltr.io;
102 (this.thrd){ "Fast I/O Poller", cltr };
103 }
104 void ^?{}( __io_poller_fast & mutex this );
105 void main( __io_poller_fast & this );
106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107 void ^?{}( __io_poller_fast & mutex this ) {}
108
109 struct __submition_data {
110 // Head and tail of the ring (associated with array)
111 volatile uint32_t * head;
112 volatile uint32_t * tail;
113 volatile uint32_t prev_head;
114
115 // The actual kernel ring which uses head/tail
116 // indexes into the sqes arrays
117 uint32_t * array;
118
119 // number of entries and mask to go with it
120 const uint32_t * num;
121 const uint32_t * mask;
122
123 // Submission flags (Not sure what for)
124 uint32_t * flags;
125
126 // number of sqes not submitted (whatever that means)
127 uint32_t * dropped;
128
129 // Like head/tail but not seen by the kernel
130 volatile uint32_t * ready;
131 uint32_t ready_cnt;
132
133 __spinlock_t lock;
134 __spinlock_t release_lock;
135
136 // A buffer of sqes (not the actual ring)
137 struct io_uring_sqe * sqes;
138
139 // The location and size of the mmaped area
140 void * ring_ptr;
141 size_t ring_sz;
142 };
143
144 struct __completion_data {
145 // Head and tail of the ring
146 volatile uint32_t * head;
147 volatile uint32_t * tail;
148
149 // number of entries and mask to go with it
150 const uint32_t * mask;
151 const uint32_t * num;
152
153 // number of cqes not submitted (whatever that means)
154 uint32_t * overflow;
155
156 // the kernel ring
157 struct io_uring_cqe * cqes;
158
159 // The location and size of the mmaped area
160 void * ring_ptr;
161 size_t ring_sz;
162 };
163
164 struct __io_data {
165 struct __submition_data submit_q;
166 struct __completion_data completion_q;
167 uint32_t ring_flags;
168 int cltr_flags;
169 int fd;
170 semaphore submit;
171 volatile bool done;
172 struct {
173 struct {
174 __processor_id_t id;
175 void * stack;
176 pthread_t kthrd;
177 volatile bool blocked;
178 } slow;
179 __io_poller_fast fast;
180 __bin_sem_t sem;
181 } poller;
182 };
183
184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190 }
191
192 this.io = malloc();
193
194 // Step 1 : call to setup
195 struct io_uring_params params;
196 memset(&params, 0, sizeof(params));
197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL;
198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
199
200 uint32_t nentries = entries_per_cluster();
201
202 int fd = syscall(__NR_io_uring_setup, nentries, &params );
203 if(fd < 0) {
204 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205 }
206
207 // Step 2 : mmap result
208 memset( this.io, 0, sizeof(struct __io_data) );
209 struct __submition_data & sq = this.io->submit_q;
210 struct __completion_data & cq = this.io->completion_q;
211
212 // calculate the right ring size
213 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
214 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
215
216 // Requires features
217 #if defined(IORING_FEAT_SINGLE_MMAP)
218 // adjust the size according to the parameters
219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
220 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
221 }
222 #endif
223
224 // mmap the Submit Queue into existence
225 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226 if (sq.ring_ptr == (void*)MAP_FAILED) {
227 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228 }
229
230 // Requires features
231 #if defined(IORING_FEAT_SINGLE_MMAP)
232 // mmap the Completion Queue into existence (may or may not be needed)
233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
234 cq.ring_ptr = sq.ring_ptr;
235 }
236 else
237 #endif
238 {
239 // We need multiple call to MMAP
240 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241 if (cq.ring_ptr == (void*)MAP_FAILED) {
242 munmap(sq.ring_ptr, sq.ring_sz);
243 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244 }
245 }
246
247 // mmap the submit queue entries
248 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
249 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251 munmap(sq.ring_ptr, sq.ring_sz);
252 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
253 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254 }
255
256 // Get the pointers from the kernel to fill the structure
257 // submit queue
258 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
265 sq.prev_head = *sq.head;
266
267 {
268 const uint32_t num = *sq.num;
269 for( i; num ) {
270 sq.sqes[i].user_data = 0ul64;
271 }
272 }
273
274 (sq.lock){};
275 (sq.release_lock){};
276
277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
280 sq.ready = alloc_align( 64, sq.ready_cnt );
281 for(i; sq.ready_cnt) {
282 sq.ready[i] = -1ul32;
283 }
284 }
285 else {
286 sq.ready_cnt = 0;
287 sq.ready = 0p;
288 }
289
290 // completion queue
291 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298 // some paranoid checks
299 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
300 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
308
309 // Update the global ring info
310 this.io->ring_flags = params.flags;
311 this.io->cltr_flags = io_flags;
312 this.io->fd = fd;
313 this.io->done = false;
314 (this.io->submit){ min(*sq.num, *cq.num) };
315
316 if(!main_cluster) {
317 __kernel_io_finish_start( this );
318 }
319 }
320
321 void __kernel_io_finish_start( cluster & this ) {
322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324 (this.io->poller.fast){ this };
325 __thrd_start( this.io->poller.fast, main );
326 }
327
328 // Create the poller thread
329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
330 this.io->poller.slow.blocked = false;
331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
332 }
333
334 void __kernel_io_prepare_stop( cluster & this ) {
335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
336 // Notify the poller thread of the shutdown
337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
338
339 // Stop the IO Poller
340 sigval val = { 1 };
341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342 post( this.io->poller.sem );
343
344 // Wait for the poller thread to finish
345 pthread_join( this.io->poller.slow.kthrd, 0p );
346 free( this.io->poller.slow.stack );
347
348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
349
350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
351 with( this.io->poller.fast ) {
352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353 /* paranoid */ verify( !ready_mutate_islocked() );
354
355 // We need to adjust the clean-up based on where the thread is
356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
357
358 ready_schedule_lock( (struct __processor_id_t *)active_processor() );
359
360 // This is the tricky case
361 // The thread was preempted and now it is on the ready queue
362 // The thread should be the last on the list
363 /* paranoid */ verify( thrd.link.next != 0p );
364
365 // Remove the thread from the ready queue of this cluster
366 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367 /* paranoid */ verify( removed );
368 thrd.link.next = 0p;
369 thrd.link.prev = 0p;
370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
371
372 // Fixup the thread state
373 thrd.state = Blocked;
374 thrd.ticket = 0;
375 thrd.preempted = __NO_PREEMPTION;
376
377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
378
379 // Pretend like the thread was blocked all along
380 }
381 // !!! This is not an else if !!!
382 if( thrd.state == Blocked ) {
383
384 // This is the "easy case"
385 // The thread is parked and can easily be moved to active cluster
386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387 thrd.curr_cluster = active_cluster();
388
389 // unpark the fast io_poller
390 unpark( &thrd __cfaabi_dbg_ctx2 );
391 }
392 else {
393
394 // The thread is in a weird state
395 // I don't know what to do here
396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397 }
398
399 }
400
401 ^(this.io->poller.fast){};
402
403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
404 }
405 }
406
407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408 if(!main_cluster) {
409 __kernel_io_prepare_stop( this );
410 }
411
412 // Shutdown the io rings
413 struct __submition_data & sq = this.io->submit_q;
414 struct __completion_data & cq = this.io->completion_q;
415
416 // unmap the submit queue entries
417 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
418
419 // unmap the Submit Queue ring
420 munmap(sq.ring_ptr, sq.ring_sz);
421
422 // unmap the Completion Queue ring, if it is different
423 if (cq.ring_ptr != sq.ring_ptr) {
424 munmap(cq.ring_ptr, cq.ring_sz);
425 }
426
427 // close the file descriptor
428 close(this.io->fd);
429
430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
431 free( this.io );
432 }
433
434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435 bool need_sys_to_submit = false;
436 bool need_sys_to_complete = false;
437 unsigned min_complete = 0;
438 unsigned flags = 0;
439
440
441 TO_SUBMIT:
442 if( to_submit > 0 ) {
443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444 need_sys_to_submit = true;
445 break TO_SUBMIT;
446 }
447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448 need_sys_to_submit = true;
449 flags |= IORING_ENTER_SQ_WAKEUP;
450 }
451 }
452
453 TO_COMPLETE:
454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455 flags |= IORING_ENTER_GETEVENTS;
456 if( mask ) {
457 need_sys_to_complete = true;
458 min_complete = 1;
459 break TO_COMPLETE;
460 }
461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462 need_sys_to_complete = true;
463 }
464 }
465
466 int ret = 0;
467 if( need_sys_to_submit || need_sys_to_complete ) {
468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469 if( ret < 0 ) {
470 switch((int)errno) {
471 case EAGAIN:
472 case EINTR:
473 ret = -1;
474 break;
475 default:
476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477 }
478 }
479 }
480
481 // Memory barrier
482 __atomic_thread_fence( __ATOMIC_SEQ_CST );
483 return ret;
484 }
485
486//=============================================================================================
487// I/O Polling
488//=============================================================================================
489 static unsigned __collect_submitions( struct __io_data & ring );
490 static uint32_t __release_consumed_submission( struct __io_data & ring );
491
492 // Process a single completion message from the io_uring
493 // This is NOT thread-safe
494 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
495 /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
496
497 unsigned to_submit = 0;
498 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
499 // If the poller thread also submits, then we need to aggregate the submissions which are ready
500 to_submit = __collect_submitions( ring );
501 }
502
503 int ret = __io_uring_enter(ring, to_submit, true, mask);
504 if( ret < 0 ) {
505 return [0, true];
506 }
507
508 // update statistics
509 if (to_submit > 0) {
510 __STATS__( true,
511 if( to_submit > 0 ) {
512 io.submit_q.submit_avg.rdy += to_submit;
513 io.submit_q.submit_avg.csm += ret;
514 io.submit_q.submit_avg.cnt += 1;
515 }
516 )
517 }
518
519 // Release the consumed SQEs
520 __release_consumed_submission( ring );
521
522 // Drain the queue
523 unsigned head = *ring.completion_q.head;
524 unsigned tail = *ring.completion_q.tail;
525 const uint32_t mask = *ring.completion_q.mask;
526
527 // Nothing was new return 0
528 if (head == tail) {
529 return [0, to_submit > 0];
530 }
531
532 uint32_t count = tail - head;
533 /* paranoid */ verify( count != 0 );
534 for(i; count) {
535 unsigned idx = (head + i) & mask;
536 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
537
538 /* paranoid */ verify(&cqe);
539
540 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
541 __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
542
543 data->result = cqe.res;
544 if(!mask) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
545 else { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
546 }
547
548 // Allow new submissions to happen
549 // V(ring.submit, count);
550
551 // Mark to the kernel that the cqe has been seen
552 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
553 __atomic_thread_fence( __ATOMIC_SEQ_CST );
554 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
555
556 return [count, count > 0 || to_submit > 0];
557 }
558
559 static void * __io_poller_slow( void * arg ) {
560 #if !defined( __CFA_NO_STATISTICS__ )
561 __stats_t local_stats;
562 __init_stats( &local_stats );
563 kernelTLS.this_stats = &local_stats;
564 #endif
565
566 cluster * cltr = (cluster *)arg;
567 struct __io_data & ring = *cltr->io;
568
569 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
570
571 sigset_t mask;
572 sigfillset(&mask);
573 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
574 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
575 }
576
577 sigdelset( &mask, SIGUSR1 );
578
579 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
580 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
581
582 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
583
584 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
585 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
586
587 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
588
589 // In the user-thread approach drain and if anything was drained,
590 // batton pass to the user-thread
591 int count;
592 bool again;
593 [count, again] = __drain_io( ring, &mask );
594
595 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
596
597 // Update statistics
598 __STATS__( true,
599 io.complete_q.completed_avg.val += count;
600 io.complete_q.completed_avg.slow_cnt += 1;
601 )
602
603 if(again) {
604 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
605 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
606 wait( ring.poller.sem );
607 }
608 }
609 }
610 else {
611 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
612 //In the naive approach, just poll the io completion queue directly
613 int count;
614 bool again;
615 [count, again] = __drain_io( ring, &mask );
616
617 // Update statistics
618 __STATS__( true,
619 io.complete_q.completed_avg.val += count;
620 io.complete_q.completed_avg.slow_cnt += 1;
621 )
622 }
623 }
624
625 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
626
627 unregister( &ring.poller.slow.id );
628
629 #if !defined(__CFA_NO_STATISTICS__)
630 __tally_stats(cltr->stats, &local_stats);
631 #endif
632
633 return 0p;
634 }
635
636 void main( __io_poller_fast & this ) {
637 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
638
639 // Start parked
640 park( __cfaabi_dbg_ctx );
641
642 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
643
644 int reset = 0;
645
646 // Then loop until we need to start
647 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
648
649 // Drain the io
650 int count;
651 bool again;
652 disable_interrupts();
653 [count, again] = __drain_io( *this.ring, 0p );
654
655 if(!again) reset++;
656
657 // Update statistics
658 __STATS__( true,
659 io.complete_q.completed_avg.val += count;
660 io.complete_q.completed_avg.fast_cnt += 1;
661 )
662 enable_interrupts( __cfaabi_dbg_ctx );
663
664 // If we got something, just yield and check again
665 if(reset < 5) {
666 yield();
667 }
668 // We didn't get anything baton pass to the slow poller
669 else {
670 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
671 reset = 0;
672
673 // wake up the slow poller
674 post( this.ring->poller.sem );
675
676 // park this thread
677 park( __cfaabi_dbg_ctx );
678 }
679 }
680
681 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
682 }
683
684 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
685 static inline void __wake_poller( struct __io_data & ring ) {
686 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
687
688 sigval val = { 1 };
689 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
690 }
691
692//=============================================================================================
693// I/O Submissions
694//=============================================================================================
695
696// Submition steps :
697// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
698// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
699// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
700// need to write an allocator that allows allocating concurrently.
701//
702// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
703//
704// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
705// needs to arrive to two concensus at the same time:
706// A - The order in which entries are listed in the array: no two threads must pick the
707// same index for their entries
708// B - When can the tail be update for the kernel. EVERY entries in the array between
709// head and tail must be fully filled and shouldn't ever be touched again.
710//
711
712 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
713 /* paranoid */ verify( data != 0 );
714
715 // Prepare the data we need
716 __attribute((unused)) int len = 0;
717 __attribute((unused)) int block = 0;
718 uint32_t cnt = *ring.submit_q.num;
719 uint32_t mask = *ring.submit_q.mask;
720
721 disable_interrupts();
722 uint32_t off = __tls_rand();
723 enable_interrupts( __cfaabi_dbg_ctx );
724
725 // Loop around looking for an available spot
726 for() {
727 // Look through the list starting at some offset
728 for(i; cnt) {
729 uint64_t expected = 0;
730 uint32_t idx = (i + off) & mask;
731 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
732 volatile uint64_t * udata = &sqe->user_data;
733
734 if( *udata == expected &&
735 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
736 {
737 // update statistics
738 __STATS__( false,
739 io.submit_q.alloc_avg.val += len;
740 io.submit_q.alloc_avg.block += block;
741 io.submit_q.alloc_avg.cnt += 1;
742 )
743
744
745 // Success return the data
746 return [sqe, idx];
747 }
748 verify(expected != data);
749
750 len ++;
751 }
752
753 block++;
754 yield();
755 }
756 }
757
758 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
759 /* paranoid */ verify( idx <= mask );
760 /* paranoid */ verify( idx != -1ul32 );
761
762 // We need to find a spot in the ready array
763 __attribute((unused)) int len = 0;
764 __attribute((unused)) int block = 0;
765 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
766
767 disable_interrupts();
768 uint32_t off = __tls_rand();
769 enable_interrupts( __cfaabi_dbg_ctx );
770
771 uint32_t picked;
772 LOOKING: for() {
773 for(i; ring.submit_q.ready_cnt) {
774 picked = (i + off) & ready_mask;
775 uint32_t expected = -1ul32;
776 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
777 break LOOKING;
778 }
779 verify(expected != idx);
780
781 len ++;
782 }
783
784 block++;
785 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
786 __release_consumed_submission( ring );
787 unlock( ring.submit_q.lock );
788 }
789 else {
790 yield();
791 }
792 }
793
794 // update statistics
795 __STATS__( false,
796 io.submit_q.look_avg.val += len;
797 io.submit_q.look_avg.block += block;
798 io.submit_q.look_avg.cnt += 1;
799 )
800
801 return picked;
802 }
803
804 void __submit( struct __io_data & ring, uint32_t idx ) {
805 // Get now the data we definetely need
806 uint32_t * const tail = ring.submit_q.tail;
807 const uint32_t mask = *ring.submit_q.mask;
808
809 // There are 2 submission schemes, check which one we are using
810 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
811 // If the poller thread submits, then we just need to add this to the ready array
812 __submit_to_ready_array( ring, idx, mask );
813
814 __wake_poller( ring );
815
816 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
817 }
818 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
819 uint32_t picked = __submit_to_ready_array( ring, idx, mask );
820
821 for() {
822 yield();
823
824 // If some one else collected our index, we are done
825 #warning ABA problem
826 if( ring.submit_q.ready[picked] != idx ) {
827 __STATS__( false,
828 io.submit_q.helped += 1;
829 )
830 return;
831 }
832
833 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
834 __STATS__( false,
835 io.submit_q.leader += 1;
836 )
837 break;
838 }
839
840 __STATS__( false,
841 io.submit_q.busy += 1;
842 )
843 }
844
845 // We got the lock
846 unsigned to_submit = __collect_submitions( ring );
847 int ret = __io_uring_enter( ring, to_submit, false, 0p );
848 if( ret < 0 ) {
849 unlock(ring.submit_q.lock);
850 return;
851 }
852
853 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
854
855 // Release the consumed SQEs
856 __release_consumed_submission( ring );
857
858 // update statistics
859 __STATS__( true,
860 io.submit_q.submit_avg.rdy += to_submit;
861 io.submit_q.submit_avg.csm += ret;
862 io.submit_q.submit_avg.cnt += 1;
863 )
864
865 unlock(ring.submit_q.lock);
866 }
867 else {
868 // get mutual exclusion
869 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
870
871 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
872 /* paranoid */ "index %u already reclaimed\n"
873 /* paranoid */ "head %u, prev %u, tail %u\n"
874 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
875 /* paranoid */ idx,
876 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail
877 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
878 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
879 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
880 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
881 /* paranoid */ );
882
883 // Append to the list of ready entries
884
885 /* paranoid */ verify( idx <= mask );
886 ring.submit_q.array[ (*tail) & mask ] = idx;
887 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
888
889 // Submit however, many entries need to be submitted
890 int ret = __io_uring_enter( ring, 1, false, 0p );
891 if( ret < 0 ) {
892 switch((int)errno) {
893 default:
894 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
895 }
896 }
897
898 // update statistics
899 __STATS__( false,
900 io.submit_q.submit_avg.csm += 1;
901 io.submit_q.submit_avg.cnt += 1;
902 )
903
904 // Release the consumed SQEs
905 __release_consumed_submission( ring );
906
907 unlock(ring.submit_q.lock);
908
909 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
910 }
911 }
912
913 static unsigned __collect_submitions( struct __io_data & ring ) {
914 /* paranoid */ verify( ring.submit_q.ready != 0p );
915 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
916
917 unsigned to_submit = 0;
918 uint32_t tail = *ring.submit_q.tail;
919 const uint32_t mask = *ring.submit_q.mask;
920
921 // Go through the list of ready submissions
922 for( i; ring.submit_q.ready_cnt ) {
923 // replace any submission with the sentinel, to consume it.
924 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
925
926 // If it was already the sentinel, then we are done
927 if( idx == -1ul32 ) continue;
928
929 // If we got a real submission, append it to the list
930 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
931 to_submit++;
932 }
933
934 // Increment the tail based on how many we are ready to submit
935 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
936
937 return to_submit;
938 }
939
940 static uint32_t __release_consumed_submission( struct __io_data & ring ) {
941 const uint32_t smask = *ring.submit_q.mask;
942
943 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
944 uint32_t chead = *ring.submit_q.head;
945 uint32_t phead = ring.submit_q.prev_head;
946 ring.submit_q.prev_head = chead;
947 unlock(ring.submit_q.release_lock);
948
949 uint32_t count = chead - phead;
950 for( i; count ) {
951 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
952 ring.submit_q.sqes[ idx ].user_data = 0;
953 }
954 return count;
955 }
956
957//=============================================================================================
958// I/O Submissions
959//=============================================================================================
960
961 void register_fixed_files( cluster & cl, int * files, unsigned count ) {
962 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
963 if( ret < 0 ) {
964 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
965 }
966
967 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
968 }
969#endif
Note: See TracBrowser for help on using the repository browser.