source: libcfa/src/concurrency/ready_queue.cfa@ dcbfcbc

ADT ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since dcbfcbc was 07b4970, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Fix incorrect change that disabled all lists

  • Property mode set to 100644
File size: 27.4 KB
RevLine 
[7768b8d]1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
[43784ac]17#define _GNU_SOURCE
18
[1b143de]19// #define __CFA_DEBUG_PRINT_READY_QUEUE__
[7768b8d]20
[1eb239e4]21
[07b4970]22#define USE_RELAXED_FIFO
[9cc3a18]23// #define USE_WORK_STEALING
24
[7768b8d]25#include "bits/defs.hfa"
[12daa43]26#include "device/cpu.hfa"
[7768b8d]27#include "kernel_private.hfa"
28
29#include "stdlib.hfa"
[61d7bec]30#include "math.hfa"
[7768b8d]31
[04b5cef]32#include <unistd.h>
33
[13c5e19]34#include "ready_subqueue.hfa"
35
[7768b8d]36static const size_t cache_line_size = 64;
37
[d2fadeb]38#if !defined(__CFA_NO_STATISTICS__)
39 #define __STATS(...) __VA_ARGS__
40#else
41 #define __STATS(...)
42#endif
43
[dca5802]44// No overriden function, no environment variable, no define
45// fall back to a magic number
46#ifndef __CFA_MAX_PROCESSORS__
[b388ee81]47 #define __CFA_MAX_PROCESSORS__ 1024
[dca5802]48#endif
[7768b8d]49
[12daa43]50#if defined(USE_CPU_WORK_STEALING)
51 #define READYQ_SHARD_FACTOR 2
52#elif defined(USE_RELAXED_FIFO)
[9cc3a18]53 #define BIAS 4
54 #define READYQ_SHARD_FACTOR 4
[5f6a172]55 #define SEQUENTIAL_SHARD 1
[9cc3a18]56#elif defined(USE_WORK_STEALING)
57 #define READYQ_SHARD_FACTOR 2
[5f6a172]58 #define SEQUENTIAL_SHARD 2
[9cc3a18]59#else
60 #error no scheduling strategy selected
61#endif
62
[d2fadeb]63static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
64static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
[431cd4f]65static inline struct $thread * search(struct cluster * cltr);
[d2fadeb]66static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
[9cc3a18]67
[04b5cef]68
[dca5802]69// returns the maximum number of processors the RWLock support
[7768b8d]70__attribute__((weak)) unsigned __max_processors() {
71 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
72 if(!max_cores_s) {
[504a7dc]73 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
[dca5802]74 return __CFA_MAX_PROCESSORS__;
[7768b8d]75 }
76
77 char * endptr = 0p;
78 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
79 if(max_cores_l < 1 || max_cores_l > 65535) {
[504a7dc]80 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
[dca5802]81 return __CFA_MAX_PROCESSORS__;
[7768b8d]82 }
83 if('\0' != *endptr) {
[504a7dc]84 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
[dca5802]85 return __CFA_MAX_PROCESSORS__;
[7768b8d]86 }
87
88 return max_cores_l;
89}
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
[b388ee81]94void ?{}(__scheduler_RWLock_t & this) {
[7768b8d]95 this.max = __max_processors();
96 this.alloc = 0;
97 this.ready = 0;
98 this.data = alloc(this.max);
[c993b15]99 this.write_lock = false;
[7768b8d]100
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
103
104}
[b388ee81]105void ^?{}(__scheduler_RWLock_t & this) {
[7768b8d]106 free(this.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
[c993b15]112unsigned register_proc_id( void ) with(*__scheduler_lock) {
[b388ee81]113 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
[c993b15]114 bool * handle = (bool *)&kernelTLS().sched_lock;
[504a7dc]115
[7768b8d]116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
[c993b15]121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
[7768b8d]130 }
131 }
132
[b388ee81]133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
[b388ee81]137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]138
139 // Step - 3 : Mark space as used and then publish it.
[c993b15]140 data[n] = handle;
[fd9b524]141 while() {
[7768b8d]142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
[fd9b524]146 Pause();
[7768b8d]147 }
148
[1b143de]149 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
[504a7dc]150
[7768b8d]151 // Return new spot.
[c993b15]152 /* paranoid */ verify(n < ready);
153 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
154 return n;
[7768b8d]155}
156
[c993b15]157void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
158 /* paranoid */ verify(id < ready);
159 /* paranoid */ verify(id == kernelTLS().sched_id);
160 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
161
162 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
163
164 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
[504a7dc]165
166 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
[7768b8d]167}
168
169//-----------------------------------------------------------------------
170// Writer side : acquire when changing the ready queue, e.g. adding more
171// queues or removing them.
[b388ee81]172uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
[8fc652e0]173 /* paranoid */ verify( ! __preemption_enabled() );
[c993b15]174 /* paranoid */ verify( ! kernelTLS().sched_lock );
[62502cc4]175
[7768b8d]176 // Step 1 : lock global lock
177 // It is needed to avoid processors that register mid Critical-Section
178 // to simply lock their own lock and enter.
[c993b15]179 __atomic_acquire( &write_lock );
[7768b8d]180
181 // Step 2 : lock per-proc lock
182 // Processors that are currently being registered aren't counted
183 // but can't be in read_lock or in the critical section.
184 // All other processors are counted
185 uint_fast32_t s = ready;
186 for(uint_fast32_t i = 0; i < s; i++) {
[c993b15]187 volatile bool * llock = data[i];
188 if(llock) __atomic_acquire( llock );
[7768b8d]189 }
190
[8fc652e0]191 /* paranoid */ verify( ! __preemption_enabled() );
[7768b8d]192 return s;
193}
194
[b388ee81]195void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
[8fc652e0]196 /* paranoid */ verify( ! __preemption_enabled() );
[62502cc4]197
[7768b8d]198 // Step 1 : release local locks
199 // This must be done while the global lock is held to avoid
200 // threads that where created mid critical section
201 // to race to lock their local locks and have the writer
202 // immidiately unlock them
203 // Alternative solution : return s in write_lock and pass it to write_unlock
204 for(uint_fast32_t i = 0; i < last_s; i++) {
[c993b15]205 volatile bool * llock = data[i];
206 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
[7768b8d]207 }
208
209 // Step 2 : release global lock
[c993b15]210 /*paranoid*/ assert(true == write_lock);
211 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
[62502cc4]212
[8fc652e0]213 /* paranoid */ verify( ! __preemption_enabled() );
[7768b8d]214}
215
216//=======================================================================
[9cc3a18]217// Cforall Ready Queue used for scheduling
[b798713]218//=======================================================================
219void ?{}(__ready_queue_t & this) with (this) {
[12daa43]220 #if defined(USE_CPU_WORK_STEALING)
221 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
222 lanes.data = alloc( lanes.count );
223 lanes.tscs = alloc( lanes.count );
224
225 for( idx; (size_t)lanes.count ) {
226 (lanes.data[idx]){};
227 lanes.tscs[idx].tv = rdtscl();
228 }
229 #else
230 lanes.data = 0p;
231 lanes.tscs = 0p;
232 lanes.count = 0;
233 #endif
[b798713]234}
235
236void ^?{}(__ready_queue_t & this) with (this) {
[12daa43]237 #if !defined(USE_CPU_WORK_STEALING)
238 verify( SEQUENTIAL_SHARD == lanes.count );
239 #endif
240
[dca5802]241 free(lanes.data);
[9cc3a18]242 free(lanes.tscs);
[dca5802]243}
244
[64a7146]245//-----------------------------------------------------------------------
[12daa43]246#if defined(USE_CPU_WORK_STEALING)
247 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
248 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
249
250 processor * const proc = kernelTLS().this_processor;
251 const bool external = !push_local || (!proc) || (cltr != proc->cltr);
252
253 const int cpu = __kernel_getcpu();
254 /* paranoid */ verify(cpu >= 0);
255 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
256 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
257
[df7597e0]258 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
259 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
260 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
261 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
262
263 const int start = map.self * READYQ_SHARD_FACTOR;
[12daa43]264 unsigned i;
265 do {
266 unsigned r;
267 if(unlikely(external)) { r = __tls_rand(); }
268 else { r = proc->rdq.its++; }
269 i = start + (r % READYQ_SHARD_FACTOR);
270 // If we can't lock it retry
271 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
272
273 // Actually push it
274 push(lanes.data[i], thrd);
275
276 // Unlock and return
277 __atomic_unlock( &lanes.data[i].lock );
278
279 #if !defined(__CFA_NO_STATISTICS__)
280 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
281 else __tls_stats()->ready.push.local.success++;
282 #endif
283
284 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
285
286 }
287
288 // Pop from the ready queue from a given cluster
289 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
290 /* paranoid */ verify( lanes.count > 0 );
291 /* paranoid */ verify( kernelTLS().this_processor );
292
293 const int cpu = __kernel_getcpu();
294 /* paranoid */ verify(cpu >= 0);
295 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
[df7597e0]296 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
297
298 const cpu_map_entry_t & map = cpu_info.llc_map[cpu];
299 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count);
300 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count);
301 /* paranoid */ verifyf((map.start + map.count) * READYQ_SHARD_FACTOR <= lanes.count, "have %u lanes but map can go up to %u", lanes.count, (map.start + map.count) * READYQ_SHARD_FACTOR);
[12daa43]302
303 processor * const proc = kernelTLS().this_processor;
[df7597e0]304 const int start = map.self * READYQ_SHARD_FACTOR;
[12daa43]305
306 // Did we already have a help target
307 if(proc->rdq.target == -1u) {
308 // if We don't have a
309 unsigned long long min = ts(lanes.data[start]);
310 for(i; READYQ_SHARD_FACTOR) {
311 unsigned long long tsc = ts(lanes.data[start + i]);
312 if(tsc < min) min = tsc;
313 }
314 proc->rdq.cutoff = min;
[df7597e0]315 proc->rdq.target = (map.start * READYQ_SHARD_FACTOR) + (__tls_rand() % (map.count* READYQ_SHARD_FACTOR));
[12daa43]316 }
317 else {
318 const unsigned long long bias = 0; //2_500_000_000;
319 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
320 {
321 unsigned target = proc->rdq.target;
322 proc->rdq.target = -1u;
323 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
324 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
325 proc->rdq.last = target;
326 if(t) return t;
327 }
328 }
329
330 unsigned last = proc->rdq.last;
331 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
332 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
333 if(t) return t;
334 }
335 else {
336 proc->rdq.last = -1u;
337 }
338 }
339
340 for(READYQ_SHARD_FACTOR) {
341 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
342 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
343 }
344
345 // All lanes where empty return 0p
346 return 0p;
347 }
348
349 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
350 processor * const proc = kernelTLS().this_processor;
351 unsigned last = proc->rdq.last;
352
353 unsigned i = __tls_rand() % lanes.count;
354 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
355 }
356 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
357 return search(cltr);
358 }
359#endif
[431cd4f]360#if defined(USE_RELAXED_FIFO)
361 //-----------------------------------------------------------------------
362 // get index from random number with or without bias towards queues
363 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
364 unsigned i;
365 bool local;
366 unsigned rlow = r % BIAS;
367 unsigned rhigh = r / BIAS;
368 if((0 != rlow) && preferred >= 0) {
369 // (BIAS - 1) out of BIAS chances
370 // Use perferred queues
371 i = preferred + (rhigh % READYQ_SHARD_FACTOR);
372 local = true;
373 }
374 else {
375 // 1 out of BIAS chances
376 // Use all queues
377 i = rhigh;
378 local = false;
379 }
380 return [i, local];
381 }
382
[b808625]383 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
[431cd4f]384 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
[1b143de]385
[b808625]386 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
[431cd4f]387 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
[fd1f65e]388
[431cd4f]389 bool local;
390 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
[52769ba]391
[431cd4f]392 // Try to pick a lane and lock it
393 unsigned i;
394 do {
395 // Pick the index of a lane
396 unsigned r = __tls_rand_fwd();
397 [i, local] = idx_from_r(r, preferred);
[772411a]398
[431cd4f]399 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
400
401 #if !defined(__CFA_NO_STATISTICS__)
[d2fadeb]402 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
403 else if(local) __tls_stats()->ready.push.local.attempt++;
404 else __tls_stats()->ready.push.share.attempt++;
[431cd4f]405 #endif
[b798713]406
[431cd4f]407 // If we can't lock it retry
408 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
409
410 // Actually push it
411 push(lanes.data[i], thrd);
412
[b808625]413 // Unlock and return
414 __atomic_unlock( &lanes.data[i].lock );
[431cd4f]415
416 // Mark the current index in the tls rng instance as having an item
417 __tls_rand_advance_bck();
418
419 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
420
421 // Update statistics
[b798713]422 #if !defined(__CFA_NO_STATISTICS__)
[d2fadeb]423 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
424 else if(local) __tls_stats()->ready.push.local.success++;
425 else __tls_stats()->ready.push.share.success++;
[b798713]426 #endif
[431cd4f]427 }
[b798713]428
[431cd4f]429 // Pop from the ready queue from a given cluster
430 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
431 /* paranoid */ verify( lanes.count > 0 );
432 /* paranoid */ verify( kernelTLS().this_processor );
433 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
[b798713]434
[431cd4f]435 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
436 int preferred = kernelTLS().this_processor->rdq.id;
[dca5802]437
438
[431cd4f]439 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
440 for(25) {
441 // Pick two lists at random
442 unsigned ri = __tls_rand_bck();
443 unsigned rj = __tls_rand_bck();
[c426b03]444
[431cd4f]445 unsigned i, j;
446 __attribute__((unused)) bool locali, localj;
447 [i, locali] = idx_from_r(ri, preferred);
448 [j, localj] = idx_from_r(rj, preferred);
[1b143de]449
[431cd4f]450 i %= count;
451 j %= count;
[9cc3a18]452
[431cd4f]453 // try popping from the 2 picked lists
[d2fadeb]454 struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
[431cd4f]455 if(thrd) {
456 return thrd;
457 }
458 }
[13c5e19]459
[431cd4f]460 // All lanes where empty return 0p
461 return 0p;
462 }
[772411a]463
[fc59df78]464 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
465 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
[431cd4f]466 return search(cltr);
467 }
468#endif
469#if defined(USE_WORK_STEALING)
[b808625]470 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
[431cd4f]471 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
[772411a]472
[d3ba775]473 // #define USE_PREFERRED
474 #if !defined(USE_PREFERRED)
[b808625]475 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
[431cd4f]476 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
[d3ba775]477 #else
478 unsigned preferred = thrd->preferred;
[b808625]479 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
[d3ba775]480 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
[772411a]481
[d3ba775]482 unsigned r = preferred % READYQ_SHARD_FACTOR;
483 const unsigned start = preferred - r;
[2b96031]484 #endif
[431cd4f]485
486 // Try to pick a lane and lock it
487 unsigned i;
488 do {
[d2fadeb]489 #if !defined(__CFA_NO_STATISTICS__)
490 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
491 else __tls_stats()->ready.push.local.attempt++;
492 #endif
493
[431cd4f]494 if(unlikely(external)) {
495 i = __tls_rand() % lanes.count;
496 }
497 else {
[d3ba775]498 #if !defined(USE_PREFERRED)
[b808625]499 processor * proc = kernelTLS().this_processor;
500 unsigned r = proc->rdq.its++;
501 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
502 #else
[d3ba775]503 i = start + (r++ % READYQ_SHARD_FACTOR);
504 #endif
505 }
[431cd4f]506 // If we can't lock it retry
507 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
[13c5e19]508
[431cd4f]509 // Actually push it
510 push(lanes.data[i], thrd);
[13c5e19]511
[b808625]512 // Unlock and return
513 __atomic_unlock( &lanes.data[i].lock );
[431cd4f]514
[d2fadeb]515 #if !defined(__CFA_NO_STATISTICS__)
516 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
517 else __tls_stats()->ready.push.local.success++;
518 #endif
519
[431cd4f]520 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
[13c5e19]521 }
522
[431cd4f]523 // Pop from the ready queue from a given cluster
524 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
525 /* paranoid */ verify( lanes.count > 0 );
526 /* paranoid */ verify( kernelTLS().this_processor );
527 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
528
529 processor * proc = kernelTLS().this_processor;
530
531 if(proc->rdq.target == -1u) {
[1680072]532 unsigned long long min = ts(lanes.data[proc->rdq.id]);
533 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
534 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
535 if(tsc < min) min = tsc;
536 }
537 proc->rdq.cutoff = min;
[f55d54d]538 proc->rdq.target = __tls_rand() % lanes.count;
[431cd4f]539 }
[341aa39]540 else {
541 unsigned target = proc->rdq.target;
[431cd4f]542 proc->rdq.target = -1u;
[9cac0da]543 const unsigned long long bias = 0; //2_500_000_000;
544 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
545 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
[341aa39]546 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
547 if(t) return t;
548 }
[431cd4f]549 }
[13c5e19]550
[431cd4f]551 for(READYQ_SHARD_FACTOR) {
[f55d54d]552 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
[d2fadeb]553 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
[431cd4f]554 }
555 return 0p;
[1eb239e4]556 }
557
[431cd4f]558 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
[fc59df78]559 unsigned i = __tls_rand() % lanes.count;
560 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
561 }
[431cd4f]562
[fc59df78]563 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
[431cd4f]564 return search(cltr);
565 }
566#endif
[1eb239e4]567
[9cc3a18]568//=======================================================================
569// Various Ready Queue utilities
570//=======================================================================
571// these function work the same or almost the same
572// whether they are using work-stealing or relaxed fifo scheduling
[1eb239e4]573
[9cc3a18]574//-----------------------------------------------------------------------
575// try to pop from a lane given by index w
[d2fadeb]576static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
577 __STATS( stats.attempt++; )
578
[dca5802]579 // Get relevant elements locally
580 __intrusive_lane_t & lane = lanes.data[w];
581
[b798713]582 // If list looks empty retry
[d2fadeb]583 if( is_empty(lane) ) {
584 return 0p;
585 }
[b798713]586
587 // If we can't get the lock retry
[d2fadeb]588 if( !__atomic_try_acquire(&lane.lock) ) {
589 return 0p;
590 }
[b798713]591
592 // If list is empty, unlock and retry
[dca5802]593 if( is_empty(lane) ) {
594 __atomic_unlock(&lane.lock);
[b798713]595 return 0p;
596 }
597
598 // Actually pop the list
[504a7dc]599 struct $thread * thrd;
[f302d80]600 unsigned long long tsv;
601 [thrd, tsv] = pop(lane);
[b798713]602
[dca5802]603 /* paranoid */ verify(thrd);
[78ea291]604 /* paranoid */ verify(tsv);
[dca5802]605 /* paranoid */ verify(lane.lock);
[b798713]606
607 // Unlock and return
[dca5802]608 __atomic_unlock(&lane.lock);
[b798713]609
[dca5802]610 // Update statistics
[d2fadeb]611 __STATS( stats.success++; )
[b798713]612
[431cd4f]613 #if defined(USE_WORK_STEALING)
[f302d80]614 lanes.tscs[w].tv = tsv;
[9cc3a18]615 #endif
[d72c074]616
[d3ba775]617 thrd->preferred = w;
618
[dca5802]619 // return the popped thread
[b798713]620 return thrd;
621}
[04b5cef]622
[9cc3a18]623//-----------------------------------------------------------------------
624// try to pop from any lanes making sure you don't miss any threads push
625// before the start of the function
[431cd4f]626static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
[9cc3a18]627 /* paranoid */ verify( lanes.count > 0 );
628 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
629 unsigned offset = __tls_rand();
630 for(i; count) {
631 unsigned idx = (offset + i) % count;
[d2fadeb]632 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
[9cc3a18]633 if(thrd) {
634 return thrd;
635 }
[13c5e19]636 }
[9cc3a18]637
638 // All lanes where empty return 0p
639 return 0p;
[b798713]640}
641
642//-----------------------------------------------------------------------
[9cc3a18]643// Check that all the intrusive queues in the data structure are still consistent
[b798713]644static void check( __ready_queue_t & q ) with (q) {
[d3ba775]645 #if defined(__CFA_WITH_VERIFY__)
[b798713]646 {
[dca5802]647 for( idx ; lanes.count ) {
648 __intrusive_lane_t & sl = lanes.data[idx];
649 assert(!lanes.data[idx].lock);
[b798713]650
[2b96031]651 if(is_empty(sl)) {
652 assert( sl.anchor.next == 0p );
653 assert( sl.anchor.ts == 0 );
654 assert( mock_head(sl) == sl.prev );
655 } else {
656 assert( sl.anchor.next != 0p );
657 assert( sl.anchor.ts != 0 );
658 assert( mock_head(sl) != sl.prev );
659 }
[b798713]660 }
661 }
662 #endif
663}
664
[9cc3a18]665//-----------------------------------------------------------------------
666// Given 2 indexes, pick the list with the oldest push an try to pop from it
[d2fadeb]667static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
[9cc3a18]668 // Pick the bet list
669 int w = i;
670 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
671 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
672 }
673
[d2fadeb]674 return try_pop(cltr, w __STATS(, stats));
[9cc3a18]675}
676
[b798713]677// Call this function of the intrusive list was moved using memcpy
[dca5802]678// fixes the list so that the pointers back to anchors aren't left dangling
679static inline void fix(__intrusive_lane_t & ll) {
[2b96031]680 if(is_empty(ll)) {
681 verify(ll.anchor.next == 0p);
682 ll.prev = mock_head(ll);
683 }
[b798713]684}
685
[69914cbc]686static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
[a017ee7]687 processor * it = &list`first;
688 for(unsigned i = 0; i < count; i++) {
689 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
[431cd4f]690 it->rdq.id = value;
691 it->rdq.target = -1u;
[9cc3a18]692 value += READYQ_SHARD_FACTOR;
[a017ee7]693 it = &(*it)`next;
694 }
695}
696
[9cc3a18]697static void reassign_cltr_id(struct cluster * cltr) {
[a017ee7]698 unsigned preferred = 0;
[9cc3a18]699 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
700 assign_list(preferred, cltr->procs.idles , cltr->procs.idle );
[a017ee7]701}
702
[431cd4f]703static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
704 #if defined(USE_WORK_STEALING)
705 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
706 for(i; lanes.count) {
[9cac0da]707 unsigned long long tsc = ts(lanes.data[i]);
708 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
[431cd4f]709 }
710 #endif
711}
712
[12daa43]713#if defined(USE_CPU_WORK_STEALING)
714 // ready_queue size is fixed in this case
715 void ready_queue_grow(struct cluster * cltr) {}
716 void ready_queue_shrink(struct cluster * cltr) {}
717#else
718 // Grow the ready queue
719 void ready_queue_grow(struct cluster * cltr) {
720 size_t ncount;
721 int target = cltr->procs.total;
722
723 /* paranoid */ verify( ready_mutate_islocked() );
724 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
725
726 // Make sure that everything is consistent
727 /* paranoid */ check( cltr->ready_queue );
728
729 // grow the ready queue
730 with( cltr->ready_queue ) {
731 // Find new count
732 // Make sure we always have atleast 1 list
733 if(target >= 2) {
734 ncount = target * READYQ_SHARD_FACTOR;
735 } else {
736 ncount = SEQUENTIAL_SHARD;
737 }
[b798713]738
[12daa43]739 // Allocate new array (uses realloc and memcpies the data)
740 lanes.data = alloc( ncount, lanes.data`realloc );
[b798713]741
[12daa43]742 // Fix the moved data
743 for( idx; (size_t)lanes.count ) {
744 fix(lanes.data[idx]);
745 }
[b798713]746
[12daa43]747 // Construct new data
748 for( idx; (size_t)lanes.count ~ ncount) {
749 (lanes.data[idx]){};
750 }
[b798713]751
[12daa43]752 // Update original
753 lanes.count = ncount;
754 }
[b798713]755
[12daa43]756 fix_times(cltr);
[9cc3a18]757
[12daa43]758 reassign_cltr_id(cltr);
[a017ee7]759
[12daa43]760 // Make sure that everything is consistent
761 /* paranoid */ check( cltr->ready_queue );
[dca5802]762
[12daa43]763 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
[dca5802]764
[12daa43]765 /* paranoid */ verify( ready_mutate_islocked() );
766 }
[b798713]767
[12daa43]768 // Shrink the ready queue
769 void ready_queue_shrink(struct cluster * cltr) {
770 /* paranoid */ verify( ready_mutate_islocked() );
771 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
[dca5802]772
[12daa43]773 // Make sure that everything is consistent
774 /* paranoid */ check( cltr->ready_queue );
[dca5802]775
[12daa43]776 int target = cltr->procs.total;
[a017ee7]777
[12daa43]778 with( cltr->ready_queue ) {
779 // Remember old count
780 size_t ocount = lanes.count;
[b798713]781
[12daa43]782 // Find new count
783 // Make sure we always have atleast 1 list
784 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
785 /* paranoid */ verify( ocount >= lanes.count );
786 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
[dca5802]787
[12daa43]788 // for printing count the number of displaced threads
789 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
790 __attribute__((unused)) size_t displaced = 0;
791 #endif
[b798713]792
[12daa43]793 // redistribute old data
794 for( idx; (size_t)lanes.count ~ ocount) {
795 // Lock is not strictly needed but makes checking invariants much easier
796 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
797 verify(locked);
[dca5802]798
[12daa43]799 // As long as we can pop from this lane to push the threads somewhere else in the queue
800 while(!is_empty(lanes.data[idx])) {
801 struct $thread * thrd;
802 unsigned long long _;
803 [thrd, _] = pop(lanes.data[idx]);
[dca5802]804
[12daa43]805 push(cltr, thrd, true);
[dca5802]806
[12daa43]807 // for printing count the number of displaced threads
808 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
809 displaced++;
810 #endif
811 }
[b798713]812
[12daa43]813 // Unlock the lane
814 __atomic_unlock(&lanes.data[idx].lock);
[b798713]815
[12daa43]816 // TODO print the queue statistics here
[b798713]817
[12daa43]818 ^(lanes.data[idx]){};
819 }
[b798713]820
[12daa43]821 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
[c84b4be]822
[12daa43]823 // Allocate new array (uses realloc and memcpies the data)
824 lanes.data = alloc( lanes.count, lanes.data`realloc );
[b798713]825
[12daa43]826 // Fix the moved data
827 for( idx; (size_t)lanes.count ) {
828 fix(lanes.data[idx]);
829 }
[b798713]830 }
831
[12daa43]832 fix_times(cltr);
[9cc3a18]833
[12daa43]834 reassign_cltr_id(cltr);
[a017ee7]835
[12daa43]836 // Make sure that everything is consistent
837 /* paranoid */ check( cltr->ready_queue );
[dca5802]838
[12daa43]839 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
840 /* paranoid */ verify( ready_mutate_islocked() );
841 }
842#endif
[8cd5434]843
844#if !defined(__CFA_NO_STATISTICS__)
845 unsigned cnt(const __ready_queue_t & this, unsigned idx) {
846 /* paranoid */ verify(this.lanes.count > idx);
847 return this.lanes.data[idx].cnt;
848 }
849#endif
Note: See TracBrowser for help on using the repository browser.