source: libcfa/src/concurrency/io.cfa @ 92976d9

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 92976d9 was 92976d9, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Implemented basic io_uring setup and poller

  • Property mode set to 100644
File size: 8.6 KB
Line 
1#include "kernel.hfa"
2
3#if !defined(HAVE_LINUX_IO_URING_H)
4        void __kernel_io_startup( cluster & this ) {
5                // Nothing to do without io_uring
6        }
7
8        void __kernel_io_shutdown( cluster & this ) {
9                // Nothing to do without io_uring
10        }
11
12        bool is_async( void (*)() ) {
13                return false;
14        }
15
16#else
17        extern "C" {
18                #define _GNU_SOURCE         /* See feature_test_macros(7) */
19                #include <errno.h>
20                #include <stdint.h>
21                #include <string.h>
22                #include <unistd.h>
23                #include <sys/mman.h>
24                #include <sys/syscall.h>
25
26                #include <linux/io_uring.h>
27        }
28
29        #include "bits/signal.hfa"
30        #include "kernel_private.hfa"
31        #include "thread.hfa"
32
33        uint32_t entries_per_cluster() {
34                return 256;
35        }
36
37        static void * __io_poller( void * arg );
38
39//=============================================================================================
40// I/O Startup / Shutdown logic
41//=============================================================================================
42        void __kernel_io_startup( cluster & this ) {
43                // Step 1 : call to setup
44                struct io_uring_params params;
45                memset(&params, 0, sizeof(params));
46
47                int fd = syscall(__NR_io_uring_setup, entries_per_cluster(), &params );
48                if(fd < 0) {
49                        abort("KERNEL ERROR: IO_URING SETUP - %s\n", strerror(errno));
50                }
51
52                // Step 2 : mmap result
53                memset(&this.io, 0, sizeof(struct io_ring));
54                struct io_uring_sq * sq = &this.io.submit_q;
55                struct io_uring_cq * cq = &this.io.completion_q;
56
57                // calculate the right ring size
58                sq->ring_sz = params.sq_off.array + (params.sq_entries * sizeof(unsigned)           );
59                cq->ring_sz = params.cq_off.cqes  + (params.cq_entries * sizeof(struct io_uring_cqe));
60
61                // Requires features
62                // // adjust the size according to the parameters
63                // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
64                //      cq->ring_sz = sq->ring_sz = max(cq->ring_sz, sq->ring_sz);
65                // }
66
67                // mmap the Submit Queue into existence
68                sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
69                if (sq->ring_ptr == (void*)MAP_FAILED) {
70                        abort("KERNEL ERROR: IO_URING MMAP1 - %s\n", strerror(errno));
71                }
72
73                // mmap the Completion Queue into existence (may or may not be needed)
74                // Requires features
75                // if ((params.features & IORING_FEAT_SINGLE_MMAP) != 0) {
76                //      cq->ring_ptr = sq->ring_ptr;
77                // }
78                // else {
79                        // We need multiple call to MMAP
80                        cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
81                        if (cq->ring_ptr == (void*)MAP_FAILED) {
82                                munmap(sq->ring_ptr, sq->ring_sz);
83                                abort("KERNEL ERROR: IO_URING MMAP2 - %s\n", strerror(errno));
84                        }
85                // }
86
87                // mmap the submit queue entries
88                size_t size = params.sq_entries * sizeof(struct io_uring_sqe);
89                sq->sqes = (struct io_uring_sqe *)mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
90                if (sq->sqes == (struct io_uring_sqe *)MAP_FAILED) {
91                        munmap(sq->ring_ptr, sq->ring_sz);
92                        if (cq->ring_ptr != sq->ring_ptr) munmap(cq->ring_ptr, cq->ring_sz);
93                        abort("KERNEL ERROR: IO_URING MMAP3 - %s\n", strerror(errno));
94                }
95
96                // Get the pointers from the kernel to fill the structure
97                // submit queue
98                sq->head    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.head;
99                sq->tail    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.tail;
100                sq->mask    = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_mask;
101                sq->entries = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.ring_entries;
102                sq->flags   = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.flags;
103                sq->dropped = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.dropped;
104                sq->array   = (uint32_t *)((intptr_t)sq->ring_ptr) + params.sq_off.array;
105
106                // completion queue
107                cq->head     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.head;
108                cq->tail     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.tail;
109                cq->mask     = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_mask;
110                cq->entries  = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.ring_entries;
111                cq->overflow = (uint32_t *)((intptr_t)cq->ring_ptr) + params.cq_off.overflow;
112                cq->cqes = (struct io_uring_cqe *)((intptr_t)cq->ring_ptr) + params.cq_off.cqes;
113
114                // Update the global ring info
115                this.io.flags = params.flags;
116                this.io.fd    = fd;
117                this.io.done  = false;
118
119                // Create the poller thread
120                this.io.stack = __create_pthread( &this.io.poller, __io_poller, &this );
121        }
122
123        void __kernel_io_shutdown( cluster & this ) {
124                // Stop the IO Poller
125                // Notify the poller thread of the shutdown
126                __atomic_store_n(&this.io.done, true, __ATOMIC_SEQ_CST);
127                sigval val = { 1 };
128                pthread_sigqueue( this.io.poller, SIGUSR1, val );
129
130                // Wait for the poller thread to finish
131                pthread_join( this.io.poller, 0p );
132                free( this.io.stack );
133
134                // Shutdown the io rings
135                struct io_uring_sq & sq = this.io.submit_q;
136                struct io_uring_cq & cq = this.io.completion_q;
137
138                // unmap the submit queue entries
139                munmap(sq.sqes, *sq.entries * sizeof(struct io_uring_sqe));
140
141                // unmap the Submit Queue ring
142                munmap(sq.ring_ptr, sq.ring_sz);
143
144                // unmap the Completion Queue ring, if it is different
145                if (cq.ring_ptr != sq.ring_ptr) {
146                        munmap(cq.ring_ptr, cq.ring_sz);
147                }
148
149                // close the file descriptor
150                close(this.io.fd);
151        }
152
153//=============================================================================================
154// I/O Polling
155//=============================================================================================
156        struct io_user_data {
157                int32_t result;
158                $thread * thrd;
159        };
160
161        // Process a single completion message from the io_uring
162        // This is NOT thread-safe
163        static bool __io_process(struct io_ring & ring) {
164                unsigned head = *ring.completion_q.head;
165                unsigned tail = __atomic_load_n(ring.completion_q.tail, __ATOMIC_ACQUIRE);
166
167                if (head == tail) return false;
168
169                unsigned idx = head & (*ring.completion_q.mask);
170                struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
171
172                /* paranoid */ verify(&cqe);
173
174                struct io_user_data * data = (struct io_user_data *)cqe.user_data;
175                data->result = cqe.res;
176                unpark( data->thrd __cfaabi_dbg_ctx2 );
177
178                // Mark to the kernel that the cqe has been seen
179                // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
180                __atomic_fetch_add( ring.completion_q.head, 1, __ATOMIC_RELEASE );
181
182                return true;
183        }
184
185        static void * __io_poller( void * arg ) {
186                cluster * cltr = (cluster *)arg;
187                struct io_ring & ring = cltr->io;
188
189                sigset_t mask;
190                sigfillset(&mask);
191                if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {
192                        abort( "KERNEL ERROR: IO_URING - pthread_sigmask" );
193                }
194
195                sigdelset( &mask, SIGUSR1 );
196
197                verify( (*ring.submit_q.head) == (*ring.submit_q.tail) );
198                verify( (*ring.completion_q.head) == (*ring.completion_q.tail) );
199
200                LOOP: while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) {
201                        int ret = syscall( __NR_io_uring_enter, ring.fd, 0, 1, IORING_ENTER_GETEVENTS, &mask, _NSIG / 8);
202                        if( ret < 0 ) {
203                                switch((int)errno) {
204                                case EAGAIN:
205                                case EINTR:
206                                        continue LOOP;
207                                default:
208                                        abort( "KERNEL ERROR: IO_URING WAIT - %s\n", strerror(errno) );
209                                }
210                        }
211
212                        // Drain the queue
213                        while(__io_process(ring)) {}
214                }
215
216                return 0p;
217        }
218
219//=============================================================================================
220// I/O Submissions
221//=============================================================================================
222
223[* struct io_uring_sqe, uint32_t] io_submit_prelude( struct io_ring & ring ) {
224        return [0p, 0];
225}
226
227//=============================================================================================
228// I/O Interface
229//=============================================================================================
230
231/*
232        extern "C" {
233                #define __USE_GNU
234                #include <sys/uio.h>
235        }
236
237        ssize_t async_preadv2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
238                #if !defined(IORING_OP_READV)
239                        return preadv2(fd, iov, iovcnt, offset, flags);
240                #else
241                        sqe->opcode = IORING_OP_READV;
242                        sqe->flags = 0;
243                        sqe->ioprio = 0;
244                        sqe->fd = fd;
245                        sqe->off = offset;
246                        sqe->addr = (unsigned long) iov;
247                        sqe->len = iovcnt;
248                        sqe->rw_flags = 0;
249                        sqe->user_data = 0;
250                        sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
251                #endif
252        }
253
254        ssize_t async_pwritev2(int fd, const struct iovec *iov, int iovcnt, off_t offset, int flags) {
255                #if !defined(IORING_OP_WRITEV)
256                        return pwritev2(fd, iov, iovcnt, offset, flags);
257                #else
258                        #warning not implemented
259                #endif
260        }
261
262        bool is_async( void (*func)() ) {
263                switch((uintptr_t)func) {
264                case (uintptr_t)preadv2:
265                case (uintptr_t)async_preadv2:
266                        #if defined(IORING_OP_READV)
267                                return true;
268                        #else
269                                return false;
270                        #endif
271                default:
272                        return false;
273                }
274        }
275*/
276
277#endif
Note: See TracBrowser for help on using the repository browser.