source: libcfa/src/concurrency/io/setup.cfa @ 78da4ab

arm-ehjacob/cs343-translationnew-ast-unique-expr
Last change on this file since 78da4ab was 78da4ab, checked in by Thierry Delisle <tdelisle@…>, 8 months ago

New implementation of io based on instance burrowing.
Trying to avoid the unbounded growth of the previous flat combining approach.

  • Property mode set to 100644
File size: 15.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io/setup.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Fri Jul 31 16:25:51 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE         /* See feature_test_macros(7) */
18
19#if defined(__CFA_DEBUG__)
20        // #define __CFA_DEBUG_PRINT_IO__
21        // #define __CFA_DEBUG_PRINT_IO_CORE__
22#endif
23
24#include "io/types.hfa"
25#include "kernel.hfa"
26
27#if !defined(CFA_HAVE_LINUX_IO_URING_H)
28        void __kernel_io_startup() {
29                // Nothing to do without io_uring
30        }
31
32        void __kernel_io_shutdown() {
33                // Nothing to do without io_uring
34        }
35
36        void ?{}(io_context_params & this) {}
37
38        void  ?{}($io_context & this, struct cluster & cl) {}
39        void ^?{}($io_context & this) {}
40
41        $io_arbiter * create(void) { return 0p; }
42        void destroy($io_arbiter *) {}
43
44#else
45        #include <errno.h>
46        #include <stdint.h>
47        #include <string.h>
48        #include <signal.h>
49        #include <unistd.h>
50
51        extern "C" {
52                #include <pthread.h>
53                #include <sys/epoll.h>
54                #include <sys/eventfd.h>
55                #include <sys/mman.h>
56                #include <sys/syscall.h>
57
58                #include <linux/io_uring.h>
59        }
60
61        #include "bitmanip.hfa"
62        #include "kernel_private.hfa"
63        #include "thread.hfa"
64
65        void ?{}(io_context_params & this) {
66                this.num_entries = 256;
67        }
68
69        static void * __io_poller_slow( void * arg );
70
71        // Weirdly, some systems that do support io_uring don't actually define these
72        #ifdef __alpha__
73                /*
74                * alpha is the only exception, all other architectures
75                * have common numbers for new system calls.
76                */
77                #ifndef __NR_io_uring_setup
78                        #define __NR_io_uring_setup           535
79                #endif
80                #ifndef __NR_io_uring_enter
81                        #define __NR_io_uring_enter           536
82                #endif
83                #ifndef __NR_io_uring_register
84                        #define __NR_io_uring_register        537
85                #endif
86        #else /* !__alpha__ */
87                #ifndef __NR_io_uring_setup
88                        #define __NR_io_uring_setup           425
89                #endif
90                #ifndef __NR_io_uring_enter
91                        #define __NR_io_uring_enter           426
92                #endif
93                #ifndef __NR_io_uring_register
94                        #define __NR_io_uring_register        427
95                #endif
96        #endif
97
98//=============================================================================================
99// I/O Startup / Shutdown logic + Master Poller
100//=============================================================================================
101
102        // IO Master poller loop forward
103        static void * iopoll_loop( __attribute__((unused)) void * args );
104
105        static struct {
106                      pthread_t  thrd;    // pthread handle to io poller thread
107                      void *     stack;   // pthread stack for io poller thread
108                      int        epollfd; // file descriptor to the epoll instance
109                volatile     bool run;     // Whether or not to continue
110                volatile     bool stopped; // Whether the poller has finished running
111                volatile uint64_t epoch;   // Epoch used for memory reclamation
112        } iopoll;
113
114        void __kernel_io_startup(void) {
115                __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
116
117                iopoll.epollfd = epoll_create1(0);
118                if (iopoll.epollfd == -1) {
119                        abort( "internal error, epoll_create1\n");
120                }
121
122                __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
123
124                iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
125                iopoll.run     = true;
126                iopoll.stopped = false;
127                iopoll.epoch   = 0;
128        }
129
130        void __kernel_io_shutdown(void) {
131                // Notify the io poller thread of the shutdown
132                iopoll.run = false;
133                sigval val = { 1 };
134                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
135
136                // Wait for the io poller thread to finish
137
138                __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
139
140                int ret = close(iopoll.epollfd);
141                if (ret == -1) {
142                        abort( "internal error, close epoll\n");
143                }
144
145                // Io polling is now fully stopped
146
147                __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
148        }
149
150        static void * iopoll_loop( __attribute__((unused)) void * args ) {
151                __processor_id_t id;
152                id.full_proc = false;
153                id.id = doregister(&id);
154                __cfaabi_tls.this_proc_id = &id;
155                __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
156
157                // Block signals to control when they arrive
158                sigset_t mask;
159                sigfillset(&mask);
160                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
161                abort( "internal error, pthread_sigmask" );
162                }
163
164                sigdelset( &mask, SIGUSR1 );
165
166                // Create sufficient events
167                struct epoll_event events[10];
168                // Main loop
169                while( iopoll.run ) {
170                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
171
172                        // increment the epoch to notify any deleters we are starting a new cycle
173                        __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
174
175                        // Wait for events
176                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
177
178                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
179
180                        // Check if an error occured
181                        if (nfds == -1) {
182                                if( errno == EINTR ) continue;
183                                abort( "internal error, pthread_sigmask" );
184                        }
185
186                        for(i; nfds) {
187                                $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
188                                /* paranoid */ verify( io_ctx );
189                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
190                                #if !defined( __CFA_NO_STATISTICS__ )
191                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
192                                #endif
193
194                                eventfd_t v;
195                                eventfd_read(io_ctx->efd, &v);
196
197                                post( io_ctx->sem );
198                        }
199                }
200
201                __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
202
203                __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
204                unregister(&id);
205                return 0p;
206        }
207
208//=============================================================================================
209// I/O Context Constrution/Destruction
210//=============================================================================================
211
212        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
213        static void __io_uring_teardown( $io_context & this );
214        static void __epoll_register($io_context & ctx);
215        static void __epoll_unregister($io_context & ctx);
216        void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
217        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
218
219        void ?{}($io_context & this, struct cluster & cl) {
220                (this.self){ "IO Poller", cl };
221                this.ext_sq.empty = true;
222                __io_uring_setup( this, cl.io.params );
223                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
224
225                __epoll_register(this);
226
227                __ioarbiter_register(*cl.io.arbiter, this);
228
229                __thrd_start( this, main );
230                __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
231        }
232
233        void ^?{}($io_context & mutex this) {
234                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
235
236                ^(this.self){};
237                __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
238
239                __ioarbiter_unregister(*this.arbiter, this);
240
241                __epoll_unregister(this);
242
243                __io_uring_teardown( this );
244                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
245        }
246
247        void ?{}(io_context & this, struct cluster & cl) {
248                // this.ctx = new(cl);
249                this.ctx = alloc();
250                (*this.ctx){ cl };
251
252                __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
253        }
254
255        void ^?{}(io_context & this) {
256                post( this.ctx->sem );
257
258                delete(this.ctx);
259        }
260
261        extern void __disable_interrupts_hard();
262        extern void __enable_interrupts_hard();
263
264        static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
265                // Step 1 : call to setup
266                struct io_uring_params params;
267                memset(&params, 0, sizeof(params));
268                // if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
269                // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
270
271                __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
272                if( !is_pow2(nentries) ) {
273                        abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
274                }
275
276                int fd = syscall(__NR_io_uring_setup, nentries, &params );
277                if(fd < 0) {
278                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
279                }
280
281                // Step 2 : mmap result
282                struct __sub_ring_t & sq = this.sq;
283                struct __cmp_ring_t & cq = this.cq;
284
285                // calculate the right ring size
286                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
287                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
288
289                // Requires features
290                #if defined(IORING_FEAT_SINGLE_MMAP)
291                        // adjust the size according to the parameters
292                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
293                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
294                        }
295                #endif
296
297                // mmap the Submit Queue into existence
298                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
299                if (sq.ring_ptr == (void*)MAP_FAILED) {
300                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
301                }
302
303                // Requires features
304                #if defined(IORING_FEAT_SINGLE_MMAP)
305                        // mmap the Completion Queue into existence (may or may not be needed)
306                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
307                                cq.ring_ptr = sq.ring_ptr;
308                        }
309                        else
310                #endif
311                {
312                        // We need multiple call to MMAP
313                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
314                        if (cq.ring_ptr == (void*)MAP_FAILED) {
315                                munmap(sq.ring_ptr, sq.ring_sz);
316                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
317                        }
318                }
319
320                // mmap the submit queue entries
321                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
322                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
323                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
324                        munmap(sq.ring_ptr, sq.ring_sz);
325                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
326                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
327                }
328
329                // Step 3 : Initialize the data structure
330                // Get the pointers from the kernel to fill the structure
331                // submit queue
332                sq.kring.head  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
333                sq.kring.tail  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
334                sq.kring.array = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
335                sq.mask        = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
336                sq.num         = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
337                sq.flags       = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
338                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
339
340                sq.kring.ready = 0;
341                sq.kring.released = 0;
342
343                sq.free_ring.head = 0;
344                sq.free_ring.tail = *sq.num;
345                sq.free_ring.array = alloc( *sq.num, 128`align );
346                for(i; (__u32)*sq.num) {
347                        sq.free_ring.array[i] = i;
348                }
349
350                sq.to_submit = 0;
351
352                // completion queue
353                cq.head      = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
354                cq.tail      = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
355                cq.mask      = (   const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
356                cq.num       = (   const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
357                cq.overflow  = (         __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
358                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
359
360                // Step 4 : eventfd
361                // io_uring_register is so f*cking slow on some machine that it
362                // will never succeed if preemption isn't hard blocked
363                __disable_interrupts_hard();
364
365                int efd = eventfd(0, 0);
366                if (efd < 0) {
367                        abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
368                }
369
370                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
371                if (ret < 0) {
372                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
373                }
374
375                __enable_interrupts_hard();
376
377                // some paranoid checks
378                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
379                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
380                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
381                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
382
383                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
384                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
385                /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
386                /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
387
388                // Update the global ring info
389                this.ring_flags = 0;
390                this.fd         = fd;
391                this.efd        = efd;
392        }
393
394        static void __io_uring_teardown( $io_context & this ) {
395                // Shutdown the io rings
396                struct __sub_ring_t & sq = this.sq;
397                struct __cmp_ring_t & cq = this.cq;
398
399                // unmap the submit queue entries
400                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
401
402                // unmap the Submit Queue ring
403                munmap(sq.ring_ptr, sq.ring_sz);
404
405                // unmap the Completion Queue ring, if it is different
406                if (cq.ring_ptr != sq.ring_ptr) {
407                        munmap(cq.ring_ptr, cq.ring_sz);
408                }
409
410                // close the file descriptor
411                close(this.fd);
412                close(this.efd);
413
414                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
415        }
416
417//=============================================================================================
418// I/O Context Sleep
419//=============================================================================================
420        static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
421                struct epoll_event ev;
422                ev.events = EPOLLIN | EPOLLONESHOT;
423                ev.data.u64 = (__u64)&ctx;
424                int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
425                if (ret < 0) {
426                        abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
427                }
428        }
429
430        static void __epoll_register($io_context & ctx) {
431                __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
432        }
433
434        static void __epoll_unregister($io_context & ctx) {
435                // Read the current epoch so we know when to stop
436                size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
437
438                // Remove the fd from the iopoller
439                __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
440
441                // Notify the io poller thread of the shutdown
442                iopoll.run = false;
443                sigval val = { 1 };
444                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
445
446                // Make sure all this is done
447                __atomic_thread_fence(__ATOMIC_SEQ_CST);
448
449                // Wait for the next epoch
450                while(curr == iopoll.epoch && !iopoll.stopped) Pause();
451        }
452
453        void __ioctx_prepare_block($io_context & ctx) {
454                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
455                __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
456        }
457
458
459//=============================================================================================
460// I/O Context Misc Setup
461//=============================================================================================
462        void ?{}( $io_arbiter & this ) {
463                this.pending.flag = false;
464        }
465
466        void ^?{}( $io_arbiter & mutex this ) {
467                /* paranoid */ verify( empty(this.assigned) );
468                /* paranoid */ verify( empty(this.available) );
469                /* paranoid */ verify( is_empty(this.pending.blocked) );
470        }
471
472        $io_arbiter * create(void) {
473                return new();
474        }
475        void destroy($io_arbiter * arbiter) {
476                delete(arbiter);
477        }
478
479//=============================================================================================
480// I/O Context Misc Setup
481//=============================================================================================
482
483#endif
Note: See TracBrowser for help on using the repository browser.