Index: libcfa/src/concurrency/ready_queue.cfa
===================================================================
--- libcfa/src/concurrency/ready_queue.cfa	(revision 7768b8df58110d321b6431c9d8728f39b7d92f86)
+++ libcfa/src/concurrency/ready_queue.cfa	(revision 7768b8df58110d321b6431c9d8728f39b7d92f86)
@@ -0,0 +1,296 @@
+//
+// Cforall Version 1.0.0 Copyright (C) 2019 University of Waterloo
+//
+// The contents of this file are covered under the licence agreement in the
+// file "LICENCE" distributed with Cforall.
+//
+// ready_queue.cfa --
+//
+// Author           : Thierry Delisle
+// Created On       : Mon Nov dd 16:29:18 2019
+// Last Modified By :
+// Last Modified On :
+// Update Count     :
+//
+
+#define __cforall_thread__
+
+#include "bits/defs.hfa"
+#include "kernel_private.hfa"
+
+#define _GNU_SOURCE
+#include "stdlib.hfa"
+
+static const size_t cache_line_size = 64;
+
+static inline unsigned __max_processors_fallback() {
+	#ifdef __CFA_MAX_PROCESSORS__
+		return __CFA_MAX_PROCESSORS__;
+	#else
+		// No overriden function, no environment variable, no define
+		// fall back to a magic number
+		return 128;
+	#endif
+}
+
+__attribute__((weak)) unsigned __max_processors() {
+	const char * max_cores_s = getenv("CFA_MAX_PROCESSORS");
+	if(!max_cores_s) {
+		__cfaabi_dbg_print_nolock("No CFA_MAX_PROCESSORS in ENV");
+		return __max_processors_fallback();
+	}
+
+	char * endptr = 0p;
+	long int max_cores_l = strtol(max_cores_s, &endptr, 10);
+	if(max_cores_l < 1 || max_cores_l > 65535) {
+		__cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS out of range : %ld", max_cores_l);
+		return __max_processors_fallback();
+	}
+	if('\0' != *endptr) {
+		__cfaabi_dbg_print_nolock("CFA_MAX_PROCESSORS not a decimal number : %s", max_cores_s);
+		return __max_processors_fallback();
+	}
+
+	return max_cores_l;
+}
+
+//=======================================================================
+// Cluster wide reader-writer lock
+//=======================================================================
+void  ?{}(__clusterRWLock_t & this) {
+	this.max   = __max_processors();
+	this.alloc = 0;
+	this.ready = 0;
+	this.lock  = false;
+	this.data  = alloc(this.max);
+
+	/*paranoid*/ verify( 0 == (((uintptr_t)(this.data    )) % 64) );
+	/*paranoid*/ verify( 0 == (((uintptr_t)(this.data + 1)) % 64) );
+	/*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.alloc), &this.alloc));
+	/*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.ready), &this.ready));
+
+}
+void ^?{}(__clusterRWLock_t & this) {
+	free(this.data);
+}
+
+void ?{}( __processor_id & this, struct processor * proc ) {
+	this.handle = proc;
+	this.lock   = false;
+}
+
+//=======================================================================
+// Lock-Free registering/unregistering of threads
+unsigned doregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
+	// Step - 1 : check if there is already space in the data
+	uint_fast32_t s = ready;
+
+	// Check among all the ready
+	for(uint_fast32_t i = 0; i < s; i++) {
+		processor * null = 0p; // Re-write every loop since compare thrashes it
+		if( __atomic_load_n(&data[i].handle, (int)__ATOMIC_RELAXED) == null
+			&& __atomic_compare_exchange_n( &data[i].handle, &null, proc, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {
+			/*paranoid*/ verify(i < ready);
+			/*paranoid*/ verify(__alignof__(data[i]) == cache_line_size);
+			/*paranoid*/ verify((((uintptr_t)&data[i]) % cache_line_size) == 0);
+			return i;
+		}
+	}
+
+	if(max <= alloc) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
+
+	// Step - 2 : F&A to get a new spot in the array.
+	uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST);
+	if(max <= n) abort("Trying to create more than %ud processors", cltr->ready_lock.max);
+
+	// Step - 3 : Mark space as used and then publish it.
+	__processor_id * storage = (__processor_id *)&data[n];
+	(*storage){ proc };
+	while(true) {
+		unsigned copy = n;
+		if( __atomic_load_n(&ready, __ATOMIC_RELAXED) == n
+			&& __atomic_compare_exchange_n(&ready, &copy, n + 1, true, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
+			break;
+		asm volatile("pause");
+	}
+
+	// Return new spot.
+	/*paranoid*/ verify(n < ready);
+	/*paranoid*/ verify(__alignof__(data[n]) == cache_line_size);
+	/*paranoid*/ verify((((uintptr_t)&data[n]) % cache_line_size) == 0);
+	return n;
+}
+
+void unregister( struct cluster * cltr, struct processor * proc ) with(cltr->ready_lock) {
+	unsigned id = proc->id;
+	/*paranoid*/ verify(id < ready);
+	/*paranoid*/ verify(proc == __atomic_load_n(&data[id].handle, __ATOMIC_RELAXED));
+	__atomic_store_n(&data[id].handle, 0p, __ATOMIC_RELEASE);
+}
+
+//-----------------------------------------------------------------------
+// Writer side : acquire when changing the ready queue, e.g. adding more
+//  queues or removing them.
+uint_fast32_t ready_mutate_lock( struct cluster & cltr ) with(cltr.ready_lock) {
+	// Step 1 : lock global lock
+	// It is needed to avoid processors that register mid Critical-Section
+	//   to simply lock their own lock and enter.
+	__atomic_acquire( &lock );
+
+	// Step 2 : lock per-proc lock
+	// Processors that are currently being registered aren't counted
+	//   but can't be in read_lock or in the critical section.
+	// All other processors are counted
+	uint_fast32_t s = ready;
+	for(uint_fast32_t i = 0; i < s; i++) {
+		__atomic_acquire( &data[i].lock );
+	}
+
+	return s;
+}
+
+void ready_mutate_unlock( struct cluster & cltr, uint_fast32_t last_s ) with(cltr.ready_lock) {
+	// Step 1 : release local locks
+	// This must be done while the global lock is held to avoid
+	//   threads that where created mid critical section
+	//   to race to lock their local locks and have the writer
+	//   immidiately unlock them
+	// Alternative solution : return s in write_lock and pass it to write_unlock
+	for(uint_fast32_t i = 0; i < last_s; i++) {
+		verify(data[i].lock);
+		__atomic_store_n(&data[i].lock, (bool)false, __ATOMIC_RELEASE);
+	}
+
+	// Step 2 : release global lock
+	/*paranoid*/ assert(true == lock);
+	__atomic_store_n(&lock, (bool)false, __ATOMIC_RELEASE);
+}
+
+//=======================================================================
+// Intrusive Queue used by ready queue
+//=======================================================================
+static const size_t fields_offset = offsetof( thread_desc, next );
+
+// Get the head pointer (one before the first element) from the anchor
+static inline thread_desc * head(const __intrusive_ready_queue_t & this) {
+	thread_desc * rhead = (thread_desc *)(
+		(uintptr_t)( &this.before ) - fields_offset
+	);
+	/* paranoid */ verify(rhead);
+	return rhead;
+}
+
+// Get the tail pointer (one after the last element) from the anchor
+static inline thread_desc * tail(const __intrusive_ready_queue_t & this) {
+	thread_desc * rtail = (thread_desc *)(
+		(uintptr_t)( &this.after ) - fields_offset
+	);
+	/* paranoid */ verify(rtail);
+	return rtail;
+}
+
+// Ctor
+void ?{}( __intrusive_ready_queue_t & this ) {
+	this.before.prev = 0p;
+	this.before.next = tail(this);
+
+	this.after .prev = head(this);
+	this.after .next = 0p;
+
+	// We add a boat-load of assertions here because the anchor code is very fragile
+	/* paranoid */ verify(((uintptr_t)( head(this) ) + fields_offset) == (uintptr_t)(&this.before));
+	/* paranoid */ verify(((uintptr_t)( tail(this) ) + fields_offset) == (uintptr_t)(&this.after ));
+	/* paranoid */ verify(head(this)->prev == 0p );
+	/* paranoid */ verify(head(this)->next == tail(this) );
+	/* paranoid */ verify(tail(this)->next == 0p );
+	/* paranoid */ verify(tail(this)->prev == head(this) );
+	/* paranoid */ verify(&head(this)->prev == &this.before.prev );
+	/* paranoid */ verify(&head(this)->next == &this.before.next );
+	/* paranoid */ verify(&tail(this)->prev == &this.after .prev );
+	/* paranoid */ verify(&tail(this)->next == &this.after .next );
+	/* paranoid */ verify(sizeof(__intrusive_ready_queue_t) == 128);
+	/* paranoid */ verify(sizeof(this) == 128);
+	/* paranoid */ verify(__alignof__(__intrusive_ready_queue_t) == 128);
+	/* paranoid */ verify(__alignof__(this) == 128);
+	/* paranoid */ verifyf(((intptr_t)(&this) % 128) == 0, "Expected address to be aligned %p %% 128 == %zd", &this, ((intptr_t)(&this) % 128));
+}
+
+// Dtor is trivial
+void ^?{}( __intrusive_ready_queue_t & this ) {
+	// Make sure the list is empty
+	/* paranoid */ verify(head(this)->prev == 0p );
+	/* paranoid */ verify(head(this)->next == tail(this) );
+	/* paranoid */ verify(tail(this)->next == 0p );
+	/* paranoid */ verify(tail(this)->prev == head(this) );
+}
+
+
+
+bool push(__intrusive_ready_queue_t & this, thread_desc * node) {
+	verify(this.lock);
+	verify(node->ts != 0);
+	verify(node->next == 0p);
+	verify(node->prev == 0p);
+
+
+	// Get the relevant nodes locally
+	thread_desc * tail = tail(this);
+	thread_desc * prev = tail->prev;
+
+	// Do the push
+	node->next = tail;
+	node->prev = prev;
+	prev->next = node;
+	tail->prev = node;
+
+	// Update stats
+	#ifndef __CFA_NO_SCHED_STATS__
+		this.stat.diff++;
+		this.stat.push++;
+	#endif
+
+	// Check if the queue used to be empty
+	if(this.before.ts == 0l) {
+		this.before.ts = node->ts;
+		verify(node->prev == head(this));
+		return true;
+	}
+	return false;
+}
+
+[thread_desc *, bool] pop(__intrusive_ready_queue_t & this) {
+	verify(this.lock);
+	thread_desc * head = head(this);
+	thread_desc * tail = tail(this);
+
+	thread_desc * node = head->next;
+	thread_desc * next = node->next;
+	if(node == tail) return [0p, false];
+
+	/* paranoid */ verify(node);
+
+	head->next = next;
+	next->prev = head;
+
+	#ifndef __CFA_NO_SCHED_STATS__
+		this.stat.diff--;
+		this.stat.pop ++;
+	#endif
+
+	if(next == tail) {
+		this.before.ts = 0ul;
+		node->[next, prev] = 0p;
+		return [node, true];
+	}
+	else {
+		verify(next->ts != 0);
+		this.before.ts = next->ts;
+		verify(this.before.ts != 0);
+		node->[next, prev] = 0p;
+		return [node, false];
+	}
+}
+
+static inline unsigned long long ts(__intrusive_ready_queue_t & this) {
+	return this.before.ts;
+}
