Index: libcfa/src/concurrency/kernel/cluster.cfa
===================================================================
--- libcfa/src/concurrency/kernel/cluster.cfa	(revision c42b8a1098b5adb1a163766aabb8c1e1b5f99fc9)
+++ libcfa/src/concurrency/kernel/cluster.cfa	(revision c42b8a1098b5adb1a163766aabb8c1e1b5f99fc9)
@@ -0,0 +1,494 @@
+//
+// Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo
+//
+// The contents of this file are covered under the licence agreement in the
+// file "LICENCE" distributed with Cforall.
+//
+// cluster.cfa.cfa -- file that includes helpers for subsystem that need
+//				cluster wide support
+//
+// Author           : Thierry Delisle
+// Created On       : Fri 03 11 12:39:24 2022
+// Last Modified By :
+// Last Modified On :
+// Update Count     :
+//
+
+#define __cforall_thread__
+
+#include "bits/defs.hfa"
+#include "device/cpu.hfa"
+#include "kernel_private.hfa"
+
+#include "stdlib.hfa"
+#include "limits.hfa"
+#include "math.hfa"
+
+#include "ready_subqueue.hfa"
+
+#include <errno.h>
+#include <unistd.h>
+
+extern "C" {
+	#include <sys/syscall.h>  // __NR_xxx
+}
+
+// No overriden function, no environment variable, no define
+// fall back to a magic number
+#ifndef __CFA_MAX_PROCESSORS__
+	#define __CFA_MAX_PROCESSORS__ 1024
+#endif
+
+#if !defined(__CFA_NO_STATISTICS__)
+	#define __STATS(...) __VA_ARGS__
+#else
+	#define __STATS(...)
+#endif
+
+// returns the maximum number of processors the RWLock support
+__attribute__((weak)) unsigned __max_processors() {
+	const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
+	if(!max_cores_s) {
+		__cfadbg_print_nolock(ready_queue, "No CFA_MAX_PROCESSORS in ENV\n");
+		return __CFA_MAX_PROCESSORS__;
+	}
+
+	char * endptr = 0p;
+	long int max_cores_l = strtol(max_cores_s, &endptr, 10);
+	if(max_cores_l < 1 || max_cores_l > 65535) {
+		__cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS out of range : %ld\n", max_cores_l);
+		return __CFA_MAX_PROCESSORS__;
+	}
+	if('\0' != *endptr) {
+		__cfadbg_print_nolock(ready_queue, "CFA_MAX_PROCESSORS not a decimal number : %s\n", max_cores_s);
+		return __CFA_MAX_PROCESSORS__;
+	}
+
+	return max_cores_l;
+}
+
+#if   defined(CFA_HAVE_LINUX_LIBRSEQ)
+	// No forward declaration needed
+	#define __kernel_rseq_register rseq_register_current_thread
+	#define __kernel_rseq_unregister rseq_unregister_current_thread
+#elif defined(CFA_HAVE_LINUX_RSEQ_H)
+	static void __kernel_raw_rseq_register  (void);
+	static void __kernel_raw_rseq_unregister(void);
+
+	#define __kernel_rseq_register __kernel_raw_rseq_register
+	#define __kernel_rseq_unregister __kernel_raw_rseq_unregister
+#else
+	// No forward declaration needed
+	// No initialization needed
+	static inline void noop(void) {}
+
+	#define __kernel_rseq_register noop
+	#define __kernel_rseq_unregister noop
+#endif
+
+//=======================================================================
+// Cluster wide reader-writer lock
+//=======================================================================
+void  ?{}(__scheduler_RWLock_t & this) {
+	this.max   = __max_processors();
+	this.alloc = 0;
+	this.ready = 0;
+	this.data  = alloc(this.max);
+	this.write_lock  = false;
+
+	/*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
+	/*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
+
+}
+void ^?{}(__scheduler_RWLock_t & this) {
+	free(this.data);
+}
+
+
+//=======================================================================
+// Lock-Free registering/unregistering of threads
+unsigned register_proc_id( void ) with(*__scheduler_lock) {
+	__kernel_rseq_register();
+
+	bool * handle = (bool *)&kernelTLS().sched_lock;
+
+	// Step - 1 : check if there is already space in the data
+	uint_fast32_t s = ready;
+
+	// Check among all the ready
+	for(uint_fast32_t i = 0; i < s; i++) {
+		bool * volatile * cell = (bool * volatile *)&data[i]; // Cforall is bugged and the double volatiles causes problems
+		/* paranoid */ verify( handle != *cell );
+
+		bool * null = 0p; // Re-write every loop since compare thrashes it
+		if( __atomic_load_n(cell, (int)__ATOMIC_RELAXED) == null
+			&& __atomic_compare_exchange_n( cell, &null, handle, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
+			/* paranoid */ verify(i < ready);
+			/* paranoid */ verify( (kernelTLS().sched_id = i, true) );
+			return i;
+		}
+	}
+
+	if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock->max);
+
+	// Step - 2 : F&A to get a new spot in the array.
+	uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
+	if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock->max);
+
+	// Step - 3 : Mark space as used and then publish it.
+	data[n] = handle;
+	while() {
+		unsigned copy = n;
+		if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
+			&& __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
+			break;
+		Pause();
+	}
+
+	// Return new spot.
+	/* paranoid */ verify(n < ready);
+	/* paranoid */ verify( (kernelTLS().sched_id = n, true) );
+	return n;
+}
+
+void unregister_proc_id( unsigned id ) with(*__scheduler_lock) {
+	/* paranoid */ verify(id < ready);
+	/* paranoid */ verify(id == kernelTLS().sched_id);
+	/* paranoid */ verify(data[id] == &kernelTLS().sched_lock);
+
+	bool * volatile * cell = (bool * volatile *)&data[id]; // Cforall is bugged and the double volatiles causes problems
+
+	__atomic_store_n(cell, 0p, __ATOMIC_RELEASE);
+
+	__kernel_rseq_unregister();
+}
+
+//-----------------------------------------------------------------------
+// Writer side : acquire when changing the ready queue, e.g. adding more
+//  queues or removing them.
+uint_fast32_t ready_mutate_lock( void ) with(*__scheduler_lock) {
+	/* paranoid */ verify( ! __preemption_enabled() );
+
+	// Step 1 : lock global lock
+	// It is needed to avoid processors that register mid Critical-Section
+	//   to simply lock their own lock and enter.
+	__atomic_acquire( &write_lock );
+
+	// Make sure we won't deadlock ourself
+	// Checking before acquiring the writer lock isn't safe
+	// because someone else could have locked us.
+	/* paranoid */ verify( ! kernelTLS().sched_lock );
+
+	// Step 2 : lock per-proc lock
+	// Processors that are currently being registered aren't counted
+	//   but can't be in read_lock or in the critical section.
+	// All other processors are counted
+	uint_fast32_t s = ready;
+	for(uint_fast32_t i = 0; i < s; i++) {
+		volatile bool * llock = data[i];
+		if(llock) __atomic_acquire( llock );
+	}
+
+	/* paranoid */ verify( ! __preemption_enabled() );
+	return s;
+}
+
+void ready_mutate_unlock( uint_fast32_t last_s ) with(*__scheduler_lock) {
+	/* paranoid */ verify( ! __preemption_enabled() );
+
+	// Step 1 : release local locks
+	// This must be done while the global lock is held to avoid
+	//   threads that where created mid critical section
+	//   to race to lock their local locks and have the writer
+	//   immidiately unlock them
+	// Alternative solution : return s in write_lock and pass it to write_unlock
+	for(uint_fast32_t i = 0; i < last_s; i++) {
+		volatile bool * llock = data[i];
+		if(llock) __atomic_store_n(llock, (bool)false, __ATOMIC_RELEASE);
+	}
+
+	// Step 2 : release global lock
+	/*paranoid*/ assert(true == write_lock);
+	__atomic_store_n(&write_lock, (bool)false, __ATOMIC_RELEASE);
+
+	/* paranoid */ verify( ! __preemption_enabled() );
+}
+
+//=======================================================================
+// Cluster growth
+static const unsigned __readyq_single_shard = 2;
+
+//-----------------------------------------------------------------------
+// Check that all the intrusive queues in the data structure are still consistent
+static void check( __ready_queue_t & q ) with (q) {
+	#if defined(__CFA_WITH_VERIFY__)
+		{
+			for( idx ; lanes.count ) {
+				__intrusive_lane_t & sl = lanes.data[idx];
+				assert(!lanes.data[idx].lock);
+
+					if(is_empty(sl)) {
+						assert( sl.anchor.next == 0p );
+						assert( sl.anchor.ts   == -1llu );
+						assert( mock_head(sl)  == sl.prev );
+					} else {
+						assert( sl.anchor.next != 0p );
+						assert( sl.anchor.ts   != -1llu );
+						assert( mock_head(sl)  != sl.prev );
+					}
+			}
+		}
+	#endif
+}
+
+// Call this function of the intrusive list was moved using memcpy
+// fixes the list so that the pointers back to anchors aren't left dangling
+static inline void fix(__intrusive_lane_t & ll) {
+			if(is_empty(ll)) {
+				verify(ll.anchor.next == 0p);
+				ll.prev = mock_head(ll);
+			}
+}
+
+static void assign_list(unsigned & value, dlist(processor) & list, unsigned count) {
+	processor * it = &list`first;
+	for(unsigned i = 0; i < count; i++) {
+		/* paranoid */ verifyf( it, "Unexpected null iterator, at index %u of %u\n", i, count);
+		it->rdq.id = value;
+		it->rdq.target = MAX;
+		value += __readyq_shard_factor;
+		it = &(*it)`next;
+	}
+}
+
+static void reassign_cltr_id(struct cluster * cltr) {
+	unsigned preferred = 0;
+	assign_list(preferred, cltr->procs.actives, cltr->procs.total - cltr->procs.idle);
+	assign_list(preferred, cltr->procs.idles  , cltr->procs.idle );
+}
+
+static void fix_times( struct cluster * cltr ) with( cltr->ready_queue ) {
+	lanes.tscs = alloc(lanes.count, lanes.tscs`realloc);
+	for(i; lanes.count) {
+		lanes.tscs[i].tv = rdtscl();
+		lanes.tscs[i].ma = 0;
+	}
+}
+
+// Grow the ready queue
+void ready_queue_grow(struct cluster * cltr) {
+	size_t ncount;
+	int target = cltr->procs.total;
+
+	/* paranoid */ verify( ready_mutate_islocked() );
+	__cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue\n");
+
+	// Make sure that everything is consistent
+	/* paranoid */ check( cltr->ready_queue );
+
+	// grow the ready queue
+	with( cltr->ready_queue ) {
+		// Find new count
+		// Make sure we always have atleast 1 list
+		if(target >= 2) {
+			ncount = target * __readyq_shard_factor;
+		} else {
+			ncount = __readyq_single_shard;
+		}
+
+		// Allocate new array (uses realloc and memcpies the data)
+		lanes.data = alloc( ncount, lanes.data`realloc );
+
+		// Fix the moved data
+		for( idx; (size_t)lanes.count ) {
+			fix(lanes.data[idx]);
+		}
+
+		// Construct new data
+		for( idx; (size_t)lanes.count ~ ncount) {
+			(lanes.data[idx]){};
+		}
+
+		// Update original
+		lanes.count = ncount;
+
+		lanes.caches = alloc( target, lanes.caches`realloc );
+	}
+
+	fix_times(cltr);
+
+	reassign_cltr_id(cltr);
+
+	// Make sure that everything is consistent
+	/* paranoid */ check( cltr->ready_queue );
+
+	__cfadbg_print_safe(ready_queue, "Kernel : Growing ready queue done\n");
+
+	/* paranoid */ verify( ready_mutate_islocked() );
+}
+
+// Shrink the ready queue
+void ready_queue_shrink(struct cluster * cltr) {
+	/* paranoid */ verify( ready_mutate_islocked() );
+	__cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue\n");
+
+	// Make sure that everything is consistent
+	/* paranoid */ check( cltr->ready_queue );
+
+	int target = cltr->procs.total;
+
+	with( cltr->ready_queue ) {
+		// Remember old count
+		size_t ocount = lanes.count;
+
+		// Find new count
+		// Make sure we always have atleast 1 list
+		lanes.count = target >= 2 ? target * __readyq_shard_factor: __readyq_single_shard;
+		/* paranoid */ verify( ocount >= lanes.count );
+		/* paranoid */ verify( lanes.count == target * __readyq_shard_factor || target < 2 );
+
+		// for printing count the number of displaced threads
+		#if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
+			__attribute__((unused)) size_t displaced = 0;
+		#endif
+
+		// redistribute old data
+		for( idx; (size_t)lanes.count ~ ocount) {
+			// Lock is not strictly needed but makes checking invariants much easier
+			__attribute__((unused)) bool locked = __atomic_try_acquire(&lanes.data[idx].lock);
+			verify(locked);
+
+			// As long as we can pop from this lane to push the threads somewhere else in the queue
+			while(!is_empty(lanes.data[idx])) {
+				struct thread$ * thrd;
+				unsigned long long _;
+				[thrd, _] = pop(lanes.data[idx]);
+
+				push(cltr, thrd, true);
+
+				// for printing count the number of displaced threads
+				#if defined(__CFA_DEBUG_PRINT__) || defined(__CFA_DEBUG_PRINT_READY_QUEUE__)
+					displaced++;
+				#endif
+			}
+
+			// Unlock the lane
+			__atomic_unlock(&lanes.data[idx].lock);
+
+			// TODO print the queue statistics here
+
+			^(lanes.data[idx]){};
+		}
+
+		__cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue displaced %zu threads\n", displaced);
+
+		// Allocate new array (uses realloc and memcpies the data)
+		lanes.data = alloc( lanes.count, lanes.data`realloc );
+
+		// Fix the moved data
+		for( idx; (size_t)lanes.count ) {
+			fix(lanes.data[idx]);
+		}
+
+		lanes.caches = alloc( target, lanes.caches`realloc );
+	}
+
+	fix_times(cltr);
+
+
+	reassign_cltr_id(cltr);
+
+	// Make sure that everything is consistent
+	/* paranoid */ check( cltr->ready_queue );
+
+	__cfadbg_print_safe(ready_queue, "Kernel : Shrinking ready queue done\n");
+	/* paranoid */ verify( ready_mutate_islocked() );
+}
+
+// Ctor
+void ?{}( __intrusive_lane_t & this ) {
+	this.lock = false;
+	this.prev = mock_head(this);
+	this.anchor.next = 0p;
+	this.anchor.ts   = -1llu;
+	#if !defined(__CFA_NO_STATISTICS__)
+		this.cnt  = 0;
+	#endif
+
+	// We add a boat-load of assertions here because the anchor code is very fragile
+	/* paranoid */ _Static_assert( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
+	/* paranoid */ verify( offsetof( thread$, link ) == offsetof(__intrusive_lane_t, anchor) );
+	/* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.anchor) );
+	/* paranoid */ verify( &mock_head(this)->link.next == &this.anchor.next );
+	/* paranoid */ verify( &mock_head(this)->link.ts   == &this.anchor.ts   );
+	/* paranoid */ verify( mock_head(this)->link.next == 0p );
+	/* paranoid */ verify( mock_head(this)->link.ts   == -1llu  );
+	/* paranoid */ verify( mock_head(this) == this.prev );
+	/* paranoid */ verify( __alignof__(__intrusive_lane_t) == 128 );
+	/* paranoid */ verify( __alignof__(this) == 128 );
+	/* paranoid */ verifyf( ((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128) );
+}
+
+// Dtor is trivial
+void ^?{}( __intrusive_lane_t & this ) {
+	// Make sure the list is empty
+	/* paranoid */ verify( this.anchor.next == 0p );
+	/* paranoid */ verify( this.anchor.ts   == -1llu );
+	/* paranoid */ verify( mock_head(this)  == this.prev );
+}
+
+#if   defined(CFA_HAVE_LINUX_LIBRSEQ)
+	// No definition needed
+#elif defined(CFA_HAVE_LINUX_RSEQ_H)
+
+	#if defined( __x86_64 ) || defined( __i386 )
+		#define RSEQ_SIG	0x53053053
+	#elif defined( __ARM_ARCH )
+		#ifdef __ARMEB__
+		#define RSEQ_SIG    0xf3def5e7      /* udf    #24035    ; 0x5de3 (ARMv6+) */
+		#else
+		#define RSEQ_SIG    0xe7f5def3      /* udf    #24035    ; 0x5de3 */
+		#endif
+	#endif
+
+	extern void __disable_interrupts_hard();
+	extern void __enable_interrupts_hard();
+
+	static void __kernel_raw_rseq_register  (void) {
+		/* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED );
+
+		// int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, (sigset_t *)0p, _NSIG / 8);
+		int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), 0, RSEQ_SIG);
+		if(ret != 0) {
+			int e = errno;
+			switch(e) {
+			case EINVAL: abort("KERNEL ERROR: rseq register invalid argument");
+			case ENOSYS: abort("KERNEL ERROR: rseq register no supported");
+			case EFAULT: abort("KERNEL ERROR: rseq register with invalid argument");
+			case EBUSY : abort("KERNEL ERROR: rseq register already registered");
+			case EPERM : abort("KERNEL ERROR: rseq register sig  argument  on unregistration does not match the signature received on registration");
+			default: abort("KERNEL ERROR: rseq register unexpected return %d", e);
+			}
+		}
+	}
+
+	static void __kernel_raw_rseq_unregister(void) {
+		/* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 );
+
+		// int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, (sigset_t *)0p, _NSIG / 8);
+		int ret = syscall(__NR_rseq, &__cfaabi_rseq, sizeof(struct rseq), RSEQ_FLAG_UNREGISTER, RSEQ_SIG);
+		if(ret != 0) {
+			int e = errno;
+			switch(e) {
+			case EINVAL: abort("KERNEL ERROR: rseq unregister invalid argument");
+			case ENOSYS: abort("KERNEL ERROR: rseq unregister no supported");
+			case EFAULT: abort("KERNEL ERROR: rseq unregister with invalid argument");
+			case EBUSY : abort("KERNEL ERROR: rseq unregister already registered");
+			case EPERM : abort("KERNEL ERROR: rseq unregister sig  argument  on unregistration does not match the signature received on registration");
+			default: abort("KERNEL ERROR: rseq unregisteunexpected return %d", e);
+			}
+		}
+	}
+#else
+	// No definition needed
+#endif
