- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel/fwd.hfa
rfe9468e2 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel/fwd.hfa -- 7 // kernel/fwd.hfa -- PUBLIC 8 // Fundamental code needed to implement threading M.E.S. algorithms. 8 9 // 9 10 // Author : Thierry Delisle … … 134 135 extern uint64_t thread_rand(); 135 136 137 // Semaphore which only supports a single thread 138 struct single_sem { 139 struct $thread * volatile ptr; 140 }; 141 142 static inline { 143 void ?{}(single_sem & this) { 144 this.ptr = 0p; 145 } 146 147 void ^?{}(single_sem &) {} 148 149 bool wait(single_sem & this) { 150 for() { 151 struct $thread * expected = this.ptr; 152 if(expected == 1p) { 153 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 154 return false; 155 } 156 } 157 else { 158 /* paranoid */ verify( expected == 0p ); 159 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 160 park(); 161 return true; 162 } 163 } 164 165 } 166 } 167 168 bool post(single_sem & this) { 169 for() { 170 struct $thread * expected = this.ptr; 171 if(expected == 1p) return false; 172 if(expected == 0p) { 173 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 174 return false; 175 } 176 } 177 else { 178 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 179 unpark( expected ); 180 return true; 181 } 182 } 183 } 184 } 185 } 186 187 // Synchronozation primitive which only supports a single thread and one post 188 // Similar to a binary semaphore with a 'one shot' semantic 189 // is expected to be discarded after each party call their side 190 struct oneshot { 191 // Internal state : 192 // 0p : is initial state (wait will block) 193 // 1p : fulfilled (wait won't block) 194 // any thread : a thread is currently waiting 195 struct $thread * volatile ptr; 196 }; 197 198 static inline { 199 void ?{}(oneshot & this) { 200 this.ptr = 0p; 201 } 202 203 void ^?{}(oneshot &) {} 204 205 // Wait for the post, return immidiately if it already happened. 206 // return true if the thread was parked 207 bool wait(oneshot & this) { 208 for() { 209 struct $thread * expected = this.ptr; 210 if(expected == 1p) return false; 211 /* paranoid */ verify( expected == 0p ); 212 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 213 park(); 214 /* paranoid */ verify( this.ptr == 1p ); 215 return true; 216 } 217 } 218 } 219 220 // Mark as fulfilled, wake thread if needed 221 // return true if a thread was unparked 222 bool post(oneshot & this) { 223 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 224 if( got == 0p ) return false; 225 unpark( got ); 226 return true; 227 } 228 } 229 230 // base types for future to build upon 231 // It is based on the 'oneshot' type to allow multiple futures 232 // to block on the same instance, permitting users to block a single 233 // thread on "any of" [a given set of] futures. 234 // does not support multiple threads waiting on the same future 235 struct future_t { 236 // Internal state : 237 // 0p : is initial state (wait will block) 238 // 1p : fulfilled (wait won't block) 239 // 2p : in progress () 240 // 3p : abandoned, server should delete 241 // any oneshot : a context has been setup to wait, a thread could wait on it 242 struct oneshot * volatile ptr; 243 }; 244 245 static inline { 246 void ?{}(future_t & this) { 247 this.ptr = 0p; 248 } 249 250 void ^?{}(future_t &) {} 251 252 void reset(future_t & this) { 253 // needs to be in 0p or 1p 254 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 255 } 256 257 // check if the future is available 258 bool available( future_t & this ) { 259 return this.ptr == 1p; 260 } 261 262 // Prepare the future to be waited on 263 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 264 bool setup( future_t & this, oneshot & wait_ctx ) { 265 /* paranoid */ verify( wait_ctx.ptr == 0p ); 266 // The future needs to set the wait context 267 for() { 268 struct oneshot * expected = this.ptr; 269 // Is the future already fulfilled? 270 if(expected == 1p) return false; // Yes, just return false (didn't block) 271 272 // The future is not fulfilled, try to setup the wait context 273 /* paranoid */ verify( expected == 0p ); 274 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 275 return true; 276 } 277 } 278 } 279 280 // Stop waiting on a future 281 // When multiple futures are waited for together in "any of" pattern 282 // futures that weren't fulfilled before the thread woke up 283 // should retract the wait ctx 284 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 285 void retract( future_t & this, oneshot & wait_ctx ) { 286 // Remove the wait context 287 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 288 289 // got == 0p: future was never actually setup, just return 290 if( got == 0p ) return; 291 292 // got == wait_ctx: since fulfil does an atomic_swap, 293 // if we got back the original then no one else saw context 294 // It is safe to delete (which could happen after the return) 295 if( got == &wait_ctx ) return; 296 297 // got == 1p: the future is ready and the context was fully consumed 298 // the server won't use the pointer again 299 // It is safe to delete (which could happen after the return) 300 if( got == 1p ) return; 301 302 // got == 2p: the future is ready but the context hasn't fully been consumed 303 // spin until it is safe to move on 304 if( got == 2p ) { 305 while( this.ptr != 1p ) Pause(); 306 return; 307 } 308 309 // got == any thing else, something wen't wrong here, abort 310 abort("Future in unexpected state"); 311 } 312 313 // Mark the future as abandoned, meaning it will be deleted by the server 314 bool abandon( future_t & this ) { 315 /* paranoid */ verify( this.ptr != 3p ); 316 317 // Mark the future as abandonned 318 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST); 319 320 // If the future isn't already fulfilled, let the server delete it 321 if( got == 0p ) return false; 322 323 // got == 2p: the future is ready but the context hasn't fully been consumed 324 // spin until it is safe to move on 325 if( got == 2p ) { 326 while( this.ptr != 1p ) Pause(); 327 got = 1p; 328 } 329 330 // The future is completed delete it now 331 /* paranoid */ verify( this.ptr != 1p ); 332 free( &this ); 333 return true; 334 } 335 336 // from the server side, mark the future as fulfilled 337 // delete it if needed 338 bool fulfil( future_t & this ) { 339 for() { 340 struct oneshot * expected = this.ptr; 341 // was this abandoned? 342 #if defined(__GNUC__) && __GNUC__ >= 7 343 #pragma GCC diagnostic push 344 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 345 #endif 346 if( expected == 3p ) { free( &this ); return false; } 347 #if defined(__GNUC__) && __GNUC__ >= 7 348 #pragma GCC diagnostic pop 349 #endif 350 351 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen 352 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case. 353 354 // If there is a wait context, we need to consume it and mark it as consumed after 355 // If there is no context then we can skip the in progress phase 356 struct oneshot * want = expected == 0p ? 1p : 2p; 357 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 358 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; } 359 bool ret = post( *expected ); 360 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 361 return ret; 362 } 363 } 364 365 } 366 367 // Wait for the future to be fulfilled 368 bool wait( future_t & this ) { 369 oneshot temp; 370 if( !setup(this, temp) ) return false; 371 372 // Wait context is setup, just wait on it 373 bool ret = wait( temp ); 374 375 // Wait for the future to tru 376 while( this.ptr == 2p ) Pause(); 377 // Make sure the state makes sense 378 // Should be fulfilled, could be in progress but it's out of date if so 379 // since if that is the case, the oneshot was fulfilled (unparking this thread) 380 // and the oneshot should not be needed any more 381 __attribute__((unused)) struct oneshot * was = this.ptr; 382 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was ); 383 384 // Mark the future as fulfilled, to be consistent 385 // with potential calls to avail 386 // this.ptr = 1p; 387 return ret; 388 } 389 } 390 136 391 //----------------------------------------------------------------------- 137 392 // Statics call at the end of each thread to register statistics
Note: See TracChangeset
for help on using the changeset viewer.