| 1 | //
 | 
|---|
| 2 | // Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
 | 
|---|
| 3 | //
 | 
|---|
| 4 | // The contents of this file are covered under the licence agreement in the
 | 
|---|
| 5 | // file "LICENCE" distributed with Cforall.
 | 
|---|
| 6 | //
 | 
|---|
| 7 | // io.cfa --
 | 
|---|
| 8 | //
 | 
|---|
| 9 | // Author           : Thierry Delisle
 | 
|---|
| 10 | // Created On       : Thu Apr 23 17:31:00 2020
 | 
|---|
| 11 | // Last Modified By :
 | 
|---|
| 12 | // Last Modified On :
 | 
|---|
| 13 | // Update Count     :
 | 
|---|
| 14 | //
 | 
|---|
| 15 | 
 | 
|---|
| 16 | // #define __CFA_DEBUG_PRINT_IO__
 | 
|---|
| 17 | // #define __CFA_DEBUG_PRINT_IO_CORE__
 | 
|---|
| 18 | 
 | 
|---|
| 19 | #include "kernel.hfa"
 | 
|---|
| 20 | #include "bitmanip.hfa"
 | 
|---|
| 21 | 
 | 
|---|
| 22 | #if !defined(HAVE_LINUX_IO_URING_H)
 | 
|---|
| 23 |         void __kernel_io_startup( cluster &, unsigned, bool ) {
 | 
|---|
| 24 |                 // Nothing to do without io_uring
 | 
|---|
| 25 |         }
 | 
|---|
| 26 | 
 | 
|---|
| 27 |         void __kernel_io_finish_start( cluster & ) {
 | 
|---|
| 28 |                 // Nothing to do without io_uring
 | 
|---|
| 29 |         }
 | 
|---|
| 30 | 
 | 
|---|
| 31 |         void __kernel_io_prepare_stop( cluster & ) {
 | 
|---|
| 32 |                 // Nothing to do without io_uring
 | 
|---|
| 33 |         }
 | 
|---|
| 34 | 
 | 
|---|
| 35 |         void __kernel_io_shutdown( cluster &, bool ) {
 | 
|---|
| 36 |                 // Nothing to do without io_uring
 | 
|---|
| 37 |         }
 | 
|---|
| 38 | 
 | 
|---|
| 39 | #else
 | 
|---|
| 40 |         #define _GNU_SOURCE         /* See feature_test_macros(7) */
 | 
|---|
| 41 |         #include <errno.h>
 | 
|---|
| 42 |         #include <stdint.h>
 | 
|---|
| 43 |         #include <string.h>
 | 
|---|
| 44 |         #include <unistd.h>
 | 
|---|
| 45 |         #include <sys/mman.h>
 | 
|---|
| 46 | 
 | 
|---|
| 47 |         extern "C" {
 | 
|---|
| 48 |                 #include <sys/syscall.h>
 | 
|---|
| 49 | 
 | 
|---|
| 50 |                 #include <linux/io_uring.h>
 | 
|---|
| 51 |         }
 | 
|---|
| 52 | 
 | 
|---|
| 53 |         #include "bits/signal.hfa"
 | 
|---|
| 54 |         #include "kernel_private.hfa"
 | 
|---|
| 55 |         #include "thread.hfa"
 | 
|---|
| 56 | 
 | 
|---|
| 57 |         uint32_t entries_per_cluster() {
 | 
|---|
| 58 |                 return 256;
 | 
|---|
| 59 |         }
 | 
|---|
| 60 | 
 | 
|---|
| 61 |         static void * __io_poller_slow( void * arg );
 | 
|---|
| 62 | 
 | 
|---|
| 63 |         // Weirdly, some systems that do support io_uring don't actually define these
 | 
|---|
| 64 |         #ifdef __alpha__
 | 
|---|
| 65 |                 /*
 | 
|---|
| 66 |                 * alpha is the only exception, all other architectures
 | 
|---|
| 67 |                 * have common numbers for new system calls.
 | 
|---|
| 68 |                 */
 | 
|---|
| 69 |                 #ifndef __NR_io_uring_setup
 | 
|---|
| 70 |                         #define __NR_io_uring_setup           535
 | 
|---|
| 71 |                 #endif
 | 
|---|
| 72 |                 #ifndef __NR_io_uring_enter
 | 
|---|
| 73 |                         #define __NR_io_uring_enter           536
 | 
|---|
| 74 |                 #endif
 | 
|---|
| 75 |                 #ifndef __NR_io_uring_register
 | 
|---|
| 76 |                         #define __NR_io_uring_register        537
 | 
|---|
| 77 |                 #endif
 | 
|---|
| 78 |         #else /* !__alpha__ */
 | 
|---|
| 79 |                 #ifndef __NR_io_uring_setup
 | 
|---|
| 80 |                         #define __NR_io_uring_setup           425
 | 
|---|
| 81 |                 #endif
 | 
|---|
| 82 |                 #ifndef __NR_io_uring_enter
 | 
|---|
| 83 |                         #define __NR_io_uring_enter           426
 | 
|---|
| 84 |                 #endif
 | 
|---|
| 85 |                 #ifndef __NR_io_uring_register
 | 
|---|
| 86 |                         #define __NR_io_uring_register        427
 | 
|---|
| 87 |                 #endif
 | 
|---|
| 88 |         #endif
 | 
|---|
| 89 | 
 | 
|---|
| 90 |         // Fast poller user-thread
 | 
|---|
| 91 |         // Not using the "thread" keyword because we want to control
 | 
|---|
| 92 |         // more carefully when to start/stop it
 | 
|---|
| 93 |         struct __io_poller_fast {
 | 
|---|
| 94 |                 struct __io_data * ring;
 | 
|---|
| 95 |                 $thread thrd;
 | 
|---|
| 96 |         };
 | 
|---|
| 97 | 
 | 
|---|
| 98 |         void ?{}( __io_poller_fast & this, struct cluster & cltr ) {
 | 
|---|
| 99 |                 this.ring = cltr.io;
 | 
|---|
| 100 |                 (this.thrd){ "Fast I/O Poller", cltr };
 | 
|---|
| 101 |         }
 | 
|---|
| 102 |         void ^?{}( __io_poller_fast & mutex this );
 | 
|---|
| 103 |         void main( __io_poller_fast & this );
 | 
