Changeset b110bcc for libcfa/src/concurrency/locks.hfa
- Timestamp:
- Apr 21, 2023, 5:36:12 PM (2 years ago)
- Branches:
- ADT, master
- Children:
- 28f8f15, 6e4c44d
- Parents:
- 2ed94a9 (diff), 699a97d (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/locks.hfa
r2ed94a9 rb110bcc 32 32 #include <fstream.hfa> 33 33 34 35 34 // futex headers 36 35 #include <linux/futex.h> /* Definition of FUTEX_* constants */ … … 155 154 // futex_mutex 156 155 157 // - No cond var support158 156 // - Kernel thd blocking alternative to the spinlock 159 157 // - No ownership (will deadlock on reacq) … … 185 183 int state; 186 184 187 188 // // linear backoff omitted for now 189 // for( int spin = 4; spin < 1024; spin += spin) { 190 // state = 0; 191 // // if unlocked, lock and return 192 // if (internal_try_lock(this, state)) return; 193 // if (2 == state) break; 194 // for (int i = 0; i < spin; i++) Pause(); 195 // } 196 197 // no contention try to acquire 198 if (internal_try_lock(this, state)) return; 185 for( int spin = 4; spin < 1024; spin += spin) { 186 state = 0; 187 // if unlocked, lock and return 188 if (internal_try_lock(this, state)) return; 189 if (2 == state) break; 190 for (int i = 0; i < spin; i++) Pause(); 191 } 192 193 // // no contention try to acquire 194 // if (internal_try_lock(this, state)) return; 199 195 200 196 // if not in contended state, set to be in contended state … … 209 205 210 206 static inline void unlock(futex_mutex & this) with(this) { 211 // if uncontended do atomic eunlock and then return212 if (__atomic_fetch_sub(&val, 1, __ATOMIC_RELEASE) == 1) return; // TODO: try acq/rel 207 // if uncontended do atomic unlock and then return 208 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return; 213 209 214 210 // otherwise threads are blocked so we must wake one 215 __atomic_store_n((int *)&val, 0, __ATOMIC_RELEASE);216 211 futex((int *)&val, FUTEX_WAKE, 1); 217 212 } … … 222 217 // to set recursion count after getting signalled; 223 218 static inline void on_wakeup( futex_mutex & f, size_t recursion ) {} 219 220 //----------------------------------------------------------------------------- 221 // go_mutex 222 223 // - Kernel thd blocking alternative to the spinlock 224 // - No ownership (will deadlock on reacq) 225 // - Golang's flavour of mutex 226 // - Impl taken from Golang: src/runtime/lock_futex.go 227 struct go_mutex { 228 // lock state any state other than UNLOCKED is locked 229 // enum LockState { UNLOCKED = 0, LOCKED = 1, SLEEPING = 2 }; 230 231 // stores a lock state 232 int val; 233 }; 234 235 static inline void ?{}( go_mutex & this ) with(this) { val = 0; } 236 237 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) { 238 return __atomic_compare_exchange_n((int*)&val, (int*)&compare_val, new_val, false, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE); 239 } 240 241 static inline int internal_exchange(go_mutex & this, int swap ) with(this) { 242 return __atomic_exchange_n((int*)&val, swap, __ATOMIC_ACQUIRE); 243 } 244 245 // if this is called recursively IT WILL DEADLOCK!!!!! 246 static inline void lock(go_mutex & this) with(this) { 247 int state, init_state; 248 249 // speculative grab 250 state = internal_exchange(this, 1); 251 if ( !state ) return; // state == 0 252 init_state = state; 253 for (;;) { 254 for( int i = 0; i < 4; i++ ) { 255 while( !val ) { // lock unlocked 256 state = 0; 257 if (internal_try_lock(this, state, init_state)) return; 258 } 259 for (int i = 0; i < 30; i++) Pause(); 260 } 261 262 while( !val ) { // lock unlocked 263 state = 0; 264 if (internal_try_lock(this, state, init_state)) return; 265 } 266 sched_yield(); 267 268 // if not in contended state, set to be in contended state 269 state = internal_exchange(this, 2); 270 if ( !state ) return; // state == 0 271 init_state = 2; 272 futex((int*)&val, FUTEX_WAIT, 2); // if val is not 2 this returns with EWOULDBLOCK 273 } 274 } 275 276 static inline void unlock( go_mutex & this ) with(this) { 277 // if uncontended do atomic unlock and then return 278 if (__atomic_exchange_n(&val, 0, __ATOMIC_RELEASE) == 1) return; 279 280 // otherwise threads are blocked so we must wake one 281 futex((int *)&val, FUTEX_WAKE, 1); 282 } 283 284 static inline void on_notify( go_mutex & f, thread$ * t){ unpark(t); } 285 static inline size_t on_wait( go_mutex & f ) {unlock(f); return 0;} 286 static inline void on_wakeup( go_mutex & f, size_t recursion ) {} 224 287 225 288 //----------------------------------------------------------------------------- … … 253 316 static inline void on_wakeup(clh_lock & this, size_t recursion ) { lock(this); } 254 317 255 256 318 //----------------------------------------------------------------------------- 257 319 // Exponential backoff then block lock … … 272 334 this.lock_value = 0; 273 335 } 274 static inline void ^?{}( exp_backoff_then_block_lock & this ) {} 275 // static inline void ?{}( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 276 // static inline void ?=?( exp_backoff_then_block_lock & this, exp_backoff_then_block_lock this2 ) = void; 336 337 static inline void ^?{}( exp_backoff_then_block_lock & this ){} 277 338 278 339 static inline bool internal_try_lock(exp_backoff_then_block_lock & this, size_t & compare_val) with(this) { 279 if (__atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { 280 return true; 281 } 282 return false; 340 return __atomic_compare_exchange_n(&lock_value, &compare_val, 1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED); 283 341 } 284 342 … … 286 344 287 345 static inline bool try_lock_contention(exp_backoff_then_block_lock & this) with(this) { 288 if (__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE) == 0) { 289 return true; 290 } 291 return false; 346 return !__atomic_exchange_n(&lock_value, 2, __ATOMIC_ACQUIRE); 292 347 } 293 348 294 349 static inline bool block(exp_backoff_then_block_lock & this) with(this) { 295 lock( spinlock __cfaabi_dbg_ctx2 ); // TODO change to lockfree queue (MPSC) 296 if (lock_value!= 2) {297 298 299 300 301 350 lock( spinlock __cfaabi_dbg_ctx2 ); 351 if (__atomic_load_n( &lock_value, __ATOMIC_SEQ_CST) != 2) { 352 unlock( spinlock ); 353 return true; 354 } 355 insert_last( blocked_threads, *active_thread() ); 356 unlock( spinlock ); 302 357 park( ); 303 358 return true; … … 307 362 size_t compare_val = 0; 308 363 int spin = 4; 364 309 365 // linear backoff 310 366 for( ;; ) { … … 324 380 static inline void unlock(exp_backoff_then_block_lock & this) with(this) { 325 381 if (__atomic_exchange_n(&lock_value, 0, __ATOMIC_RELEASE) == 1) return; 326 327 328 329 382 lock( spinlock __cfaabi_dbg_ctx2 ); 383 thread$ * t = &try_pop_front( blocked_threads ); 384 unlock( spinlock ); 385 unpark( t ); 330 386 } 331 387
Note:
See TracChangeset
for help on using the changeset viewer.