Index: benchmark/Cargo.toml.in
===================================================================
--- benchmark/Cargo.toml.in	(revision d0b9247e0319aa788954f0367c5a245623826293)
+++ benchmark/Cargo.toml.in	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -12,4 +12,8 @@
 name = "rdq-locality-tokio"
 path = "@abs_srcdir@/readyQ/locality.rs"
+
+[[bin]]
+name = "rdq-transfer-tokio"
+path = "@abs_srcdir@/readyQ/transfer.rs"
 
 [[bin]]
Index: benchmark/Makefile.am
===================================================================
--- benchmark/Makefile.am	(revision d0b9247e0319aa788954f0367c5a245623826293)
+++ benchmark/Makefile.am	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -600,5 +600,6 @@
 	rdq-locality-go \
 	rdq-locality-fibre \
-	rdq-transfer-cfa
+	rdq-transfer-cfa \
+	rdq-transfer-tokio
 
 rdq-benches:
Index: benchmark/bench.rs
===================================================================
--- benchmark/bench.rs	(revision d0b9247e0319aa788954f0367c5a245623826293)
+++ benchmark/bench.rs	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -1,5 +1,7 @@
 use std::io::{self, Write};
+use std::option;
 use std::sync::atomic::{AtomicU64, AtomicBool, Ordering};
 use std::time::{Instant,Duration};
+use std::u128;
 
 use clap::{Arg, ArgMatches};
