source: libcfa/src/concurrency/io/setup.cfa @ 78da4ab

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 78da4ab was 78da4ab, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

New implementation of io based on instance burrowing.
Trying to avoid the unbounded growth of the previous flat combining approach.

  • Property mode set to 100644
File size: 15.7 KB
RevLine 
[3e2b9c9]1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io/setup.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Fri Jul 31 16:25:51 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE         /* See feature_test_macros(7) */
18
[80444bb]19#if defined(__CFA_DEBUG__)
20        // #define __CFA_DEBUG_PRINT_IO__
21        // #define __CFA_DEBUG_PRINT_IO_CORE__
22#endif
23
[3e2b9c9]24#include "io/types.hfa"
[c44d652]25#include "kernel.hfa"
[3e2b9c9]26
27#if !defined(CFA_HAVE_LINUX_IO_URING_H)
28        void __kernel_io_startup() {
29                // Nothing to do without io_uring
30        }
31
32        void __kernel_io_shutdown() {
33                // Nothing to do without io_uring
34        }
35
[f277633e]36        void ?{}(io_context_params & this) {}
37
[78da4ab]38        void  ?{}($io_context & this, struct cluster & cl) {}
39        void ^?{}($io_context & this) {}
[3e2b9c9]40
[78da4ab]41        $io_arbiter * create(void) { return 0p; }
42        void destroy($io_arbiter *) {}
[b7664a0]43
[3e2b9c9]44#else
45        #include <errno.h>
46        #include <stdint.h>
47        #include <string.h>
48        #include <signal.h>
49        #include <unistd.h>
50
51        extern "C" {
52                #include <pthread.h>
53                #include <sys/epoll.h>
[426f60c]54                #include <sys/eventfd.h>
[3e2b9c9]55                #include <sys/mman.h>
56                #include <sys/syscall.h>
57
58                #include <linux/io_uring.h>
59        }
60
61        #include "bitmanip.hfa"
62        #include "kernel_private.hfa"
63        #include "thread.hfa"
64
65        void ?{}(io_context_params & this) {
66                this.num_entries = 256;
67        }
68
69        static void * __io_poller_slow( void * arg );
70
71        // Weirdly, some systems that do support io_uring don't actually define these
72        #ifdef __alpha__
73                /*
74                * alpha is the only exception, all other architectures
75                * have common numbers for new system calls.
76                */
77                #ifndef __NR_io_uring_setup
78                        #define __NR_io_uring_setup           535
79                #endif
80                #ifndef __NR_io_uring_enter
81                        #define __NR_io_uring_enter           536
82                #endif
83                #ifndef __NR_io_uring_register
84                        #define __NR_io_uring_register        537
85                #endif
86        #else /* !__alpha__ */
87                #ifndef __NR_io_uring_setup
88                        #define __NR_io_uring_setup           425
89                #endif
90                #ifndef __NR_io_uring_enter
91                        #define __NR_io_uring_enter           426
92                #endif
93                #ifndef __NR_io_uring_register
94                        #define __NR_io_uring_register        427
95                #endif
96        #endif
97
98//=============================================================================================
99// I/O Startup / Shutdown logic + Master Poller
100//=============================================================================================
101
102        // IO Master poller loop forward
103        static void * iopoll_loop( __attribute__((unused)) void * args );
104
105        static struct {
[4fc3343]106                      pthread_t  thrd;    // pthread handle to io poller thread
107                      void *     stack;   // pthread stack for io poller thread
108                      int        epollfd; // file descriptor to the epoll instance
109                volatile     bool run;     // Whether or not to continue
110                volatile     bool stopped; // Whether the poller has finished running
111                volatile uint64_t epoch;   // Epoch used for memory reclamation
[3e2b9c9]112        } iopoll;
113
114        void __kernel_io_startup(void) {
[80444bb]115                __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
[3e2b9c9]116
117                iopoll.epollfd = epoll_create1(0);
118                if (iopoll.epollfd == -1) {
119                        abort( "internal error, epoll_create1\n");
120                }
121
[80444bb]122                __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
[3e2b9c9]123
[4fc3343]124                iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
125                iopoll.run     = true;
126                iopoll.stopped = false;
127                iopoll.epoch   = 0;
[3e2b9c9]128        }
129
130        void __kernel_io_shutdown(void) {
131                // Notify the io poller thread of the shutdown
132                iopoll.run = false;
133                sigval val = { 1 };
134                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
135
136                // Wait for the io poller thread to finish
137
[bfcf6b9]138                __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
[3e2b9c9]139
140                int ret = close(iopoll.epollfd);
141                if (ret == -1) {
142                        abort( "internal error, close epoll\n");
143                }
144
145                // Io polling is now fully stopped
146
[80444bb]147                __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
[3e2b9c9]148        }
149
150        static void * iopoll_loop( __attribute__((unused)) void * args ) {
151                __processor_id_t id;
[58d64a4]152                id.full_proc = false;
[3e2b9c9]153                id.id = doregister(&id);
[8fc652e0]154                __cfaabi_tls.this_proc_id = &id;
[80444bb]155                __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
[3e2b9c9]156
157                // Block signals to control when they arrive
158                sigset_t mask;
159                sigfillset(&mask);
160                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
161                abort( "internal error, pthread_sigmask" );
162                }
163
164                sigdelset( &mask, SIGUSR1 );
165
166                // Create sufficient events
167                struct epoll_event events[10];
168                // Main loop
169                while( iopoll.run ) {
[ece0e80]170                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
171
[d611995]172                        // increment the epoch to notify any deleters we are starting a new cycle
173                        __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
174
[3e2b9c9]175                        // Wait for events
176                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
177
[ece0e80]178                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
179
[3e2b9c9]180                        // Check if an error occured
181                        if (nfds == -1) {
182                                if( errno == EINTR ) continue;
183                                abort( "internal error, pthread_sigmask" );
184                        }
185
186                        for(i; nfds) {
[78da4ab]187                                $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
[3e2b9c9]188                                /* paranoid */ verify( io_ctx );
[78da4ab]189                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
[3e2b9c9]190                                #if !defined( __CFA_NO_STATISTICS__ )
[8fc652e0]191                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
[3e2b9c9]192                                #endif
[d48b174]193
194                                eventfd_t v;
[78da4ab]195                                eventfd_read(io_ctx->efd, &v);
[d48b174]196
[e873838]197                                post( io_ctx->sem );
[3e2b9c9]198                        }
199                }
200
[4fc3343]201                __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
202
[80444bb]203                __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
[3e2b9c9]204                unregister(&id);
205                return 0p;
206        }
207
208//=============================================================================================
209// I/O Context Constrution/Destruction
210//=============================================================================================
211
[78da4ab]212        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
213        static void __io_uring_teardown( $io_context & this );
214        static void __epoll_register($io_context & ctx);
215        static void __epoll_unregister($io_context & ctx);
216        void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
217        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
[3e2b9c9]218
[78da4ab]219        void ?{}($io_context & this, struct cluster & cl) {
220                (this.self){ "IO Poller", cl };
221                this.ext_sq.empty = true;
222                __io_uring_setup( this, cl.io.params );
223                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
[3e2b9c9]224
[78da4ab]225                __epoll_register(this);
[3e2b9c9]226
[78da4ab]227                __ioarbiter_register(*cl.io.arbiter, this);
[3e2b9c9]228
[78da4ab]229                __thrd_start( this, main );
230                __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
[3e2b9c9]231        }
232
[78da4ab]233        void ^?{}($io_context & mutex this) {
234                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
[3e2b9c9]235
[78da4ab]236                ^(this.self){};
237                __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
[ece0e80]238
[78da4ab]239                __ioarbiter_unregister(*this.arbiter, this);
[3e2b9c9]240
[78da4ab]241                __epoll_unregister(this);
[3e2b9c9]242
[78da4ab]243                __io_uring_teardown( this );
244                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
245        }
[3e2b9c9]246
[78da4ab]247        void ?{}(io_context & this, struct cluster & cl) {
248                // this.ctx = new(cl);
249                this.ctx = alloc();
250                (*this.ctx){ cl };
251
252                __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
[3e2b9c9]253        }
254
255        void ^?{}(io_context & this) {
[78da4ab]256                post( this.ctx->sem );
257
258                delete(this.ctx);
[3e2b9c9]259        }
260
[7222630]261        extern void __disable_interrupts_hard();
262        extern void __enable_interrupts_hard();
[bb58825]263
[78da4ab]264        static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
[3e2b9c9]265                // Step 1 : call to setup
266                struct io_uring_params params;
267                memset(&params, 0, sizeof(params));
[78da4ab]268                // if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
269                // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
[3e2b9c9]270
[4998155]271                __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
[63fe427c]272                if( !is_pow2(nentries) ) {
273                        abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
274                }
[3e2b9c9]275
276                int fd = syscall(__NR_io_uring_setup, nentries, &params );
277                if(fd < 0) {
278                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
279                }
280
281                // Step 2 : mmap result
[78da4ab]282                struct __sub_ring_t & sq = this.sq;
283                struct __cmp_ring_t & cq = this.cq;
[3e2b9c9]284
285                // calculate the right ring size
286                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
287                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
288
289                // Requires features
290                #if defined(IORING_FEAT_SINGLE_MMAP)
291                        // adjust the size according to the parameters
292                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
293                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
294                        }
295                #endif
296
297                // mmap the Submit Queue into existence
298                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
299                if (sq.ring_ptr == (void*)MAP_FAILED) {
300                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
301                }
302
303                // Requires features
304                #if defined(IORING_FEAT_SINGLE_MMAP)
305                        // mmap the Completion Queue into existence (may or may not be needed)
306                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
307                                cq.ring_ptr = sq.ring_ptr;
308                        }
309                        else
310                #endif
311                {
312                        // We need multiple call to MMAP
313                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
314                        if (cq.ring_ptr == (void*)MAP_FAILED) {
315                                munmap(sq.ring_ptr, sq.ring_sz);
316                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
317                        }
318                }
319
320                // mmap the submit queue entries
321                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
322                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
323                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
324                        munmap(sq.ring_ptr, sq.ring_sz);
325                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
326                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
327                }
328
[426f60c]329                // Step 3 : Initialize the data structure
[3e2b9c9]330                // Get the pointers from the kernel to fill the structure
331                // submit queue
[78da4ab]332                sq.kring.head  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
333                sq.kring.tail  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
334                sq.kring.array = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
335                sq.mask        = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
336                sq.num         = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
337                sq.flags       = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
338                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
339
340                sq.kring.ready = 0;
341                sq.kring.released = 0;
342
343                sq.free_ring.head = 0;
344                sq.free_ring.tail = *sq.num;
345                sq.free_ring.array = alloc( *sq.num, 128`align );
346                for(i; (__u32)*sq.num) {
347                        sq.free_ring.array[i] = i;
[3e2b9c9]348                }
349
[78da4ab]350                sq.to_submit = 0;
[3e2b9c9]351
352                // completion queue
[4998155]353                cq.head      = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
354                cq.tail      = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
355                cq.mask      = (   const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
356                cq.num       = (   const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
357                cq.overflow  = (         __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
358                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
[3e2b9c9]359
[426f60c]360                // Step 4 : eventfd
[bb58825]361                // io_uring_register is so f*cking slow on some machine that it
362                // will never succeed if preemption isn't hard blocked
[7222630]363                __disable_interrupts_hard();
[bb58825]364
365                int efd = eventfd(0, 0);
366                if (efd < 0) {
367                        abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
[426f60c]368                }
369
[bb58825]370                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
371                if (ret < 0) {
372                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
[426f60c]373                }
374
[7222630]375                __enable_interrupts_hard();
[bb58825]376
[3e2b9c9]377                // some paranoid checks
378                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
379                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
380                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
381                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
382
383                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
384                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
[78da4ab]385                /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
386                /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
[3e2b9c9]387
388                // Update the global ring info
[78da4ab]389                this.ring_flags = 0;
[3e2b9c9]390                this.fd         = fd;
[426f60c]391                this.efd        = efd;
[3e2b9c9]392        }
393
[78da4ab]394        static void __io_uring_teardown( $io_context & this ) {
[3e2b9c9]395                // Shutdown the io rings
[78da4ab]396                struct __sub_ring_t & sq = this.sq;
397                struct __cmp_ring_t & cq = this.cq;
[3e2b9c9]398
399                // unmap the submit queue entries
400                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
401
402                // unmap the Submit Queue ring
403                munmap(sq.ring_ptr, sq.ring_sz);
404
405                // unmap the Completion Queue ring, if it is different
406                if (cq.ring_ptr != sq.ring_ptr) {
407                        munmap(cq.ring_ptr, cq.ring_sz);
408                }
409
410                // close the file descriptor
411                close(this.fd);
[426f60c]412                close(this.efd);
[3e2b9c9]413
[78da4ab]414                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
[3e2b9c9]415        }
416
417//=============================================================================================
418// I/O Context Sleep
419//=============================================================================================
[78da4ab]420        static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
[d48b174]421                struct epoll_event ev;
[d611995]422                ev.events = EPOLLIN | EPOLLONESHOT;
[4998155]423                ev.data.u64 = (__u64)&ctx;
[78da4ab]424                int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
[3e2b9c9]425                if (ret < 0) {
[d48b174]426                        abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
[3e2b9c9]427                }
428        }
429
[78da4ab]430        static void __epoll_register($io_context & ctx) {
431                __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
[3e2b9c9]432        }
433
[78da4ab]434        static void __epoll_unregister($io_context & ctx) {
[d611995]435                // Read the current epoch so we know when to stop
436                size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
437
438                // Remove the fd from the iopoller
[78da4ab]439                __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
[d611995]440
441                // Notify the io poller thread of the shutdown
442                iopoll.run = false;
443                sigval val = { 1 };
444                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
445
446                // Make sure all this is done
447                __atomic_thread_fence(__ATOMIC_SEQ_CST);
448
449                // Wait for the next epoch
[4fc3343]450                while(curr == iopoll.epoch && !iopoll.stopped) Pause();
[d611995]451        }
452
[78da4ab]453        void __ioctx_prepare_block($io_context & ctx) {
454                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
455                __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
456        }
457
458
[3e2b9c9]459//=============================================================================================
460// I/O Context Misc Setup
461//=============================================================================================
[78da4ab]462        void ?{}( $io_arbiter & this ) {
463                this.pending.flag = false;
464        }
[3e2b9c9]465
[78da4ab]466        void ^?{}( $io_arbiter & mutex this ) {
467                /* paranoid */ verify( empty(this.assigned) );
468                /* paranoid */ verify( empty(this.available) );
469                /* paranoid */ verify( is_empty(this.pending.blocked) );
[3e2b9c9]470        }
471
[78da4ab]472        $io_arbiter * create(void) {
473                return new();
[3e2b9c9]474        }
[78da4ab]475        void destroy($io_arbiter * arbiter) {
476                delete(arbiter);
477        }
478
479//=============================================================================================
480// I/O Context Misc Setup
481//=============================================================================================
482
[c44d652]483#endif
Note: See TracBrowser for help on using the repository browser.