source: libcfa/src/concurrency/ready_queue.cfa@ 1f3d212

ADT ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since 1f3d212 was 07b4970, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Fix incorrect change that disabled all lists

  • Property mode set to 100644
File size: 27.4 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19// #define __CFA_DEBUG_PRINT_READY_QUEUE__
20
21
22#define USE_RELAXED_FIFO
23// #define USE_WORK_STEALING
24
25#include "bits/defs.hfa"
26#include "device/cpu.hfa"
27#include "kernel_private.hfa"
28
29#include "stdlib.hfa"
30#include "math.hfa"
31
32#include <unistd.h>
33
34#include "ready_subqueue.hfa"
35
36static const size_t cache_line_size = 64;
37
38#if !defined(__CFA_NO_STATISTICS__)
39 #define __STATS(...) __VA_ARGS__
40#else
41 #define __STATS(...)
42#endif
43
44// No overriden function, no environment variable, no define
45// fall back to a magic number
46#ifndef __CFA_MAX_PROCESSORS__
47 #define __CFA_MAX_PROCESSORS__ 1024
48#endif
49
50#if defined(USE_CPU_WORK_STEALING)
51 #define READYQ_SHARD_FACTOR 2
52#elif defined(USE_RELAXED_FIFO)
53 #define BIAS 4
54 #define READYQ_SHARD_FACTOR 4
55 #define SEQUENTIAL_SHARD 1
56#elif defined(USE_WORK_STEALING)
57 #define READYQ_SHARD_FACTOR 2
58 #define SEQUENTIAL_SHARD 2
59#else
60 #error no scheduling strategy selected
61#endif
62
63static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
64static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
65static inline struct $thread * search(struct cluster * cltr);
66static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
67
68
69// returns the maximum number of processors the RWLock support
70__attribute__((weak)) unsigned __max_processors() {
71 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
72 if(!max_cores_s) {
73 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
74 return __CFA_MAX_PROCESSORS__;
75 }
76
77 char * endptr = 0p;
78 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
79 if(max_cores_l < 1 || max_cores_l > 65535) {
80 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
81 return __CFA_MAX_PROCESSORS__;
82 }
83 if('\0' != *endptr) {
84 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
85 return __CFA_MAX_PROCESSORS__;
86 }
87
88 return max_cores_l;
89}
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
94void ?{}(__scheduler_RWLock_t & this) {
95 this.max = __max_processors();
96 this.alloc = 0;
97 this.ready = 0;
98 this.data = alloc(this.max);
99 this.write_lock = false;
100
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
103
104}
105void ^?{}(__scheduler_RWLock_t & this) {
106 free(this.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
112unsigned register_proc_id( void ) with(*__scheduler_lock) {
113 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
114 bool * handle = (bool *)&kernelTLS().sched_lock;
115
116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
130 }
131 }
132
133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
138
139 // Step - 3 : Mark space as used and then publish it.
140 data[n] = handle;
141 while() {
142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
146 Pause();
147 }
148
149 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
150
151 // Return new spot.
152 /* paranoid */ verify(n < ready);
153 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
154 return n;
155}
156
157void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
158 /* paranoid */ verify(id < ready);
159 /* paranoid */ verify(id == kernelTLS().sched_id);
160 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
161
162 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
163
164 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
165
166 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
167}
168
169//-----------------------------------------------------------------------
170// Writer side : acquire when changing the ready queue, e.g. adding more
171// queues or removing them.
172uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
173 /* paranoid */ verify( ! __preemption_enabled() );
174 /* paranoid */ verify( ! kernelTLS().sched_lock );
175
176 // Step 1 : lock global lock
177 // It is needed to avoid processors that register mid Critical-Section
178 // to simply lock their own lock and enter.
179 __atomic_acquire( &write_lock );
180
181 // Step 2 : lock per-proc lock
182 // Processors that are currently being registered aren't counted
183 // but can't be in read_lock or in the critical section.
184 // All other processors are counted
185 uint_fast32_t s = ready;
186 for(uint_fast32_t i = 0; i < s; i++) {
187 volatile bool * llock = data[i];
188 if(llock) __atomic_acquire( llock );
189 }
190
191 /* paranoid */ verify( ! __preemption_enabled() );
192 return s;
193}
194
195void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
196 /* paranoid */ verify( ! __preemption_enabled() );
197
198 // Step 1 : release local locks
199 // This must be done while the global lock is held to avoid
200 // threads that where created mid critical section
201 // to race to lock their local locks and have the writer
202 // immidiately unlock them
203 // Alternative solution : return s in write_lock and pass it to write_unlock
204 for(uint_fast32_t i = 0; i < last_s; i++) {
205 volatile bool * llock = data[i];
206 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
207 }
208
209 // Step 2 : release global lock
210 /*paranoid*/ assert(true == write_lock);
211 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
212
213 /* paranoid */ verify( ! __preemption_enabled() );
214}
215
216//=======================================================================
217// Cforall Ready Queue used for scheduling
218//=======================================================================
219void ?{}(__ready_queue_t & this) with (this) {
220 #if defined(USE_CPU_WORK_STEALING)
221 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
222 lanes.data = alloc( lanes.count );
223 lanes.tscs = alloc( lanes.count );
224
225 for( idx; (size_t)lanes.count ) {
226 (lanes.data[idx]){};
227 lanes.tscs[idx].tv = rdtscl();
228 }
229 #else
230 lanes.data = 0p;
231 lanes.tscs = 0p;
232 lanes.count = 0;
233 #endif
234}
235
236void ^?{}(__ready_queue_t & this) with (this) {
237 #if !defined(USE_CPU_WORK_STEALING)
238 verify( SEQUENTIAL_SHARD == lanes.count );
239 #endif
240
241 free(lanes.data);
242 free(lanes.tscs);
243}
244
245//-----------------------------------------------------------------------
246#if defined(USE_CPU_WORK_STEALING)
247 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
248 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
249
250 processor * const proc = kernelTLS().this_processor;
251 const bool external = !push_local || (!proc) || (cltr != proc->cltr);
252
253 const int cpu = __kernel_getcpu();
254 /* paranoid */ verify(cpu >= 0);
255 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
256 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
257
258 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
259 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
260 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
261 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
262
263 const int start = map.self * READYQ_SHARD_FACTOR;
264 unsigned i;
265 do {
266 unsigned r;
267 if(unlikely(external)) { r = __tls_rand(); }
268 else { r = proc->rdq.its++; }
269 i = start + (r % READYQ_SHARD_FACTOR);
270 // If we can't lock it retry
271 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
272
273 // Actually push it
274 push(lanes.data[i], thrd);
275
276 // Unlock and return
277 __atomic_unlock( &lanes.data[i].lock );
278
279 #if !defined(__CFA_NO_STATISTICS__)
280 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
281 else __tls_stats()->ready.push.local.success++;
282 #endif
283
284 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
285
286 }
287
288 // Pop from the ready queue from a given cluster
289 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
290 /* paranoid */ verify( lanes.count > 0 );
291 /* paranoid */ verify( kernelTLS().this_processor );
292
293 const int cpu = __kernel_getcpu();
294 /* paranoid */ verify(cpu >= 0);
295 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
296 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
297
298 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
299 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
300 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
301 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
302
303 processor * const proc = kernelTLS().this_processor;
304 const int start = map.self * READYQ_SHARD_FACTOR;
305
306 // Did we already have a help target
307 if(proc->rdq.target == -1u) {
308 // if We don't have a
309 unsigned long long min = ts(lanes.data[start]);
310 for(i; READYQ_SHARD_FACTOR) {
311 unsigned long long tsc = ts(lanes.data[start + i]);
312 if(tsc < min) min = tsc;
313 }
314 proc->rdq.cutoff = min;
315 proc->rdq.target = (map.start * READYQ_SHARD_FACTOR) + (__tls_rand() % (map.count* READYQ_SHARD_FACTOR));
316 }
317 else {
318 const unsigned long long bias = 0; //2_500_000_000;
319 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
320 {
321 unsigned target = proc->rdq.target;
322 proc->rdq.target = -1u;
323 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
324 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
325 proc->rdq.last = target;
326 if(t) return t;
327 }
328 }
329
330 unsigned last = proc->rdq.last;
331 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
332 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
333 if(t) return t;
334 }
335 else {
336 proc->rdq.last = -1u;
337 }
338 }
339
340 for(READYQ_SHARD_FACTOR) {
341 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
342 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
343 }
344
345 // All lanes where empty return 0p
346 return 0p;
347 }
348
349 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
350 processor * const proc = kernelTLS().this_processor;
351 unsigned last = proc->rdq.last;
352
353 unsigned i = __tls_rand() % lanes.count;
354 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
355 }
356 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
357 return search(cltr);
358 }
359#endif
360#if defined(USE_RELAXED_FIFO)
361 //-----------------------------------------------------------------------
362 // get index from random number with or without bias towards queues
363 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
364 unsigned i;
365 bool local;
366 unsigned rlow = r % BIAS;
367 unsigned rhigh = r / BIAS;
368 if((0 != rlow) && preferred >= 0) {
369 // (BIAS - 1) out of BIAS chances
370 // Use perferred queues
371 i = preferred + (rhigh % READYQ_SHARD_FACTOR);
372 local = true;
373 }
374 else {
375 // 1 out of BIAS chances
376 // Use all queues
377 i = rhigh;
378 local = false;
379 }
380 return [i, local];
381 }
382
383 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
384 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
385
386 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
387 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
388
389 bool local;
390 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
391
392 // Try to pick a lane and lock it
393 unsigned i;
394 do {
395 // Pick the index of a lane
396 unsigned r = __tls_rand_fwd();
397 [i, local] = idx_from_r(r, preferred);
398
399 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
400
401 #if !defined(__CFA_NO_STATISTICS__)
402 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
403 else if(local) __tls_stats()->ready.push.local.attempt++;
404 else __tls_stats()->ready.push.share.attempt++;
405 #endif
406
407 // If we can't lock it retry
408 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
409
410 // Actually push it
411 push(lanes.data[i], thrd);
412
413 // Unlock and return
414 __atomic_unlock( &lanes.data[i].lock );
415
416 // Mark the current index in the tls rng instance as having an item
417 __tls_rand_advance_bck();
418
419 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
420
421 // Update statistics
422 #if !defined(__CFA_NO_STATISTICS__)
423 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
424 else if(local) __tls_stats()->ready.push.local.success++;
425 else __tls_stats()->ready.push.share.success++;
426 #endif
427 }
428
429 // Pop from the ready queue from a given cluster
430 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
431 /* paranoid */ verify( lanes.count > 0 );
432 /* paranoid */ verify( kernelTLS().this_processor );
433 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
434
435 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
436 int preferred = kernelTLS().this_processor->rdq.id;
437
438
439 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
440 for(25) {
441 // Pick two lists at random
442 unsigned ri = __tls_rand_bck();
443 unsigned rj = __tls_rand_bck();
444
445 unsigned i, j;
446 __attribute__((unused)) bool locali, localj;
447 [i, locali] = idx_from_r(ri, preferred);
448 [j, localj] = idx_from_r(rj, preferred);
449
450 i %= count;
451 j %= count;
452
453 // try popping from the 2 picked lists
454 struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
455 if(thrd) {
456 return thrd;
457 }
458 }
459
460 // All lanes where empty return 0p
461 return 0p;
462 }
463
464 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
465 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
466 return search(cltr);
467 }
468#endif
469#if defined(USE_WORK_STEALING)
470 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
471 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
472
473 // #define USE_PREFERRED
474 #if !defined(USE_PREFERRED)
475 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
476 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
477 #else
478 unsigned preferred = thrd->preferred;
479 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
480 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
481
482 unsigned r = preferred % READYQ_SHARD_FACTOR;
483 const unsigned start = preferred - r;
484 #endif
485
486 // Try to pick a lane and lock it
487 unsigned i;
488 do {
489 #if !defined(__CFA_NO_STATISTICS__)
490 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
491 else __tls_stats()->ready.push.local.attempt++;
492 #endif
493
494 if(unlikely(external)) {
495 i = __tls_rand() % lanes.count;
496 }
497 else {
498 #if !defined(USE_PREFERRED)
499 processor * proc = kernelTLS().this_processor;
500 unsigned r = proc->rdq.its++;
501 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
502 #else
503 i = start + (r++ % READYQ_SHARD_FACTOR);
504 #endif
505 }
506 // If we can't lock it retry
507 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
508
509 // Actually push it
510 push(lanes.data[i], thrd);
511
512 // Unlock and return
513 __atomic_unlock( &lanes.data[i].lock );
514
515 #if !defined(__CFA_NO_STATISTICS__)
516 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
517 else __tls_stats()->ready.push.local.success++;
518 #endif
519
520 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
521 }
522
523 // Pop from the ready queue from a given cluster
524 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
525 /* paranoid */ verify( lanes.count > 0 );
526 /* paranoid */ verify( kernelTLS().this_processor );
527 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
528
529 processor * proc = kernelTLS().this_processor;
530
531 if(proc->rdq.target == -1u) {
532 unsigned long long min = ts(lanes.data[proc->rdq.id]);
533 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
534 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
535 if(tsc < min) min = tsc;
536 }
537 proc->rdq.cutoff = min;
538 proc->rdq.target = __tls_rand() % lanes.count;
539 }
540 else {
541 unsigned target = proc->rdq.target;
542 proc->rdq.target = -1u;
543 const unsigned long long bias = 0; //2_500_000_000;
544 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
545 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
546 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
547 if(t) return t;
548 }
549 }
550
551 for(READYQ_SHARD_FACTOR) {
552 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
553 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
554 }
555 return 0p;
556 }
557
558 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
559 unsigned i = __tls_rand() % lanes.count;
560 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
561 }
562
563 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
564 return search(cltr);
565 }
566#endif
567
568//=======================================================================
569// Various Ready Queue utilities
570//=======================================================================
571// these function work the same or almost the same
572// whether they are using work-stealing or relaxed fifo scheduling
573
574//-----------------------------------------------------------------------
575// try to pop from a lane given by index w
576static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
577 __STATS( stats.attempt++; )
578
579 // Get relevant elements locally
580 __intrusive_lane_t & lane = lanes.data[w];
581
582 // If list looks empty retry
583 if( is_empty(lane) ) {
584 return 0p;
585 }
586
587 // If we can't get the lock retry
588 if( !__atomic_try_acquire(&lane.lock) ) {
589 return 0p;
590 }
591
592 // If list is empty, unlock and retry
593 if( is_empty(lane) ) {
594 __atomic_unlock(&lane.lock);
595 return 0p;
596 }
597
598 // Actually pop the list
599 struct $thread * thrd;
600 unsigned long long tsv;
601 [thrd, tsv] = pop(lane);
602
603 /* paranoid */ verify(thrd);
604 /* paranoid */ verify(tsv);
605 /* paranoid */ verify(lane.lock);
606
607 // Unlock and return
608 __atomic_unlock(&lane.lock);
609
610 // Update statistics
611 __STATS( stats.success++; )
612
613 #if defined(USE_WORK_STEALING)
614 lanes.tscs[w].tv = tsv;
615 #endif
616
617 thrd->preferred = w;
618
619 // return the popped thread
620 return thrd;
621}
622
623//-----------------------------------------------------------------------
624// try to pop from any lanes making sure you don't miss any threads push
625// before the start of the function
626static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
627 /* paranoid */ verify( lanes.count > 0 );
628 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
629 unsigned offset = __tls_rand();
630 for(i; count) {
631 unsigned idx = (offset + i) % count;
632 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
633 if(thrd) {
634 return thrd;
635 }
636 }
637
638 // All lanes where empty return 0p
639 return 0p;
640}
641
642//-----------------------------------------------------------------------
643// Check that all the intrusive queues in the data structure are still consistent
644static void check( __ready_queue_t & q ) with (q) {
645 #if defined(__CFA_WITH_VERIFY__)
646 {
647 for( idx ; lanes.count ) {
648 __intrusive_lane_t & sl = lanes.data[idx];
649 assert(!lanes.data[idx].lock);
650
651 if(is_empty(sl)) {
652 assert( sl.anchor.next == 0p );
653 assert( sl.anchor.ts == 0 );
654 assert( mock_head(sl) == sl.prev );
655 } else {
656 assert( sl.anchor.next != 0p );
657 assert( sl.anchor.ts != 0 );
658 assert( mock_head(sl) != sl.prev );
659 }
660 }
661 }
662 #endif
663}
664
665//-----------------------------------------------------------------------
666// Given 2 indexes, pick the list with the oldest push an try to pop from it
667static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
668 // Pick the bet list
669 int w = i;
670 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
671 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
672 }
673
674 return try_pop(cltr, w __STATS(, stats));
675}
676
677// Call this function of the intrusive list was moved using memcpy
678// fixes the list so that the pointers back to anchors aren't left dangling
679static inline void fix(__intrusive_lane_t & ll) {
680 if(is_empty(ll)) {
681 verify(ll.anchor.next == 0p);
682 ll.prev = mock_head(ll);
683 }
684}
685
686static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
687 processor * it = &list`first;
688 for(unsigned i = 0; i < count; i++) {
689 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
690 it->rdq.id = value;
691 it->rdq.target = -1u;
692 value += READYQ_SHARD_FACTOR;
693 it = &(*it)`next;
694 }
695}
696
697static void reassign_cltr_id(struct cluster * cltr) {
698 unsigned preferred = 0;
699 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
700 assign_list(preferred, cltr->procs.idles , cltr->procs.idle );
701}
702
703static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
704 #if defined(USE_WORK_STEALING)
705 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
706 for(i; lanes.count) {
707 unsigned long long tsc = ts(lanes.data[i]);
708 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
709 }
710 #endif
711}
712
713#if defined(USE_CPU_WORK_STEALING)
714 // ready_queue size is fixed in this case
715 void ready_queue_grow(struct cluster * cltr) {}
716 void ready_queue_shrink(struct cluster * cltr) {}
717#else
718 // Grow the ready queue
719 void ready_queue_grow(struct cluster * cltr) {
720 size_t ncount;
721 int target = cltr->procs.total;
722
723 /* paranoid */ verify( ready_mutate_islocked() );
724 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
725
726 // Make sure that everything is consistent
727 /* paranoid */ check( cltr->ready_queue );
728
729 // grow the ready queue
730 with( cltr->ready_queue ) {
731 // Find new count
732 // Make sure we always have atleast 1 list
733 if(target >= 2) {
734 ncount = target * READYQ_SHARD_FACTOR;
735 } else {
736 ncount = SEQUENTIAL_SHARD;
737 }
738
739 // Allocate new array (uses realloc and memcpies the data)
740 lanes.data = alloc( ncount, lanes.data`realloc );
741
742 // Fix the moved data
743 for( idx; (size_t)lanes.count ) {
744 fix(lanes.data[idx]);
745 }
746
747 // Construct new data
748 for( idx; (size_t)lanes.count ~ ncount) {
749 (lanes.data[idx]){};
750 }
751
752 // Update original
753 lanes.count = ncount;
754 }
755
756 fix_times(cltr);
757
758 reassign_cltr_id(cltr);
759
760 // Make sure that everything is consistent
761 /* paranoid */ check( cltr->ready_queue );
762
763 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
764
765 /* paranoid */ verify( ready_mutate_islocked() );
766 }
767
768 // Shrink the ready queue
769 void ready_queue_shrink(struct cluster * cltr) {
770 /* paranoid */ verify( ready_mutate_islocked() );
771 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
772
773 // Make sure that everything is consistent
774 /* paranoid */ check( cltr->ready_queue );
775
776 int target = cltr->procs.total;
777
778 with( cltr->ready_queue ) {
779 // Remember old count
780 size_t ocount = lanes.count;
781
782 // Find new count
783 // Make sure we always have atleast 1 list
784 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
785 /* paranoid */ verify( ocount >= lanes.count );
786 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
787
788 // for printing count the number of displaced threads
789 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
790 __attribute__((unused)) size_t displaced = 0;
791 #endif
792
793 // redistribute old data
794 for( idx; (size_t)lanes.count ~ ocount) {
795 // Lock is not strictly needed but makes checking invariants much easier
796 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
797 verify(locked);
798
799 // As long as we can pop from this lane to push the threads somewhere else in the queue
800 while(!is_empty(lanes.data[idx])) {
801 struct $thread * thrd;
802 unsigned long long _;
803 [thrd, _] = pop(lanes.data[idx]);
804
805 push(cltr, thrd, true);
806
807 // for printing count the number of displaced threads
808 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
809 displaced++;
810 #endif
811 }
812
813 // Unlock the lane
814 __atomic_unlock(&lanes.data[idx].lock);
815
816 // TODO print the queue statistics here
817
818 ^(lanes.data[idx]){};
819 }
820
821 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
822
823 // Allocate new array (uses realloc and memcpies the data)
824 lanes.data = alloc( lanes.count, lanes.data`realloc );
825
826 // Fix the moved data
827 for( idx; (size_t)lanes.count ) {
828 fix(lanes.data[idx]);
829 }
830 }
831
832 fix_times(cltr);
833
834 reassign_cltr_id(cltr);
835
836 // Make sure that everything is consistent
837 /* paranoid */ check( cltr->ready_queue );
838
839 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
840 /* paranoid */ verify( ready_mutate_islocked() );
841 }
842#endif
843
844#if !defined(__CFA_NO_STATISTICS__)
845 unsigned cnt(const __ready_queue_t & this, unsigned idx) {
846 /* paranoid */ verify(this.lanes.count > idx);
847 return this.lanes.data[idx].cnt;
848 }
849#endif
Note: See TracBrowser for help on using the repository browser.