|---|
| 104 |         static inline $thread * get_thread( __io_poller_fast & this ) { return &this.thrd; }
 | 
|---|
| 105 |         void ^?{}( __io_poller_fast & mutex this ) {}
 | 
|---|
| 106 | 
 | 
|---|
| 107 |         struct __submition_data {
 | 
|---|
| 108 |                 // Head and tail of the ring (associated with array)
 | 
|---|
| 109 |                 volatile uint32_t * head;
 | 
|---|
| 110 |                 volatile uint32_t * tail;
 | 
|---|
| 111 | 
 | 
|---|
| 112 |                 // The actual kernel ring which uses head/tail
 | 
|---|
| 113 |                 // indexes into the sqes arrays
 | 
|---|
| 114 |                 uint32_t * array;
 | 
|---|
| 115 | 
 | 
|---|
| 116 |                 // number of entries and mask to go with it
 | 
|---|
| 117 |                 const uint32_t * num;
 | 
|---|
| 118 |                 const uint32_t * mask;
 | 
|---|
| 119 | 
 | 
|---|
| 120 |                 // Submission flags (Not sure what for)
 | 
|---|
| 121 |                 uint32_t * flags;
 | 
|---|
| 122 | 
 | 
|---|
| 123 |                 // number of sqes not submitted (whatever that means)
 | 
|---|
| 124 |                 uint32_t * dropped;
 | 
|---|
| 125 | 
 | 
|---|
| 126 |                 // Like head/tail but not seen by the kernel
 | 
|---|
| 127 |                 volatile uint32_t * ready;
 | 
|---|
| 128 |                 uint32_t ready_cnt;
 | 
|---|
| 129 | 
 | 
|---|
| 130 |                 __spinlock_t lock;
 | 
|---|
| 131 | 
 | 
|---|
| 132 |                 // A buffer of sqes (not the actual ring)
 | 
|---|
| 133 |                 struct io_uring_sqe * sqes;
 | 
|---|
| 134 | 
 | 
|---|
| 135 |                 // The location and size of the mmaped area
 | 
|---|
| 136 |                 void * ring_ptr;
 | 
|---|
| 137 |                 size_t ring_sz;
 | 
|---|
| 138 |         };
 | 
|---|
| 139 | 
 | 
|---|
| 140 |         struct __completion_data {
 | 
|---|
| 141 |                 // Head and tail of the ring
 | 
|---|
| 142 |                 volatile uint32_t * head;
 | 
|---|
| 143 |                 volatile uint32_t * tail;
 | 
|---|
| 144 | 
 | 
|---|
| 145 |                 // number of entries and mask to go with it
 | 
|---|
| 146 |                 const uint32_t * mask;
 | 
|---|
| 147 |                 const uint32_t * num;
 | 
|---|
| 148 | 
 | 
|---|
| 149 |                 // number of cqes not submitted (whatever that means)
 | 
|---|
| 150 |                 uint32_t * overflow;
 | 
|---|
| 151 | 
 | 
|---|
| 152 |                 // the kernel ring
 | 
|---|
| 153 |                 struct io_uring_cqe * cqes;
 | 
|---|
| 154 | 
 | 
|---|
| 155 |                 // The location and size of the mmaped area
 | 
|---|
| 156 |                 void * ring_ptr;
 | 
|---|
| 157 |                 size_t ring_sz;
 | 
|---|
| 158 |         };
 | 
|---|
| 159 | 
 | 
|---|
| 160 |         struct __io_data {
 | 
|---|
| 161 |                 struct __submition_data submit_q;
 | 
|---|
| 162 |                 struct __completion_data completion_q;
 | 
|---|
| 163 |                 uint32_t ring_flags;
 | 
|---|
| 164 |                 int cltr_flags;
 | 
|---|
| 165 |                 int fd;
 | 
|---|
| 166 |                 semaphore submit;
 | 
|---|
| 167 |                 volatile bool done;
 | 
|---|
| 168 |                 struct {
 | 
|---|
| 169 |                         struct {
 | 
|---|
| 170 |                                 __processor_id_t id;
 | 
|---|
| 171 |                                 void * stack;
 | 
|---|
| 172 |                                 pthread_t kthrd;
 | 
|---|
| 173 |                                 volatile bool blocked;
 | 
|---|
| 174 |                         } slow;
 | 
|---|
| 175 |                         __io_poller_fast fast;
 | 
|---|
| 176 |                         __bin_sem_t sem;
 | 
|---|
| 177 |                 } poller;
 | 
|---|
| 178 |         };
 | 
|---|
| 179 | 
 | 
|---|
| 180 | //=============================================================================================
 | 
|---|
| 181 | // I/O Startup / Shutdown logic
 | 
|---|
| 182 | //=============================================================================================
 | 
|---|
| 183 |         void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) {
 | 
|---|
| 184 |                 this.io = malloc();
 | 
|---|
| 185 | 
 | 
|---|
| 186 |                 // Step 1 : call to setup
 | 
|---|
| 187 |                 struct io_uring_params params;
 | 
|---|
| 188 |                 memset(¶ms, 0, sizeof(params));
 | 
|---|
| 189 | 
 | 
|---|
| 190 |                 uint32_t nentries = entries_per_cluster();
 | 
|---|
| 191 | 
 | 
|---|
| 192 |                 int fd = syscall(__NR_io_uring_setup, nentries, ¶ms );
 | 
|---|
| 193 |                 if(fd < 0) {
 | 
|---|
| 194 |                         abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
 | 
|---|
| 195 |                 }
 | 
|---|
| 196 | 
 | 
|---|
| 197 |                 // Step 2 : mmap result
 | 
|---|
| 198 |                 memset( this.io, 0, sizeof(struct __io_data) );
 | 
|---|
| 199 |                 struct __submition_data  & sq = this.io->submit_q;
 | 
|---|
| 200 |                 struct __completion_data & cq = this.io->completion_q;
 | 
|---|
| 201 | 
 | 
|---|
| 202 |                 // calculate the right ring size
 | 
|---|
| 203 |                 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
 | 
|---|
| 204 |                 cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
 | 
|---|
| 205 | 
 | 
|---|
| 206 |                 // Requires features
 | 
|---|
| 207 |                 #if defined(IORING_FEAT_SINGLE_MMAP)
 | 
|---|
| 208 |                         // adjust the size according to the parameters
 | 
|---|
| 209 |                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
 | 
|---|
| 210 |                                 cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
 | 
|---|
| 211 |                         }
 | 
