source: libcfa/src/concurrency/io/setup.cfa@ 150d21a

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 150d21a was 4c4d854, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Fixed broken initialization and add verify.

  • Property mode set to 100644
File size: 15.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io/setup.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Fri Jul 31 16:25:51 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE /* See feature_test_macros(7) */
18
19#if defined(__CFA_DEBUG__)
20 // #define __CFA_DEBUG_PRINT_IO__
21 // #define __CFA_DEBUG_PRINT_IO_CORE__
22#endif
23
24#include "io/types.hfa"
25#include "kernel.hfa"
26
27#if !defined(CFA_HAVE_LINUX_IO_URING_H)
28 void __kernel_io_startup() {
29 // Nothing to do without io_uring
30 }
31
32 void __kernel_io_shutdown() {
33 // Nothing to do without io_uring
34 }
35
36 void ?{}(io_context_params & this) {}
37
38 void ?{}($io_context & this, struct cluster & cl) {}
39 void ^?{}($io_context & this) {}
40
41 $io_arbiter * create(void) { return 0p; }
42 void destroy($io_arbiter *) {}
43
44#else
45 #include <errno.h>
46 #include <stdint.h>
47 #include <string.h>
48 #include <signal.h>
49 #include <unistd.h>
50
51 extern "C" {
52 #include <pthread.h>
53 #include <sys/epoll.h>
54 #include <sys/eventfd.h>
55 #include <sys/mman.h>
56 #include <sys/syscall.h>
57
58 #include <linux/io_uring.h>
59 }
60
61 #include "bitmanip.hfa"
62 #include "kernel_private.hfa"
63 #include "thread.hfa"
64
65 void ?{}(io_context_params & this) {
66 this.num_entries = 256;
67 }
68
69 static void * __io_poller_slow( void * arg );
70
71 // Weirdly, some systems that do support io_uring don't actually define these
72 #ifdef __alpha__
73 /*
74 * alpha is the only exception, all other architectures
75 * have common numbers for new system calls.
76 */
77 #ifndef __NR_io_uring_setup
78 #define __NR_io_uring_setup 535
79 #endif
80 #ifndef __NR_io_uring_enter
81 #define __NR_io_uring_enter 536
82 #endif
83 #ifndef __NR_io_uring_register
84 #define __NR_io_uring_register 537
85 #endif
86 #else /* !__alpha__ */
87 #ifndef __NR_io_uring_setup
88 #define __NR_io_uring_setup 425
89 #endif
90 #ifndef __NR_io_uring_enter
91 #define __NR_io_uring_enter 426
92 #endif
93 #ifndef __NR_io_uring_register
94 #define __NR_io_uring_register 427
95 #endif
96 #endif
97
98//=============================================================================================
99// I/O Startup / Shutdown logic + Master Poller
100//=============================================================================================
101
102 // IO Master poller loop forward
103 static void * iopoll_loop( __attribute__((unused)) void * args );
104
105 static struct {
106 pthread_t thrd; // pthread handle to io poller thread
107 void * stack; // pthread stack for io poller thread
108 int epollfd; // file descriptor to the epoll instance
109 volatile bool run; // Whether or not to continue
110 volatile bool stopped; // Whether the poller has finished running
111 volatile uint64_t epoch; // Epoch used for memory reclamation
112 } iopoll;
113
114 void __kernel_io_startup(void) {
115 __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
116
117 iopoll.epollfd = epoll_create1(0);
118 if (iopoll.epollfd == -1) {
119 abort( "internal error, epoll_create1\n");
120 }
121
122 __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
123
124 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
125 iopoll.run = true;
126 iopoll.stopped = false;
127 iopoll.epoch = 0;
128 }
129
130 void __kernel_io_shutdown(void) {
131 // Notify the io poller thread of the shutdown
132 iopoll.run = false;
133 sigval val = { 1 };
134 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
135
136 // Wait for the io poller thread to finish
137
138 __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
139
140 int ret = close(iopoll.epollfd);
141 if (ret == -1) {
142 abort( "internal error, close epoll\n");
143 }
144
145 // Io polling is now fully stopped
146
147 __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
148 }
149
150 static void * iopoll_loop( __attribute__((unused)) void * args ) {
151 __processor_id_t id;
152 id.full_proc = false;
153 id.id = doregister(&id);
154 __cfaabi_tls.this_proc_id = &id;
155 __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
156
157 // Block signals to control when they arrive
158 sigset_t mask;
159 sigfillset(&mask);
160 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
161 abort( "internal error, pthread_sigmask" );
162 }
163
164 sigdelset( &mask, SIGUSR1 );
165
166 // Create sufficient events
167 struct epoll_event events[10];
168 // Main loop
169 while( iopoll.run ) {
170 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
171
172 // increment the epoch to notify any deleters we are starting a new cycle
173 __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
174
175 // Wait for events
176 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
177
178 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
179
180 // Check if an error occured
181 if (nfds == -1) {
182 if( errno == EINTR ) continue;
183 abort( "internal error, pthread_sigmask" );
184 }
185
186 for(i; nfds) {
187 $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
188 /* paranoid */ verify( io_ctx );
189 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
190 #if !defined( __CFA_NO_STATISTICS__ )
191 __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
192 #endif
193
194 eventfd_t v;
195 eventfd_read(io_ctx->efd, &v);
196
197 post( io_ctx->sem );
198 }
199 }
200
201 __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
202
203 __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
204 unregister(&id);
205 return 0p;
206 }
207
208//=============================================================================================
209// I/O Context Constrution/Destruction
210//=============================================================================================
211
212 static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
213 static void __io_uring_teardown( $io_context & this );
214 static void __epoll_register($io_context & ctx);
215 static void __epoll_unregister($io_context & ctx);
216 void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
217 void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
218
219 void ?{}($io_context & this, struct cluster & cl) {
220 (this.self){ "IO Poller", cl };
221 this.ext_sq.empty = true;
222 this.revoked = true;
223 __io_uring_setup( this, cl.io.params );
224 __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
225
226 __epoll_register(this);
227
228 __ioarbiter_register(*cl.io.arbiter, this);
229
230 __thrd_start( this, main );
231 __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
232 }
233
234 void ^?{}($io_context & mutex this) {
235 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
236
237 ^(this.self){};
238 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
239
240 __ioarbiter_unregister(*this.arbiter, this);
241
242 __epoll_unregister(this);
243
244 __io_uring_teardown( this );
245 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
246 }
247
248 void ?{}(io_context & this, struct cluster & cl) {
249 // this.ctx = new(cl);
250 this.ctx = alloc();
251 (*this.ctx){ cl };
252
253 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
254 }
255
256 void ^?{}(io_context & this) {
257 post( this.ctx->sem );
258
259 delete(this.ctx);
260 }
261
262 extern void __disable_interrupts_hard();
263 extern void __enable_interrupts_hard();
264
265 static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
266 // Step 1 : call to setup
267 struct io_uring_params params;
268 memset(&params, 0, sizeof(params));
269 // if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;
270 // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
271
272 __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
273 if( !is_pow2(nentries) ) {
274 abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
275 }
276
277 int fd = syscall(__NR_io_uring_setup, nentries, &params );
278 if(fd < 0) {
279 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
280 }
281
282 // Step 2 : mmap result
283 struct __sub_ring_t & sq = this.sq;
284 struct __cmp_ring_t & cq = this.cq;
285
286 // calculate the right ring size
287 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
288 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
289
290 // Requires features
291 #if defined(IORING_FEAT_SINGLE_MMAP)
292 // adjust the size according to the parameters
293 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
294 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
295 }
296 #endif
297
298 // mmap the Submit Queue into existence
299 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
300 if (sq.ring_ptr == (void*)MAP_FAILED) {
301 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
302 }
303
304 // Requires features
305 #if defined(IORING_FEAT_SINGLE_MMAP)
306 // mmap the Completion Queue into existence (may or may not be needed)
307 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
308 cq.ring_ptr = sq.ring_ptr;
309 }
310 else
311 #endif
312 {
313 // We need multiple call to MMAP
314 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
315 if (cq.ring_ptr == (void*)MAP_FAILED) {
316 munmap(sq.ring_ptr, sq.ring_sz);
317 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
318 }
319 }
320
321 // mmap the submit queue entries
322 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
323 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
324 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
325 munmap(sq.ring_ptr, sq.ring_sz);
326 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
327 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
328 }
329
330 // Step 3 : Initialize the data structure
331 // Get the pointers from the kernel to fill the structure
332 // submit queue
333 sq.kring.head = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
334 sq.kring.tail = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
335 sq.kring.array = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
336 sq.mask = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
337 sq.num = ( const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
338 sq.flags = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
339 sq.dropped = ( __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
340
341 sq.kring.ready = 0;
342 sq.kring.released = 0;
343
344 sq.free_ring.head = 0;
345 sq.free_ring.tail = *sq.num;
346 sq.free_ring.array = alloc( *sq.num, 128`align );
347 for(i; (__u32)*sq.num) {
348 sq.free_ring.array[i] = i;
349 }
350
351 sq.to_submit = 0;
352
353 // completion queue
354 cq.head = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
355 cq.tail = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
356 cq.mask = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
357 cq.num = ( const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
358 cq.overflow = ( __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
359 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
360
361 // Step 4 : eventfd
362 // io_uring_register is so f*cking slow on some machine that it
363 // will never succeed if preemption isn't hard blocked
364 __disable_interrupts_hard();
365
366 int efd = eventfd(0, 0);
367 if (efd < 0) {
368 abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
369 }
370
371 int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
372 if (ret < 0) {
373 abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
374 }
375
376 __enable_interrupts_hard();
377
378 // some paranoid checks
379 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
380 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
381 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
382 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
383
384 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
385 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
386 /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
387 /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
388
389 // Update the global ring info
390 this.ring_flags = 0;
391 this.fd = fd;
392 this.efd = efd;
393 }
394
395 static void __io_uring_teardown( $io_context & this ) {
396 // Shutdown the io rings
397 struct __sub_ring_t & sq = this.sq;
398 struct __cmp_ring_t & cq = this.cq;
399
400 // unmap the submit queue entries
401 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
402
403 // unmap the Submit Queue ring
404 munmap(sq.ring_ptr, sq.ring_sz);
405
406 // unmap the Completion Queue ring, if it is different
407 if (cq.ring_ptr != sq.ring_ptr) {
408 munmap(cq.ring_ptr, cq.ring_sz);
409 }
410
411 // close the file descriptor
412 close(this.fd);
413 close(this.efd);
414
415 free( this.sq.free_ring.array ); // Maybe null, doesn't matter
416 }
417
418//=============================================================================================
419// I/O Context Sleep
420//=============================================================================================
421 static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
422 struct epoll_event ev;
423 ev.events = EPOLLIN | EPOLLONESHOT;
424 ev.data.u64 = (__u64)&ctx;
425 int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
426 if (ret < 0) {
427 abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
428 }
429 }
430
431 static void __epoll_register($io_context & ctx) {
432 __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
433 }
434
435 static void __epoll_unregister($io_context & ctx) {
436 // Read the current epoch so we know when to stop
437 size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
438
439 // Remove the fd from the iopoller
440 __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
441
442 // Notify the io poller thread of the shutdown
443 iopoll.run = false;
444 sigval val = { 1 };
445 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
446
447 // Make sure all this is done
448 __atomic_thread_fence(__ATOMIC_SEQ_CST);
449
450 // Wait for the next epoch
451 while(curr == iopoll.epoch && !iopoll.stopped) Pause();
452 }
453
454 void __ioctx_prepare_block($io_context & ctx) {
455 __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
456 __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
457 }
458
459
460//=============================================================================================
461// I/O Context Misc Setup
462//=============================================================================================
463 void ?{}( $io_arbiter & this ) {
464 this.pending.flag = false;
465 }
466
467 void ^?{}( $io_arbiter & mutex this ) {
468 /* paranoid */ verify( empty(this.assigned) );
469 /* paranoid */ verify( empty(this.available) );
470 /* paranoid */ verify( is_empty(this.pending.blocked) );
471 }
472
473 $io_arbiter * create(void) {
474 return new();
475 }
476 void destroy($io_arbiter * arbiter) {
477 delete(arbiter);
478 }
479
480//=============================================================================================
481// I/O Context Misc Setup
482//=============================================================================================
483
484#endif
Note: See TracBrowser for help on using the repository browser.