source: libcfa/src/concurrency/io/setup.cfa@ d60d30e

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since d60d30e was 78da4ab, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

New implementation of io based on instance burrowing.
Trying to avoid the unbounded growth of the previous flat combining approach.

  • Property mode set to 100644
File size: 15.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io/setup.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Fri Jul 31 16:25:51 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE /* See feature_test_macros(7) */
18
19#if defined(__CFA_DEBUG__)
20 // #define __CFA_DEBUG_PRINT_IO__
21 // #define __CFA_DEBUG_PRINT_IO_CORE__
22#endif
23
24#include "io/types.hfa"
25#include "kernel.hfa"
26
27#if !defined(CFA_HAVE_LINUX_IO_URING_H)
28 void __kernel_io_startup() {
29 // Nothing to do without io_uring
30 }
31
32 void __kernel_io_shutdown() {
33 // Nothing to do without io_uring
34 }
35
36 void ?{}(io_context_params & this) {}
37
38 void ?{}($io_context & this, struct cluster & cl) {}
39 void ^?{}($io_context & this) {}
40
41 $io_arbiter * create(void) { return 0p; }
42 void destroy($io_arbiter *) {}
43
44#else
45 #include <errno.h>
46 #include <stdint.h>
47 #include <string.h>
48 #include <signal.h>
49 #include <unistd.h>
50
51 extern "C" {
52 #include <pthread.h>
53 #include <sys/epoll.h>
54 #include <sys/eventfd.h>
55 #include <sys/mman.h>
56 #include <sys/syscall.h>
57
58 #include <linux/io_uring.h>
59 }
60
61 #include "bitmanip.hfa"
62 #include "kernel_private.hfa"
63 #include "thread.hfa"
64
65 void ?{}(io_context_params & this) {
66 this.num_entries = 256;
67 }
68
69 static void * __io_poller_slow( void * arg );
70
71 // Weirdly, some systems that do support io_uring don't actually define these
72 #ifdef __alpha__
73 /*
74 * alpha is the only exception, all other architectures
75 * have common numbers for new system calls.
76 */
77 #ifndef __NR_io_uring_setup
78 #define __NR_io_uring_setup 535
79 #endif
80 #ifndef __NR_io_uring_enter
81 #define __NR_io_uring_enter 536
82 #endif
83 #ifndef __NR_io_uring_register
84 #define __NR_io_uring_register 537
85 #endif
86 #else /* !__alpha__ */
87 #ifndef __NR_io_uring_setup
88 #define __NR_io_uring_setup 425
89 #endif
90 #ifndef __NR_io_uring_enter
91 #define __NR_io_uring_enter 426
92 #endif
93 #ifndef __NR_io_uring_register
94 #define __NR_io_uring_register 427
95 #endif
96 #endif
97
98//=============================================================================================
99// I/O Startup / Shutdown logic + Master Poller
100//=============================================================================================
101
102 // IO Master poller loop forward
103 static void * iopoll_loop( __attribute__((unused)) void * args );
104
105 static struct {
106 pthread_t thrd; // pthread handle to io poller thread
107 void * stack; // pthread stack for io poller thread
108 int epollfd; // file descriptor to the epoll instance
109 volatile bool run; // Whether or not to continue
110 volatile bool stopped; // Whether the poller has finished running
111 volatile uint64_t epoch; // Epoch used for memory reclamation
112 } iopoll;
113
114 void __kernel_io_startup(void) {
115 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
116
117 iopoll.epollfd = epoll_create1(0);
118 if (iopoll.epollfd == -1) {
119 abort( "internal error, epoll_create1\n");
120 }
121
122 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
123
124 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
125 iopoll.run = true;
126 iopoll.stopped = false;
127 iopoll.epoch = 0;
128 }
129
130 void __kernel_io_shutdown(void) {
131 // Notify the io poller thread of the shutdown
132 iopoll.run = false;
133 sigval val = { 1 };
134 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
135
136 // Wait for the io poller thread to finish
137
138 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
139
140 int ret = close(iopoll.epollfd);
141 if (ret == -1) {
142 abort( "internal error, close epoll\n");
143 }
144
145 // Io polling is now fully stopped
146
147 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
148 }
149
150 static void * iopoll_loop( __attribute__((unused)) void * args ) {
151 __processor_id_t id;
152 id.full_proc = false;
153 id.id = doregister(&id);
154 __cfaabi_tls.this_proc_id = &id;
155 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
156
157 // Block signals to control when they arrive
158 sigset_t mask;
159 sigfillset(&mask);
160 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
161 abort( "internal error, pthread_sigmask" );
162 }
163
164 sigdelset( &mask, SIGUSR1 );
165
166 // Create sufficient events
167 struct epoll_event events[10];
168 // Main loop
169 while( iopoll.run ) {
170 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
171
172 // increment the epoch to notify any deleters we are starting a new cycle
173 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
174
175 // Wait for events
176 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
177
178 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
179
180 // Check if an error occured
181 if (nfds == -1) {
182 if( errno == EINTR ) continue;
183 abort( "internal error, pthread_sigmask" );
184 }
185
186 for(i; nfds) {
187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
188 /* paranoid */ verify( io_ctx );
189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
190 #if !defined( __CFA_NO_STATISTICS__ )
191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
192 #endif
193
194 eventfd_t v;
195 eventfd_read(io_ctx->efd, &v);
196
197 post( io_ctx->sem );
198 }
199 }
200
201 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
202
203 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
204 unregister(&id);
205 return 0p;
206 }
207
208//=============================================================================================
209// I/O Context Constrution/Destruction
210//=============================================================================================
211
212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
213 static void __io_uring_teardown( $io_context & this );
214 static void __epoll_register($io_context & ctx);
215 static void __epoll_unregister($io_context & ctx);
216 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
217 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
218
219 void ?{}($io_context & this, struct cluster & cl) {
220 (this.self){ "IO Poller", cl };
221 this.ext_sq.empty = true;
222 __io_uring_setup( this, cl.io.params );
223 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
224
225 __epoll_register(this);
226
227 __ioarbiter_register(*cl.io.arbiter, this);
228
229 __thrd_start( this, main );
230 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
231 }
232
233 void ^?{}($io_context & mutex this) {
234 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
235
236 ^(this.self){};
237 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
238
239 __ioarbiter_unregister(*this.arbiter, this);
240
241 __epoll_unregister(this);
242
243 __io_uring_teardown( this );
244 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
245 }
246
247 void ?{}(io_context & this, struct cluster & cl) {
248 // this.ctx = new(cl);
249 this.ctx = alloc();
250 (*this.ctx){ cl };
251
252 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
253 }
254
255 void ^?{}(io_context & this) {
256 post( this.ctx->sem );
257
258 delete(this.ctx);
259 }
260
261 extern void __disable_interrupts_hard();
262 extern void __enable_interrupts_hard();
263
264 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
265 // Step 1 : call to setup
266 struct io_uring_params params;
267 memset(&params, 0, sizeof(params));
268 // if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;
269 // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
270
271 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
272 if( !is_pow2(nentries) ) {
273 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
274 }
275
276 int fd = syscall(__NR_io_uring_setup, nentries, &params );
277 if(fd < 0) {
278 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
279 }
280
281 // Step 2 : mmap result
282 struct __sub_ring_t & sq = this.sq;
283 struct __cmp_ring_t & cq = this.cq;
284
285 // calculate the right ring size
286 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
287 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
288
289 // Requires features
290 #if defined(IORING_FEAT_SINGLE_MMAP)
291 // adjust the size according to the parameters
292 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
293 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
294 }
295 #endif
296
297 // mmap the Submit Queue into existence
298 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
299 if (sq.ring_ptr == (void*)MAP_FAILED) {
300 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
301 }
302
303 // Requires features
304 #if defined(IORING_FEAT_SINGLE_MMAP)
305 // mmap the Completion Queue into existence (may or may not be needed)
306 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
307 cq.ring_ptr = sq.ring_ptr;
308 }
309 else
310 #endif
311 {
312 // We need multiple call to MMAP
313 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
314 if (cq.ring_ptr == (void*)MAP_FAILED) {
315 munmap(sq.ring_ptr, sq.ring_sz);
316 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
317 }
318 }
319
320 // mmap the submit queue entries
321 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
322 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
323 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
324 munmap(sq.ring_ptr, sq.ring_sz);
325 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
326 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
327 }
328
329 // Step 3 : Initialize the data structure
330 // Get the pointers from the kernel to fill the structure
331 // submit queue
332 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
333 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
334 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
335 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
336 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
337 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
338 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
339
340 sq.kring.ready = 0;
341 sq.kring.released = 0;
342
343 sq.free_ring.head = 0;
344 sq.free_ring.tail = *sq.num;
345 sq.free_ring.array = alloc( *sq.num, 128`align );
346 for(i; (__u32)*sq.num) {
347 sq.free_ring.array[i] = i;
348 }
349
350 sq.to_submit = 0;
351
352 // completion queue
353 cq.head = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
354 cq.tail = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
355 cq.mask = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
356 cq.num = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
357 cq.overflow = ( __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
358 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
359
360 // Step 4 : eventfd
361 // io_uring_register is so f*cking slow on some machine that it
362 // will never succeed if preemption isn't hard blocked
363 __disable_interrupts_hard();
364
365 int efd = eventfd(0, 0);
366 if (efd < 0) {
367 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
368 }
369
370 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
371 if (ret < 0) {
372 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
373 }
374
375 __enable_interrupts_hard();
376
377 // some paranoid checks
378 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
379 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
380 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
381 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
382
383 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
384 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
385 /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
386 /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
387
388 // Update the global ring info
389 this.ring_flags = 0;
390 this.fd = fd;
391 this.efd = efd;
392 }
393
394 static void __io_uring_teardown( $io_context & this ) {
395 // Shutdown the io rings
396 struct __sub_ring_t & sq = this.sq;
397 struct __cmp_ring_t & cq = this.cq;
398
399 // unmap the submit queue entries
400 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
401
402 // unmap the Submit Queue ring
403 munmap(sq.ring_ptr, sq.ring_sz);
404
405 // unmap the Completion Queue ring, if it is different
406 if (cq.ring_ptr != sq.ring_ptr) {
407 munmap(cq.ring_ptr, cq.ring_sz);
408 }
409
410 // close the file descriptor
411 close(this.fd);
412 close(this.efd);
413
414 free( this.sq.free_ring.array ); // Maybe null, doesn't matter
415 }
416
417//=============================================================================================
418// I/O Context Sleep
419//=============================================================================================
420 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
421 struct epoll_event ev;
422 ev.events = EPOLLIN | EPOLLONESHOT;
423 ev.data.u64 = (__u64)&ctx;
424 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
425 if (ret < 0) {
426 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
427 }
428 }
429
430 static void __epoll_register($io_context & ctx) {
431 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
432 }
433
434 static void __epoll_unregister($io_context & ctx) {
435 // Read the current epoch so we know when to stop
436 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
437
438 // Remove the fd from the iopoller
439 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
440
441 // Notify the io poller thread of the shutdown
442 iopoll.run = false;
443 sigval val = { 1 };
444 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
445
446 // Make sure all this is done
447 __atomic_thread_fence(__ATOMIC_SEQ_CST);
448
449 // Wait for the next epoch
450 while(curr == iopoll.epoch && !iopoll.stopped) Pause();
451 }
452
453 void __ioctx_prepare_block($io_context & ctx) {
454 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
455 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
456 }
457
458
459//=============================================================================================
460// I/O Context Misc Setup
461//=============================================================================================
462 void ?{}( $io_arbiter & this ) {
463 this.pending.flag = false;
464 }
465
466 void ^?{}( $io_arbiter & mutex this ) {
467 /* paranoid */ verify( empty(this.assigned) );
468 /* paranoid */ verify( empty(this.available) );
469 /* paranoid */ verify( is_empty(this.pending.blocked) );
470 }
471
472 $io_arbiter * create(void) {
473 return new();
474 }
475 void destroy($io_arbiter * arbiter) {
476 delete(arbiter);
477 }
478
479//=============================================================================================
480// I/O Context Misc Setup
481//=============================================================================================
482
483#endif
Note: See TracBrowser for help on using the repository browser.