|---|
| 212 |                 #endif
 | 
|---|
| 213 | 
 | 
|---|
| 214 |                 // mmap the Submit Queue into existence
 | 
|---|
| 215 |                 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
 | 
|---|
| 216 |                 if (sq.ring_ptr == (void*)MAP_FAILED) {
 | 
|---|
| 217 |                         abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
 | 
|---|
| 218 |                 }
 | 
|---|
| 219 | 
 | 
|---|
| 220 |                 // Requires features
 | 
|---|
| 221 |                 #if defined(IORING_FEAT_SINGLE_MMAP)
 | 
|---|
| 222 |                         // mmap the Completion Queue into existence (may or may not be needed)
 | 
|---|
| 223 |                         if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
 | 
|---|
| 224 |                                 cq->ring_ptr = sq->ring_ptr;
 | 
|---|
| 225 |                         }
 | 
|---|
| 226 |                         else
 | 
|---|
| 227 |                 #endif
 | 
|---|
| 228 |                 {
 | 
|---|
| 229 |                         // We need multiple call to MMAP
 | 
|---|
| 230 |                         cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
 | 
|---|
| 231 |                         if (cq.ring_ptr == (void*)MAP_FAILED) {
 | 
|---|
| 232 |                                 munmap(sq.ring_ptr, sq.ring_sz);
 | 
|---|
| 233 |                                 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
 | 
|---|
| 234 |                         }
 | 
|---|
| 235 |                 }
 | 
|---|
| 236 | 
 | 
|---|
| 237 |                 // mmap the submit queue entries
 | 
|---|
| 238 |                 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
 | 
|---|
| 239 |                 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
 | 
|---|
| 240 |                 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
 | 
|---|
| 241 |                         munmap(sq.ring_ptr, sq.ring_sz);
 | 
|---|
| 242 |                         if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
 | 
|---|
| 243 |                         abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
 | 
|---|
| 244 |                 }
 | 
|---|
| 245 | 
 | 
|---|
| 246 |                 // Get the pointers from the kernel to fill the structure
 | 
|---|
| 247 |                 // submit queue
 | 
|---|
| 248 |                 sq.head    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
 | 
|---|
| 249 |                 sq.tail    = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
 | 
|---|
| 250 |                 sq.mask    = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
 | 
|---|
| 251 |                 sq.num     = (   const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
 | 
|---|
| 252 |                 sq.flags   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
 | 
|---|
| 253 |                 sq.dropped = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
 | 
|---|
| 254 |                 sq.array   = (         uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
 | 
|---|
| 255 | 
 | 
|---|
| 256 |                 {
 | 
|---|
| 257 |                         const uint32_t num = *sq.num;
 | 
|---|
| 258 |                         for( i; num ) {
 | 
|---|
| 259 |                                 sq.sqes[i].user_data = 0ul64;
 | 
|---|
| 260 |                         }
 | 
|---|
| 261 |                 }
 | 
|---|
| 262 | 
 | 
|---|
| 263 |                 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
 | 
|---|
| 264 |                         /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8)  );
 | 
|---|
| 265 |                         sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8);
 | 
|---|
| 266 |                         sq.ready = alloc_align( 64, sq.ready_cnt );
 | 
|---|
| 267 |                         for(i; sq.ready_cnt) {
 | 
|---|
| 268 |                                 sq.ready[i] = -1ul32;
 | 
|---|
| 269 |                         }
 | 
|---|
| 270 |                 }
 | 
|---|
| 271 |                 else {
 | 
|---|
| 272 |                         sq.ready_cnt = 0;
 | 
|---|
| 273 |                         sq.ready = 0p;
 | 
|---|
| 274 |                 }
 | 
|---|
| 275 | 
 | 
|---|
| 276 |                 // completion queue
 | 
|---|
| 277 |                 cq.head     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
 | 
|---|
| 278 |                 cq.tail     = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
 | 
|---|
| 279 |                 cq.mask     = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
 | 
|---|
| 280 |                 cq.num      = (   const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
 | 
|---|
| 281 |                 cq.overflow = (         uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
 | 
|---|
| 282 |                 cq.cqes   = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
 | 
|---|
| 283 | 
 | 
|---|
| 284 |                 // some paranoid checks
 | 
|---|
| 285 |                 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
 | 
|---|
| 286 |                 /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
 | 
|---|
| 287 |                 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
 | 
|---|
| 288 |                 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
 | 
|---|
| 289 | 
 | 
|---|
| 290 |                 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
 | 
|---|
| 291 |                 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
 | 
|---|
| 292 |                 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
 | 
|---|
| 293 |                 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
 | 
|---|
| 294 | 
 | 
|---|
| 295 |                 // Update the global ring info
 | 
|---|
| 296 |                 this.io->ring_flags = params.flags;
 | 
|---|
| 297 |                 this.io->cltr_flags = io_flags;
 | 
|---|
| 298 |                 this.io->fd         = fd;
 | 
|---|
| 299 |                 this.io->done       = false;
 | 
|---|
| 300 |                 (this.io->submit){ min(*sq.num, *cq.num) };
 | 
|---|
| 301 | 
 | 
|---|
| 302 |                 if(!main_cluster) {
 | 
|---|
| 303 |                         __kernel_io_finish_start( this );
 | 
|---|
| 304 |                 }
 | 
|---|
| 305 |         }
 | 
|---|
| 306 | 
 | 
|---|
| 307 |         void __kernel_io_finish_start( cluster & this ) {
 | 
|---|
| 308 |                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
 | 
|---|
| 309 |                         __cfadbg_print_safe(io_core, "Kernel I/O : Creating fast poller for cluter %p\n", &this);
 | 
|---|
| 310 |                         (this.io->poller.fast){ this };
 | 
|---|
| 311 |                         __thrd_start( this.io->poller.fast, main );
 | 
|---|
| 312 |                 }
 | 
|---|
| 313 | 
 | 
|---|
| 314 |                 // Create the poller thread
 | 
|---|
| 315 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this);
 | 
|---|
| 316 |                 this.io->poller.slow.blocked = false;
 | 
