Changeset 65c9208 for benchmark/readyQ
- Timestamp:
- May 10, 2022, 12:28:54 PM (3 years ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation, qualifiedEnum
- Children:
- e07187d
- Parents:
- 3613e25
- Location:
- benchmark/readyQ
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
benchmark/readyQ/transfer.cfa
r3613e25 r65c9208 14 14 15 15 bool exhaust = false; 16 volatile bool estop = false; 17 16 18 17 19 thread$ * the_main; … … 35 37 static void waitgroup() { 36 38 Time start = timeHiRes(); 37 for(i; nthreads) {39 OUTER: for(i; nthreads) { 38 40 PRINT( sout | "Waiting for :" | i | "(" | threads[i]->idx | ")"; ) 39 41 while( threads[i]->idx != lead_idx ) { … … 42 44 print_stats_now( bench_cluster, CFA_STATS_READY_Q | CFA_STATS_IO ); 43 45 serr | "Programs has been blocked for more than 5 secs"; 44 exit(1); 46 estop = true; 47 unpark( the_main ); 48 break OUTER; 45 49 } 46 50 } … … 59 63 static void lead(MyThread & this) { 60 64 this.idx = ++lead_idx; 61 if(lead_idx > stop_count ) {65 if(lead_idx > stop_count || estop) { 62 66 PRINT( sout | "Leader" | this.id | "done"; ) 63 67 unpark( the_main ); … … 100 104 wait( this ); 101 105 } 102 if(lead_idx > stop_count ) break;106 if(lead_idx > stop_count || estop) break; 103 107 } 104 108 } … … 172 176 sout | "Number of processors : " | nprocs; 173 177 sout | "Number of threads : " | nthreads; 174 sout | "Total Operations(ops) : " | stop_count;178 sout | "Total Operations(ops) : " | lead_idx - 1; 175 179 sout | "Threads parking on wait : " | (exhaust ? "yes" : "no"); 176 180 sout | "Rechecking : " | rechecks; 181 sout | "ns per transfer : " | (end - start)`dms / lead_idx; 177 182 178 183 -
benchmark/readyQ/transfer.cpp
r3613e25 r65c9208 12 12 13 13 bool exhaust = false; 14 volatile bool estop = false; 14 15 15 16 bench_sem the_main; … … 42 43 if( to_miliseconds(timeHiRes() - start) > 5'000 ) { 43 44 std::cerr << "Programs has been blocked for more than 5 secs" << std::endl; 44 std::exit(1); 45 estop = true; 46 the_main.post(); 47 goto END; 45 48 } 46 49 } 47 50 } 51 END:; 48 52 PRINT( std::cout | "Waiting done"; ) 49 53 } … … 59 63 void lead() { 60 64 this->idx = ++lead_idx; 61 if(lead_idx > stop_count ) {65 if(lead_idx > stop_count || estop) { 62 66 PRINT( std::cout << "Leader " << this->id << " done" << std::endl; ) 63 67 the_main.post(); … … 88 92 } 89 93 90 static void main( void * arg) {91 MyThread & self = * reinterpret_cast<MyThread*>(arg);94 static void main(MyThread * arg) { 95 MyThread & self = *arg; 92 96 self.park(); 93 97 … … 101 105 self.wait(); 102 106 } 103 if(lead_idx > stop_count ) break;107 if(lead_idx > stop_count || estop) break; 104 108 } 105 109 } … … 144 148 for(size_t i = 0; i < nthreads; i++) { 145 149 threads[i] = new MyThread( i ); 146 handles[i] = new Fibre( MyThread::main, threads[i] ); 150 handles[i] = new Fibre(); 151 handles[i]->run( MyThread::main, threads[i] ); 147 152 } 148 153 … … 164 169 PRINT( std::cout << i << " joined" << std::endl; ) 165 170 rechecks += thrd.rechecks; 166 // delete( handles[i] );167 171 delete( threads[i] ); 168 172 } … … 176 180 std::cout << "Number of processors : " << nprocs << std::endl; 177 181 std::cout << "Number of threads : " << nthreads << std::endl; 178 std::cout << "Total Operations(ops) : " << stop_count<< std::endl;182 std::cout << "Total Operations(ops) : " << (lead_idx - 1) << std::endl; 179 183 std::cout << "Threads parking on wait : " << (exhaust ? "yes" : "no") << std::endl; 180 184 std::cout << "Rechecking : " << rechecks << std::endl; 185 std::cout << "ns per transfer : " << std::fixed << (((double)(end - start)) / (lead_idx)) << std::endl; 181 186 182 187 -
benchmark/readyQ/transfer.go
r3613e25 r65c9208 6 6 "math/rand" 7 7 "os" 8 "regexp" 8 9 "runtime" 9 10 "sync/atomic" … … 16 17 id uint64 17 18 idx uint64 19 estop uint64 18 20 seed uint64 19 21 } … … 34 36 35 37 func NewLeader(size uint64) (*LeaderInfo) { 36 this := &LeaderInfo{0, 0, uint64(os.Getpid())}38 this := &LeaderInfo{0, 0, 0, uint64(os.Getpid())} 37 39 38 40 r := rand.Intn(10) … … 51 53 } 52 54 53 func waitgroup( idx uint64, threads [] MyThread) {55 func waitgroup(leader * LeaderInfo, idx uint64, threads [] MyThread, main_sem chan struct {}) { 54 56 start := time.Now() 57 Outer: 55 58 for i := 0; i < len(threads); i++ { 56 59 // fmt.Fprintf(os.Stderr, "Waiting for :%d (%d)\n", threads[i].id, atomic.LoadUint64(&threads[i].idx) ); … … 61 64 if delta.Seconds() > 5 { 62 65 fmt.Fprintf(os.Stderr, "Programs has been blocked for more than 5 secs") 63 os.Exit(1) 66 atomic.StoreUint64(&leader.estop, 1); 67 main_sem <- (struct {}{}) 68 break Outer 64 69 } 65 70 } … … 74 79 if i != me { 75 80 // debug!( "Leader waking {}", i); 81 defer func() { 82 if err := recover(); err != nil { 83 fmt.Fprintf(os.Stderr, "Panic occurred: %s\n", err) 84 } 85 }() 76 86 threads[i].sem <- (struct {}{}) 77 87 } … … 84 94 atomic.StoreUint64(&leader.idx, nidx); 85 95 86 if nidx > stop_count {96 if nidx > stop_count || atomic.LoadUint64(&leader.estop) != 0 { 87 97 // debug!( "Leader {} done", this.id); 88 98 main_sem <- (struct {}{}) … … 92 102 // debug!( "====================\nLeader no {} : {}", nidx, this.id); 93 103 94 waitgroup( nidx, threads);104 waitgroup(leader, nidx, threads, main_sem); 95 105 96 106 leader.next( uint64(len(threads)) ); … … 146 156 waitleader( exhaust, leader, &threads[me], &r ) 147 157 } 148 if atomic.LoadUint64(&leader.idx) > stop_count { break; }158 if atomic.LoadUint64(&leader.idx) > stop_count || atomic.LoadUint64(&leader.estop) != 0 { break; } 149 159 } 150 160 … … 155 165 func main() { 156 166 // Benchmark specific command line arguments 157 exhaustOpt := flag. Bool("e", false, "Whether or not threads that have seen the new epoch should park instead of yielding.")167 exhaustOpt := flag.String("e", "no", "Whether or not threads that have seen the new epoch should park instead of yielding.") 158 168 159 169 // General benchmark initialization and deinitialization 160 defer bench_init()() 161 162 exhaust := *exhaustOpt; 170 bench_init() 171 172 exhaustVal := *exhaustOpt; 173 174 var exhaust bool 175 re_yes := regexp.MustCompile("[Yy]|[Yy][Ee][Ss]") 176 re_no := regexp.MustCompile("[Nn]|[Nn][Oo]") 177 if re_yes.Match([]byte(exhaustVal)) { 178 exhaust = true 179 } else if re_no.Match([]byte(exhaustVal)) { 180 exhaust = false 181 } else { 182 fmt.Fprintf(os.Stderr, "Unrecognized exhaust(-e) option '%s'\n", exhaustVal) 183 os.Exit(1) 184 } 185 163 186 if clock_mode { 164 fmt.Fprintf(os.Stderr, "Programs does not support fixed duration mode ")187 fmt.Fprintf(os.Stderr, "Programs does not support fixed duration mode\n") 165 188 os.Exit(1) 166 189 } … … 215 238 ws = "no" 216 239 } 217 p.Printf("Duration (ms) : % f\n", delta.Milliseconds() )240 p.Printf("Duration (ms) : %d\n", delta.Milliseconds() ) 218 241 p.Printf("Number of processors : %d\n", nprocs ) 219 242 p.Printf("Number of threads : %d\n", nthreads ) 220 p.Printf("Total Operations(ops) : %15d\n", stop_count)243 p.Printf("Total Operations(ops) : %15d\n", (leader.idx - 1) ) 221 244 p.Printf("Threads parking on wait : %s\n", ws) 222 245 p.Printf("Rechecking : %d\n", rechecks ) 223 } 246 p.Printf("ns per transfer : %f\n", float64(delta.Nanoseconds()) / float64(leader.idx) ) 247 } -
benchmark/readyQ/transfer.rs
r3613e25 r65c9208 6 6 use std::hint; 7 7 use std::sync::Arc; 8 use std::sync::atomic::{Atomic Usize, Ordering};8 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 9 9 use std::time::{Instant,Duration}; 10 10 … … 44 44 match val { 45 45 "yes" => true, 46 "Y" => true, 47 "y" => true, 46 48 "no" => false, 49 "N" => false, 50 "n" => false, 47 51 "maybe" | "I don't know" | "Can you repeat the question?" => { 48 52 eprintln!("Lines for 'Malcolm in the Middle' are not acceptable values of parameter 'exhaust'"); … … 64 68 id: AtomicUsize, 65 69 idx: AtomicUsize, 70 estop: AtomicBool, 66 71 seed: u128, 67 72 } … … 72 77 id: AtomicUsize::new(nthreads), 73 78 idx: AtomicUsize::new(0), 79 estop: AtomicBool::new(false), 74 80 seed: process::id() as u128 75 81 }; … … 100 106 } 101 107 102 fn waitgroup( idx: usize, threads: &Vec<Arc<MyThread>>) {108 fn waitgroup(leader: &LeaderInfo, idx: usize, threads: &Vec<Arc<MyThread>>, main_sem: &sync::Semaphore) { 103 109 let start = Instant::now(); 104 for t in threads {110 'outer: for t in threads { 105 111 debug!( "Waiting for :{} ({})", t.id, t.idx.load(Ordering::Relaxed) ); 106 112 while t.idx.load(Ordering::Relaxed) != idx { … … 108 114 if start.elapsed() > Duration::from_secs(5) { 109 115 eprintln!("Programs has been blocked for more than 5 secs"); 110 std::process::exit(1); 116 leader.estop.store(true, Ordering::Relaxed); 117 main_sem.add_permits(1); 118 break 'outer; 111 119 } 112 120 } … … 131 139 leader.idx.store(nidx, Ordering::Relaxed); 132 140 133 if nidx as u64 > exp.stop_count {141 if nidx as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { 134 142 debug!( "Leader {} done", this.id); 135 143 main_sem.add_permits(1); … … 139 147 debug!( "====================\nLeader no {} : {}", nidx, this.id); 140 148 141 waitgroup( nidx, threads);149 waitgroup(leader, nidx, threads, main_sem); 142 150 143 151 leader.next( threads.len() ); … … 192 200 wait( exhaust, &leader, &threads[me], &mut rechecks ).await; 193 201 } 194 if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count { break; }202 if leader.idx.load(Ordering::Relaxed) as u64 > exp.stop_count || leader.estop.load(Ordering::Relaxed) { break; } 195 203 } 196 204 … … 273 281 println!("Number of processors : {}", (nprocs).to_formatted_string(&Locale::en)); 274 282 println!("Number of threads : {}", (nthreads).to_formatted_string(&Locale::en)); 275 println!("Total Operations(ops) : {:>15}", ( exp.stop_count).to_formatted_string(&Locale::en));283 println!("Total Operations(ops) : {:>15}", (leader.idx.load(Ordering::Relaxed) - 1).to_formatted_string(&Locale::en)); 276 284 println!("Threads parking on wait : {}", if exhaust { "yes" } else { "no" }); 277 285 println!("Rechecking : {}", rechecks ); 278 } 286 println!("ns per transfer : {}", ((duration.as_nanos() as f64) / leader.idx.load(Ordering::Relaxed) as f64)); 287 288 }
Note: See TracChangeset
for help on using the changeset viewer.