source: libcfa/src/concurrency/ready_queue.cfa@ cf85f96

ADT ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since cf85f96 was 12daa43, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added a define switch for using cpu workstealing.
Not Fully implemented.

  • Property mode set to 100644
File size: 26.5 KB
RevLine 
[7768b8d]1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
[43784ac]17#define _GNU_SOURCE
18
[1b143de]19// #define __CFA_DEBUG_PRINT_READY_QUEUE__
[7768b8d]20
[1eb239e4]21
[12daa43]22// #define USE_RELAXED_FIFO
[9cc3a18]23// #define USE_WORK_STEALING
24
[7768b8d]25#include "bits/defs.hfa"
[12daa43]26#include "device/cpu.hfa"
[7768b8d]27#include "kernel_private.hfa"
28
29#include "stdlib.hfa"
[61d7bec]30#include "math.hfa"
[7768b8d]31
[04b5cef]32#include <unistd.h>
33
[13c5e19]34#include "ready_subqueue.hfa"
35
[7768b8d]36static const size_t cache_line_size = 64;
37
[d2fadeb]38#if !defined(__CFA_NO_STATISTICS__)
39 #define __STATS(...) __VA_ARGS__
40#else
41 #define __STATS(...)
42#endif
43
[dca5802]44// No overriden function, no environment variable, no define
45// fall back to a magic number
46#ifndef __CFA_MAX_PROCESSORS__
[b388ee81]47 #define __CFA_MAX_PROCESSORS__ 1024
[dca5802]48#endif
[7768b8d]49
[12daa43]50#if defined(USE_CPU_WORK_STEALING)
51 #define READYQ_SHARD_FACTOR 2
52#elif defined(USE_RELAXED_FIFO)
[9cc3a18]53 #define BIAS 4
54 #define READYQ_SHARD_FACTOR 4
[5f6a172]55 #define SEQUENTIAL_SHARD 1
[9cc3a18]56#elif defined(USE_WORK_STEALING)
57 #define READYQ_SHARD_FACTOR 2
[5f6a172]58 #define SEQUENTIAL_SHARD 2
[9cc3a18]59#else
60 #error no scheduling strategy selected
61#endif
62
[d2fadeb]63static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
64static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
[431cd4f]65static inline struct $thread * search(struct cluster * cltr);
[d2fadeb]66static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
[9cc3a18]67
[04b5cef]68
[dca5802]69// returns the maximum number of processors the RWLock support
[7768b8d]70__attribute__((weak)) unsigned __max_processors() {
71 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
72 if(!max_cores_s) {
[504a7dc]73 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
[dca5802]74 return __CFA_MAX_PROCESSORS__;
[7768b8d]75 }
76
77 char * endptr = 0p;
78 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
79 if(max_cores_l < 1 || max_cores_l > 65535) {
[504a7dc]80 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
[dca5802]81 return __CFA_MAX_PROCESSORS__;
[7768b8d]82 }
83 if('\0' != *endptr) {
[504a7dc]84 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
[dca5802]85 return __CFA_MAX_PROCESSORS__;
[7768b8d]86 }
87
88 return max_cores_l;
89}
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
[b388ee81]94void ?{}(__scheduler_RWLock_t & this) {
[7768b8d]95 this.max = __max_processors();
96 this.alloc = 0;
97 this.ready = 0;
98 this.data = alloc(this.max);
[c993b15]99 this.write_lock = false;
[7768b8d]100
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
103
104}
[b388ee81]105void ^?{}(__scheduler_RWLock_t & this) {
[7768b8d]106 free(this.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
[c993b15]112unsigned register_proc_id( void ) with(*__scheduler_lock) {
[b388ee81]113 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
[c993b15]114 bool * handle = (bool *)&kernelTLS().sched_lock;
[504a7dc]115
[7768b8d]116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
[c993b15]121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
[7768b8d]130 }
131 }
132
[b388ee81]133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
[b388ee81]137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
[7768b8d]138
139 // Step - 3 : Mark space as used and then publish it.
[c993b15]140 data[n] = handle;
[fd9b524]141 while() {
[7768b8d]142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
[fd9b524]146 Pause();
[7768b8d]147 }
148
[1b143de]149 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
[504a7dc]150
[7768b8d]151 // Return new spot.
[c993b15]152 /* paranoid */ verify(n < ready);
153 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
154 return n;
[7768b8d]155}
156
[c993b15]157void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
158 /* paranoid */ verify(id < ready);
159 /* paranoid */ verify(id == kernelTLS().sched_id);
160 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
161
162 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
163
164 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
[504a7dc]165
166 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
[7768b8d]167}
168
169//-----------------------------------------------------------------------
170// Writer side : acquire when changing the ready queue, e.g. adding more
171// queues or removing them.
[b388ee81]172uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
[8fc652e0]173 /* paranoid */ verify( ! __preemption_enabled() );
[c993b15]174 /* paranoid */ verify( ! kernelTLS().sched_lock );
[62502cc4]175
[7768b8d]176 // Step 1 : lock global lock
177 // It is needed to avoid processors that register mid Critical-Section
178 // to simply lock their own lock and enter.
[c993b15]179 __atomic_acquire( &write_lock );
[7768b8d]180
181 // Step 2 : lock per-proc lock
182 // Processors that are currently being registered aren't counted
183 // but can't be in read_lock or in the critical section.
184 // All other processors are counted
185 uint_fast32_t s = ready;
186 for(uint_fast32_t i = 0; i < s; i++) {
[c993b15]187 volatile bool * llock = data[i];
188 if(llock) __atomic_acquire( llock );
[7768b8d]189 }
190
[8fc652e0]191 /* paranoid */ verify( ! __preemption_enabled() );
[7768b8d]192 return s;
193}
194
[b388ee81]195void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
[8fc652e0]196 /* paranoid */ verify( ! __preemption_enabled() );
[62502cc4]197
[7768b8d]198 // Step 1 : release local locks
199 // This must be done while the global lock is held to avoid
200 // threads that where created mid critical section
201 // to race to lock their local locks and have the writer
202 // immidiately unlock them
203 // Alternative solution : return s in write_lock and pass it to write_unlock
204 for(uint_fast32_t i = 0; i < last_s; i++) {
[c993b15]205 volatile bool * llock = data[i];
206 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
[7768b8d]207 }
208
209 // Step 2 : release global lock
[c993b15]210 /*paranoid*/ assert(true == write_lock);
211 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
[62502cc4]212
[8fc652e0]213 /* paranoid */ verify( ! __preemption_enabled() );
[7768b8d]214}
215
216//=======================================================================
[9cc3a18]217// Cforall Ready Queue used for scheduling
[b798713]218//=======================================================================
219void ?{}(__ready_queue_t & this) with (this) {
[12daa43]220 #if defined(USE_CPU_WORK_STEALING)
221 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
222 lanes.data = alloc( lanes.count );
223 lanes.tscs = alloc( lanes.count );
224
225 for( idx; (size_t)lanes.count ) {
226 (lanes.data[idx]){};
227 lanes.tscs[idx].tv = rdtscl();
228 }
229 #else
230 lanes.data = 0p;
231 lanes.tscs = 0p;
232 lanes.count = 0;
233 #endif
[b798713]234}
235
236void ^?{}(__ready_queue_t & this) with (this) {
[12daa43]237 #if !defined(USE_CPU_WORK_STEALING)
238 verify( SEQUENTIAL_SHARD == lanes.count );
239 #endif
240
[dca5802]241 free(lanes.data);
[9cc3a18]242 free(lanes.tscs);
[dca5802]243}
244
[64a7146]245//-----------------------------------------------------------------------
[12daa43]246#if defined(USE_CPU_WORK_STEALING)
247 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
248 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
249
250 processor * const proc = kernelTLS().this_processor;
251 const bool external = !push_local || (!proc) || (cltr != proc->cltr);
252
253 const int cpu = __kernel_getcpu();
254 /* paranoid */ verify(cpu >= 0);
255 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
256 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
257
258 const int start = cpu * READYQ_SHARD_FACTOR;
259 unsigned i;
260 do {
261 unsigned r;
262 if(unlikely(external)) { r = __tls_rand(); }
263 else { r = proc->rdq.its++; }
264 i = start + (r % READYQ_SHARD_FACTOR);
265 // If we can't lock it retry
266 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
267
268 // Actually push it
269 push(lanes.data[i], thrd);
270
271 // Unlock and return
272 __atomic_unlock( &lanes.data[i].lock );
273
274 #if !defined(__CFA_NO_STATISTICS__)
275 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
276 else __tls_stats()->ready.push.local.success++;
277 #endif
278
279 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
280
281 }
282
283 // Pop from the ready queue from a given cluster
284 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
285 /* paranoid */ verify( lanes.count > 0 );
286 /* paranoid */ verify( kernelTLS().this_processor );
287
288 const int cpu = __kernel_getcpu();
289 /* paranoid */ verify(cpu >= 0);
290 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
291 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
292
293 processor * const proc = kernelTLS().this_processor;
294 const int start = cpu * READYQ_SHARD_FACTOR;
295
296 // Did we already have a help target
297 if(proc->rdq.target == -1u) {
298 // if We don't have a
299 unsigned long long min = ts(lanes.data[start]);
300 for(i; READYQ_SHARD_FACTOR) {
301 unsigned long long tsc = ts(lanes.data[start + i]);
302 if(tsc < min) min = tsc;
303 }
304 proc->rdq.cutoff = min;
305 proc->rdq.target = __tls_rand() % lanes.count;
306 }
307 else {
308 const unsigned long long bias = 0; //2_500_000_000;
309 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
310 {
311 unsigned target = proc->rdq.target;
312 proc->rdq.target = -1u;
313 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
314 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
315 proc->rdq.last = target;
316 if(t) return t;
317 }
318 }
319
320 unsigned last = proc->rdq.last;
321 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
322 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
323 if(t) return t;
324 }
325 else {
326 proc->rdq.last = -1u;
327 }
328 }
329
330 for(READYQ_SHARD_FACTOR) {
331 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
332 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
333 }
334
335 // All lanes where empty return 0p
336 return 0p;
337 }
338
339 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
340 processor * const proc = kernelTLS().this_processor;
341 unsigned last = proc->rdq.last;
342
343 unsigned i = __tls_rand() % lanes.count;
344 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
345 }
346 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
347 return search(cltr);
348 }
349#endif
[431cd4f]350#if defined(USE_RELAXED_FIFO)
351 //-----------------------------------------------------------------------
352 // get index from random number with or without bias towards queues
353 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
354 unsigned i;
355 bool local;
356 unsigned rlow = r % BIAS;
357 unsigned rhigh = r / BIAS;
358 if((0 != rlow) && preferred >= 0) {
359 // (BIAS - 1) out of BIAS chances
360 // Use perferred queues
361 i = preferred + (rhigh % READYQ_SHARD_FACTOR);
362 local = true;
363 }
364 else {
365 // 1 out of BIAS chances
366 // Use all queues
367 i = rhigh;
368 local = false;
369 }
370 return [i, local];
371 }
372
[b808625]373 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
[431cd4f]374 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
[1b143de]375
[b808625]376 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
[431cd4f]377 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
[fd1f65e]378
[431cd4f]379 bool local;
380 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
[52769ba]381
[431cd4f]382 // Try to pick a lane and lock it
383 unsigned i;
384 do {
385 // Pick the index of a lane
386 unsigned r = __tls_rand_fwd();
387 [i, local] = idx_from_r(r, preferred);
[772411a]388
[431cd4f]389 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
390
391 #if !defined(__CFA_NO_STATISTICS__)
[d2fadeb]392 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
393 else if(local) __tls_stats()->ready.push.local.attempt++;
394 else __tls_stats()->ready.push.share.attempt++;
[431cd4f]395 #endif
[b798713]396
[431cd4f]397 // If we can't lock it retry
398 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
399
400 // Actually push it
401 push(lanes.data[i], thrd);
402
[b808625]403 // Unlock and return
404 __atomic_unlock( &lanes.data[i].lock );
[431cd4f]405
406 // Mark the current index in the tls rng instance as having an item
407 __tls_rand_advance_bck();
408
409 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
410
411 // Update statistics
[b798713]412 #if !defined(__CFA_NO_STATISTICS__)
[d2fadeb]413 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
414 else if(local) __tls_stats()->ready.push.local.success++;
415 else __tls_stats()->ready.push.share.success++;
[b798713]416 #endif
[431cd4f]417 }
[b798713]418
[431cd4f]419 // Pop from the ready queue from a given cluster
420 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
421 /* paranoid */ verify( lanes.count > 0 );
422 /* paranoid */ verify( kernelTLS().this_processor );
423 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
[b798713]424
[431cd4f]425 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
426 int preferred = kernelTLS().this_processor->rdq.id;
[dca5802]427
428
[431cd4f]429 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
430 for(25) {
431 // Pick two lists at random
432 unsigned ri = __tls_rand_bck();
433 unsigned rj = __tls_rand_bck();
[c426b03]434
[431cd4f]435 unsigned i, j;
436 __attribute__((unused)) bool locali, localj;
437 [i, locali] = idx_from_r(ri, preferred);
438 [j, localj] = idx_from_r(rj, preferred);
[1b143de]439
[431cd4f]440 i %= count;
441 j %= count;
[9cc3a18]442
[431cd4f]443 // try popping from the 2 picked lists
[d2fadeb]444 struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
[431cd4f]445 if(thrd) {
446 return thrd;
447 }
448 }
[13c5e19]449
[431cd4f]450 // All lanes where empty return 0p
451 return 0p;
452 }
[772411a]453
[fc59df78]454 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
455 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
[431cd4f]456 return search(cltr);
457 }
458#endif
459#if defined(USE_WORK_STEALING)
[b808625]460 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
[431cd4f]461 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
[772411a]462
[d3ba775]463 // #define USE_PREFERRED
464 #if !defined(USE_PREFERRED)
[b808625]465 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
[431cd4f]466 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
[d3ba775]467 #else
468 unsigned preferred = thrd->preferred;
[b808625]469 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
[d3ba775]470 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
[772411a]471
[d3ba775]472 unsigned r = preferred % READYQ_SHARD_FACTOR;
473 const unsigned start = preferred - r;
[2b96031]474 #endif
[431cd4f]475
476 // Try to pick a lane and lock it
477 unsigned i;
478 do {
[d2fadeb]479 #if !defined(__CFA_NO_STATISTICS__)
480 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
481 else __tls_stats()->ready.push.local.attempt++;
482 #endif
483
[431cd4f]484 if(unlikely(external)) {
485 i = __tls_rand() % lanes.count;
486 }
487 else {
[d3ba775]488 #if !defined(USE_PREFERRED)
[b808625]489 processor * proc = kernelTLS().this_processor;
490 unsigned r = proc->rdq.its++;
491 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
492 #else
[d3ba775]493 i = start + (r++ % READYQ_SHARD_FACTOR);
494 #endif
495 }
[431cd4f]496 // If we can't lock it retry
497 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
[13c5e19]498
[431cd4f]499 // Actually push it
500 push(lanes.data[i], thrd);
[13c5e19]501
[b808625]502 // Unlock and return
503 __atomic_unlock( &lanes.data[i].lock );
[431cd4f]504
[d2fadeb]505 #if !defined(__CFA_NO_STATISTICS__)
506 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
507 else __tls_stats()->ready.push.local.success++;
508 #endif
509
[431cd4f]510 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
[13c5e19]511 }
512
[431cd4f]513 // Pop from the ready queue from a given cluster
514 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
515 /* paranoid */ verify( lanes.count > 0 );
516 /* paranoid */ verify( kernelTLS().this_processor );
517 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
518
519 processor * proc = kernelTLS().this_processor;
520
521 if(proc->rdq.target == -1u) {
[1680072]522 unsigned long long min = ts(lanes.data[proc->rdq.id]);
523 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
524 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
525 if(tsc < min) min = tsc;
526 }
527 proc->rdq.cutoff = min;
[f55d54d]528 proc->rdq.target = __tls_rand() % lanes.count;
[431cd4f]529 }
[341aa39]530 else {
531 unsigned target = proc->rdq.target;
[431cd4f]532 proc->rdq.target = -1u;
[9cac0da]533 const unsigned long long bias = 0; //2_500_000_000;
534 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
535 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
[341aa39]536 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
537 if(t) return t;
538 }
[431cd4f]539 }
[13c5e19]540
[431cd4f]541 for(READYQ_SHARD_FACTOR) {
[f55d54d]542 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
[d2fadeb]543 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
[431cd4f]544 }
545 return 0p;
[1eb239e4]546 }
547
[431cd4f]548 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
[fc59df78]549 unsigned i = __tls_rand() % lanes.count;
550 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
551 }
[431cd4f]552
[fc59df78]553 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
[431cd4f]554 return search(cltr);
555 }
556#endif
[1eb239e4]557
[9cc3a18]558//=======================================================================
559// Various Ready Queue utilities
560//=======================================================================
561// these function work the same or almost the same
562// whether they are using work-stealing or relaxed fifo scheduling
[1eb239e4]563
[9cc3a18]564//-----------------------------------------------------------------------
565// try to pop from a lane given by index w
[d2fadeb]566static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
567 __STATS( stats.attempt++; )
568
[dca5802]569 // Get relevant elements locally
570 __intrusive_lane_t & lane = lanes.data[w];
571
[b798713]572 // If list looks empty retry
[d2fadeb]573 if( is_empty(lane) ) {
574 return 0p;
575 }
[b798713]576
577 // If we can't get the lock retry
[d2fadeb]578 if( !__atomic_try_acquire(&lane.lock) ) {
579 return 0p;
580 }
[b798713]581
582 // If list is empty, unlock and retry
[dca5802]583 if( is_empty(lane) ) {
584 __atomic_unlock(&lane.lock);
[b798713]585 return 0p;
586 }
587
588 // Actually pop the list
[504a7dc]589 struct $thread * thrd;
[f302d80]590 unsigned long long tsv;
591 [thrd, tsv] = pop(lane);
[b798713]592
[dca5802]593 /* paranoid */ verify(thrd);
[78ea291]594 /* paranoid */ verify(tsv);
[dca5802]595 /* paranoid */ verify(lane.lock);
[b798713]596
597 // Unlock and return
[dca5802]598 __atomic_unlock(&lane.lock);
[b798713]599
[dca5802]600 // Update statistics
[d2fadeb]601 __STATS( stats.success++; )
[b798713]602
[431cd4f]603 #if defined(USE_WORK_STEALING)
[f302d80]604 lanes.tscs[w].tv = tsv;
[9cc3a18]605 #endif
[d72c074]606
[d3ba775]607 thrd->preferred = w;
608
[dca5802]609 // return the popped thread
[b798713]610 return thrd;
611}
[04b5cef]612
[9cc3a18]613//-----------------------------------------------------------------------
614// try to pop from any lanes making sure you don't miss any threads push
615// before the start of the function
[431cd4f]616static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
[9cc3a18]617 /* paranoid */ verify( lanes.count > 0 );
618 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
619 unsigned offset = __tls_rand();
620 for(i; count) {
621 unsigned idx = (offset + i) % count;
[d2fadeb]622 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
[9cc3a18]623 if(thrd) {
624 return thrd;
625 }
[13c5e19]626 }
[9cc3a18]627
628 // All lanes where empty return 0p
629 return 0p;
[b798713]630}
631
632//-----------------------------------------------------------------------
[9cc3a18]633// Check that all the intrusive queues in the data structure are still consistent
[b798713]634static void check( __ready_queue_t & q ) with (q) {
[d3ba775]635 #if defined(__CFA_WITH_VERIFY__)
[b798713]636 {
[dca5802]637 for( idx ; lanes.count ) {
638 __intrusive_lane_t & sl = lanes.data[idx];
639 assert(!lanes.data[idx].lock);
[b798713]640
[2b96031]641 if(is_empty(sl)) {
642 assert( sl.anchor.next == 0p );
643 assert( sl.anchor.ts == 0 );
644 assert( mock_head(sl) == sl.prev );
645 } else {
646 assert( sl.anchor.next != 0p );
647 assert( sl.anchor.ts != 0 );
648 assert( mock_head(sl) != sl.prev );
649 }
[b798713]650 }
651 }
652 #endif
653}
654
[9cc3a18]655//-----------------------------------------------------------------------
656// Given 2 indexes, pick the list with the oldest push an try to pop from it
[d2fadeb]657static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
[9cc3a18]658 // Pick the bet list
659 int w = i;
660 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
661 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
662 }
663
[d2fadeb]664 return try_pop(cltr, w __STATS(, stats));
[9cc3a18]665}
666
[b798713]667// Call this function of the intrusive list was moved using memcpy
[dca5802]668// fixes the list so that the pointers back to anchors aren't left dangling
669static inline void fix(__intrusive_lane_t & ll) {
[2b96031]670 if(is_empty(ll)) {
671 verify(ll.anchor.next == 0p);
672 ll.prev = mock_head(ll);
673 }
[b798713]674}
675
[69914cbc]676static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
[a017ee7]677 processor * it = &list`first;
678 for(unsigned i = 0; i < count; i++) {
679 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
[431cd4f]680 it->rdq.id = value;
681 it->rdq.target = -1u;
[9cc3a18]682 value += READYQ_SHARD_FACTOR;
[a017ee7]683 it = &(*it)`next;
684 }
685}
686
[9cc3a18]687static void reassign_cltr_id(struct cluster * cltr) {
[a017ee7]688 unsigned preferred = 0;
[9cc3a18]689 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
690 assign_list(preferred, cltr->procs.idles , cltr->procs.idle );
[a017ee7]691}
692
[431cd4f]693static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
694 #if defined(USE_WORK_STEALING)
695 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
696 for(i; lanes.count) {
[9cac0da]697 unsigned long long tsc = ts(lanes.data[i]);
698 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
[431cd4f]699 }
700 #endif
701}
702
[12daa43]703#if defined(USE_CPU_WORK_STEALING)
704 // ready_queue size is fixed in this case
705 void ready_queue_grow(struct cluster * cltr) {}
706 void ready_queue_shrink(struct cluster * cltr) {}
707#else
708 // Grow the ready queue
709 void ready_queue_grow(struct cluster * cltr) {
710 size_t ncount;
711 int target = cltr->procs.total;
712
713 /* paranoid */ verify( ready_mutate_islocked() );
714 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
715
716 // Make sure that everything is consistent
717 /* paranoid */ check( cltr->ready_queue );
718
719 // grow the ready queue
720 with( cltr->ready_queue ) {
721 // Find new count
722 // Make sure we always have atleast 1 list
723 if(target >= 2) {
724 ncount = target * READYQ_SHARD_FACTOR;
725 } else {
726 ncount = SEQUENTIAL_SHARD;
727 }
[b798713]728
[12daa43]729 // Allocate new array (uses realloc and memcpies the data)
730 lanes.data = alloc( ncount, lanes.data`realloc );
[b798713]731
[12daa43]732 // Fix the moved data
733 for( idx; (size_t)lanes.count ) {
734 fix(lanes.data[idx]);
735 }
[b798713]736
[12daa43]737 // Construct new data
738 for( idx; (size_t)lanes.count ~ ncount) {
739 (lanes.data[idx]){};
740 }
[b798713]741
[12daa43]742 // Update original
743 lanes.count = ncount;
744 }
[b798713]745
[12daa43]746 fix_times(cltr);
[9cc3a18]747
[12daa43]748 reassign_cltr_id(cltr);
[a017ee7]749
[12daa43]750 // Make sure that everything is consistent
751 /* paranoid */ check( cltr->ready_queue );
[dca5802]752
[12daa43]753 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
[dca5802]754
[12daa43]755 /* paranoid */ verify( ready_mutate_islocked() );
756 }
[b798713]757
[12daa43]758 // Shrink the ready queue
759 void ready_queue_shrink(struct cluster * cltr) {
760 /* paranoid */ verify( ready_mutate_islocked() );
761 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
[dca5802]762
[12daa43]763 // Make sure that everything is consistent
764 /* paranoid */ check( cltr->ready_queue );
[dca5802]765
[12daa43]766 int target = cltr->procs.total;
[a017ee7]767
[12daa43]768 with( cltr->ready_queue ) {
769 // Remember old count
770 size_t ocount = lanes.count;
[b798713]771
[12daa43]772 // Find new count
773 // Make sure we always have atleast 1 list
774 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
775 /* paranoid */ verify( ocount >= lanes.count );
776 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
[dca5802]777
[12daa43]778 // for printing count the number of displaced threads
779 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
780 __attribute__((unused)) size_t displaced = 0;
781 #endif
[b798713]782
[12daa43]783 // redistribute old data
784 for( idx; (size_t)lanes.count ~ ocount) {
785 // Lock is not strictly needed but makes checking invariants much easier
786 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
787 verify(locked);
[dca5802]788
[12daa43]789 // As long as we can pop from this lane to push the threads somewhere else in the queue
790 while(!is_empty(lanes.data[idx])) {
791 struct $thread * thrd;
792 unsigned long long _;
793 [thrd, _] = pop(lanes.data[idx]);
[dca5802]794
[12daa43]795 push(cltr, thrd, true);
[dca5802]796
[12daa43]797 // for printing count the number of displaced threads
798 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
799 displaced++;
800 #endif
801 }
[b798713]802
[12daa43]803 // Unlock the lane
804 __atomic_unlock(&lanes.data[idx].lock);
[b798713]805
[12daa43]806 // TODO print the queue statistics here
[b798713]807
[12daa43]808 ^(lanes.data[idx]){};
809 }
[b798713]810
[12daa43]811 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
[c84b4be]812
[12daa43]813 // Allocate new array (uses realloc and memcpies the data)
814 lanes.data = alloc( lanes.count, lanes.data`realloc );
[b798713]815
[12daa43]816 // Fix the moved data
817 for( idx; (size_t)lanes.count ) {
818 fix(lanes.data[idx]);
819 }
[b798713]820 }
821
[12daa43]822 fix_times(cltr);
[9cc3a18]823
[12daa43]824 reassign_cltr_id(cltr);
[a017ee7]825
[12daa43]826 // Make sure that everything is consistent
827 /* paranoid */ check( cltr->ready_queue );
[dca5802]828
[12daa43]829 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
830 /* paranoid */ verify( ready_mutate_islocked() );
831 }
832#endif
[8cd5434]833
834#if !defined(__CFA_NO_STATISTICS__)
835 unsigned cnt(const __ready_queue_t & this, unsigned idx) {
836 /* paranoid */ verify(this.lanes.count > idx);
837 return this.lanes.data[idx].cnt;
838 }
839#endif
Note: See TracBrowser for help on using the repository browser.