|---|
| 317 |                 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this );
 | 
|---|
| 318 |         }
 | 
|---|
| 319 | 
 | 
|---|
| 320 |         void __kernel_io_prepare_stop( cluster & this ) {
 | 
|---|
| 321 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Stopping pollers for cluster\n", &this);
 | 
|---|
| 322 |                 // Notify the poller thread of the shutdown
 | 
|---|
| 323 |                 __atomic_store_n(&this.io->done, true, __ATOMIC_SEQ_CST);
 | 
|---|
| 324 | 
 | 
|---|
| 325 |                 // Stop the IO Poller
 | 
|---|
| 326 |                 sigval val = { 1 };
 | 
|---|
| 327 |                 pthread_sigqueue( this.io->poller.slow.kthrd, SIGUSR1, val );
 | 
|---|
| 328 |                 post( this.io->poller.sem );
 | 
|---|
| 329 | 
 | 
|---|
| 330 |                 // Wait for the poller thread to finish
 | 
|---|
| 331 |                 pthread_join( this.io->poller.slow.kthrd, 0p );
 | 
|---|
| 332 |                 free( this.io->poller.slow.stack );
 | 
|---|
| 333 | 
 | 
|---|
| 334 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller stopped for cluster\n", &this);
 | 
|---|
| 335 | 
 | 
|---|
| 336 |                 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
 | 
|---|
| 337 |                         with( this.io->poller.fast ) {
 | 
|---|
| 338 |                                 /* paranoid */ verify( this.nprocessors == 0 || &this == mainCluster );
 | 
|---|
| 339 |                                 /* paranoid */ verify( !ready_mutate_islocked() );
 | 
|---|
| 340 | 
 | 
|---|
| 341 |                                 // We need to adjust the clean-up based on where the thread is
 | 
|---|
| 342 |                                 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
 | 
|---|
| 343 | 
 | 
|---|
| 344 |                                         ready_schedule_lock( (struct __processor_id_t *)active_processor() );
 | 
|---|
| 345 | 
 | 
|---|
| 346 |                                                 // This is the tricky case
 | 
|---|
| 347 |                                                 // The thread was preempted and now it is on the ready queue
 | 
|---|
| 348 |                                                 // The thread should be the last on the list
 | 
|---|
| 349 |                                                 /* paranoid */ verify( thrd.link.next != 0p );
 | 
|---|
| 350 | 
 | 
|---|
| 351 |                                                 // Remove the thread from the ready queue of this cluster
 | 
|---|
| 352 |                                                 __attribute__((unused)) bool removed = remove_head( &this, &thrd );
 | 
|---|
| 353 |                                                 /* paranoid */ verify( removed );
 | 
|---|
| 354 |                                                 thrd.link.next = 0p;
 | 
|---|
| 355 |                                                 thrd.link.prev = 0p;
 | 
|---|
| 356 |                                                 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
 | 
|---|
| 357 | 
 | 
|---|
| 358 |                                                 // Fixup the thread state
 | 
|---|
| 359 |                                                 thrd.state = Blocked;
 | 
|---|
| 360 |                                                 thrd.ticket = 0;
 | 
|---|
| 361 |                                                 thrd.preempted = __NO_PREEMPTION;
 | 
|---|
| 362 | 
 | 
|---|
| 363 |                                         ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
 | 
|---|
| 364 | 
 | 
|---|
| 365 |                                         // Pretend like the thread was blocked all along
 | 
|---|
| 366 |                                 }
 | 
|---|
| 367 |                                 // !!! This is not an else if !!!
 | 
|---|
| 368 |                                 if( thrd.state == Blocked ) {
 | 
|---|
| 369 | 
 | 
|---|
| 370 |                                         // This is the "easy case"
 | 
|---|
| 371 |                                         // The thread is parked and can easily be moved to active cluster
 | 
|---|
| 372 |                                         verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
 | 
|---|
| 373 |                                         thrd.curr_cluster = active_cluster();
 | 
|---|
| 374 | 
 | 
|---|
| 375 |                                         // unpark the fast io_poller
 | 
|---|
| 376 |                                         unpark( &thrd __cfaabi_dbg_ctx2 );
 | 
|---|
| 377 |                                 }
 | 
|---|
| 378 |                                 else {
 | 
|---|
| 379 | 
 | 
|---|
| 380 |                                         // The thread is in a weird state
 | 
|---|
| 381 |                                         // I don't know what to do here
 | 
|---|
| 382 |                                         abort("Fast poller thread is in unexpected state, cannot clean-up correctly\n");
 | 
|---|
| 383 |                                 }
 | 
|---|
| 384 | 
 | 
|---|
| 385 |                         }
 | 
|---|
| 386 | 
 | 
|---|
| 387 |                         ^(this.io->poller.fast){};
 | 
|---|
| 388 | 
 | 
|---|
| 389 |                         __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller stopped for cluster\n", &this);
 | 
|---|
| 390 |                 }
 | 
|---|
| 391 |         }
 | 
|---|
| 392 | 
 | 
|---|
| 393 |         void __kernel_io_shutdown( cluster & this, bool main_cluster ) {
 | 
|---|
| 394 |                 if(!main_cluster) {
 | 
|---|
| 395 |                         __kernel_io_prepare_stop( this );
 | 
|---|
| 396 |                 }
 | 
|---|
| 397 | 
 | 
|---|
| 398 |                 // Shutdown the io rings
 | 
|---|
| 399 |                 struct __submition_data  & sq = this.io->submit_q;
 | 
|---|
| 400 |                 struct __completion_data & cq = this.io->completion_q;
 | 
|---|
| 401 | 
 | 
|---|
| 402 |                 // unmap the submit queue entries
 | 
|---|
| 403 |                 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
 | 
|---|
| 404 | 
 | 
|---|
| 405 |                 // unmap the Submit Queue ring
 | 
|---|
| 406 |                 munmap(sq.ring_ptr, sq.ring_sz);
 | 
|---|
| 407 | 
 | 
|---|
| 408 |                 // unmap the Completion Queue ring, if it is different
 | 
|---|
| 409 |                 if (cq.ring_ptr != sq.ring_ptr) {
 | 
|---|
| 410 |                         munmap(cq.ring_ptr, cq.ring_sz);
 | 
|---|
| 411 |                 }
 | 
|---|
| 412 | 
 | 
|---|
| 413 |                 // close the file descriptor
 | 
|---|
| 414 |                 close(this.io->fd);
 | 
|---|
| 415 | 
 | 
|---|
| 416 |                 free( this.io->submit_q.ready ); // Maybe null, doesn't matter
 | 
|---|
| 417 |                 free( this.io );
 | 
|---|
| 418 |         }
 | 
