source: libcfa/src/concurrency/io.cfa@ 5715d43

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 5715d43 was 309d814, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added lock around system call in eager mode, since system call has it's own, worst, lock

  • Property mode set to 100644
File size: 13.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// io.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Thu Apr 23 17:31:00 2020
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17
18#if defined(__CFA_DEBUG__)
19 // #define __CFA_DEBUG_PRINT_IO__
20 // #define __CFA_DEBUG_PRINT_IO_CORE__
21#endif
22
23
24#if defined(CFA_HAVE_LINUX_IO_URING_H)
25 #define _GNU_SOURCE /* See feature_test_macros(7) */
26 #include <errno.h>
27 #include <signal.h>
28 #include <stdint.h>
29 #include <string.h>
30 #include <unistd.h>
31
32 extern "C" {
33 #include <sys/epoll.h>
34 #include <sys/syscall.h>
35
36 #include <linux/io_uring.h>
37 }
38
39 #include "stats.hfa"
40 #include "kernel.hfa"
41 #include "kernel/fwd.hfa"
42 #include "io/types.hfa"
43
44//=============================================================================================
45// I/O Syscall
46//=============================================================================================
47 static int __io_uring_enter( struct __io_data & ring, unsigned to_submit, bool get ) {
48 bool need_sys_to_submit = false;
49 bool need_sys_to_complete = false;
50 unsigned flags = 0;
51
52 TO_SUBMIT:
53 if( to_submit > 0 ) {
54 if( !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
55 need_sys_to_submit = true;
56 break TO_SUBMIT;
57 }
58 if( (*ring.submit_q.flags) & IORING_SQ_NEED_WAKEUP ) {
59 need_sys_to_submit = true;
60 flags |= IORING_ENTER_SQ_WAKEUP;
61 }
62 }
63
64 if( get && !(ring.ring_flags & IORING_SETUP_SQPOLL) ) {
65 flags |= IORING_ENTER_GETEVENTS;
66 if( (ring.ring_flags & IORING_SETUP_IOPOLL) ) {
67 need_sys_to_complete = true;
68 }
69 }
70
71 int ret = 0;
72 if( need_sys_to_submit || need_sys_to_complete ) {
73 ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, 0, flags, 0p, _NSIG / 8);
74 if( ret < 0 ) {
75 switch((int)errno) {
76 case EAGAIN:
77 case EINTR:
78 ret = -1;
79 break;
80 default:
81 abort( "KERNEL ERROR: IO_URING SYSCALL - (%d) %s\n", (int)errno, strerror(errno) );
82 }
83 }
84 }
85
86 // Memory barrier
87 __atomic_thread_fence( __ATOMIC_SEQ_CST );
88 return ret;
89 }
90
91//=============================================================================================
92// I/O Polling
93//=============================================================================================
94 static unsigned __collect_submitions( struct __io_data & ring );
95 static uint32_t __release_consumed_submission( struct __io_data & ring );
96
97 static inline void process(struct io_uring_cqe & cqe ) {
98 struct __io_user_data_t * data = (struct __io_user_data_t *)(uintptr_t)cqe.user_data;
99 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", data, cqe.res, data->thrd );
100
101 data->result = cqe.res;
102 unpark( data->thrd __cfaabi_dbg_ctx2 );
103 }
104
105 // Process a single completion message from the io_uring
106 // This is NOT thread-safe
107 static [int, bool] __drain_io( & struct __io_data ring ) {
108 /* paranoid */ verify( !kernelTLS.preemption_state.enabled );
109
110 unsigned to_submit = 0;
111 if( ring.poller_submits ) {
112 // If the poller thread also submits, then we need to aggregate the submissions which are ready
113 to_submit = __collect_submitions( ring );
114 }
115
116 int ret = __io_uring_enter(ring, to_submit, true);
117 if( ret < 0 ) {
118 return [0, true];
119 }
120
121 // update statistics
122 if (to_submit > 0) {
123 __STATS__( true,
124 if( to_submit > 0 ) {
125 io.submit_q.submit_avg.rdy += to_submit;
126 io.submit_q.submit_avg.csm += ret;
127 io.submit_q.submit_avg.cnt += 1;
128 }
129 )
130 }
131
132 // Release the consumed SQEs
133 __release_consumed_submission( ring );
134
135 // Drain the queue
136 unsigned head = *ring.completion_q.head;
137 unsigned tail = *ring.completion_q.tail;
138 const uint32_t mask = *ring.completion_q.mask;
139
140 // Nothing was new return 0
141 if (head == tail) {
142 return [0, to_submit > 0];
143 }
144
145 uint32_t count = tail - head;
146 /* paranoid */ verify( count != 0 );
147 for(i; count) {
148 unsigned idx = (head + i) & mask;
149 struct io_uring_cqe & cqe = ring.completion_q.cqes[idx];
150
151 /* paranoid */ verify(&cqe);
152
153 process( cqe );
154 }
155
156 // Mark to the kernel that the cqe has been seen
157 // Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
158 __atomic_thread_fence( __ATOMIC_SEQ_CST );
159 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED );
160
161 return [count, count > 0 || to_submit > 0];
162 }
163
164 void main( $io_ctx_thread & this ) {
165 epoll_event ev;
166 __ioctx_register( this, ev );
167
168 __cfadbg_print_safe(io_core, "Kernel I/O : IO poller %p for ring %p ready\n", &this, &this.ring);
169
170 int reset = 0;
171 // Then loop until we need to start
172 while(!__atomic_load_n(&this.done, __ATOMIC_SEQ_CST)) {
173 // Drain the io
174 int count;
175 bool again;
176 disable_interrupts();
177 [count, again] = __drain_io( *this.ring );
178
179 if(!again) reset++;
180
181 // Update statistics
182 __STATS__( true,
183 io.complete_q.completed_avg.val += count;
184 io.complete_q.completed_avg.fast_cnt += 1;
185 )
186 enable_interrupts( __cfaabi_dbg_ctx );
187
188 // If we got something, just yield and check again
189 if(reset < 5) {
190 yield();
191 }
192 // We didn't get anything baton pass to the slow poller
193 else {
194 __cfadbg_print_safe(io_core, "Kernel I/O : Parking io poller %p\n", &this.self);
195 reset = 0;
196
197 // block this thread
198 __ioctx_prepare_block( this, ev );
199 wait( this.sem );
200 }
201 }
202
203 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring);
204 }
205
206//=============================================================================================
207// I/O Submissions
208//=============================================================================================
209
210// Submition steps :
211// 1 - Allocate a queue entry. The ring already has memory for all entries but only the ones
212// listed in sq.array are visible by the kernel. For those not listed, the kernel does not
213// offer any assurance that an entry is not being filled by multiple flags. Therefore, we
214// need to write an allocator that allows allocating concurrently.
215//
216// 2 - Actually fill the submit entry, this is the only simple and straightforward step.
217//
218// 3 - Append the entry index to the array and adjust the tail accordingly. This operation
219// needs to arrive to two concensus at the same time:
220// A - The order in which entries are listed in the array: no two threads must pick the
221// same index for their entries
222// B - When can the tail be update for the kernel. EVERY entries in the array between
223// head and tail must be fully filled and shouldn't ever be touched again.
224//
225
226 [* struct io_uring_sqe, uint32_t] __submit_alloc( struct __io_data & ring, uint64_t data ) {
227 /* paranoid */ verify( data != 0 );
228
229 // Prepare the data we need
230 __attribute((unused)) int len = 0;
231 __attribute((unused)) int block = 0;
232 uint32_t cnt = *ring.submit_q.num;
233 uint32_t mask = *ring.submit_q.mask;
234
235 disable_interrupts();
236 uint32_t off = __tls_rand();
237 enable_interrupts( __cfaabi_dbg_ctx );
238
239 // Loop around looking for an available spot
240 for() {
241 // Look through the list starting at some offset
242 for(i; cnt) {
243 uint64_t expected = 0;
244 uint32_t idx = (i + off) & mask;
245 struct io_uring_sqe * sqe = &ring.submit_q.sqes[idx];
246 volatile uint64_t * udata = (volatile uint64_t *)&sqe->user_data;
247
248 if( *udata == expected &&
249 __atomic_compare_exchange_n( udata, &expected, data, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) )
250 {
251 // update statistics
252 __STATS__( false,
253 io.submit_q.alloc_avg.val += len;
254 io.submit_q.alloc_avg.block += block;
255 io.submit_q.alloc_avg.cnt += 1;
256 )
257
258
259 // Success return the data
260 return [sqe, idx];
261 }
262 verify(expected != data);
263
264 len ++;
265 }
266
267 block++;
268 yield();
269 }
270 }
271
272 static inline uint32_t __submit_to_ready_array( struct __io_data & ring, uint32_t idx, const uint32_t mask ) {
273 /* paranoid */ verify( idx <= mask );
274 /* paranoid */ verify( idx != -1ul32 );
275
276 // We need to find a spot in the ready array
277 __attribute((unused)) int len = 0;
278 __attribute((unused)) int block = 0;
279 uint32_t ready_mask = ring.submit_q.ready_cnt - 1;
280
281 disable_interrupts();
282 uint32_t off = __tls_rand();
283 enable_interrupts( __cfaabi_dbg_ctx );
284
285 uint32_t picked;
286 LOOKING: for() {
287 for(i; ring.submit_q.ready_cnt) {
288 picked = (i + off) & ready_mask;
289 uint32_t expected = -1ul32;
290 if( __atomic_compare_exchange_n( &ring.submit_q.ready[picked], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) {
291 break LOOKING;
292 }
293 verify(expected != idx);
294
295 len ++;
296 }
297
298 block++;
299 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
300 __release_consumed_submission( ring );
301 unlock( ring.submit_q.lock );
302 }
303 else {
304 yield();
305 }
306 }
307
308 // update statistics
309 __STATS__( false,
310 io.submit_q.look_avg.val += len;
311 io.submit_q.look_avg.block += block;
312 io.submit_q.look_avg.cnt += 1;
313 )
314
315 return picked;
316 }
317
318 void __submit( struct io_context * ctx, uint32_t idx ) __attribute__((nonnull (1))) {
319 __io_data & ring = *ctx->thrd.ring;
320 // Get now the data we definetely need
321 volatile uint32_t * const tail = ring.submit_q.tail;
322 const uint32_t mask = *ring.submit_q.mask;
323
324 // There are 2 submission schemes, check which one we are using
325 if( ring.poller_submits ) {
326 // If the poller thread submits, then we just need to add this to the ready array
327 __submit_to_ready_array( ring, idx, mask );
328
329 post( ctx->thrd.sem );
330
331 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() );
332 }
333 else if( ring.eager_submits ) {
334 uint32_t picked = __submit_to_ready_array( ring, idx, mask );
335
336 for() {
337 yield();
338
339 // If some one else collected our index, we are done
340 #warning ABA problem
341 if( ring.submit_q.ready[picked] != idx ) {
342 __STATS__( false,
343 io.submit_q.helped += 1;
344 )
345 return;
346 }
347
348 if( try_lock(ring.submit_q.lock __cfaabi_dbg_ctx2) ) {
349 __STATS__( false,
350 io.submit_q.leader += 1;
351 )
352 break;
353 }
354
355 __STATS__( false,
356 io.submit_q.busy += 1;
357 )
358 }
359
360 // We got the lock
361 // Collect the submissions
362 unsigned to_submit = __collect_submitions( ring );
363
364 // Actually submit
365 int ret = __io_uring_enter( ring, to_submit, false );
366
367 unlock(ring.submit_q.lock);
368 if( ret < 0 ) return;
369
370 // Release the consumed SQEs
371 __release_consumed_submission( ring );
372
373 // update statistics
374 __STATS__( false,
375 io.submit_q.submit_avg.rdy += to_submit;
376 io.submit_q.submit_avg.csm += ret;
377 io.submit_q.submit_avg.cnt += 1;
378 )
379 }
380 else {
381 // get mutual exclusion
382 lock(ring.submit_q.lock __cfaabi_dbg_ctx2);
383
384 /* paranoid */ verifyf( ring.submit_q.sqes[ idx ].user_data != 0,
385 /* paranoid */ "index %u already reclaimed\n"
386 /* paranoid */ "head %u, prev %u, tail %u\n"
387 /* paranoid */ "[-0: %u,-1: %u,-2: %u,-3: %u]\n",
388 /* paranoid */ idx,
389 /* paranoid */ *ring.submit_q.head, ring.submit_q.prev_head, *tail
390 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 0) & (*ring.submit_q.mask) ]
391 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 1) & (*ring.submit_q.mask) ]
392 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 2) & (*ring.submit_q.mask) ]
393 /* paranoid */ ,ring.submit_q.array[ ((*ring.submit_q.head) - 3) & (*ring.submit_q.mask) ]
394 /* paranoid */ );
395
396 // Append to the list of ready entries
397
398 /* paranoid */ verify( idx <= mask );
399 ring.submit_q.array[ (*tail) & mask ] = idx;
400 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST);
401
402 // Submit however, many entries need to be submitted
403 int ret = __io_uring_enter( ring, 1, false );
404 if( ret < 0 ) {
405 switch((int)errno) {
406 default:
407 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) );
408 }
409 }
410
411 // update statistics
412 __STATS__( false,
413 io.submit_q.submit_avg.csm += 1;
414 io.submit_q.submit_avg.cnt += 1;
415 )
416
417 // Release the consumed SQEs
418 __release_consumed_submission( ring );
419
420 unlock(ring.submit_q.lock);
421
422 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret );
423 }
424 }
425
426 static unsigned __collect_submitions( struct __io_data & ring ) {
427 /* paranoid */ verify( ring.submit_q.ready != 0p );
428 /* paranoid */ verify( ring.submit_q.ready_cnt > 0 );
429
430 unsigned to_submit = 0;
431 uint32_t tail = *ring.submit_q.tail;
432 const uint32_t mask = *ring.submit_q.mask;
433
434 // Go through the list of ready submissions
435 for( i; ring.submit_q.ready_cnt ) {
436 // replace any submission with the sentinel, to consume it.
437 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED);
438
439 // If it was already the sentinel, then we are done
440 if( idx == -1ul32 ) continue;
441
442 // If we got a real submission, append it to the list
443 ring.submit_q.array[ (tail + to_submit) & mask ] = idx & mask;
444 to_submit++;
445 }
446
447 // Increment the tail based on how many we are ready to submit
448 __atomic_fetch_add(ring.submit_q.tail, to_submit, __ATOMIC_SEQ_CST);
449
450 return to_submit;
451 }
452
453 static uint32_t __release_consumed_submission( struct __io_data & ring ) {
454 const uint32_t smask = *ring.submit_q.mask;
455
456 if( !try_lock(ring.submit_q.release_lock __cfaabi_dbg_ctx2) ) return 0;
457 uint32_t chead = *ring.submit_q.head;
458 uint32_t phead = ring.submit_q.prev_head;
459 ring.submit_q.prev_head = chead;
460 unlock(ring.submit_q.release_lock);
461
462 uint32_t count = chead - phead;
463 for( i; count ) {
464 uint32_t idx = ring.submit_q.array[ (phead + i) & smask ];
465 ring.submit_q.sqes[ idx ].user_data = 0;
466 }
467 return count;
468 }
469#endif
Note: See TracBrowser for help on using the repository browser.