- File:
-
- 1 edited
-
libcfa/src/concurrency/locks.hfa (modified) (10 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.hfa
rbd72c284 r8a97248 32 32 #include <fstream.hfa> 33 33 34 34 35 // futex headers 35 36 #include <linux/futex.h> /* Definition of FUTEX_* constants */ … … 154 155 // futex_mutex 155 156 157 // - No cond var support 156 158 // - Kernel thd blocking alternative to the spinlock 157 159 // - No ownership (will deadlock on reacq) … … 183 185 int state; 184 186 185 for( int spin = 4; spin < 1024; spin += spin) { 186 state = 0; 187 // if unlocked, lock and return 188 if (internal_try_lock(this, state)) return; 189 if (2 == state) break; 190 for (int i = 0; i < spin; i++) Pause(); 191 } 192 193 // // no contention try to acquire 194 // if (internal_try_lock(this, state)) return; 187 188 // // linear backoff omitted for now 189 // for( int spin = 4; spin < 1024; spin += spin) { 190 // state = 0; 191 // // if unlocked, lock and return 192 // if (internal_try_lock(this, state)) return; 193 // if (2 == state) break; 194 // for (int i = 0; i < spin; i++) Pause(); 195 // } 196 197 // no contention try to acquire 198 if (internal_try_lock(this, state)) return; 195 199 196 200 // if not in contended state, set to be in contended state … … 205 209 206 210 static inline void unlock(futex_mutex & this) with(this) { 207 // if uncontended do atomic unlock and then return208 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return; 211 // if uncontended do atomice unlock and then return 212 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel 209 213 210 214 // otherwise threads are blocked so we must wake one 215 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE); 211 216 futex((int *)&val, FUTEX_WAKE, 1); 212 217 } … … 217 222 // to set recursion count after getting signalled; 218 223 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 219 220 //-----------------------------------------------------------------------------221 // go_mutex222 223 // - Kernel thd blocking alternative to the spinlock224 // - No ownership (will deadlock on reacq)225 // - Golang's flavour of mutex226 // - Impl taken from Golang: src/runtime/lock_futex.go227 struct go_mutex {228 // lock state any state other than UNLOCKED is locked229 // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 };230 231 // stores a lock state232 int val;233 };234 235 static inline void ?{}( go_mutex & this ) with(this) { val = 0; }236 237 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) {238 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE);239 }240 241 static inline int internal_exchange(go_mutex & this, int swap ) with(this) {242 return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE);243 }244 245 // if this is called recursively IT WILL DEADLOCK!!!!!246 static inline void lock(go_mutex & this) with(this) {247 int state, init_state;248 249 // speculative grab250 state = internal_exchange(this, 1);251 if ( !state ) return; // state == 0252 init_state = state;253 for (;;) {254 for( int i = 0; i < 4; i++ ) {255 while( !val ) { // lock unlocked256 state = 0;257 if (internal_try_lock(this, state, init_state)) return;258 }259 for (int i = 0; i < 30; i++) Pause();260 }261 262 while( !val ) { // lock unlocked263 state = 0;264 if (internal_try_lock(this, state, init_state)) return;265 }266 sched_yield();267 268 // if not in contended state, set to be in contended state269 state = internal_exchange(this, 2);270 if ( !state ) return; // state == 0271 init_state = 2;272 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK273 }274 }275 276 static inline void unlock( go_mutex & this ) with(this) {277 // if uncontended do atomic unlock and then return278 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return;279 280 // otherwise threads are blocked so we must wake one281 futex((int *)&val, FUTEX_WAKE, 1);282 }283 284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); }285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;}286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {}287 224 288 225 //----------------------------------------------------------------------------- … … 316 253 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); } 317 254 255 318 256 //----------------------------------------------------------------------------- 319 257 // Exponential backoff then block lock … … 334 272 this.lock_value = 0; 335 273 } 336 337 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 274 static inline void ^?{}( exp_backoff_then_block_lock & this ) {} 275 // static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 276 // static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 338 277 339 278 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) { 340 return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); 279 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 280 return true; 281 } 282 return false; 341 283 } 342 284 … … 344 286 345 287 static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) { 346 return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE); 288 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 289 return true; 290 } 291 return false; 347 292 } 348 293 349 294 static inline bool block(exp_backoff_then_block_lock & this) with(this) { 350 lock( spinlock __cfaabi_dbg_ctx2 ); 351 if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST)!= 2) {352 unlock( spinlock );353 return true;354 }355 insert_last( blocked_threads, *active_thread() );356 unlock( spinlock );295 lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC) 296 if (lock_value != 2) { 297 unlock( spinlock ); 298 return true; 299 } 300 insert_last( blocked_threads, *active_thread() ); 301 unlock( spinlock ); 357 302 park( ); 358 303 return true; … … 362 307 size_t compare_val = 0; 363 308 int spin = 4; 364 365 309 // linear backoff 366 310 for( ;; ) { … … 380 324 static inline void unlock(exp_backoff_then_block_lock & this) with(this) { 381 325 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 382 lock( spinlock __cfaabi_dbg_ctx2 );383 thread$ * t = &try_pop_front( blocked_threads );384 unlock( spinlock );385 unpark( t );326 lock( spinlock __cfaabi_dbg_ctx2 ); 327 thread$ * t = &try_pop_front( blocked_threads ); 328 unlock( spinlock ); 329 unpark( t ); 386 330 } 387 331
Note:
See TracChangeset
for help on using the changeset viewer.