|---|
| 419 | 
 | 
|---|
| 420 | //=============================================================================================
 | 
|---|
| 421 | // I/O Polling
 | 
|---|
| 422 | //=============================================================================================
 | 
|---|
| 423 |         // Process a single completion message from the io_uring
 | 
|---|
| 424 |         // This is NOT thread-safe
 | 
|---|
| 425 |         static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) {
 | 
|---|
| 426 |                 unsigned to_submit = 0;
 | 
|---|
| 427 |                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
 | 
|---|
| 428 | 
 | 
|---|
| 429 |                         // If the poller thread also submits, then we need to aggregate the submissions which are ready
 | 
|---|
| 430 |                         uint32_t tail = *ring.submit_q.tail;
 | 
|---|
| 431 |                         const uint32_t mask = *ring.submit_q.mask;
 | 
|---|
| 432 | 
 | 
|---|
| 433 |                         // Go through the list of ready submissions
 | 
|---|
| 434 |                         for( i; ring.submit_q.ready_cnt ) {
 | 
|---|
| 435 |                                 // replace any submission with the sentinel, to consume it.
 | 
|---|
| 436 |                                 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
 | 
|---|
| 437 | 
 | 
|---|
| 438 |                                 // If it was already the sentinel, then we are done
 | 
|---|
| 439 |                                 if( idx == -1ul32 ) continue;
 | 
|---|
| 440 | 
 | 
|---|
| 441 |                                 // If we got a real submission, append it to the list
 | 
|---|
| 442 |                                 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
 | 
|---|
| 443 |                                 to_submit++;
 | 
|---|
| 444 |                         }
 | 
|---|
| 445 | 
 | 
|---|
| 446 |                         // Increment the tail based on how many we are ready to submit
 | 
|---|
| 447 |                         __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
 | 
|---|
| 448 |                 }
 | 
|---|
| 449 | 
 | 
|---|
| 450 |                 const uint32_t smask = *ring.submit_q.mask;
 | 
|---|
| 451 |                 uint32_t shead = *ring.submit_q.head;
 | 
|---|
| 452 |                 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8);
 | 
|---|
| 453 |                 if( ret < 0 ) {
 | 
|---|
| 454 |                         switch((int)errno) {
 | 
|---|
| 455 |                         case EAGAIN:
 | 
|---|
| 456 |                         case EINTR:
 | 
|---|
| 457 |                                 return -EAGAIN;
 | 
|---|
| 458 |                         default:
 | 
|---|
| 459 |                                 abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
 | 
|---|
| 460 |                         }
 | 
|---|
| 461 |                 }
 | 
|---|
| 462 | 
 | 
|---|
| 463 |                 // Release the consumed SQEs
 | 
|---|
| 464 |                 for( i; ret ) {
 | 
|---|
| 465 |                         uint32_t idx = ring.submit_q.array[ (i + shead) & smask ];
 | 
|---|
| 466 |                         ring.submit_q.sqes[ idx ].user_data = 0;
 | 
|---|
| 467 |                 }
 | 
|---|
| 468 | 
 | 
|---|
| 469 |                 uint32_t avail = 0;
 | 
|---|
| 470 |                 uint32_t sqe_num = *ring.submit_q.num;
 | 
|---|
| 471 |                 for(i; sqe_num) {
 | 
|---|
| 472 |                         if( ring.submit_q.sqes[ i ].user_data == 0 ) avail++;
 | 
|---|
| 473 |                 }
 | 
|---|
| 474 | 
 | 
|---|
| 475 |                 // update statistics
 | 
|---|
| 476 |                 #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 477 |                         __tls_stats()->io.submit_q.submit_avg.rdy += to_submit;
 | 
|---|
| 478 |                         __tls_stats()->io.submit_q.submit_avg.csm += ret;
 | 
|---|
| 479 |                         __tls_stats()->io.submit_q.submit_avg.avl += avail;
 | 
|---|
| 480 |                         __tls_stats()->io.submit_q.submit_avg.cnt += 1;
 | 
|---|
| 481 |                 #endif
 | 
|---|
| 482 | 
 | 
|---|
| 483 |                 // Drain the queue
 | 
|---|
| 484 |                 unsigned head = *ring.completion_q.head;
 | 
|---|
| 485 |                 unsigned tail = *ring.completion_q.tail;
 | 
|---|
| 486 |                 const uint32_t mask = *ring.completion_q.mask;
 | 
|---|
| 487 | 
 | 
|---|
| 488 |                 // Memory barrier
 | 
|---|
| 489 |                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
 | 
|---|
| 490 | 
 | 
|---|
| 491 |                 // Nothing was new return 0
 | 
|---|
| 492 |                 if (head == tail) {
 | 
|---|
| 493 |                         return 0;
 | 
|---|
| 494 |                 }
 | 
|---|
| 495 | 
 | 
|---|
| 496 |                 uint32_t count = tail - head;
 | 
|---|
| 497 |                 for(i; count) {
 | 
|---|
| 498 |                         unsigned idx = (head + i) & mask;
 | 
|---|
| 499 |                         struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
 | 
|---|
| 500 | 
 | 
|---|
| 501 |                         /* paranoid */ verify(&cqe);
 | 
|---|
| 502 | 
 | 
|---|
| 503 |                         struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
 | 
|---|
| 504 |                         __cfadbg_print_safe( io, "Kernel I/O : Performed reading io cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
 | 
|---|
| 505 | 
 | 
|---|
| 506 |                         data->result = cqe.res;
 | 
|---|
| 507 |                         if(!in_kernel) { unpark( data->thrd __cfaabi_dbg_ctx2 ); }
 | 
|---|
| 508 |                         else         { __unpark( &ring.poller.slow.id, data->thrd __cfaabi_dbg_ctx2 ); }
 | 
|---|
| 509 |                 }
 | 
|---|
| 510 | 
 | 
|---|
| 511 |                 // Allow new submissions to happen
 | 
|---|
| 512 |                 // V(ring.submit, count);
 | 
|---|
| 513 | 
 | 
|---|
| 514 |                 // Mark to the kernel that the cqe has been seen
 | 
|---|
| 515 |                 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
 | 
|---|
| 516 |                 __atomic_thread_fence( __ATOMIC_SEQ_CST );
 | 
|---|
| 517 |                 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
 | 
|---|
| 518 | 
 | 
|---|
| 519 |                 return [count, count > 0 || to_submit > 0];
 | 
|---|
| 520 |         }
 | 
