source: libcfa/src/concurrency/io/setup.cfa @ 150d21a

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 150d21a was 4c4d854, checked in by Thierry Delisle <tdelisle@…>, 3 years ago

Fixed broken initialization and add verify.

  • Property mode set to 100644
File size: 15.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io/setup.cfa --
8//
9// Author           : Thierry Delisle
10// Created On       : Fri Jul 31 16:25:51 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE         /* See feature_test_macros(7) */
18
19#if defined(__CFA_DEBUG__)
20        // #define __CFA_DEBUG_PRINT_IO__
21        // #define __CFA_DEBUG_PRINT_IO_CORE__
22#endif
23
24#include "io/types.hfa"
25#include "kernel.hfa"
26
27#if !defined(CFA_HAVE_LINUX_IO_URING_H)
28        void __kernel_io_startup() {
29                // Nothing to do without io_uring
30        }
31
32        void __kernel_io_shutdown() {
33                // Nothing to do without io_uring
34        }
35
36        void ?{}(io_context_params & this) {}
37
38        void  ?{}($io_context & this, struct cluster & cl) {}
39        void ^?{}($io_context & this) {}
40
41        $io_arbiter * create(void) { return 0p; }
42        void destroy($io_arbiter *) {}
43
44#else
45        #include <errno.h>
46        #include <stdint.h>
47        #include <string.h>
48        #include <signal.h>
49        #include <unistd.h>
50
51        extern "C" {
52                #include <pthread.h>
53                #include <sys/epoll.h>
54                #include <sys/eventfd.h>
55                #include <sys/mman.h>
56                #include <sys/syscall.h>
57
58                #include <linux/io_uring.h>
59        }
60
61        #include "bitmanip.hfa"
62        #include "kernel_private.hfa"
63        #include "thread.hfa"
64
65        void ?{}(io_context_params & this) {
66                this.num_entries = 256;
67        }
68
69        static void * __io_poller_slow( void * arg );
70
71        // Weirdly, some systems that do support io_uring don't actually define these
72        #ifdef __alpha__
73                /*
74                * alpha is the only exception, all other architectures
75                * have common numbers for new system calls.
76                */
77                #ifndef __NR_io_uring_setup
78                        #define __NR_io_uring_setup           535
79                #endif
80                #ifndef __NR_io_uring_enter
81                        #define __NR_io_uring_enter           536
82                #endif
83                #ifndef __NR_io_uring_register
84                        #define __NR_io_uring_register        537
85                #endif
86        #else /* !__alpha__ */
87                #ifndef __NR_io_uring_setup
88                        #define __NR_io_uring_setup           425
89                #endif
90                #ifndef __NR_io_uring_enter
91                        #define __NR_io_uring_enter           426
92                #endif
93                #ifndef __NR_io_uring_register
94                        #define __NR_io_uring_register        427
95                #endif
96        #endif
97
98//=============================================================================================
99// I/O Startup / Shutdown logic + Master Poller
100//=============================================================================================
101
102        // IO Master poller loop forward
103        static void * iopoll_loop( __attribute__((unused)) void * args );
104
105        static struct {
106                      pthread_t  thrd;    // pthread handle to io poller thread
107                      void *     stack;   // pthread stack for io poller thread
108                      int        epollfd; // file descriptor to the epoll instance
109                volatile     bool run;     // Whether or not to continue
110                volatile     bool stopped; // Whether the poller has finished running
111                volatile uint64_t epoch;   // Epoch used for memory reclamation
112        } iopoll;
113
114        void __kernel_io_startup(void) {
115                __cfadbg_print_safe(io_core, "Kernel : Creating EPOLL instance\n" );
116
117                iopoll.epollfd = epoll_create1(0);
118                if (iopoll.epollfd == -1) {
119                        abort( "internal error, epoll_create1\n");
120                }
121
122                __cfadbg_print_safe(io_core, "Kernel : Starting io poller thread\n" );
123
124                iopoll.stack   = __create_pthread( &iopoll.thrd, iopoll_loop, 0p );
125                iopoll.run     = true;
126                iopoll.stopped = false;
127                iopoll.epoch   = 0;
128        }
129
130        void __kernel_io_shutdown(void) {
131                // Notify the io poller thread of the shutdown
132                iopoll.run = false;
133                sigval val = { 1 };
134                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
135
136                // Wait for the io poller thread to finish
137
138                __destroy_pthread( iopoll.thrd, iopoll.stack, 0p );
139
140                int ret = close(iopoll.epollfd);
141                if (ret == -1) {
142                        abort( "internal error, close epoll\n");
143                }
144
145                // Io polling is now fully stopped
146
147                __cfadbg_print_safe(io_core, "Kernel : IO poller stopped\n" );
148        }
149
150        static void * iopoll_loop( __attribute__((unused)) void * args ) {
151                __processor_id_t id;
152                id.full_proc = false;
153                id.id = doregister(&id);
154                __cfaabi_tls.this_proc_id = &id;
155                __cfadbg_print_safe(io_core, "Kernel : IO poller thread starting\n" );
156
157                // Block signals to control when they arrive
158                sigset_t mask;
159                sigfillset(&mask);
160                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
161                abort( "internal error, pthread_sigmask" );
162                }
163
164                sigdelset( &mask, SIGUSR1 );
165
166                // Create sufficient events
167                struct epoll_event events[10];
168                // Main loop
169                while( iopoll.run ) {
170                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : waiting on io_uring contexts\n");
171
172                        // increment the epoch to notify any deleters we are starting a new cycle
173                        __atomic_fetch_add(&iopoll.epoch, 1, __ATOMIC_SEQ_CST);
174
175                        // Wait for events
176                        int nfds = epoll_pwait( iopoll.epollfd, events, 10, -1, &mask );
177
178                        __cfadbg_print_safe(io_core, "Kernel I/O - epoll : %d io contexts events, waking up\n", nfds);
179
180                        // Check if an error occured
181                        if (nfds == -1) {
182                                if( errno == EINTR ) continue;
183                                abort( "internal error, pthread_sigmask" );
184                        }
185
186                        for(i; nfds) {
187                                $io_context * io_ctx = ($io_context *)(uintptr_t)events[i].data.u64;
188                                /* paranoid */ verify( io_ctx );
189                                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Unparking io poller %d (%p)\n", io_ctx->fd, io_ctx);
190                                #if !defined( __CFA_NO_STATISTICS__ )
191                                        __cfaabi_tls.this_stats = io_ctx->self.curr_cluster->stats;
192                                #endif
193
194                                eventfd_t v;
195                                eventfd_read(io_ctx->efd, &v);
196
197                                post( io_ctx->sem );
198                        }
199                }
200
201                __atomic_store_n(&iopoll.stopped, true, __ATOMIC_SEQ_CST);
202
203                __cfadbg_print_safe(io_core, "Kernel : IO poller thread stopping\n" );
204                unregister(&id);
205                return 0p;
206        }
207
208//=============================================================================================
209// I/O Context Constrution/Destruction
210//=============================================================================================
211
212        static void __io_uring_setup ( $io_context & this, const io_context_params & params_in );
213        static void __io_uring_teardown( $io_context & this );
214        static void __epoll_register($io_context & ctx);
215        static void __epoll_unregister($io_context & ctx);
216        void __ioarbiter_register( $io_arbiter & mutex, $io_context & ctx );
217        void __ioarbiter_unregister( $io_arbiter & mutex, $io_context & ctx );
218
219        void ?{}($io_context & this, struct cluster & cl) {
220                (this.self){ "IO Poller", cl };
221                this.ext_sq.empty = true;
222                this.revoked = true;
223                __io_uring_setup( this, cl.io.params );
224                __cfadbg_print_safe(io_core, "Kernel I/O : Created ring for io_context %u (%p)\n", this.fd, &this);
225
226                __epoll_register(this);
227
228                __ioarbiter_register(*cl.io.arbiter, this);
229
230                __thrd_start( this, main );
231                __cfadbg_print_safe(io_core, "Kernel I/O : Started poller thread for io_context %u\n", this.fd);
232        }
233
234        void ^?{}($io_context & mutex this) {
235                __cfadbg_print_safe(io_core, "Kernel I/O : tearing down io_context %u\n", this.fd);
236
237                ^(this.self){};
238                __cfadbg_print_safe(io_core, "Kernel I/O : Stopped poller thread for io_context %u\n", this.fd);
239
240                __ioarbiter_unregister(*this.arbiter, this);
241
242                __epoll_unregister(this);
243
244                __io_uring_teardown( this );
245                __cfadbg_print_safe(io_core, "Kernel I/O : Destroyed ring for io_context %u\n", this.fd);
246        }
247
248        void ?{}(io_context & this, struct cluster & cl) {
249                // this.ctx = new(cl);
250                this.ctx = alloc();
251                (*this.ctx){ cl };
252
253                __cfadbg_print_safe(io_core, "Kernel I/O : io_context %u ready\n", this.ctx->fd);
254        }
255
256        void ^?{}(io_context & this) {
257                post( this.ctx->sem );
258
259                delete(this.ctx);
260        }
261
262        extern void __disable_interrupts_hard();
263        extern void __enable_interrupts_hard();
264
265        static void __io_uring_setup( $io_context & this, const io_context_params & params_in ) {
266                // Step 1 : call to setup
267                struct io_uring_params params;
268                memset(&params, 0, sizeof(params));
269                // if( params_in.poll_submit   ) params.flags |= IORING_SETUP_SQPOLL;
270                // if( params_in.poll_complete ) params.flags |= IORING_SETUP_IOPOLL;
271
272                __u32 nentries = params_in.num_entries != 0 ? params_in.num_entries : 256;
273                if( !is_pow2(nentries) ) {
274                        abort("ERROR: I/O setup 'num_entries' must be a power of 2\n");
275                }
276
277                int fd = syscall(__NR_io_uring_setup, nentries, &params );
278                if(fd < 0) {
279                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
280                }
281
282                // Step 2 : mmap result
283                struct __sub_ring_t & sq = this.sq;
284                struct __cmp_ring_t & cq = this.cq;
285
286                // calculate the right ring size
287                sq.ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
288                cq.ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
289
290                // Requires features
291                #if defined(IORING_FEAT_SINGLE_MMAP)
292                        // adjust the size according to the parameters
293                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
294                                cq.ring_sz = sq.ring_sz = max(cq.ring_sz, sq.ring_sz);
295                        }
296                #endif
297
298                // mmap the Submit Queue into existence
299                sq.ring_ptr = mmap(0, sq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
300                if (sq.ring_ptr == (void*)MAP_FAILED) {
301                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
302                }
303
304                // Requires features
305                #if defined(IORING_FEAT_SINGLE_MMAP)
306                        // mmap the Completion Queue into existence (may or may not be needed)
307                        if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
308                                cq.ring_ptr = sq.ring_ptr;
309                        }
310                        else
311                #endif
312                {
313                        // We need multiple call to MMAP
314                        cq.ring_ptr = mmap(0, cq.ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
315                        if (cq.ring_ptr == (void*)MAP_FAILED) {
316                                munmap(sq.ring_ptr, sq.ring_sz);
317                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
318                        }
319                }
320
321                // mmap the submit queue entries
322                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
323                sq.sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
324                if (sq.sqes == (struct io_uring_sqe *)MAP_FAILED) {
325                        munmap(sq.ring_ptr, sq.ring_sz);
326                        if (cq.ring_ptr != sq.ring_ptr) munmap(cq.ring_ptr, cq.ring_sz);
327                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
328                }
329
330                // Step 3 : Initialize the data structure
331                // Get the pointers from the kernel to fill the structure
332                // submit queue
333                sq.kring.head  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.head);
334                sq.kring.tail  = (volatile __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.tail);
335                sq.kring.array = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.array);
336                sq.mask        = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_mask);
337                sq.num         = (   const __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.ring_entries);
338                sq.flags       = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.flags);
339                sq.dropped     = (         __u32 *)(((intptr_t)sq.ring_ptr) + params.sq_off.dropped);
340
341                sq.kring.ready = 0;
342                sq.kring.released = 0;
343
344                sq.free_ring.head = 0;
345                sq.free_ring.tail = *sq.num;
346                sq.free_ring.array = alloc( *sq.num, 128`align );
347                for(i; (__u32)*sq.num) {
348                        sq.free_ring.array[i] = i;
349                }
350
351                sq.to_submit = 0;
352
353                // completion queue
354                cq.head      = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.head);
355                cq.tail      = (volatile __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.tail);
356                cq.mask      = (   const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_mask);
357                cq.num       = (   const __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.ring_entries);
358                cq.overflow  = (         __u32 *)(((intptr_t)cq.ring_ptr) + params.cq_off.overflow);
359                cq.cqes = (struct io_uring_cqe *)(((intptr_t)cq.ring_ptr) + params.cq_off.cqes);
360
361                // Step 4 : eventfd
362                // io_uring_register is so f*cking slow on some machine that it
363                // will never succeed if preemption isn't hard blocked
364                __disable_interrupts_hard();
365
366                int efd = eventfd(0, 0);
367                if (efd < 0) {
368                        abort("KERNEL ERROR: IO_URING EVENTFD - %s\n", strerror(errno));
369                }
370
371                int ret = syscall( __NR_io_uring_register, fd, IORING_REGISTER_EVENTFD, &efd, 1);
372                if (ret < 0) {
373                        abort("KERNEL ERROR: IO_URING EVENTFD REGISTER - %s\n", strerror(errno));
374                }
375
376                __enable_interrupts_hard();
377
378                // some paranoid checks
379                /* paranoid */ verifyf( (*cq.mask) == ((*cq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*cq.num) - 1ul32, *cq.num, *cq.mask  );
380                /* paranoid */ verifyf( (*cq.num)  >= nentries, "IO_URING Expected %u entries, got %u", nentries, *cq.num );
381                /* paranoid */ verifyf( (*cq.head) == 0, "IO_URING Expected head to be 0, got %u", *cq.head );
382                /* paranoid */ verifyf( (*cq.tail) == 0, "IO_URING Expected tail to be 0, got %u", *cq.tail );
383
384                /* paranoid */ verifyf( (*sq.mask) == ((*sq.num) - 1ul32), "IO_URING Expected mask to be %u (%u entries), was %u", (*sq.num) - 1ul32, *sq.num, *sq.mask );
385                /* paranoid */ verifyf( (*sq.num) >= nentries, "IO_URING Expected %u entries, got %u", nentries, *sq.num );
386                /* paranoid */ verifyf( (*sq.kring.head) == 0, "IO_URING Expected head to be 0, got %u", *sq.kring.head );
387                /* paranoid */ verifyf( (*sq.kring.tail) == 0, "IO_URING Expected tail to be 0, got %u", *sq.kring.tail );
388
389                // Update the global ring info
390                this.ring_flags = 0;
391                this.fd         = fd;
392                this.efd        = efd;
393        }
394
395        static void __io_uring_teardown( $io_context & this ) {
396                // Shutdown the io rings
397                struct __sub_ring_t & sq = this.sq;
398                struct __cmp_ring_t & cq = this.cq;
399
400                // unmap the submit queue entries
401                munmap(sq.sqes, (*sq.num) * sizeof(struct io_uring_sqe));
402
403                // unmap the Submit Queue ring
404                munmap(sq.ring_ptr, sq.ring_sz);
405
406                // unmap the Completion Queue ring, if it is different
407                if (cq.ring_ptr != sq.ring_ptr) {
408                        munmap(cq.ring_ptr, cq.ring_sz);
409                }
410
411                // close the file descriptor
412                close(this.fd);
413                close(this.efd);
414
415                free( this.sq.free_ring.array ); // Maybe null, doesn't matter
416        }
417
418//=============================================================================================
419// I/O Context Sleep
420//=============================================================================================
421        static inline void __epoll_ctl($io_context & ctx, int op, const char * error) {
422                struct epoll_event ev;
423                ev.events = EPOLLIN | EPOLLONESHOT;
424                ev.data.u64 = (__u64)&ctx;
425                int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);
426                if (ret < 0) {
427                        abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );
428                }
429        }
430
431        static void __epoll_register($io_context & ctx) {
432                __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");
433        }
434
435        static void __epoll_unregister($io_context & ctx) {
436                // Read the current epoch so we know when to stop
437                size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);
438
439                // Remove the fd from the iopoller
440                __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");
441
442                // Notify the io poller thread of the shutdown
443                iopoll.run = false;
444                sigval val = { 1 };
445                pthread_sigqueue( iopoll.thrd, SIGUSR1, val );
446
447                // Make sure all this is done
448                __atomic_thread_fence(__ATOMIC_SEQ_CST);
449
450                // Wait for the next epoch
451                while(curr == iopoll.epoch && !iopoll.stopped) Pause();
452        }
453
454        void __ioctx_prepare_block($io_context & ctx) {
455                __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);
456                __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");
457        }
458
459
460//=============================================================================================
461// I/O Context Misc Setup
462//=============================================================================================
463        void ?{}( $io_arbiter & this ) {
464                this.pending.flag = false;
465        }
466
467        void ^?{}( $io_arbiter & mutex this ) {
468                /* paranoid */ verify( empty(this.assigned) );
469                /* paranoid */ verify( empty(this.available) );
470                /* paranoid */ verify( is_empty(this.pending.blocked) );
471        }
472
473        $io_arbiter * create(void) {
474                return new();
475        }
476        void destroy($io_arbiter * arbiter) {
477                delete(arbiter);
478        }
479
480//=============================================================================================
481// I/O Context Misc Setup
482//=============================================================================================
483
484#endif
Note: See TracBrowser for help on using the repository browser.