source: libcfa/src/concurrency/io/setup.cfa@ c44d652

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since c44d652 was c44d652, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Fixed broken merge

  • Property mode set to 100644
File size: 15.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io/setup.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Fri Jul 31 16:25:51 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE /* See feature_test_macros(7) */
18
19#include "io/types.hfa"
20#include "kernel.hfa"
21
22#if !defined(CFA_HAVE_LINUX_IO_URING_H)
23 void __kernel_io_startup() {
24 // Nothing to do without io_uring
25 }
26
27 void __kernel_io_shutdown() {
28 // Nothing to do without io_uring
29 }
30
31 void ?{}(io_context & this, struct cluster & cl) {}
32 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {}
33
34 void ^?{}(io_context & this) {}
35 void ^?{}(io_context & this, bool cluster_context) {}
36
37#else
38 #include <errno.h>
39 #include <stdint.h>
40 #include <string.h>
41 #include <signal.h>
42 #include <unistd.h>
43
44 extern "C" {
45 #include <pthread.h>
46 #include <sys/epoll.h>
47 #include <sys/mman.h>
48 #include <sys/syscall.h>
49
50 #include <linux/io_uring.h>
51 }
52
53 #include "bitmanip.hfa"
54 #include "kernel_private.hfa"
55 #include "thread.hfa"
56
57 void ?{}(io_context_params & this) {
58 this.num_entries = 256;
59 this.num_ready = 256;
60 this.submit_aff = -1;
61 this.eager_submits = false;
62 this.poller_submits = false;
63 this.poll_submit = false;
64 this.poll_complete = false;
65 }
66
67 static void * __io_poller_slow( void * arg );
68
69 // Weirdly, some systems that do support io_uring don't actually define these
70 #ifdef __alpha__
71 /*
72 * alpha is the only exception, all other architectures
73 * have common numbers for new system calls.
74 */
75 #ifndef __NR_io_uring_setup
76 #define __NR_io_uring_setup 535
77 #endif
78 #ifndef __NR_io_uring_enter
79 #define __NR_io_uring_enter 536
80 #endif
81 #ifndef __NR_io_uring_register
82 #define __NR_io_uring_register 537
83 #endif
84 #else /* !__alpha__ */
85 #ifndef __NR_io_uring_setup
86 #define __NR_io_uring_setup 425
87 #endif
88 #ifndef __NR_io_uring_enter
89 #define __NR_io_uring_enter 426
90 #endif
91 #ifndef __NR_io_uring_register
92 #define __NR_io_uring_register 427
93 #endif
94 #endif
95
96//=============================================================================================
97// I/O Startup / Shutdown logic + Master Poller
98//=============================================================================================
99
100 // IO Master poller loop forward
101 static void * iopoll_loop( __attribute__((unused)) void * args );
102
103 static struct {
104 pthread_t thrd; // pthread handle to io poller thread
105 void * stack; // pthread stack for io poller thread
106 int epollfd; // file descriptor to the epoll instance
107 volatile bool run; // Whether or not to continue
108 } iopoll;
109
110 void __kernel_io_startup(void) {
111 __cfaabi_dbg_print_safe( "Kernel : Creating EPOLL instance\n" );
112
113 iopoll.epollfd = epoll_create1(0);
114 if (iopoll.epollfd == -1) {
115 abort( "internal error, epoll_create1\n");
116 }
117
118 __cfaabi_dbg_print_safe( "Kernel : Starting io poller thread\n" );
119
120 iopoll.run = true;
121 iopoll.stack = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
122 }
123
124 void __kernel_io_shutdown(void) {
125 // Notify the io poller thread of the shutdown
126 iopoll.run = false;
127 sigval val = { 1 };
128 pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
129
130 // Wait for the io poller thread to finish
131
132 pthread_join( iopoll.thrd, 0p );
133 free( iopoll.stack );
134
135 int ret = close(iopoll.epollfd);
136 if (ret == -1) {
137 abort( "internal error, close epoll\n");
138 }
139
140 // Io polling is now fully stopped
141
142 __cfaabi_dbg_print_safe( "Kernel : IO poller stopped\n" );
143 }
144
145 static void * iopoll_loop( __attribute__((unused)) void * args ) {
146 __processor_id_t id;
147 id.id = doregister(&id);
148 __cfaabi_dbg_print_safe( "Kernel : IO poller thread starting\n" );
149
150 // Block signals to control when they arrive
151 sigset_t mask;
152 sigfillset(&mask);
153 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
154 abort( "internal error, pthread_sigmask" );
155 }
156
157 sigdelset( &mask, SIGUSR1 );
158
159 // Create sufficient events
160 struct epoll_event events[10];
161 // Main loop
162 while( iopoll.run ) {
163 // Wait for events
164 int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
165
166 // Check if an error occured
167 if (nfds == -1) {
168 if( errno == EINTR ) continue;
169 abort( "internal error, pthread_sigmask" );
170 }
171
172 for(i; nfds) {
173 $io_ctx_thread * io_ctx = ($io_ctx_thread *)(uintptr_t)events[i].data.u64;
174 /* paranoid */ verify( io_ctx );
175 __cfadbg_print_safe(io_core, "Kernel I/O : Unparking io poller %p\n", io_ctx);
176 #if !defined( __CFA_NO_STATISTICS__ )
177 kernelTLS.this_stats = io_ctx->self.curr_cluster->stats;
178 #endif
179 __post( io_ctx->sem, &id );
180 }
181 }
182
183 __cfaabi_dbg_print_safe( "Kernel : IO poller thread stopping\n" );
184 unregister(&id);
185 return 0p;
186 }
187
188//=============================================================================================
189// I/O Context Constrution/Destruction
190//=============================================================================================
191
192 void ?{}($io_ctx_thread & this, struct cluster & cl) { (this.self){ "IO Poller", cl }; }
193 void main( $io_ctx_thread & this );
194 static inline $thread * get_thread( $io_ctx_thread & this ) { return &this.self; }
195 void ^?{}( $io_ctx_thread & mutex this ) {}
196
197 static void __io_create ( __io_data & this, const io_context_params & params_in );
198 static void __io_destroy( __io_data & this );
199
200 void ?{}(io_context & this, struct cluster & cl, const io_context_params & params) {
201 (this.thrd){ cl };
202 this.thrd.ring = malloc();
203 __cfadbg_print_safe(io_core, "Kernel I/O : Creating ring for io_context %p\n", &this);
204 __io_create( *this.thrd.ring, params );
205
206 __cfadbg_print_safe(io_core, "Kernel I/O : Starting poller thread for io_context %p\n", &this);
207 this.thrd.done = false;
208 __thrd_start( this.thrd, main );
209
210 __cfadbg_print_safe(io_core, "Kernel I/O : io_context %p ready\n", &this);
211 }
212
213 void ?{}(io_context & this, struct cluster & cl) {
214 io_context_params params;
215 (this){ cl, params };
216 }
217
218 void ^?{}(io_context & this, bool cluster_context) {
219 __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %p\n", &this);
220
221 // Notify the thread of the shutdown
222 __atomic_store_n(&this.thrd.done, true, __ATOMIC_SEQ_CST);
223
224 // If this is an io_context within a cluster, things get trickier
225 $thread & thrd = this.thrd.self;
226 if( cluster_context ) {
227 cluster & cltr = *thrd.curr_cluster;
228 /* paranoid */ verify( cltr.nprocessors == 0 || &cltr == mainCluster );
229 /* paranoid */ verify( !ready_mutate_islocked() );
230
231 // We need to adjust the clean-up based on where the thread is
232 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) {
233
234 ready_schedule_lock( (struct __processor_id_t *)active_processor() );
235
236 // This is the tricky case
237 // The thread was preempted and now it is on the ready queue
238 // The thread should be the last on the list
239 /* paranoid */ verify( thrd.link.next != 0p );
240
241 // Remove the thread from the ready queue of this cluster
242 __attribute__((unused)) bool removed = remove_head( &cltr, &thrd );
243 /* paranoid */ verify( removed );
244 thrd.link.next = 0p;
245 thrd.link.prev = 0p;
246 __cfaabi_dbg_debug_do( thrd.unpark_stale = true );
247
248 // Fixup the thread state
249 thrd.state = Blocked;
250 thrd.ticket = 0;
251 thrd.preempted = __NO_PREEMPTION;
252
253 ready_schedule_unlock( (struct __processor_id_t *)active_processor() );
254
255 // Pretend like the thread was blocked all along
256 }
257 // !!! This is not an else if !!!
258 if( thrd.state == Blocked ) {
259
260 // This is the "easy case"
261 // The thread is parked and can easily be moved to active cluster
262 verify( thrd.curr_cluster != active_cluster() || thrd.curr_cluster == mainCluster );
263 thrd.curr_cluster = active_cluster();
264
265 // unpark the fast io_poller
266 unpark( &thrd __cfaabi_dbg_ctx2 );
267 }
268 else {
269
270 // The thread is in a weird state
271 // I don't know what to do here
272 abort("io_context poller thread is in unexpected state, cannot clean-up correctly\n");
273 }
274 } else {
275 unpark( &thrd __cfaabi_dbg_ctx2 );
276 }
277
278 ^(this.thrd){};
279 __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %p\n", &this);
280
281 __io_destroy( *this.thrd.ring );
282 __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %p\n", &this);
283
284 free(this.thrd.ring);
285 }
286
287 void ^?{}(io_context & this) {
288 ^(this){ false };
289 }
290
291 static void __io_create( __io_data & this, const io_context_params & params_in ) {
292 // Step 1 : call to setup
293 struct io_uring_params params;
294 memset(&params, 0, sizeof(params));
295 if( params_in.poll_submit ) params.flags |= IORING_SETUP_SQPOLL;
296 if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
297
298 uint32_t nentries = params_in.num_entries;
299
300 int fd = syscall(__NR_io_uring_setup, nentries, &params );
301 if(fd < 0) {
302 abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
303 }
304
305 // Step 2 : mmap result
306 memset( &this, 0, sizeof(struct __io_data) );
307 struct __submition_data & sq = this.submit_q;
308 struct __completion_data & cq = this.completion_q;
309
310 // calculate the right ring size
311 sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned) );
312 cq.ring_sz = params.cq_off.cqes + (params.cq_entries * sizeof(struct io_uring_cqe));
313
314 // Requires features
315 #if defined(IORING_FEAT_SINGLE_MMAP)
316 // adjust the size according to the parameters
317 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
318 cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
319 }
320 #endif
321
322 // mmap the Submit Queue into existence
323 sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
324 if (sq.ring_ptr == (void*)MAP_FAILED) {
325 abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
326 }
327
328 // Requires features
329 #if defined(IORING_FEAT_SINGLE_MMAP)
330 // mmap the Completion Queue into existence (may or may not be needed)
331 if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
332 cq.ring_ptr = sq.ring_ptr;
333 }
334 else
335 #endif
336 {
337 // We need multiple call to MMAP
338 cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
339 if (cq.ring_ptr == (void*)MAP_FAILED) {
340 munmap(sq.ring_ptr, sq.ring_sz);
341 abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
342 }
343 }
344
345 // mmap the submit queue entries
346 size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
347 sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
348 if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
349 munmap(sq.ring_ptr, sq.ring_sz);
350 if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
351 abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
352 }
353
354 // Get the pointers from the kernel to fill the structure
355 // submit queue
356 sq.head = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
357 sq.tail = (volatile uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
358 sq.mask = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
359 sq.num = ( const uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
360 sq.flags = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
361 sq.dropped = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
362 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
363 sq.prev_head = *sq.head;
364
365 {
366 const uint32_t num = *sq.num;
367 for( i; num ) {
368 sq.sqes[i].user_data = 0ul64;
369 }
370 }
371
372 (sq.lock){};
373 (sq.release_lock){};
374
375 if( params_in.poller_submits || params_in.eager_submits ) {
376 /* paranoid */ verify( is_pow2( params_in.num_ready ) || (params_in.num_ready < 8) );
377 sq.ready_cnt = max( params_in.num_ready, 8 );
378 sq.ready = alloc_align( 64, sq.ready_cnt );
379 for(i; sq.ready_cnt) {
380 sq.ready[i] = -1ul32;
381 }
382 }
383 else {
384 sq.ready_cnt = 0;
385 sq.ready = 0p;
386 }
387
388 // completion queue
389 cq.head = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
390 cq.tail = (volatile uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
391 cq.mask = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
392 cq.num = ( const uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
393 cq.overflow = ( uint32_t *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
394 cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
395
396 // some paranoid checks
397 /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask );
398 /* paranoid */ verifyf( (*cq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
399 /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
400 /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
401
402 /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
403 /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
404 /* paranoid */ verifyf( (*sq.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.head );
405 /* paranoid */ verifyf( (*sq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.tail );
406
407 // Update the global ring info
408 this.ring_flags = params.flags;
409 this.fd = fd;
410 this.eager_submits = params_in.eager_submits;
411 this.poller_submits = params_in.poller_submits;
412 }
413
414 static void __io_destroy( __io_data & this ) {
415 // Shutdown the io rings
416 struct __submition_data & sq = this.submit_q;
417 struct __completion_data & cq = this.completion_q;
418
419 // unmap the submit queue entries
420 munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
421
422 // unmap the Submit Queue ring
423 munmap(sq.ring_ptr, sq.ring_sz);
424
425 // unmap the Completion Queue ring, if it is different
426 if (cq.ring_ptr != sq.ring_ptr) {
427 munmap(cq.ring_ptr, cq.ring_sz);
428 }
429
430 // close the file descriptor
431 close(this.fd);
432
433 free( this.submit_q.ready ); // Maybe null, doesn't matter
434 }
435
436//=============================================================================================
437// I/O Context Sleep
438//=============================================================================================
439
440 void __ioctx_register($io_ctx_thread & ctx, struct epoll_event & ev) {
441 ev.events = EPOLLIN | EPOLLONESHOT;
442 ev.data.u64 = (uint64_t)&ctx;
443 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_ADD, ctx.ring->fd, &ev);
444 if (ret < 0) {
445 abort( "KERNEL ERROR: EPOLL ADD - (%d) %s\n", (int)errno, strerror(errno) );
446 }
447 }
448
449 void __ioctx_prepare_block($io_ctx_thread & ctx, struct epoll_event & ev) {
450 int ret = epoll_ctl(iopoll.epollfd, EPOLL_CTL_MOD, ctx.ring->fd, &ev);
451 if (ret < 0) {
452 abort( "KERNEL ERROR: EPOLL REARM - (%d) %s\n", (int)errno, strerror(errno) );
453 }
454 }
455
456//=============================================================================================
457// I/O Context Misc Setup
458//=============================================================================================
459 void register_fixed_files( io_context & ctx, int * files, unsigned count ) {
460 int ret = syscall( __NR_io_uring_register, ctx.thrd.ring->fd, IORING_REGISTER_FILES, files, count );
461 if( ret < 0 ) {
462 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
463 }
464
465 __cfadbg_print_safe( io_core, "Kernel I/O : Performed io_register for %p, returned %d\n", active_thread(), ret );
466 }
467
468 void register_fixed_files( cluster & cltr, int * files, unsigned count ) {
469 for(i; cltr.io.cnt) {
470 register_fixed_files( cltr.io.ctxs[i], files, count );
471 }
472 }
473#endif
Note: See TracBrowser for help on using the repository browser.