@@ -27,8 +29,12 @@
 
 impl BenchData {
-	pub fn new(options: ArgMatches, nthreads: usize) -> BenchData {
+	pub fn new(options: ArgMatches, nthreads: usize, default_it: option::Option<u64>) -> BenchData {
 		let (clock_mode, stop_count, duration) = if options.is_present("iterations") {
 			(false,
 			options.value_of("iterations").unwrap().parse::<u64>().unwrap(),
+			-1.0)
+		} else if !default_it.is_none() {
+			(false,
+			default_it.unwrap(),
 			-1.0)
 		} else {
@@ -48,4 +54,5 @@
 	}
 
+	#[allow(dead_code)]
 	pub async fn wait(&self, start: &Instant) -> Duration{
 		loop {
@@ -69,2 +76,7 @@
 }
 
+// ==================================================
+pub fn _lehmer64( state: &mut u128 ) -> u64 {
+	*state = state.wrapping_mul(0xda942042e4dd58b5);
+	return (*state >> 64) as u64;
+}
Index: benchmark/readyQ/cycle.rs
===================================================================
--- benchmark/readyQ/cycle.rs	(revision d0b9247e0319aa788954f0367c5a245623826293)
+++ benchmark/readyQ/cycle.rs	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -46,5 +46,5 @@
 
 	let tthreads = nthreads * ring_size;
-	let exp = Arc::new(bench::BenchData::new(options, tthreads));
+	let exp = Arc::new(bench::BenchData::new(options, tthreads, None));
 
 	let s = (1000000 as u64).to_formatted_string(&Locale::en);
Index: benchmark/readyQ/locality.rs
===================================================================
--- benchmark/readyQ/locality.rs	(revision d0b9247e0319aa788954f0367c5a245623826293)
+++ benchmark/readyQ/locality.rs	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -285,5 +285,5 @@
 	assert_eq!(&s, "1,000,000");
 
-	let exp = Arc::new(bench::BenchData::new(options, nprocs));
+	let exp = Arc::new(bench::BenchData::new(options, nprocs, None));
 	let mut results = Result::new();
 
Index: benchmark/readyQ/transfer.rs
===================================================================
--- benchmark/readyQ/transfer.rs	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
+++ benchmark/readyQ/transfer.rs	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -0,0 +1,277 @@
+#[cfg(debug_assertions)]
+use std::io::{self, Write};
+
+use std::process;
+use std::option;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::time::{Instant,Duration};
+
+use tokio::runtime::Builder;
+use tokio::sync;
+use tokio::task;
+
+use rand::Rng;
+
+use clap::{Arg, App};
+use num_format::{Locale, ToFormattedString};
+
+#[path = "../bench.rs"]
+mod bench;
+
+#[cfg(debug_assertions)]
+macro_rules! debug {
+	($x:expr) => {
+		println!( $x );
+		io::stdout().flush().unwrap();
+	};
+	($x:expr, $($more:expr),+) => {
+		println!( $x, $($more), * );
+		io::stdout().flush().unwrap();
+	};
+}
+
+#[cfg(not(debug_assertions))]
+macro_rules! debug {
+    ($x:expr   ) => { () };
+    ($x:expr, $($more:expr),+) => { () };
+}
+
+fn parse_yes_no(opt: option::Option<&str>, default: bool) -> bool {
+	match opt {
+		Some(val) => {
+			match val {
+				"yes" => true,
+				"no"  => false,
+				"maybe" | "I don't know" | "Can you repeat the question?" => {
+					eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'");
+					std::process::exit(1);
+				},
+				_ => {
+					eprintln!("parameter 'exhaust' must have value 'yes' or 'no', was {}", val);
+					std::process::exit(1);
+				},
+			}
+		},
+		_ => {
+			default
+		},
+	}
+}
+
+struct LeaderInfo {
+	id: AtomicUsize,
+	idx: AtomicUsize,
+	seed: u128,
+}
+
+impl LeaderInfo {
+	pub fn new(nthreads: usize) -> LeaderInfo {
+		let this = LeaderInfo{
+			id: AtomicUsize::new(nthreads),
+			idx: AtomicUsize::new(0),
+			seed: process::id() as u128
+		};
+
+		let mut rng = rand::thread_rng();
+
+		for _ in 0..rng.gen_range(0..10) {
+			this.next( nthreads );
+		}
+
+		this
+	}
+
+	pub fn next(&self, len: usize) {
+		let n = bench::_lehmer64( unsafe {
+			let r1 = &self.seed as *const u128;
+			let r2 = r1 as *mut u128;
+			&mut *r2
+		} ) as usize;
+		self.id.store( n % len , Ordering::SeqCst );
+	}
+}
+
+struct MyThread {
+	id: usize,
+	idx: AtomicUsize,
+	sem: sync::Semaphore,
+}
+
+fn waitgroup(idx: usize, threads: &Vec<Arc<MyThread>>) {
+	let start = Instant::now();
+	for t in threads {
+		debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) );
+		while t.idx.load(Ordering::Relaxed) != idx {
+			std::sync::atomic::spin_loop_hint();
+			if start.elapsed() > Duration::from_secs(5) {
+				eprintln!("Programs has been blocked for more than 5 secs");
+				std::process::exit(1);
+			}
+		}
+	}
+	debug!( "Waiting done" );
+}
+
+fn wakegroup(exhaust: bool, me: usize, threads: &Vec<Arc<MyThread>>) {
+	if !exhaust { return; }
+
+	for i in 0..threads.len() {
+		if i != me {
+			debug!( "Leader waking {}", i);
+			threads[i].sem.add_permits(1);
+		}
+	}
+}
+
+fn lead(exhaust: bool, leader: &LeaderInfo, this: & MyThread, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore, exp: &bench::BenchData) {
+	let nidx = leader.idx.load(Ordering::Relaxed) + 1;
+	this.idx.store(nidx, Ordering::Relaxed);
+	leader.idx.store(nidx, Ordering::Relaxed);
+
+	if nidx as u64 > exp.stop_count {
+		debug!( "Leader {} done", this.id);
+		main_sem.add_permits(1);
+		return;
+	}
+
+	debug!( "====================\nLeader no {} : {}", nidx, this.id);
+
+	waitgroup(nidx, threads);
+
+	leader.next( threads.len() );
+
+	wakegroup(exhaust, this.id, threads);
+
+	debug!( "Leader no {} : {} done\n====================", nidx, this.id);
+}
+
+async fn wait(exhaust: bool, leader: &LeaderInfo, this: &MyThread, rechecks: &mut usize) {
+	task::yield_now().await;
+
+	if leader.idx.load(Ordering::Relaxed) == this.idx.load(Ordering::Relaxed) {
+		debug!("Waiting {} recheck", this.id);
+		*rechecks += 1;
+		return;
+	}
+
+	debug!("Waiting {}", this.id);
+
+	debug_assert!( (leader.idx.load(Ordering::Relaxed) - 1) == this.idx.load(Ordering::Relaxed) );
+	this.idx.fetch_add(1, Ordering::SeqCst);
+	if exhaust {
+		debug!("Waiting {} sem", this.id);
+		this.sem.acquire().await.forget();
+	}
+	else {
+		debug!("Waiting {} yield", this.id);
+		task::yield_now().await;
+	}
+
+	debug!("Waiting {} done", this.id);
+}
+
+async fn transfer_main( me: usize, leader: Arc<LeaderInfo>, threads: Arc<Vec<Arc<MyThread>>>, exhaust: bool, start: Arc<sync::Barrier>, main_sem: Arc<sync::Semaphore>, exp: Arc<bench::BenchData>) -> usize{
+	assert!( me == threads[me].id );
+
+	debug!("Ready {}: {:p}", me, &threads[me].sem as *const sync::Semaphore);
+
+	start.wait().await;
+
+	debug!( "Start {}", me );
+
+	let mut rechecks: usize = 0;
+
+	loop {
+		if leader.id.load(Ordering::Relaxed) == me {
+			lead( exhaust, &leader, &threads[me], &threads, &main_sem, &exp );
+			task::yield_now().await;
+		}
+		else {
+			wait( exhaust, &leader, &threads[me], &mut rechecks ).await;
+		}
+		if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count { break; }
+	}
+
+	rechecks
+}
+
+fn main() {
+	let options = App::new("Transfer Tokio")
+		.args(&bench::args())
+		.arg(Arg::with_name("exhaust")  .short("e").long("exhaust")  .takes_value(true).default_value("no").help("Whether or not threads that have seen the new epoch should park instead of yielding."))
+		.get_matches();
+
+	let exhaust  = parse_yes_no( options.value_of("exhaust"), false );
+	let nthreads = options.value_of("nthreads").unwrap().parse::<usize>().unwrap();
+	let nprocs   = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
+
+
+	let exp = Arc::new(bench::BenchData::new(options, nthreads, Some(100)));
+	if exp.clock_mode {
+		eprintln!("Programs does not support fixed duration mode");
+		std::process::exit(1);
+	}
+
+	println!("Running {} threads on {} processors, doing {} iterations, {} exhaustion", nthreads, nprocs, exp.stop_count, if exhaust { "with" } else { "without" });
+
+	let s = (1000000 as u64).to_formatted_string(&Locale::en);
+	assert_eq!(&s, "1,000,000");
+
+	let main_sem = Arc::new(sync::Semaphore::new(0));
+	let leader = Arc::new(LeaderInfo::new(nthreads));
+	let barr = Arc::new(sync::Barrier::new(nthreads + 1));
+	let thddata : Arc<Vec<Arc<MyThread>>> = Arc::new(
+		(0..nthreads).map(|i| {
+			Arc::new(MyThread{
+				id: i,
+				idx: AtomicUsize::new(0),
+				sem: sync::Semaphore::new(0),
+			})
+		}).collect()
+	);
+
+	let mut rechecks: usize = 0;
+	let mut duration : std::time::Duration = std::time::Duration::from_secs(0);
+
+	let runtime = Builder::new_multi_thread()
+		.worker_threads(nprocs)
+		.enable_all()
+		.build()
+		.unwrap();
+
+	runtime.block_on(async {
+		let threads: Vec<_> = (0..nthreads).map(|i| {
+			tokio::spawn(transfer_main(i, leader.clone(), thddata.clone(), exhaust, barr.clone(), main_sem.clone(), exp.clone()))
+		}).collect();
+		println!("Starting");
+
+		let start = Instant::now();
+
+		barr.wait().await;
+		debug!("Unlocked all");
+
+
+		main_sem.acquire().await.forget();
+
+		duration = start.elapsed();
+
+		println!("\nDone");
+
+
+		for i in 0..nthreads {
+			thddata[i].sem.add_permits(1);
+		}
+
+		for t in threads {
+			rechecks += t.await.unwrap();
+		}
+	});
+
+	println!("Duration (ms)           : {}", (duration.as_millis()).to_formatted_string(&Locale::en));
+	println!("Number of processors    : {}", (nprocs).to_formatted_string(&Locale::en));
+	println!("Number of threads       : {}", (nthreads).to_formatted_string(&Locale::en));
+	println!("Total Operations(ops)   : {:>15}", (exp.stop_count).to_formatted_string(&Locale::en));
+	println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" });
+	println!("Rechecking              : {}", rechecks );
+}
Index: benchmark/readyQ/yield.rs
===================================================================
--- benchmark/readyQ/yield.rs	(revision d0b9247e0319aa788954f0367c5a245623826293)
+++ benchmark/readyQ/yield.rs	(revision 821c534aa3eaeb777a33fb9c44f8effeb9f54b85)
@@ -44,5 +44,5 @@
 	let nprocs    = options.value_of("nprocs").unwrap().parse::<usize>().unwrap();
 
-	let exp = Arc::new(bench::BenchData::new(options, nthreads));
+	let exp = Arc::new(bench::BenchData::new(options, nthreads, None));
 
 	let s = (1000000 as u64).to_formatted_string(&Locale::en);
@@ -50,6 +50,5 @@
 
 	let thddata : Arc<Vec<Arc<Yielder>>> = Arc::new(
-		(0..nthreads).map(|i| {
-			let pi = (i + nthreads) % nthreads;
+		(0..nthreads).map(|_i| {
 			Arc::new(Yielder{
 				sem: sync::Semaphore::new(0),
