source: libcfa/src/concurrency/ready_queue.cfa@ cf85f96

ADT ast-experimental enum forall-pointer-decay jacob/cs343-translation new-ast-unique-expr pthread-emulation qualifiedEnum
Last change on this file since cf85f96 was 12daa43, checked in by Thierry Delisle <tdelisle@…>, 4 years ago

Added a define switch for using cpu workstealing.
Not Fully implemented.

  • Property mode set to 100644
File size: 26.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// ready_queue.cfa --
8//
9// Author : Thierry Delisle
10// Created On : Mon Nov dd 16:29:18 2019
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#define __cforall_thread__
17#define _GNU_SOURCE
18
19// #define __CFA_DEBUG_PRINT_READY_QUEUE__
20
21
22// #define USE_RELAXED_FIFO
23// #define USE_WORK_STEALING
24
25#include "bits/defs.hfa"
26#include "device/cpu.hfa"
27#include "kernel_private.hfa"
28
29#include "stdlib.hfa"
30#include "math.hfa"
31
32#include <unistd.h>
33
34#include "ready_subqueue.hfa"
35
36static const size_t cache_line_size = 64;
37
38#if !defined(__CFA_NO_STATISTICS__)
39 #define __STATS(...) __VA_ARGS__
40#else
41 #define __STATS(...)
42#endif
43
44// No overriden function, no environment variable, no define
45// fall back to a magic number
46#ifndef __CFA_MAX_PROCESSORS__
47 #define __CFA_MAX_PROCESSORS__ 1024
48#endif
49
50#if defined(USE_CPU_WORK_STEALING)
51 #define READYQ_SHARD_FACTOR 2
52#elif defined(USE_RELAXED_FIFO)
53 #define BIAS 4
54 #define READYQ_SHARD_FACTOR 4
55 #define SEQUENTIAL_SHARD 1
56#elif defined(USE_WORK_STEALING)
57 #define READYQ_SHARD_FACTOR 2
58 #define SEQUENTIAL_SHARD 2
59#else
60 #error no scheduling strategy selected
61#endif
62
63static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats));
64static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats));
65static inline struct $thread * search(struct cluster * cltr);
66static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred);
67
68
69// returns the maximum number of processors the RWLock support
70__attribute__((weak)) unsigned __max_processors() {
71 const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
72 if(!max_cores_s) {
73 __cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
74 return __CFA_MAX_PROCESSORS__;
75 }
76
77 char * endptr = 0p;
78 long int max_cores_l = strtol(max_cores_s, &endptr, 10);
79 if(max_cores_l < 1 || max_cores_l > 65535) {
80 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
81 return __CFA_MAX_PROCESSORS__;
82 }
83 if('\0' != *endptr) {
84 __cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
85 return __CFA_MAX_PROCESSORS__;
86 }
87
88 return max_cores_l;
89}
90
91//=======================================================================
92// Cluster wide reader-writer lock
93//=======================================================================
94void ?{}(__scheduler_RWLock_t & this) {
95 this.max = __max_processors();
96 this.alloc = 0;
97 this.ready = 0;
98 this.data = alloc(this.max);
99 this.write_lock = false;
100
101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
103
104}
105void ^?{}(__scheduler_RWLock_t & this) {
106 free(this.data);
107}
108
109
110//=======================================================================
111// Lock-Free registering/unregistering of threads
112unsigned register_proc_id( void ) with(*__scheduler_lock) {
113 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p for RW-Lock\n", proc);
114 bool * handle = (bool *)&kernelTLS().sched_lock;
115
116 // Step - 1 : check if there is already space in the data
117 uint_fast32_t s = ready;
118
119 // Check among all the ready
120 for(uint_fast32_t i = 0; i < s; i++) {
121 bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
122 /* paranoid */ verify( handle != *cell );
123
124 bool * null = 0p; // Re-write every loop since compare thrashes it
125 if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
126 && __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
127 /* paranoid */ verify(i < ready);
128 /* paranoid */ verify( (kernelTLS().sched_id = i, true) );
129 return i;
130 }
131 }
132
133 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
134
135 // Step - 2 : F&A to get a new spot in the array.
136 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
137 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
138
139 // Step - 3 : Mark space as used and then publish it.
140 data[n] = handle;
141 while() {
142 unsigned copy = n;
143 if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
144 && __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
145 break;
146 Pause();
147 }
148
149 __cfadbg_print_safe(ready_queue, "Kernel : Registering proc %p done, id %lu\n", proc, n);
150
151 // Return new spot.
152 /* paranoid */ verify(n < ready);
153 /* paranoid */ verify( (kernelTLS().sched_id = n, true) );
154 return n;
155}
156
157void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
158 /* paranoid */ verify(id < ready);
159 /* paranoid */ verify(id == kernelTLS().sched_id);
160 /* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
161
162 bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
163
164 __atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
165
166 __cfadbg_print_safe(ready_queue, "Kernel : Unregister proc %p\n", proc);
167}
168
169//-----------------------------------------------------------------------
170// Writer side : acquire when changing the ready queue, e.g. adding more
171// queues or removing them.
172uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
173 /* paranoid */ verify( ! __preemption_enabled() );
174 /* paranoid */ verify( ! kernelTLS().sched_lock );
175
176 // Step 1 : lock global lock
177 // It is needed to avoid processors that register mid Critical-Section
178 // to simply lock their own lock and enter.
179 __atomic_acquire( &write_lock );
180
181 // Step 2 : lock per-proc lock
182 // Processors that are currently being registered aren't counted
183 // but can't be in read_lock or in the critical section.
184 // All other processors are counted
185 uint_fast32_t s = ready;
186 for(uint_fast32_t i = 0; i < s; i++) {
187 volatile bool * llock = data[i];
188 if(llock) __atomic_acquire( llock );
189 }
190
191 /* paranoid */ verify( ! __preemption_enabled() );
192 return s;
193}
194
195void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
196 /* paranoid */ verify( ! __preemption_enabled() );
197
198 // Step 1 : release local locks
199 // This must be done while the global lock is held to avoid
200 // threads that where created mid critical section
201 // to race to lock their local locks and have the writer
202 // immidiately unlock them
203 // Alternative solution : return s in write_lock and pass it to write_unlock
204 for(uint_fast32_t i = 0; i < last_s; i++) {
205 volatile bool * llock = data[i];
206 if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
207 }
208
209 // Step 2 : release global lock
210 /*paranoid*/ assert(true == write_lock);
211 __atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
212
213 /* paranoid */ verify( ! __preemption_enabled() );
214}
215
216//=======================================================================
217// Cforall Ready Queue used for scheduling
218//=======================================================================
219void ?{}(__ready_queue_t & this) with (this) {
220 #if defined(USE_CPU_WORK_STEALING)
221 lanes.count = cpu_info.hthrd_count * READYQ_SHARD_FACTOR;
222 lanes.data = alloc( lanes.count );
223 lanes.tscs = alloc( lanes.count );
224
225 for( idx; (size_t)lanes.count ) {
226 (lanes.data[idx]){};
227 lanes.tscs[idx].tv = rdtscl();
228 }
229 #else
230 lanes.data = 0p;
231 lanes.tscs = 0p;
232 lanes.count = 0;
233 #endif
234}
235
236void ^?{}(__ready_queue_t & this) with (this) {
237 #if !defined(USE_CPU_WORK_STEALING)
238 verify( SEQUENTIAL_SHARD == lanes.count );
239 #endif
240
241 free(lanes.data);
242 free(lanes.tscs);
243}
244
245//-----------------------------------------------------------------------
246#if defined(USE_CPU_WORK_STEALING)
247 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
248 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
249
250 processor * const proc = kernelTLS().this_processor;
251 const bool external = !push_local || (!proc) || (cltr != proc->cltr);
252
253 const int cpu = __kernel_getcpu();
254 /* paranoid */ verify(cpu >= 0);
255 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
256 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
257
258 const int start = cpu * READYQ_SHARD_FACTOR;
259 unsigned i;
260 do {
261 unsigned r;
262 if(unlikely(external)) { r = __tls_rand(); }
263 else { r = proc->rdq.its++; }
264 i = start + (r % READYQ_SHARD_FACTOR);
265 // If we can't lock it retry
266 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
267
268 // Actually push it
269 push(lanes.data[i], thrd);
270
271 // Unlock and return
272 __atomic_unlock( &lanes.data[i].lock );
273
274 #if !defined(__CFA_NO_STATISTICS__)
275 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
276 else __tls_stats()->ready.push.local.success++;
277 #endif
278
279 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
280
281 }
282
283 // Pop from the ready queue from a given cluster
284 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
285 /* paranoid */ verify( lanes.count > 0 );
286 /* paranoid */ verify( kernelTLS().this_processor );
287
288 const int cpu = __kernel_getcpu();
289 /* paranoid */ verify(cpu >= 0);
290 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count);
291 /* paranoid */ verify(cpu < cpu_info.hthrd_count);
292
293 processor * const proc = kernelTLS().this_processor;
294 const int start = cpu * READYQ_SHARD_FACTOR;
295
296 // Did we already have a help target
297 if(proc->rdq.target == -1u) {
298 // if We don't have a
299 unsigned long long min = ts(lanes.data[start]);
300 for(i; READYQ_SHARD_FACTOR) {
301 unsigned long long tsc = ts(lanes.data[start + i]);
302 if(tsc < min) min = tsc;
303 }
304 proc->rdq.cutoff = min;
305 proc->rdq.target = __tls_rand() % lanes.count;
306 }
307 else {
308 const unsigned long long bias = 0; //2_500_000_000;
309 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
310 {
311 unsigned target = proc->rdq.target;
312 proc->rdq.target = -1u;
313 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
314 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
315 proc->rdq.last = target;
316 if(t) return t;
317 }
318 }
319
320 unsigned last = proc->rdq.last;
321 if(last != -1u && lanes.tscs[last].tv < cutoff && ts(lanes.data[last]) < cutoff) {
322 $thread * t = try_pop(cltr, last __STATS(, __tls_stats()->ready.pop.help));
323 if(t) return t;
324 }
325 else {
326 proc->rdq.last = -1u;
327 }
328 }
329
330 for(READYQ_SHARD_FACTOR) {
331 unsigned i = start + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
332 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
333 }
334
335 // All lanes where empty return 0p
336 return 0p;
337 }
338
339 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
340 processor * const proc = kernelTLS().this_processor;
341 unsigned last = proc->rdq.last;
342
343 unsigned i = __tls_rand() % lanes.count;
344 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
345 }
346 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
347 return search(cltr);
348 }
349#endif
350#if defined(USE_RELAXED_FIFO)
351 //-----------------------------------------------------------------------
352 // get index from random number with or without bias towards queues
353 static inline [unsigned, bool] idx_from_r(unsigned r, unsigned preferred) {
354 unsigned i;
355 bool local;
356 unsigned rlow = r % BIAS;
357 unsigned rhigh = r / BIAS;
358 if((0 != rlow) && preferred >= 0) {
359 // (BIAS - 1) out of BIAS chances
360 // Use perferred queues
361 i = preferred + (rhigh % READYQ_SHARD_FACTOR);
362 local = true;
363 }
364 else {
365 // 1 out of BIAS chances
366 // Use all queues
367 i = rhigh;
368 local = false;
369 }
370 return [i, local];
371 }
372
373 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
374 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
375
376 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
377 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
378
379 bool local;
380 int preferred = external ? -1 : kernelTLS().this_processor->rdq.id;
381
382 // Try to pick a lane and lock it
383 unsigned i;
384 do {
385 // Pick the index of a lane
386 unsigned r = __tls_rand_fwd();
387 [i, local] = idx_from_r(r, preferred);
388
389 i %= __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
390
391 #if !defined(__CFA_NO_STATISTICS__)
392 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
393 else if(local) __tls_stats()->ready.push.local.attempt++;
394 else __tls_stats()->ready.push.share.attempt++;
395 #endif
396
397 // If we can't lock it retry
398 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
399
400 // Actually push it
401 push(lanes.data[i], thrd);
402
403 // Unlock and return
404 __atomic_unlock( &lanes.data[i].lock );
405
406 // Mark the current index in the tls rng instance as having an item
407 __tls_rand_advance_bck();
408
409 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
410
411 // Update statistics
412 #if !defined(__CFA_NO_STATISTICS__)
413 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
414 else if(local) __tls_stats()->ready.push.local.success++;
415 else __tls_stats()->ready.push.share.success++;
416 #endif
417 }
418
419 // Pop from the ready queue from a given cluster
420 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
421 /* paranoid */ verify( lanes.count > 0 );
422 /* paranoid */ verify( kernelTLS().this_processor );
423 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
424
425 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
426 int preferred = kernelTLS().this_processor->rdq.id;
427
428
429 // As long as the list is not empty, try finding a lane that isn't empty and pop from it
430 for(25) {
431 // Pick two lists at random
432 unsigned ri = __tls_rand_bck();
433 unsigned rj = __tls_rand_bck();
434
435 unsigned i, j;
436 __attribute__((unused)) bool locali, localj;
437 [i, locali] = idx_from_r(ri, preferred);
438 [j, localj] = idx_from_r(rj, preferred);
439
440 i %= count;
441 j %= count;
442
443 // try popping from the 2 picked lists
444 struct $thread * thrd = try_pop(cltr, i, j __STATS(, *(locali || localj ? &__tls_stats()->ready.pop.local : &__tls_stats()->ready.pop.help)));
445 if(thrd) {
446 return thrd;
447 }
448 }
449
450 // All lanes where empty return 0p
451 return 0p;
452 }
453
454 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) { return pop_fast(cltr); }
455 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) {
456 return search(cltr);
457 }
458#endif
459#if defined(USE_WORK_STEALING)
460 __attribute__((hot)) void push(struct cluster * cltr, struct $thread * thrd, bool push_local) with (cltr->ready_queue) {
461 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr);
462
463 // #define USE_PREFERRED
464 #if !defined(USE_PREFERRED)
465 const bool external = !push_local || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);
466 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count );
467 #else
468 unsigned preferred = thrd->preferred;
469 const bool external = push_local || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;
470 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count );
471
472 unsigned r = preferred % READYQ_SHARD_FACTOR;
473 const unsigned start = preferred - r;
474 #endif
475
476 // Try to pick a lane and lock it
477 unsigned i;
478 do {
479 #if !defined(__CFA_NO_STATISTICS__)
480 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.attempt, 1, __ATOMIC_RELAXED);
481 else __tls_stats()->ready.push.local.attempt++;
482 #endif
483
484 if(unlikely(external)) {
485 i = __tls_rand() % lanes.count;
486 }
487 else {
488 #if !defined(USE_PREFERRED)
489 processor * proc = kernelTLS().this_processor;
490 unsigned r = proc->rdq.its++;
491 i = proc->rdq.id + (r % READYQ_SHARD_FACTOR);
492 #else
493 i = start + (r++ % READYQ_SHARD_FACTOR);
494 #endif
495 }
496 // If we can't lock it retry
497 } while( !__atomic_try_acquire( &lanes.data[i].lock ) );
498
499 // Actually push it
500 push(lanes.data[i], thrd);
501
502 // Unlock and return
503 __atomic_unlock( &lanes.data[i].lock );
504
505 #if !defined(__CFA_NO_STATISTICS__)
506 if(unlikely(external)) __atomic_fetch_add(&cltr->stats->ready.push.extrn.success, 1, __ATOMIC_RELAXED);
507 else __tls_stats()->ready.push.local.success++;
508 #endif
509
510 __cfadbg_print_safe(ready_queue, "Kernel : Pushed %p on cluster %p (idx: %u, mask %llu, first %d)\n", thrd, cltr, i, used.mask[0], lane_first);
511 }
512
513 // Pop from the ready queue from a given cluster
514 __attribute__((hot)) $thread * pop_fast(struct cluster * cltr) with (cltr->ready_queue) {
515 /* paranoid */ verify( lanes.count > 0 );
516 /* paranoid */ verify( kernelTLS().this_processor );
517 /* paranoid */ verify( kernelTLS().this_processor->rdq.id < lanes.count );
518
519 processor * proc = kernelTLS().this_processor;
520
521 if(proc->rdq.target == -1u) {
522 unsigned long long min = ts(lanes.data[proc->rdq.id]);
523 for(int i = 0; i < READYQ_SHARD_FACTOR; i++) {
524 unsigned long long tsc = ts(lanes.data[proc->rdq.id + i]);
525 if(tsc < min) min = tsc;
526 }
527 proc->rdq.cutoff = min;
528 proc->rdq.target = __tls_rand() % lanes.count;
529 }
530 else {
531 unsigned target = proc->rdq.target;
532 proc->rdq.target = -1u;
533 const unsigned long long bias = 0; //2_500_000_000;
534 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff;
535 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) {
536 $thread * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help));
537 if(t) return t;
538 }
539 }
540
541 for(READYQ_SHARD_FACTOR) {
542 unsigned i = proc->rdq.id + (proc->rdq.itr++ % READYQ_SHARD_FACTOR);
543 if($thread * t = try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.local))) return t;
544 }
545 return 0p;
546 }
547
548 __attribute__((hot)) struct $thread * pop_slow(struct cluster * cltr) with (cltr->ready_queue) {
549 unsigned i = __tls_rand() % lanes.count;
550 return try_pop(cltr, i __STATS(, __tls_stats()->ready.pop.steal));
551 }
552
553 __attribute__((hot)) struct $thread * pop_search(struct cluster * cltr) with (cltr->ready_queue) {
554 return search(cltr);
555 }
556#endif
557
558//=======================================================================
559// Various Ready Queue utilities
560//=======================================================================
561// these function work the same or almost the same
562// whether they are using work-stealing or relaxed fifo scheduling
563
564//-----------------------------------------------------------------------
565// try to pop from a lane given by index w
566static inline struct $thread * try_pop(struct cluster * cltr, unsigned w __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
567 __STATS( stats.attempt++; )
568
569 // Get relevant elements locally
570 __intrusive_lane_t & lane = lanes.data[w];
571
572 // If list looks empty retry
573 if( is_empty(lane) ) {
574 return 0p;
575 }
576
577 // If we can't get the lock retry
578 if( !__atomic_try_acquire(&lane.lock) ) {
579 return 0p;
580 }
581
582 // If list is empty, unlock and retry
583 if( is_empty(lane) ) {
584 __atomic_unlock(&lane.lock);
585 return 0p;
586 }
587
588 // Actually pop the list
589 struct $thread * thrd;
590 unsigned long long tsv;
591 [thrd, tsv] = pop(lane);
592
593 /* paranoid */ verify(thrd);
594 /* paranoid */ verify(tsv);
595 /* paranoid */ verify(lane.lock);
596
597 // Unlock and return
598 __atomic_unlock(&lane.lock);
599
600 // Update statistics
601 __STATS( stats.success++; )
602
603 #if defined(USE_WORK_STEALING)
604 lanes.tscs[w].tv = tsv;
605 #endif
606
607 thrd->preferred = w;
608
609 // return the popped thread
610 return thrd;
611}
612
613//-----------------------------------------------------------------------
614// try to pop from any lanes making sure you don't miss any threads push
615// before the start of the function
616static inline struct $thread * search(struct cluster * cltr) with (cltr->ready_queue) {
617 /* paranoid */ verify( lanes.count > 0 );
618 unsigned count = __atomic_load_n( &lanes.count, __ATOMIC_RELAXED );
619 unsigned offset = __tls_rand();
620 for(i; count) {
621 unsigned idx = (offset + i) % count;
622 struct $thread * thrd = try_pop(cltr, idx __STATS(, __tls_stats()->ready.pop.search));
623 if(thrd) {
624 return thrd;
625 }
626 }
627
628 // All lanes where empty return 0p
629 return 0p;
630}
631
632//-----------------------------------------------------------------------
633// Check that all the intrusive queues in the data structure are still consistent
634static void check( __ready_queue_t & q ) with (q) {
635 #if defined(__CFA_WITH_VERIFY__)
636 {
637 for( idx ; lanes.count ) {
638 __intrusive_lane_t & sl = lanes.data[idx];
639 assert(!lanes.data[idx].lock);
640
641 if(is_empty(sl)) {
642 assert( sl.anchor.next == 0p );
643 assert( sl.anchor.ts == 0 );
644 assert( mock_head(sl) == sl.prev );
645 } else {
646 assert( sl.anchor.next != 0p );
647 assert( sl.anchor.ts != 0 );
648 assert( mock_head(sl) != sl.prev );
649 }
650 }
651 }
652 #endif
653}
654
655//-----------------------------------------------------------------------
656// Given 2 indexes, pick the list with the oldest push an try to pop from it
657static inline struct $thread * try_pop(struct cluster * cltr, unsigned i, unsigned j __STATS(, __stats_readyQ_pop_t & stats)) with (cltr->ready_queue) {
658 // Pick the bet list
659 int w = i;
660 if( __builtin_expect(!is_empty(lanes.data[j]), true) ) {
661 w = (ts(lanes.data[i]) < ts(lanes.data[j])) ? i : j;
662 }
663
664 return try_pop(cltr, w __STATS(, stats));
665}
666
667// Call this function of the intrusive list was moved using memcpy
668// fixes the list so that the pointers back to anchors aren't left dangling
669static inline void fix(__intrusive_lane_t & ll) {
670 if(is_empty(ll)) {
671 verify(ll.anchor.next == 0p);
672 ll.prev = mock_head(ll);
673 }
674}
675
676static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
677 processor * it = &list`first;
678 for(unsigned i = 0; i < count; i++) {
679 /* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
680 it->rdq.id = value;
681 it->rdq.target = -1u;
682 value += READYQ_SHARD_FACTOR;
683 it = &(*it)`next;
684 }
685}
686
687static void reassign_cltr_id(struct cluster * cltr) {
688 unsigned preferred = 0;
689 assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
690 assign_list(preferred, cltr->procs.idles , cltr->procs.idle );
691}
692
693static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
694 #if defined(USE_WORK_STEALING)
695 lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
696 for(i; lanes.count) {
697 unsigned long long tsc = ts(lanes.data[i]);
698 lanes.tscs[i].tv = tsc != 0 ? tsc : rdtscl();
699 }
700 #endif
701}
702
703#if defined(USE_CPU_WORK_STEALING)
704 // ready_queue size is fixed in this case
705 void ready_queue_grow(struct cluster * cltr) {}
706 void ready_queue_shrink(struct cluster * cltr) {}
707#else
708 // Grow the ready queue
709 void ready_queue_grow(struct cluster * cltr) {
710 size_t ncount;
711 int target = cltr->procs.total;
712
713 /* paranoid */ verify( ready_mutate_islocked() );
714 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
715
716 // Make sure that everything is consistent
717 /* paranoid */ check( cltr->ready_queue );
718
719 // grow the ready queue
720 with( cltr->ready_queue ) {
721 // Find new count
722 // Make sure we always have atleast 1 list
723 if(target >= 2) {
724 ncount = target * READYQ_SHARD_FACTOR;
725 } else {
726 ncount = SEQUENTIAL_SHARD;
727 }
728
729 // Allocate new array (uses realloc and memcpies the data)
730 lanes.data = alloc( ncount, lanes.data`realloc );
731
732 // Fix the moved data
733 for( idx; (size_t)lanes.count ) {
734 fix(lanes.data[idx]);
735 }
736
737 // Construct new data
738 for( idx; (size_t)lanes.count ~ ncount) {
739 (lanes.data[idx]){};
740 }
741
742 // Update original
743 lanes.count = ncount;
744 }
745
746 fix_times(cltr);
747
748 reassign_cltr_id(cltr);
749
750 // Make sure that everything is consistent
751 /* paranoid */ check( cltr->ready_queue );
752
753 __cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
754
755 /* paranoid */ verify( ready_mutate_islocked() );
756 }
757
758 // Shrink the ready queue
759 void ready_queue_shrink(struct cluster * cltr) {
760 /* paranoid */ verify( ready_mutate_islocked() );
761 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
762
763 // Make sure that everything is consistent
764 /* paranoid */ check( cltr->ready_queue );
765
766 int target = cltr->procs.total;
767
768 with( cltr->ready_queue ) {
769 // Remember old count
770 size_t ocount = lanes.count;
771
772 // Find new count
773 // Make sure we always have atleast 1 list
774 lanes.count = target >= 2 ? target * READYQ_SHARD_FACTOR: SEQUENTIAL_SHARD;
775 /* paranoid */ verify( ocount >= lanes.count );
776 /* paranoid */ verify( lanes.count == target * READYQ_SHARD_FACTOR || target < 2 );
777
778 // for printing count the number of displaced threads
779 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
780 __attribute__((unused)) size_t displaced = 0;
781 #endif
782
783 // redistribute old data
784 for( idx; (size_t)lanes.count ~ ocount) {
785 // Lock is not strictly needed but makes checking invariants much easier
786 __attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
787 verify(locked);
788
789 // As long as we can pop from this lane to push the threads somewhere else in the queue
790 while(!is_empty(lanes.data[idx])) {
791 struct $thread * thrd;
792 unsigned long long _;
793 [thrd, _] = pop(lanes.data[idx]);
794
795 push(cltr, thrd, true);
796
797 // for printing count the number of displaced threads
798 #if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
799 displaced++;
800 #endif
801 }
802
803 // Unlock the lane
804 __atomic_unlock(&lanes.data[idx].lock);
805
806 // TODO print the queue statistics here
807
808 ^(lanes.data[idx]){};
809 }
810
811 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
812
813 // Allocate new array (uses realloc and memcpies the data)
814 lanes.data = alloc( lanes.count, lanes.data`realloc );
815
816 // Fix the moved data
817 for( idx; (size_t)lanes.count ) {
818 fix(lanes.data[idx]);
819 }
820 }
821
822 fix_times(cltr);
823
824 reassign_cltr_id(cltr);
825
826 // Make sure that everything is consistent
827 /* paranoid */ check( cltr->ready_queue );
828
829 __cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
830 /* paranoid */ verify( ready_mutate_islocked() );
831 }
832#endif
833
834#if !defined(__CFA_NO_STATISTICS__)
835 unsigned cnt(const __ready_queue_t & this, unsigned idx) {
836 /* paranoid */ verify(this.lanes.count > idx);
837 return this.lanes.data[idx].cnt;
838 }
839#endif
Note: See TracBrowser for help on using the repository browser.