|---|
| 521 | 
 | 
|---|
| 522 |         static void * __io_poller_slow( void * arg ) {
 | 
|---|
| 523 |                 #if !defined( __CFA_NO_STATISTICS__ )
 | 
|---|
| 524 |                         __stats_t local_stats;
 | 
|---|
| 525 |                         __init_stats( &local_stats );
 | 
|---|
| 526 |                         kernelTLS.this_stats = &local_stats;
 | 
|---|
| 527 |                 #endif
 | 
|---|
| 528 | 
 | 
|---|
| 529 |                 cluster * cltr = (cluster *)arg;
 | 
|---|
| 530 |                 struct __io_data & ring = *cltr->io;
 | 
|---|
| 531 | 
 | 
|---|
| 532 |                 ring.poller.slow.id.id = doregister( &ring.poller.slow.id );
 | 
|---|
| 533 | 
 | 
|---|
| 534 |                 sigset_t mask;
 | 
|---|
| 535 |                 sigfillset(&mask);
 | 
|---|
| 536 |                 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
 | 
|---|
| 537 |                         abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
 | 
|---|
| 538 |                 }
 | 
|---|
| 539 | 
 | 
|---|
| 540 |                 sigdelset( &mask, SIGUSR1 );
 | 
|---|
| 541 | 
 | 
|---|
| 542 |                 verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
 | 
|---|
| 543 |                 verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
 | 
|---|
| 544 | 
 | 
|---|
| 545 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p ready\n", &ring);
 | 
|---|
| 546 | 
 | 
|---|
| 547 |                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) {
 | 
|---|
| 548 |                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
 | 
|---|
| 549 | 
 | 
|---|
| 550 |                                 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST );
 | 
|---|
| 551 | 
 | 
|---|
| 552 |                                 // In the user-thread approach drain and if anything was drained,
 | 
|---|
| 553 |                                 // batton pass to the user-thread
 | 
|---|
| 554 |                                 int count;
 | 
|---|
| 555 |                                 bool again;
 | 
|---|
| 556 |                                 [count, again] = __drain_io( ring, &mask, 1, true );
 | 
|---|
| 557 | 
 | 
|---|
| 558 |                                 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST );
 | 
|---|
| 559 | 
 | 
|---|
| 560 |                                 // Update statistics
 | 
|---|
| 561 |                                 #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 562 |                                         __tls_stats()->io.complete_q.completed_avg.val += count;
 | 
|---|
| 563 |                                         __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
 | 
|---|
| 564 |                                 #endif
 | 
|---|
| 565 | 
 | 
|---|
| 566 |                                 if(again) {
 | 
|---|
| 567 |                                         __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring);
 | 
|---|
| 568 |                                         __unpark( &ring.poller.slow.id, &ring.poller.fast.thrd __cfaabi_dbg_ctx2 );
 | 
|---|
| 569 |                                         wait( ring.poller.sem );
 | 
|---|
| 570 |                                 }
 | 
|---|
| 571 |                         }
 | 
|---|
| 572 |                 }
 | 
|---|
| 573 |                 else {
 | 
|---|
| 574 |                         while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
 | 
|---|
| 575 |                                 //In the naive approach, just poll the io completion queue directly
 | 
|---|
| 576 |                                 int count;
 | 
|---|
| 577 |                                 bool again;
 | 
|---|
| 578 |                                 [count, again] = __drain_io( ring, &mask, 0, true );
 | 
|---|
| 579 | 
 | 
|---|
| 580 |                                 // Update statistics
 | 
|---|
| 581 |                                 #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 582 |                                         __tls_stats()->io.complete_q.completed_avg.val += count;
 | 
|---|
| 583 |                                         __tls_stats()->io.complete_q.completed_avg.slow_cnt += 1;
 | 
|---|
| 584 |                                 #endif
 | 
|---|
| 585 |                         }
 | 
|---|
| 586 |                 }
 | 
|---|
| 587 | 
 | 
|---|
| 588 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Slow poller for ring %p stopping\n", &ring);
 | 
|---|
| 589 | 
 | 
|---|
| 590 |                 unregister( &ring.poller.slow.id );
 | 
|---|
| 591 | 
 | 
|---|
| 592 |                 #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 593 |                         __tally_stats(cltr->stats, &local_stats);
 | 
|---|
| 594 |                 #endif
 | 
|---|
| 595 | 
 | 
|---|
| 596 |                 return 0p;
 | 
|---|
| 597 |         }
 | 
|---|
| 598 | 
 | 
