source: libcfa/src/concurrency/io.cfa@ a86b1b85

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since a86b1b85 was 5751a56, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Changed handling of io_uring support to handle op codes in configure.
Kernel probing for op codes, not supported yet.

  • Property mode set to 100644
File size: 29.9 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#if defined(__CFA_DEBUG__)
17 // #define __CFA_DEBUG_PRINT_IO__
18 // #define __CFA_DEBUG_PRINT_IO_CORE__
19#endif
20
21#include "kernel.hfa"
22#include "bitmanip.hfa"
23
24#if !defined(CFA_HAVE_LINUX_IO_URING_H)
25 void __kernel_io_startup( cluster &, unsigned, bool ) {
26 // Nothing to do without io_uring
27 }
28
29 void __kernel_io_finish_start( cluster & ) {
30 // Nothing to do without io_uring
31 }
32
33 void __kernel_io_prepare_stop( cluster & ) {
34 // Nothing to do without io_uring
35 }
36
37 void __kernel_io_shutdown( cluster &, bool ) {
38 // Nothing to do without io_uring
39 }
40
41#else
42 #define _GNU_SOURCE /* See feature_test_macros(7) */
43 #include <errno.h>
44 #include <stdint.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/mman.h>
48
49 extern "C" {
50 #include <sys/syscall.h>
51
52 #include <linux/io_uring.h>
53 }
54
55 #include "bits/signal.hfa"
56 #include "kernel_private.hfa"
57 #include "thread.hfa"
58
59 uint32_t entries_per_cluster() {
60 return 256;
61 }
62
63 static void * __io_poller_slow( void * arg );
64
65 // Weirdly, some systems that do support io_uring don't actually define these
66 #ifdef __alpha__
67 /*
68 * alpha is the only exception, all other architectures
69 * have common numbers for new system calls.
70 */
71 #ifndef __NR_io_uring_setup
72 #define __NR_io_uring_setup 535
73 #endif
74 #ifndef __NR_io_uring_enter
75 #define __NR_io_uring_enter 536
76 #endif
77 #ifndef __NR_io_uring_register
78 #define __NR_io_uring_register 537
79 #endif
80 #else /* !__alpha__ */
81 #ifndef __NR_io_uring_setup
82 #define __NR_io_uring_setup 425
83 #endif
84 #ifndef __NR_io_uring_enter
85 #define __NR_io_uring_enter 426
86 #endif
87 #ifndef __NR_io_uring_register
88 #define __NR_io_uring_register 427
89 #endif
90 #endif
91
92 // Fast poller user-thread
93 // Not using the "thread" keyword because we want to control
94 // more carefully when to start/stop it
95 struct __io_poller_fast {
96 struct __io_data * ring;
97 $thread thrd;
98 };
99
100 void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
101 this.ring = cltr.io;
102 (this.thrd){ "Fast I/O Poller", cltr };
103 }
104 void ^?{}( __io_poller_fast & mutex this );
105 void main( __io_poller_fast & this );
106 static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
107 void ^?{}( __io_poller_fast & mutex this ) {}
108
109 struct __submition_data {
110 // Head and tail of the ring (associated with array)
111 volatile uint32_t * head;
112 volatile uint32_t * tail;
113 volatile uint32_t prev_head;
114
115 // The actual kernel ring which uses head/tail
116 // indexes into the sqes arrays
117 uint32_t * array;
118
119 // number of entries and mask to go with it
120 const uint32_t * num;
121 const uint32_t * mask;
122
123 // Submission flags (Not sure what for)
124 uint32_t * flags;
125
126 // number of sqes not submitted (whatever that means)
127 uint32_t * dropped;
128
129 // Like head/tail but not seen by the kernel
130 volatile uint32_t * ready;
131 uint32_t ready_cnt;
132
133 __spinlock_t lock;
134 __spinlock_t release_lock;
135
136 // A buffer of sqes (not the actual ring)
137 struct io_uring_sqe * sqes;
138
139 // The location and size of the mmaped area
140 void * ring_ptr;
141 size_t ring_sz;
142 };
143
144 struct __completion_data {
145 // Head and tail of the ring
146 volatile uint32_t * head;
147 volatile uint32_t * tail;
148
149 // number of entries and mask to go with it
150 const uint32_t * mask;
151 const uint32_t * num;
152
153 // number of cqes not submitted (whatever that means)
154 uint32_t * overflow;
155
156 // the kernel ring
157 struct io_uring_cqe * cqes;
158
159 // The location and size of the mmaped area
160 void * ring_ptr;
161 size_t ring_sz;
162 };
163
164 struct __io_data {
165 struct __submition_data submit_q;
166 struct __completion_data completion_q;
167 uint32_t ring_flags;
168 int cltr_flags;
169 int fd;
170 semaphore submit;
171 volatile bool done;
172 struct {
173 struct {
174 __processor_id_t id;
175 void * stack;
176 pthread_t kthrd;
177 volatile bool blocked;
178 } slow;
179 __io_poller_fast fast;
180 __bin_sem_t sem;
181 } poller;
182 };
183
184//=============================================================================================
185// I/O Startup / Shutdown logic
186//=============================================================================================
187 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
188 if( (io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS) && (io_flags & CFA_CLUSTER_IO_EAGER_SUBMITS) ) {
189 abort("CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS and CFA_CLUSTER_IO_EAGER_SUBMITS cannot be mixed\n");
190 }
191
192 this.io = malloc();
193
194 // Step 1 : call to setup
195 struct io_uring_params params;
196 memset(&params, 0, sizeof(params));
197 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_SUBMITS ) params.flags |= IORING_SETUP_SQPOLL;
198 if( io_flags & CFA_CLUSTER_IO_KERNEL_POLL_COMPLETES ) params.flags |= IORING_SETUP_IOPOLL;
199
200 uint32_t nentries = entries_per_cluster();
201
202 int fd = syscall(__NR_io_uring_setup, nentries, &params );
203 if(fd < 0) {
204 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
205 }
206
207 // Step 2 : mmap result
208 memset( this.io, 0, sizeof(struct __io_data) );
209 struct __submition_data & sq = this.io->submit_q;
210 struct __completion_data & cq = this.io->completion_q;
211
212 // calculate the right ring size
213 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
214 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
215
216 // Requires features
217 #if defined(IORING_FEAT_SINGLE_MMAP)
218 // adjust the size according to the parameters
219 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
220 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
221 }
222 #endif
223
224 // mmap the Submit Queue into existence
225 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
226 if (sq.ring_ptr == (void*)MAP_FAILED) {
227 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
228 }
229
230 // Requires features
231 #if defined(IORING_FEAT_SINGLE_MMAP)
232 // mmap the Completion Queue into existence (may or may not be needed)
233 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
234 cq.ring_ptr = sq.ring_ptr;
235 }
236 else
237 #endif
238 {
239 // We need multiple call to MMAP
240 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
241 if (cq.ring_ptr == (void*)MAP_FAILED) {
242 munmap(sq.ring_ptr, sq.ring_sz);
243 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
244 }
245 }
246
247 // mmap the submit queue entries
248 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
249 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
250 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
251 munmap(sq.ring_ptr, sq.ring_sz);
252 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
253 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
254 }
255
256 // Get the pointers from the kernel to fill the structure
257 // submit queue
258 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
259 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
260 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
261 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
262 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
263 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
264 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
265 sq.prev_head = *sq.head;
266
267 {
268 const uint32_t num = *sq.num;
269 for( i; num ) {
270 sq.sqes[i].user_data = 0ul64;
271 }
272 }
273
274 (sq.lock){};
275 (sq.release_lock){};
276
277 if( io_flags & ( CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS | CFA_CLUSTER_IO_EAGER_SUBMITS ) ) {
278 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) );
279 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
280 sq.ready = alloc_align( 64, sq.ready_cnt );
281 for(i; sq.ready_cnt) {
282 sq.ready[i] = -1ul32;
283 }
284 }
285 else {
286 sq.ready_cnt = 0;
287 sq.ready = 0p;
288 }
289
290 // completion queue
291 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
292 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
293 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
294 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
295 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
296 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
297
298 // some paranoid checks
299 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
300 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
301 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
302 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
303
304 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
305 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
306 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
307 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
308
309 // Update the global ring info
310 this.io->ring_flags = params.flags;
311 this.io->cltr_flags = io_flags;
312 this.io->fd = fd;
313 this.io->done = false;
314 (this.io->submit){ min(*sq.num, *cq.num) };
315
316 if(!main_cluster) {
317 __kernel_io_finish_start( this );
318 }
319 }
320
321 void __kernel_io_finish_start( cluster & this ) {
322 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
323 __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
324 (this.io->poller.fast){ this };
325 __thrd_start( this.io->poller.fast, main );
326 }
327
328 // Create the poller thread
329 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluster %p\n", &this);
330 this.io->poller.slow.blocked = false;
331 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
332 }
333
334 void __kernel_io_prepare_stop( cluster & this ) {
335 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
336 // Notify the poller thread of the shutdown
337 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
338
339 // Stop the IO Poller
340 sigval val = { 1 };
341 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
342 post( this.io->poller.sem );
343
344 // Wait for the poller thread to finish
345 pthread_join( this.io->poller.slow.kthrd, 0p );
346 free( this.io->poller.slow.stack );
347
348 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
349
350 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
351 with( this.io->poller.fast ) {
352 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
353 /* paranoid */ verify( !ready_mutate_islocked() );
354
355 // We need to adjust the clean-up based on where the thread is
356 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
357
358 ready_schedule_lock( (struct __processor_id_t *)active_processor() );
359
360 // This is the tricky case
361 // The thread was preempted and now it is on the ready queue
362 // The thread should be the last on the list
363 /* paranoid */ verify( thrd.link.next != 0p );
364
365 // Remove the thread from the ready queue of this cluster
366 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
367 /* paranoid */ verify( removed );
368 thrd.link.next = 0p;
369 thrd.link.prev = 0p;
370 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
371
372 // Fixup the thread state
373 thrd.state = Blocked;
374 thrd.ticket = 0;
375 thrd.preempted = __NO_PREEMPTION;
376
377 ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
378
379 // Pretend like the thread was blocked all along
380 }
381 // !!! This is not an else if !!!
382 if( thrd.state == Blocked ) {
383
384 // This is the "easy case"
385 // The thread is parked and can easily be moved to active cluster
386 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
387 thrd.curr_cluster = active_cluster();
388
389 // unpark the fast io_poller
390 unpark( &thrd __cfaabi_dbg_ctx2 );
391 }
392 else {
393
394 // The thread is in a weird state
395 // I don't know what to do here
396 abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
397 }
398
399 }
400
401 ^(this.io->poller.fast){};
402
403 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
404 }
405 }
406
407 void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
408 if(!main_cluster) {
409 __kernel_io_prepare_stop( this );
410 }
411
412 // Shutdown the io rings
413 struct __submition_data & sq = this.io->submit_q;
414 struct __completion_data & cq = this.io->completion_q;
415
416 // unmap the submit queue entries
417 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
418
419 // unmap the Submit Queue ring
420 munmap(sq.ring_ptr, sq.ring_sz);
421
422 // unmap the Completion Queue ring, if it is different
423 if (cq.ring_ptr != sq.ring_ptr) {
424 munmap(cq.ring_ptr, cq.ring_sz);
425 }
426
427 // close the file descriptor
428 close(this.io->fd);
429
430 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
431 free( this.io );
432 }
433
434 int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get, sigset_t * mask ) {
435 bool need_sys_to_submit = false;
436 bool need_sys_to_complete = false;
437 unsigned min_complete = 0;
438 unsigned flags = 0;
439
440
441 TO_SUBMIT:
442 if( to_submit > 0 ) {
443 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
444 need_sys_to_submit = true;
445 break TO_SUBMIT;
446 }
447 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
448 need_sys_to_submit = true;
449 flags |= IORING_ENTER_SQ_WAKEUP;
450 }
451 }
452
453 TO_COMPLETE:
454 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
455 flags |= IORING_ENTER_GETEVENTS;
456 if( mask ) {
457 need_sys_to_complete = true;
458 min_complete = 1;
459 break TO_COMPLETE;
460 }
461 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
462 need_sys_to_complete = true;
463 }
464 }
465
466 int ret = 0;
467 if( need_sys_to_submit || need_sys_to_complete ) {
468 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, min_complete, flags, mask, _NSIG / 8);
469 if( ret < 0 ) {
470 switch((int)errno) {
471 case EAGAIN:
472 case EINTR:
473 ret = -1;
474 break;
475 default:
476 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
477 }
478 }
479 }
480
481 // Memory barrier
482 __atomic_thread_fence( __ATOMIC_SEQ_CST );
483 return ret;
484 }
485
486//=============================================================================================
487// I/O Polling
488//=============================================================================================
489 static unsigned __collect_submitions( struct __io_data & ring );
490 static uint32_t __release_consumed_submission( struct __io_data & ring );
491
492 static inline void process(struct io_uring_cqe & cqe, struct __processor_id_t * id ) {
493 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
494 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
495
496 data->result = cqe.res;
497 if(!id) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
498 else { __unpark( id, data->thrd __cfaabi_dbg_ctx2 ); }
499 }
500
501 // Process a single completion message from the io_uring
502 // This is NOT thread-safe
503 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask ) {
504 /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
505
506 unsigned to_submit = 0;
507 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
508 // If the poller thread also submits, then we need to aggregate the submissions which are ready
509 to_submit = __collect_submitions( ring );
510 }
511
512 int ret = __io_uring_enter(ring, to_submit, true, mask);
513 if( ret < 0 ) {
514 return [0, true];
515 }
516
517 // update statistics
518 if (to_submit > 0) {
519 __STATS__( true,
520 if( to_submit > 0 ) {
521 io.submit_q.submit_avg.rdy += to_submit;
522 io.submit_q.submit_avg.csm += ret;
523 io.submit_q.submit_avg.cnt += 1;
524 }
525 )
526 }
527
528 // Release the consumed SQEs
529 __release_consumed_submission( ring );
530
531 // Drain the queue
532 unsigned head = *ring.completion_q.head;
533 unsigned tail = *ring.completion_q.tail;
534 const uint32_t mask = *ring.completion_q.mask;
535
536 // Nothing was new return 0
537 if (head == tail) {
538 return [0, to_submit > 0];
539 }
540
541 uint32_t count = tail - head;
542 /* paranoid */ verify( count != 0 );
543 for(i; count) {
544 unsigned idx = (head + i) & mask;
545 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
546
547 /* paranoid */ verify(&cqe);
548
549 process( cqe, !mask ? (struct __processor_id_t *)0p : &ring.poller.slow.id );
550 }
551
552 // Allow new submissions to happen
553 // V(ring.submit, count);
554
555 // Mark to the kernel that the cqe has been seen
556 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
557 __atomic_thread_fence( __ATOMIC_SEQ_CST );
558 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
559
560 return [count, count > 0 || to_submit > 0];
561 }
562
563 static void * __io_poller_slow( void * arg ) {
564 #if !defined( __CFA_NO_STATISTICS__ )
565 __stats_t local_stats;
566 __init_stats( &local_stats );
567 kernelTLS.this_stats = &local_stats;
568 #endif
569
570 cluster * cltr = (cluster *)arg;
571 struct __io_data & ring = *cltr->io;
572
573 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
574
575 sigset_t mask;
576 sigfillset(&mask);
577 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
578 abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
579 }
580
581 sigdelset( &mask, SIGUSR1 );
582
583 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
584 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
585
586 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
587
588 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
589 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
590
591 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
592
593 // In the user-thread approach drain and if anything was drained,
594 // batton pass to the user-thread
595 int count;
596 bool again;
597 [count, again] = __drain_io( ring, &mask );
598
599 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
600
601 // Update statistics
602 __STATS__( true,
603 io.complete_q.completed_avg.val += count;
604 io.complete_q.completed_avg.slow_cnt += 1;
605 )
606
607 if(again) {
608 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
609 __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
610 wait( ring.poller.sem );
611 }
612 }
613 }
614 else {
615 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
616 //In the naive approach, just poll the io completion queue directly
617 int count;
618 bool again;
619 [count, again] = __drain_io( ring, &mask );
620
621 // Update statistics
622 __STATS__( true,
623 io.complete_q.completed_avg.val += count;
624 io.complete_q.completed_avg.slow_cnt += 1;
625 )
626 }
627 }
628
629 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
630
631 unregister( &ring.poller.slow.id );
632
633 #if !defined(__CFA_NO_STATISTICS__)
634 __tally_stats(cltr->stats, &local_stats);
635 #endif
636
637 return 0p;
638 }
639
640 void main( __io_poller_fast & this ) {
641 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
642
643 // Start parked
644 park( __cfaabi_dbg_ctx );
645
646 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
647
648 int reset = 0;
649
650 // Then loop until we need to start
651 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
652
653 // Drain the io
654 int count;
655 bool again;
656 disable_interrupts();
657 [count, again] = __drain_io( *this.ring, 0p );
658
659 if(!again) reset++;
660
661 // Update statistics
662 __STATS__( true,
663 io.complete_q.completed_avg.val += count;
664 io.complete_q.completed_avg.fast_cnt += 1;
665 )
666 enable_interrupts( __cfaabi_dbg_ctx );
667
668 // If we got something, just yield and check again
669 if(reset < 5) {
670 yield();
671 }
672 // We didn't get anything baton pass to the slow poller
673 else {
674 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
675 reset = 0;
676
677 // wake up the slow poller
678 post( this.ring->poller.sem );
679
680 // park this thread
681 park( __cfaabi_dbg_ctx );
682 }
683 }
684
685 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
686 }
687
688 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
689 static inline void __wake_poller( struct __io_data & ring ) {
690 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
691
692 sigval val = { 1 };
693 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
694 }
695
696//=============================================================================================
697// I/O Submissions
698//=============================================================================================
699
700// Submition steps :
701// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
702// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
703// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
704// need to write an allocator that allows allocating concurrently.
705//
706// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
707//
708// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
709// needs to arrive to two concensus at the same time:
710// A - The order in which entries are listed in the array: no two threads must pick the
711// same index for their entries
712// B - When can the tail be update for the kernel. EVERY entries in the array between
713// head and tail must be fully filled and shouldn't ever be touched again.
714//
715
716 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
717 /* paranoid */ verify( data != 0 );
718
719 // Prepare the data we need
720 __attribute((unused)) int len = 0;
721 __attribute((unused)) int block = 0;
722 uint32_t cnt = *ring.submit_q.num;
723 uint32_t mask = *ring.submit_q.mask;
724
725 disable_interrupts();
726 uint32_t off = __tls_rand();
727 enable_interrupts( __cfaabi_dbg_ctx );
728
729 // Loop around looking for an available spot
730 for() {
731 // Look through the list starting at some offset
732 for(i; cnt) {
733 uint64_t expected = 0;
734 uint32_t idx = (i + off) & mask;
735 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
736 volatile uint64_t * udata = &sqe->user_data;
737
738 if( *udata == expected &&
739 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
740 {
741 // update statistics
742 __STATS__( false,
743 io.submit_q.alloc_avg.val += len;
744 io.submit_q.alloc_avg.block += block;
745 io.submit_q.alloc_avg.cnt += 1;
746 )
747
748
749 // Success return the data
750 return [sqe, idx];
751 }
752 verify(expected != data);
753
754 len ++;
755 }
756
757 block++;
758 yield();
759 }
760 }
761
762 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
763 /* paranoid */ verify( idx <= mask );
764 /* paranoid */ verify( idx != -1ul32 );
765
766 // We need to find a spot in the ready array
767 __attribute((unused)) int len = 0;
768 __attribute((unused)) int block = 0;
769 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
770
771 disable_interrupts();
772 uint32_t off = __tls_rand();
773 enable_interrupts( __cfaabi_dbg_ctx );
774
775 uint32_t picked;
776 LOOKING: for() {
777 for(i; ring.submit_q.ready_cnt) {
778 picked = (i + off) & ready_mask;
779 uint32_t expected = -1ul32;
780 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
781 break LOOKING;
782 }
783 verify(expected != idx);
784
785 len ++;
786 }
787
788 block++;
789 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
790 __release_consumed_submission( ring );
791 unlock( ring.submit_q.lock );
792 }
793 else {
794 yield();
795 }
796 }
797
798 // update statistics
799 __STATS__( false,
800 io.submit_q.look_avg.val += len;
801 io.submit_q.look_avg.block += block;
802 io.submit_q.look_avg.cnt += 1;
803 )
804
805 return picked;
806 }
807
808 void __submit( struct __io_data & ring, uint32_t idx ) {
809 // Get now the data we definetely need
810 uint32_t * const tail = ring.submit_q.tail;
811 const uint32_t mask = *ring.submit_q.mask;
812
813 // There are 2 submission schemes, check which one we are using
814 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
815 // If the poller thread submits, then we just need to add this to the ready array
816 __submit_to_ready_array( ring, idx, mask );
817
818 __wake_poller( ring );
819
820 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
821 }
822 else if( ring.cltr_flags & CFA_CLUSTER_IO_EAGER_SUBMITS ) {
823 uint32_t picked = __submit_to_ready_array( ring, idx, mask );
824
825 for() {
826 yield();
827
828 // If some one else collected our index, we are done
829 #warning ABA problem
830 if( ring.submit_q.ready[picked] != idx ) {
831 __STATS__( false,
832 io.submit_q.helped += 1;
833 )
834 return;
835 }
836
837 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
838 __STATS__( false,
839 io.submit_q.leader += 1;
840 )
841 break;
842 }
843
844 __STATS__( false,
845 io.submit_q.busy += 1;
846 )
847 }
848
849 // We got the lock
850 unsigned to_submit = __collect_submitions( ring );
851 int ret = __io_uring_enter( ring, to_submit, false, 0p );
852 if( ret < 0 ) {
853 unlock(ring.submit_q.lock);
854 return;
855 }
856
857 /* paranoid */ verify( ret > 0 || (ring.ring_flags & IORING_SETUP_SQPOLL) );
858
859 // Release the consumed SQEs
860 __release_consumed_submission( ring );
861
862 // update statistics
863 __STATS__( true,
864 io.submit_q.submit_avg.rdy += to_submit;
865 io.submit_q.submit_avg.csm += ret;
866 io.submit_q.submit_avg.cnt += 1;
867 )
868
869 unlock(ring.submit_q.lock);
870 }
871 else {
872 // get mutual exclusion
873 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
874
875 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
876 /* paranoid */ "index %u already reclaimed\n"
877 /* paranoid */ "head %u, prev %u, tail %u\n"
878 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
879 /* paranoid */ idx,
880 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail
881 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
882 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
883 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
884 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
885 /* paranoid */ );
886
887 // Append to the list of ready entries
888
889 /* paranoid */ verify( idx <= mask );
890 ring.submit_q.array[ (*tail) & mask ] = idx;
891 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
892
893 // Submit however, many entries need to be submitted
894 int ret = __io_uring_enter( ring, 1, false, 0p );
895 if( ret < 0 ) {
896 switch((int)errno) {
897 default:
898 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
899 }
900 }
901
902 // update statistics
903 __STATS__( false,
904 io.submit_q.submit_avg.csm += 1;
905 io.submit_q.submit_avg.cnt += 1;
906 )
907
908 // Release the consumed SQEs
909 __release_consumed_submission( ring );
910
911 unlock(ring.submit_q.lock);
912
913 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
914 }
915 }
916
917 static unsigned __collect_submitions( struct __io_data & ring ) {
918 /* paranoid */ verify( ring.submit_q.ready != 0p );
919 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
920
921 unsigned to_submit = 0;
922 uint32_t tail = *ring.submit_q.tail;
923 const uint32_t mask = *ring.submit_q.mask;
924
925 // Go through the list of ready submissions
926 for( i; ring.submit_q.ready_cnt ) {
927 // replace any submission with the sentinel, to consume it.
928 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
929
930 // If it was already the sentinel, then we are done
931 if( idx == -1ul32 ) continue;
932
933 // If we got a real submission, append it to the list
934 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
935 to_submit++;
936 }
937
938 // Increment the tail based on how many we are ready to submit
939 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
940
941 return to_submit;
942 }
943
944 static uint32_t __release_consumed_submission( struct __io_data & ring ) {
945 const uint32_t smask = *ring.submit_q.mask;
946
947 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
948 uint32_t chead = *ring.submit_q.head;
949 uint32_t phead = ring.submit_q.prev_head;
950 ring.submit_q.prev_head = chead;
951 unlock(ring.submit_q.release_lock);
952
953 uint32_t count = chead - phead;
954 for( i; count ) {
955 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
956 ring.submit_q.sqes[ idx ].user_data = 0;
957 }
958 return count;
959 }
960
961//=============================================================================================
962// I/O Submissions
963//=============================================================================================
964
965 void register_fixed_files( cluster & cl, int * files, unsigned count ) {
966 int ret = syscall( __NR_io_uring_register, cl.io->fd, IORING_REGISTER_FILES, files, count );
967 if( ret < 0 ) {
968 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
969 }
970
971 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
972 }
973#endif
Note: See TracBrowser for help on using the repository browser.