 Timestamp:
 Apr 12, 2021, 5:22:02 PM (7 months ago)
 Branches:
 armeh, jacob/cs343translation, master, newastuniqueexpr
 Children:
 7f5683e
 Parents:
 fe63ae6
 File:

 1 edited
Legend:
 Unmodified
 Added
 Removed

doc/theses/thierry_delisle_PhD/code/readyQ_proto/work_stealing.hpp
rfe63ae6 ra1b9bc3 6 6 #include <memory> 7 7 #include <mutex> 8 #include <thread> 8 9 #include <type_traits> 9 10 … … 11 12 #include "utils.hpp" 12 13 #include "links.hpp" 14 #include "links2.hpp" 13 15 #include "snzi.hpp" 14 16 17 #include <x86intrin.h> 18 15 19 using namespace std; 20 21 static const long long lim = 2000; 22 static const unsigned nqueues = 2; 23 24 struct __attribute__((aligned(128))) timestamp_t { 25 volatile unsigned long long val = 0; 26 }; 27 28 template<typename node_t> 29 struct __attribute__((aligned(128))) localQ_t { 30 mpsc_queue<node_t> queue = {}; 31 spinlock_t lock = {}; 32 bool needs_help = true; 33 }; 16 34 17 35 template<typename node_t> … … 25 43 26 44 work_stealing(unsigned _numThreads, unsigned) 27 : numThreads(_numThreads )45 : numThreads(_numThreads * nqueues) 28 46 , lists(new intrusive_queue_t<node_t>[numThreads]) 29 , snzi( std::log2( numThreads / 2 ), 2 ) 47 , times(new timestamp_t[numThreads]) 48 // , snzi( std::log2( numThreads / 2 ), 2 ) 30 49 31 50 { … … 39 58 40 59 __attribute__((noinline, hot)) void push(node_t * node) { 41 node>_links.ts = rdtscl(); 42 if( node>_links.hint > numThreads ) { 43 node>_links.hint = tls.rng.next() % numThreads; 44 tls.stat.push.nhint++; 45 } 46 47 unsigned i = node>_links.hint; 60 // node>_links.ts = rdtscl(); 61 node>_links.ts = 1; 62 63 auto & list = *({ 64 unsigned i; 65 do { 66 tls.stats.push.attempt++; 67 // unsigned r = tls.rng1.next(); 68 unsigned r = tls.it++; 69 if(tls.my_queue == outside) { 70 i = r % numThreads; 71 } else { 72 i = tls.my_queue + (r % nqueues); 73 } 74 } while(!lists[i].lock.try_lock()); 75 &lists[i]; 76 }); 77 78 list.push( node ); 79 list.lock.unlock(); 80 // tls.rng2.set_raw_state( tls.rng1.get_raw_state()); 81 // count++; 82 tls.stats.push.success++; 83 } 84 85 __attribute__((noinline, hot)) node_t * pop() { 86 if( tls.myfriend == outside ) { 87 auto r = tls.rng1.next(); 88 tls.myfriend = r % numThreads; 89 times[tls.myfriend].val = 0; 90 } 91 else if(times[tls.myfriend].val == 0) { 92 node_t * n = try_pop(tls.myfriend, tls.stats.pop.help); 93 tls.stats.help++; 94 tls.myfriend = outside; 95 if(n) return n; 96 } 97 98 if(tls.my_queue != outside) { 99 node_t * n = local(); 100 if(n) return n; 101 } 102 103 // try steal 104 for(int i = 0; i < 25; i++) { 105 node_t * n = steal(); 106 if(n) return n; 107 } 108 109 return search(); 110 } 111 112 private: 113 inline node_t * local() { 114 // unsigned i = (tls.rng2.prev() % 4) + tls.my_queue; 115 unsigned i = (tls.it % nqueues) + tls.my_queue; 116 return try_pop(i, tls.stats.pop.local); 117 } 118 119 inline node_t * steal() { 120 unsigned i = tls.rng2.prev() % numThreads; 121 return try_pop(i, tls.stats.pop.steal); 122 } 123 124 inline node_t * search() { 125 unsigned offset = tls.rng2.prev(); 126 for(unsigned i = 0; i < numThreads; i++) { 127 unsigned idx = (offset + i) % numThreads; 128 node_t * thrd = try_pop(idx, tls.stats.pop.search); 129 if(thrd) { 130 return thrd; 131 } 132 } 133 134 return nullptr; 135 } 136 137 private: 138 struct attempt_stat_t { 139 std::size_t attempt = { 0 }; 140 std::size_t elock = { 0 }; 141 std::size_t eempty = { 0 }; 142 std::size_t espec = { 0 }; 143 std::size_t success = { 0 }; 144 }; 145 146 node_t * try_pop(unsigned i, attempt_stat_t & stat) { 147 assert(i < numThreads); 48 148 auto & list = lists[i]; 49 list.lock.lock(); 50 51 if(list.push( node )) { 52 snzi.arrive(i); 53 } 54 55 list.lock.unlock(); 56 } 57 58 __attribute__((noinline, hot)) node_t * pop() { 59 node_t * node; 60 while(true) { 61 if(!snzi.query()) { 62 return nullptr; 63 } 64 65 { 66 unsigned i = tls.my_queue; 67 auto & list = lists[i]; 68 if( list.ts() != 0 ) { 69 list.lock.lock(); 70 if((node = try_pop(i))) { 71 tls.stat.pop.local.success++; 72 break; 73 } 74 else { 75 tls.stat.pop.local.elock++; 76 } 77 } 78 else { 79 tls.stat.pop.local.espec++; 80 } 81 } 82 83 tls.stat.pop.steal.tried++; 84 85 int i = tls.rng.next() % numThreads; 86 auto & list = lists[i]; 87 if( list.ts() == 0 ) { 88 tls.stat.pop.steal.empty++; 89 continue; 90 } 91 92 if( !list.lock.try_lock() ) { 93 tls.stat.pop.steal.locked++; 94 continue; 95 } 96 97 if((node = try_pop(i))) { 98 tls.stat.pop.steal.success++; 99 break; 100 } 101 } 102 103 #if defined(READ) 104 const unsigned f = READ; 105 if(0 == (tls.it % f)) { 106 unsigned i = tls.it / f; 107 lists[i % numThreads].ts(); 108 } 109 // lists[tls.it].ts(); 110 tls.it++; 111 #endif 112 113 114 return node; 115 } 116 117 private: 118 node_t * try_pop(unsigned i) { 119 auto & list = lists[i]; 149 stat.attempt++; 150 151 // If the list is empty, don't try 152 if(list.ts() == 0) { stat.espec++; return nullptr; } 153 154 // If we can't get the lock, move on 155 if( !list.lock.try_lock() ) { stat.elock++; return nullptr; } 156 120 157 121 158 // If list is empty, unlock and retry 122 159 if( list.ts() == 0 ) { 123 160 list.lock.unlock(); 161 stat.eempty++; 124 162 return nullptr; 125 163 } 126 164 127 // Actually pop the list 128 node_t * node; 129 bool emptied; 130 std::tie(node, emptied) = list.pop(); 131 assert(node); 132 133 if(emptied) { 134 snzi.depart(i); 135 } 136 137 // Unlock and return 165 auto node = list.pop(); 138 166 list.lock.unlock(); 139 return node; 167 stat.success++; 168 times[i].val = 1; //node.first>_links.ts; 169 // count; 170 // _mm_stream_si64((long long int*)×[i].val, node.first>_links.ts); 171 return node.first; 140 172 } 141 173 … … 144 176 145 177 static std::atomic_uint32_t ticket; 178 static const unsigned outside = 0xFFFFFFFF; 179 180 static inline unsigned calc_preferred() { 181 unsigned t = ticket++; 182 if(t == 0) return outside; 183 unsigned i = (t  1) * nqueues; 184 return i; 185 } 186 146 187 static __attribute__((aligned(128))) thread_local struct TLS { 147 Random rng = { int(rdtscl()) }; 148 unsigned my_queue = ticket++; 188 Random rng1 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) }; 189 Random rng2 = { unsigned(std::hash<std::thread::id>{}(std::this_thread::get_id()) ^ rdtscl()) }; 190 unsigned it = 0; 191 unsigned my_queue = calc_preferred(); 192 unsigned myfriend = outside; 149 193 #if defined(READ) 150 194 unsigned it = 0; … … 152 196 struct { 153 197 struct { 154 std::size_t nhint = { 0 }; 198 std::size_t attempt = { 0 }; 199 std::size_t success = { 0 }; 155 200 } push; 156 201 struct { 157 struct { 158 std::size_t success = { 0 }; 159 std::size_t espec = { 0 }; 160 std::size_t elock = { 0 }; 161 } local; 162 struct { 163 std::size_t tried = { 0 }; 164 std::size_t locked = { 0 }; 165 std::size_t empty = { 0 }; 166 std::size_t success = { 0 }; 167 } steal; 202 attempt_stat_t help; 203 attempt_stat_t local; 204 attempt_stat_t steal; 205 attempt_stat_t search; 168 206 } pop; 169 } stat; 207 std::size_t help = { 0 }; 208 } stats; 170 209 } tls; 171 210 … … 173 212 const unsigned numThreads; 174 213 std::unique_ptr<intrusive_queue_t<node_t> []> lists; 175 __attribute__((aligned(64))) snzi_t snzi; 214 std::unique_ptr<timestamp_t []> times; 215 __attribute__((aligned(128))) std::atomic_size_t count; 176 216 177 217 #ifndef NO_STATS … … 179 219 static struct GlobalStats { 180 220 struct { 181 std::atomic_size_t nhint = { 0 }; 221 std::atomic_size_t attempt = { 0 }; 222 std::atomic_size_t success = { 0 }; 182 223 } push; 183 224 struct { 184 225 struct { 226 std::atomic_size_t attempt = { 0 }; 227 std::atomic_size_t elock = { 0 }; 228 std::atomic_size_t eempty = { 0 }; 229 std::atomic_size_t espec = { 0 }; 185 230 std::atomic_size_t success = { 0 }; 186 std::atomic_size_t espec = { 0 }; 187 std::atomic_size_t elock = { 0 }; 231 } help; 232 struct { 233 std::atomic_size_t attempt = { 0 }; 234 std::atomic_size_t elock = { 0 }; 235 std::atomic_size_t eempty = { 0 }; 236 std::atomic_size_t espec = { 0 }; 237 std::atomic_size_t success = { 0 }; 188 238 } local; 189 239 struct { 190 std::atomic_size_t tried = { 0 }; 191 std::atomic_size_t locked = { 0 }; 192 std::atomic_size_t empty = { 0 }; 240 std::atomic_size_t attempt = { 0 }; 241 std::atomic_size_t elock = { 0 }; 242 std::atomic_size_t eempty = { 0 }; 243 std::atomic_size_t espec = { 0 }; 193 244 std::atomic_size_t success = { 0 }; 194 245 } steal; 246 struct { 247 std::atomic_size_t attempt = { 0 }; 248 std::atomic_size_t elock = { 0 }; 249 std::atomic_size_t eempty = { 0 }; 250 std::atomic_size_t espec = { 0 }; 251 std::atomic_size_t success = { 0 }; 252 } search; 195 253 } pop; 254 std::atomic_size_t help = { 0 }; 196 255 } global_stats; 197 256 198 257 public: 199 258 static void stats_tls_tally() { 200 global_stats.push.nhint += tls.stat.push.nhint; 201 global_stats.pop.local.success += tls.stat.pop.local.success; 202 global_stats.pop.local.espec += tls.stat.pop.local.espec ; 203 global_stats.pop.local.elock += tls.stat.pop.local.elock ; 204 global_stats.pop.steal.tried += tls.stat.pop.steal.tried ; 205 global_stats.pop.steal.locked += tls.stat.pop.steal.locked ; 206 global_stats.pop.steal.empty += tls.stat.pop.steal.empty ; 207 global_stats.pop.steal.success += tls.stat.pop.steal.success; 208 } 209 210 static void stats_print(std::ostream & os ) { 259 global_stats.push.attempt += tls.stats.push.attempt; 260 global_stats.push.success += tls.stats.push.success; 261 global_stats.pop.help .attempt += tls.stats.pop.help .attempt; 262 global_stats.pop.help .elock += tls.stats.pop.help .elock ; 263 global_stats.pop.help .eempty += tls.stats.pop.help .eempty ; 264 global_stats.pop.help .espec += tls.stats.pop.help .espec ; 265 global_stats.pop.help .success += tls.stats.pop.help .success; 266 global_stats.pop.local .attempt += tls.stats.pop.local .attempt; 267 global_stats.pop.local .elock += tls.stats.pop.local .elock ; 268 global_stats.pop.local .eempty += tls.stats.pop.local .eempty ; 269 global_stats.pop.local .espec += tls.stats.pop.local .espec ; 270 global_stats.pop.local .success += tls.stats.pop.local .success; 271 global_stats.pop.steal .attempt += tls.stats.pop.steal .attempt; 272 global_stats.pop.steal .elock += tls.stats.pop.steal .elock ; 273 global_stats.pop.steal .eempty += tls.stats.pop.steal .eempty ; 274 global_stats.pop.steal .espec += tls.stats.pop.steal .espec ; 275 global_stats.pop.steal .success += tls.stats.pop.steal .success; 276 global_stats.pop.search.attempt += tls.stats.pop.search.attempt; 277 global_stats.pop.search.elock += tls.stats.pop.search.elock ; 278 global_stats.pop.search.eempty += tls.stats.pop.search.eempty ; 279 global_stats.pop.search.espec += tls.stats.pop.search.espec ; 280 global_stats.pop.search.success += tls.stats.pop.search.success; 281 global_stats.help += tls.stats.help; 282 } 283 284 static void stats_print(std::ostream & os, double duration ) { 211 285 std::cout << " Work Stealing Stats " << std::endl; 212 286 213 double stealSucc = double(global_stats.pop.steal.success) / global_stats.pop.steal.tried; 214 os << "Push to new Q : " << std::setw(15) << global_stats.push.nhint << "\n"; 215 os << "Local Pop : " << std::setw(15) << global_stats.pop.local.success << "\n"; 216 os << "Steal Pop : " << std::setw(15) << global_stats.pop.steal.success << "(" << global_stats.pop.local.espec << "s, " << global_stats.pop.local.elock << "l)\n"; 217 os << "Steal Success : " << std::setw(15) << stealSucc << "(" << global_stats.pop.steal.tried << " tries)\n"; 218 os << "Steal Fails : " << std::setw(15) << global_stats.pop.steal.empty << "e, " << global_stats.pop.steal.locked << "l\n"; 287 double push_suc = (100.0 * double(global_stats.push.success) / global_stats.push.attempt); 288 double push_len = double(global_stats.push.attempt ) / global_stats.push.success; 289 os << "Push Pick : " << push_suc << " %, len " << push_len << " (" << global_stats.push.attempt << " / " << global_stats.push.success << ")\n"; 290 291 double hlp_suc = (100.0 * double(global_stats.pop.help.success) / global_stats.pop.help.attempt); 292 double hlp_len = double(global_stats.pop.help.attempt ) / global_stats.pop.help.success; 293 os << "Help : " << hlp_suc << " %, len " << hlp_len << " (" << global_stats.pop.help.attempt << " / " << global_stats.pop.help.success << ")\n"; 294 os << "Help Fail : " << global_stats.pop.help.espec << "s, " << global_stats.pop.help.eempty << "e, " << global_stats.pop.help.elock << "l\n"; 295 296 double pop_suc = (100.0 * double(global_stats.pop.local.success) / global_stats.pop.local.attempt); 297 double pop_len = double(global_stats.pop.local.attempt ) / global_stats.pop.local.success; 298 os << "Local : " << pop_suc << " %, len " << pop_len << " (" << global_stats.pop.local.attempt << " / " << global_stats.pop.local.success << ")\n"; 299 os << "Local Fail : " << global_stats.pop.local.espec << "s, " << global_stats.pop.local.eempty << "e, " << global_stats.pop.local.elock << "l\n"; 300 301 double stl_suc = (100.0 * double(global_stats.pop.steal.success) / global_stats.pop.steal.attempt); 302 double stl_len = double(global_stats.pop.steal.attempt ) / global_stats.pop.steal.success; 303 os << "Steal : " << stl_suc << " %, len " << stl_len << " (" << global_stats.pop.steal.attempt << " / " << global_stats.pop.steal.success << ")\n"; 304 os << "Steal Fail : " << global_stats.pop.steal.espec << "s, " << global_stats.pop.steal.eempty << "e, " << global_stats.pop.steal.elock << "l\n"; 305 306 double srh_suc = (100.0 * double(global_stats.pop.search.success) / global_stats.pop.search.attempt); 307 double srh_len = double(global_stats.pop.search.attempt ) / global_stats.pop.search.success; 308 os << "Search : " << srh_suc << " %, len " << srh_len << " (" << global_stats.pop.search.attempt << " / " << global_stats.pop.search.success << ")\n"; 309 os << "Search Fail : " << global_stats.pop.search.espec << "s, " << global_stats.pop.search.eempty << "e, " << global_stats.pop.search.elock << "l\n"; 310 os << "Helps : " << std::setw(15) << std::scientific << global_stats.help / duration << "/sec (" << global_stats.help << ")\n"; 219 311 } 220 312 private:
Note: See TracChangeset
for help on using the changeset viewer.