|---|
| 599 |         void main( __io_poller_fast & this ) {
 | 
|---|
| 600 |                 verify( this.ring->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD );
 | 
|---|
| 601 | 
 | 
|---|
| 602 |                 // Start parked
 | 
|---|
| 603 |                 park( __cfaabi_dbg_ctx );
 | 
|---|
| 604 | 
 | 
|---|
| 605 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p ready\n", &this.ring);
 | 
|---|
| 606 | 
 | 
|---|
| 607 |                 int reset = 0;
 | 
|---|
| 608 | 
 | 
|---|
| 609 |                 // Then loop until we need to start
 | 
|---|
| 610 |                 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) {
 | 
|---|
| 611 | 
 | 
|---|
| 612 |                         // Drain the io
 | 
|---|
| 613 |                         int count;
 | 
|---|
| 614 |                         bool again;
 | 
|---|
| 615 |                         disable_interrupts();
 | 
|---|
| 616 |                                 [count, again] = __drain_io( *this.ring, 0p, 0, false );
 | 
|---|
| 617 | 
 | 
|---|
| 618 |                                 if(!again) reset++;
 | 
|---|
| 619 | 
 | 
|---|
| 620 |                                 // Update statistics
 | 
|---|
| 621 |                                 #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 622 |                                         __tls_stats()->io.complete_q.completed_avg.val += count;
 | 
|---|
| 623 |                                         __tls_stats()->io.complete_q.completed_avg.fast_cnt += 1;
 | 
|---|
| 624 |                                 #endif
 | 
|---|
| 625 |                         enable_interrupts( __cfaabi_dbg_ctx );
 | 
|---|
| 626 | 
 | 
|---|
| 627 |                         // If we got something, just yield and check again
 | 
|---|
| 628 |                         if(reset < 5) {
 | 
|---|
| 629 |                                 yield();
 | 
|---|
| 630 |                         }
 | 
|---|
| 631 |                         // We didn't get anything baton pass to the slow poller
 | 
|---|
| 632 |                         else {
 | 
|---|
| 633 |                                 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring);
 | 
|---|
| 634 |                                 reset = 0;
 | 
|---|
| 635 | 
 | 
|---|
| 636 |                                 // wake up the slow poller
 | 
|---|
| 637 |                                 post( this.ring->poller.sem );
 | 
|---|
| 638 | 
 | 
|---|
| 639 |                                 // park this thread
 | 
|---|
| 640 |                                 park( __cfaabi_dbg_ctx );
 | 
|---|
| 641 |                         }
 | 
|---|
| 642 |                 }
 | 
|---|
| 643 | 
 | 
|---|
| 644 |                 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
 | 
|---|
| 645 |         }
 | 
|---|
| 646 | 
 | 
|---|
| 647 |         static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial));
 | 
|---|
| 648 |         static inline void __wake_poller( struct __io_data & ring ) {
 | 
|---|
| 649 |                 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return;
 | 
|---|
| 650 | 
 | 
|---|
| 651 |                 sigval val = { 1 };
 | 
|---|
| 652 |                 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val );
 | 
|---|
| 653 |         }
 | 
|---|
| 654 | 
 | 
|---|
| 655 | //=============================================================================================
 | 
|---|
| 656 | // I/O Submissions
 | 
|---|
| 657 | //=============================================================================================
 | 
|---|
| 658 | 
 | 
|---|
| 659 | // Submition steps :
 | 
|---|
| 660 | // 1 - We need to make sure we don't overflow any of the buffer, P(ring.submit) to make sure
 | 
|---|
| 661 | //     entries are available. The semaphore make sure that there is no more operations in
 | 
|---|
| 662 | //     progress then the number of entries in the buffer. This probably limits concurrency
 | 
|---|
| 663 | //     more than necessary since submitted but not completed operations don't need any
 | 
|---|
| 664 | //     entries in user space. However, I don't know what happens if we overflow the buffers
 | 
|---|
| 665 | //     because too many requests completed at once. This is a safe approach in all cases.
 | 
|---|
| 666 | //     Furthermore, with hundreds of entries, this may be okay.
 | 
|---|
| 667 | //
 | 
|---|
| 668 | // 2 - Allocate a queue entry. The ring already has memory for all entries but only the ones
 | 
|---|
| 669 | //     listed in sq.array are visible by the kernel. For those not listed, the kernel does not
 | 
|---|
| 670 | //     offer any assurance that an entry is not being filled by multiple flags. Therefore, we
 | 
|---|
| 671 | //     need to write an allocator that allows allocating concurrently.
 | 
|---|
| 672 | //
 | 
|---|
| 673 | // 3 - Actually fill the submit entry, this is the only simple and straightforward step.
 | 
|---|
| 674 | //
 | 
|---|
| 675 | // 4 - Append the entry index to the array and adjust the tail accordingly. This operation
 | 
|---|
| 676 | //     needs to arrive to two concensus at the same time:
 | 
|---|
| 677 | //     A - The order in which entries are listed in the array: no two threads must pick the
 | 
|---|
| 678 | //         same index for their entries
 | 
|---|
| 679 | //     B - When can the tail be update for the kernel. EVERY entries in the array between
 | 
|---|
| 680 | //         head and tail must be fully filled and shouldn't ever be touched again.
 | 
|---|
| 681 | //
 | 
|---|
| 682 | 
 | 
|---|
| 683 |         [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
 | 
|---|
| 684 |                 verify( data != 0 );
 | 
|---|
| 685 | 
 | 
|---|
| 686 | 
 | 
|---|
| 687 |                 // Prepare the data we need
 | 
|---|
| 688 |                 __attribute((unused)) int len   = 0;
 | 
|---|
| 689 |                 __attribute((unused)) int block = 0;
 | 
|---|
| 690 |                 uint32_t cnt = *ring.submit_q.num;
 | 
|---|
| 691 |                 uint32_t mask = *ring.submit_q.mask;
 | 
|---|
| 692 | 
 | 
|---|
| 693 |                 disable_interrupts();
 | 
|---|
| 694 |                         uint32_t off = __tls_rand();
 | 
|---|
| 695 |                 enable_interrupts( __cfaabi_dbg_ctx );
 | 
|---|
| 696 | 
 | 
|---|
| 697 |                 // Loop around looking for an available spot
 | 
|---|
| 698 |                 for() {
 | 
|---|
| 699 |                         // Look through the list starting at some offset
 | 
|---|
| 700 |                         for(i; cnt) {
 | 
|---|
| 701 |                                 uint64_t expected = 0;
 | 
|---|
| 702 |                                 uint32_t idx = (i + off) & mask;
 | 
|---|
| 703 |                                 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
 | 
|---|
| 704 |                                 volatile uint64_t * udata = &sqe->user_data;
 | 
|---|
| 705 | 
 | 
|---|
| 706 |                                 if( *udata == expected &&
 | 
|---|
| 707 |                                         __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
 | 
|---|
| 708 |                                 {
 | 
|---|
| 709 |                                         // update statistics
 | 
|---|
| 710 |                                         #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 711 |                                                 disable_interrupts();
 | 
|---|
| 712 |                                                         __tls_stats()->io.submit_q.alloc_avg.val   += len;
 | 
|---|
| 713 |                                                         __tls_stats()->io.submit_q.alloc_avg.block += block;
 | 
|---|
| 714 |                                                         __tls_stats()->io.submit_q.alloc_avg.cnt   += 1;
 | 
|---|
| 715 |                                                 enable_interrupts( __cfaabi_dbg_ctx );
 | 
|---|
| 716 |                                         #endif
 | 
|---|
| 717 | 
 | 
|---|
| 718 | 
 | 
|---|
| 719 |                                         // Success return the data
 | 
|---|
| 720 |                                         return [sqe, idx];
 | 
|---|
| 721 |                                 }
 | 
|---|
| 722 |                                 verify(expected != data);
 | 
|---|
| 723 | 
 | 
|---|
| 724 |                                 len ++;
 | 
|---|
| 725 |                         }
 | 
|---|
| 726 | 
 | 
|---|
| 727 |                         block++;
 | 
|---|
| 728 |                         yield();
 | 
|---|
| 729 |                 }
 | 
|---|
| 730 |         }
 | 
|---|
| 731 | 
 | 
|---|
| 732 |         static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
 | 
|---|
| 733 |                 /* paranoid */ verify( idx <= mask   );
 | 
|---|
| 734 |                 /* paranoid */ verify( idx != -1ul32 );
 | 
|---|
| 735 | 
 | 
|---|
| 736 |                 // We need to find a spot in the ready array
 | 
|---|
| 737 |                 __attribute((unused)) int len   = 0;
 | 
|---|
| 738 |                 __attribute((unused)) int block = 0;
 | 
|---|
| 739 |                 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
 | 
|---|
| 740 | 
 | 
|---|
| 741 |                 disable_interrupts();
 | 
|---|
| 742 |                         uint32_t off = __tls_rand();
 | 
|---|
| 743 |                 enable_interrupts( __cfaabi_dbg_ctx );
 | 
|---|
| 744 | 
 | 
|---|
| 745 |                 uint32_t picked;
 | 
|---|
| 746 |                 LOOKING: for() {
 | 
|---|
| 747 |                         for(i; ring.submit_q.ready_cnt) {
 | 
|---|
| 748 |                                 picked = (i + off) & ready_mask;
 | 
|---|
| 749 |                                 uint32_t expected = -1ul32;
 | 
|---|
| 750 |                                 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
 | 
|---|
| 751 |                                         break LOOKING;
 | 
|---|
| 752 |                                 }
 | 
|---|
| 753 |                                 verify(expected != idx);
 | 
|---|
| 754 | 
 | 
|---|
| 755 |                                 len ++;
 | 
|---|
| 756 |                         }
 | 
|---|
| 757 | 
 | 
|---|
| 758 |                         block++;
 | 
|---|
| 759 |                         yield();
 | 
|---|
| 760 |                 }
 | 
|---|
| 761 | 
 | 
|---|
| 762 |                 // update statistics
 | 
|---|
| 763 |                 #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 764 |                 disable_interrupts();
 | 
|---|
| 765 |                         __tls_stats()->io.submit_q.look_avg.val   += len;
 | 
|---|
| 766 |                         __tls_stats()->io.submit_q.look_avg.block += block;
 | 
|---|
| 767 |                         __tls_stats()->io.submit_q.look_avg.cnt   += 1;
 | 
|---|
| 768 |                 enable_interrupts( __cfaabi_dbg_ctx );
 | 
|---|
| 769 |                 #endif
 | 
|---|
| 770 | 
 | 
|---|
| 771 |                 return picked;
 | 
|---|
| 772 |         }
 | 
|---|
| 773 | 
 | 
|---|
| 774 |         void __submit( struct __io_data & ring, uint32_t idx ) {
 | 
|---|
| 775 |                 // Get now the data we definetely need
 | 
|---|
| 776 |                 uint32_t * const tail = ring.submit_q.tail;
 | 
|---|
| 777 |                 const uint32_t mask = *ring.submit_q.mask;
 | 
|---|
| 778 | 
 | 
|---|
| 779 |                 // There are 2 submission schemes, check which one we are using
 | 
|---|
| 780 |                 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) {
 | 
|---|
| 781 |                         // If the poller thread submits, then we just need to add this to the ready array
 | 
|---|
| 782 | 
 | 
|---|
| 783 |                         __submit_to_ready_array( ring, idx, mask );
 | 
|---|
| 784 | 
 | 
|---|
| 785 |                         __wake_poller( ring );
 | 
|---|
| 786 | 
 | 
|---|
| 787 |                         __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
 | 
|---|
| 788 |                 }
 | 
|---|
| 789 |                 else {
 | 
|---|
| 790 |                         // get mutual exclusion
 | 
|---|
| 791 |                         lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
 | 
|---|
| 792 | 
 | 
|---|
| 793 |                         // Append to the list of ready entries
 | 
|---|
| 794 | 
 | 
|---|
| 795 |                         /* paranoid */ verify( idx <= mask );
 | 
|---|
| 796 | 
 | 
|---|
| 797 |                         ring.submit_q.array[ (*tail) & mask ] = idx & mask;
 | 
|---|
| 798 |                         __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
 | 
|---|
| 799 | 
 | 
|---|
| 800 |                         // Submit however, many entries need to be submitted
 | 
|---|
| 801 |                         int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0);
 | 
|---|
| 802 |                         if( ret < 0 ) {
 | 
|---|
| 803 |                                 switch((int)errno) {
 | 
|---|
| 804 |                                 default:
 | 
|---|
| 805 |                                         abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
 | 
|---|
| 806 |                                 }
 | 
|---|
| 807 |                         }
 | 
|---|
| 808 | 
 | 
|---|
| 809 |                         // update statistics
 | 
|---|
| 810 |                         #if !defined(__CFA_NO_STATISTICS__)
 | 
|---|
| 811 |                                 __tls_stats()->io.submit_q.submit_avg.csm += 1;
 | 
|---|
| 812 |                                 __tls_stats()->io.submit_q.submit_avg.cnt += 1;
 | 
|---|
| 813 |                         #endif
 | 
|---|
| 814 | 
 | 
|---|
| 815 |                         ring.submit_q.sqes[ idx & mask ].user_data = 0;
 | 
|---|
| 816 | 
 | 
|---|
| 817 |                         unlock(ring.submit_q.lock);
 | 
|---|
| 818 | 
 | 
|---|
| 819 |                         __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
 | 
|---|
| 820 |                 }
 | 
|---|
| 821 |         }
 | 
|---|
| 822 | #endif
 | 
|---|