Changeset 7f6a7c9 for libcfa/src/concurrency/kernel
- Timestamp:
- Sep 21, 2022, 11:02:15 AM (22 months ago)
- Branches:
- ADT, ast-experimental, master, pthread-emulation
- Children:
- 95dab9e
- Parents:
- 428adbc (diff), 0bd46fd (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency/kernel
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/kernel/cluster.cfa
r428adbc r7f6a7c9 93 93 //======================================================================= 94 94 void ?{}(__scheduler_RWLock_t & this) { 95 this. max = __max_processors();96 this. alloc = 0;97 this. ready = 0;98 this. data = alloc(this.max);99 this. write_lock = false;100 101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this. alloc), &this.alloc));102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this. ready), &this.ready));95 this.lock.max = __max_processors(); 96 this.lock.alloc = 0; 97 this.lock.ready = 0; 98 this.lock.data = alloc(this.lock.max); 99 this.lock.write_lock = false; 100 101 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.alloc), &this.lock.alloc)); 102 /*paranoid*/ verify(__atomic_is_lock_free(sizeof(this.lock.ready), &this.lock.ready)); 103 103 104 104 } 105 105 void ^?{}(__scheduler_RWLock_t & this) { 106 free(this. data);106 free(this.lock.data); 107 107 } 108 108 … … 110 110 //======================================================================= 111 111 // Lock-Free registering/unregistering of threads 112 unsigned register_proc_id( void ) with( *__scheduler_lock) {112 unsigned register_proc_id( void ) with(__scheduler_lock.lock) { 113 113 __kernel_rseq_register(); 114 114 … … 132 132 } 133 133 134 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock ->max);134 if(max <= alloc) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max); 135 135 136 136 // Step - 2 : F&A to get a new spot in the array. 137 137 uint_fast32_t n = __atomic_fetch_add(&alloc, 1, __ATOMIC_SEQ_CST); 138 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock ->max);138 if(max <= n) abort("Trying to create more than %ud processors", __scheduler_lock.lock.max); 139 139 140 140 // Step - 3 : Mark space as used and then publish it. … … 154 154 } 155 155 156 void unregister_proc_id( unsigned id ) with( *__scheduler_lock) {156 void unregister_proc_id( unsigned id ) with(__scheduler_lock.lock) { 157 157 /* paranoid */ verify(id < ready); 158 158 /* paranoid */ verify(id == kernelTLS().sched_id); … … 169 169 // Writer side : acquire when changing the ready queue, e.g. adding more 170 170 // queues or removing them. 171 uint_fast32_t ready_mutate_lock( void ) with( *__scheduler_lock) {171 uint_fast32_t ready_mutate_lock( void ) with(__scheduler_lock.lock) { 172 172 /* paranoid */ verify( ! __preemption_enabled() ); 173 173 … … 196 196 } 197 197 198 void ready_mutate_unlock( uint_fast32_t last_s ) with( *__scheduler_lock) {198 void ready_mutate_unlock( uint_fast32_t last_s ) with(__scheduler_lock.lock) { 199 199 /* paranoid */ verify( ! __preemption_enabled() ); 200 200 … … 278 278 279 279 #if defined(CFA_HAVE_LINUX_IO_URING_H) 280 static void assign_io( $io_context** data, size_t count, dlist(processor) & list) {280 static void assign_io(io_context$ ** data, size_t count, dlist(processor) & list) { 281 281 processor * it = &list`first; 282 282 while(it) { -
libcfa/src/concurrency/kernel/cluster.hfa
r428adbc r7f6a7c9 24 24 // Calc moving average based on existing average, before and current time. 25 25 static inline unsigned long long moving_average(unsigned long long currtsc, unsigned long long instsc, unsigned long long old_avg) { 26 /* paranoid */ verifyf( currtsc < 45000000000000000, "Suspiciously large current time: %'llu (%llx)\n", currtsc, currtsc );27 /* paranoid */ verifyf( instsc < 45000000000000000, "Suspiciously large insert time: %'llu (%llx)\n", instsc, instsc );28 26 /* paranoid */ verifyf( old_avg < 15000000000000, "Suspiciously large previous average: %'llu (%llx)\n", old_avg, old_avg ); 29 27 … … 65 63 } 66 64 } 67 return (max + 2 * max) / 2;65 return 8 * max; 68 66 } 69 67 -
libcfa/src/concurrency/kernel/fwd.hfa
r428adbc r7f6a7c9 35 35 extern "C" { 36 36 extern "Cforall" { 37 extern __attribute__((aligned(64))) thread_localstruct KernelThreadData {37 extern __attribute__((aligned(64))) __thread struct KernelThreadData { 38 38 struct thread$ * volatile this_thread; 39 39 struct processor * volatile this_processor; … … 179 179 // Similar to a binary semaphore with a 'one shot' semantic 180 180 // is expected to be discarded after each party call their side 181 enum(struct thread$ *) { oneshot_ARMED = 0p, oneshot_FULFILLED = 1p }; 181 182 struct oneshot { 182 183 // Internal state : 183 // 0p : is initial state (wait will block)184 // 1p : fulfilled (wait won't block)184 // armed : initial state, wait will block 185 // fulfilled : wait won't block 185 186 // any thread : a thread is currently waiting 186 187 struct thread$ * volatile ptr; … … 189 190 static inline { 190 191 void ?{}(oneshot & this) { 191 this.ptr = 0p;192 this.ptr = oneshot_ARMED; 192 193 } 193 194 … … 199 200 for() { 200 201 struct thread$ * expected = this.ptr; 201 if(expected == 1p) return false;202 if(expected == oneshot_FULFILLED) return false; 202 203 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 203 204 park(); 204 /* paranoid */ verify( this.ptr == 1p);205 /* paranoid */ verify( this.ptr == oneshot_FULFILLED ); 205 206 return true; 206 207 } … … 211 212 // return true if a thread was unparked 212 213 thread$ * post(oneshot & this, bool do_unpark = true) { 213 struct thread$ * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);214 if( got == 0p || got == 1p) return 0p;214 struct thread$ * got = __atomic_exchange_n( &this.ptr, oneshot_FULFILLED, __ATOMIC_SEQ_CST); 215 if( got == oneshot_ARMED || got == oneshot_FULFILLED ) return 0p; 215 216 if(do_unpark) unpark( got ); 216 217 return got; … … 223 224 // thread on "any of" [a given set of] futures. 224 225 // does not support multiple threads waiting on the same future 226 enum(struct oneshot *) { future_ARMED = 0p, future_FULFILLED = 1p, future_PROGRESS = 2p, future_ABANDONED = 3p }; 225 227 struct future_t { 226 228 // Internal state : 227 // 0p : is initial state (wait will block)228 // 1p : fulfilled (wait won't block)229 // 2p : in progress ()230 // 3p : abandoned, server should delete229 // armed : initial state, wait will block 230 // fulfilled : result is ready, wait won't block 231 // progress : someone else is in the process of fulfilling this 232 // abandoned : client no longer cares, server should delete 231 233 // any oneshot : a context has been setup to wait, a thread could wait on it 232 234 struct oneshot * volatile ptr; … … 235 237 static inline { 236 238 void ?{}(future_t & this) { 237 this.ptr = 0p;239 this.ptr = future_ARMED; 238 240 } 239 241 … … 242 244 void reset(future_t & this) { 243 245 // needs to be in 0p or 1p 244 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);246 __atomic_exchange_n( &this.ptr, future_ARMED, __ATOMIC_SEQ_CST); 245 247 } 246 248 247 249 // check if the future is available 248 250 bool available( future_t & this ) { 249 while( this.ptr == 2p) Pause();250 return this.ptr == 1p;251 while( this.ptr == future_PROGRESS ) Pause(); 252 return this.ptr == future_FULFILLED; 251 253 } 252 254 … … 254 256 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 255 257 bool setup( future_t & this, oneshot & wait_ctx ) { 256 /* paranoid */ verify( wait_ctx.ptr == 0p || wait_ctx.ptr == 1p);258 /* paranoid */ verify( wait_ctx.ptr == oneshot_ARMED || wait_ctx.ptr == oneshot_FULFILLED ); 257 259 // The future needs to set the wait context 258 260 for() { 259 261 struct oneshot * expected = this.ptr; 260 262 // Is the future already fulfilled? 261 if(expected == 1p) return false; // Yes, just return false (didn't block)263 if(expected == future_FULFILLED) return false; // Yes, just return false (didn't block) 262 264 263 265 // The future is not fulfilled, try to setup the wait context … … 277 279 278 280 // attempt to remove the context so it doesn't get consumed. 279 if(__atomic_compare_exchange_n( &this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {281 if(__atomic_compare_exchange_n( &this.ptr, &expected, future_ARMED, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 280 282 // we still have the original context, then no one else saw it 281 283 return false; 282 284 } 283 285 284 // expected == 0p: future was never actually setup, just return285 if( expected == 0p) return false;286 287 // expected == 1p: the future is ready and the context was fully consumed286 // expected == ARMED: future was never actually setup, just return 287 if( expected == future_ARMED ) return false; 288 289 // expected == FULFILLED: the future is ready and the context was fully consumed 288 290 // the server won't use the pointer again 289 291 // It is safe to delete (which could happen after the return) 290 if( expected == 1p) return true;291 292 // expected == 2p: the future is ready but the context hasn't fully been consumed292 if( expected == future_FULFILLED ) return true; 293 294 // expected == PROGRESS: the future is ready but the context hasn't fully been consumed 293 295 // spin until it is safe to move on 294 if( expected == 2p) {295 while( this.ptr != 1p) Pause();296 /* paranoid */ verify( this.ptr == 1p);296 if( expected == future_PROGRESS ) { 297 while( this.ptr != future_FULFILLED ) Pause(); 298 /* paranoid */ verify( this.ptr == future_FULFILLED ); 297 299 return true; 298 300 } … … 305 307 // Mark the future as abandoned, meaning it will be deleted by the server 306 308 bool abandon( future_t & this ) { 307 /* paranoid */ verify( this.ptr != 3p);309 /* paranoid */ verify( this.ptr != future_ABANDONED ); 308 310 309 311 // Mark the future as abandonned 310 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);312 struct oneshot * got = __atomic_exchange_n( &this.ptr, future_ABANDONED, __ATOMIC_SEQ_CST); 311 313 312 314 // If the future isn't already fulfilled, let the server delete it 313 if( got == 0p) return false;314 315 // got == 2p: the future is ready but the context hasn't fully been consumed315 if( got == future_ARMED ) return false; 316 317 // got == PROGRESS: the future is ready but the context hasn't fully been consumed 316 318 // spin until it is safe to move on 317 if( got == 2p) {318 while( this.ptr != 1p) Pause();319 got = 1p;319 if( got == future_PROGRESS ) { 320 while( this.ptr != future_FULFILLED ) Pause(); 321 got = future_FULFILLED; 320 322 } 321 323 322 324 // The future is completed delete it now 323 /* paranoid */ verify( this.ptr != 1p);325 /* paranoid */ verify( this.ptr != future_FULFILLED ); 324 326 free( &this ); 325 327 return true; … … 336 338 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 337 339 #endif 338 if( expected == 3p) { free( &this ); return 0p; }340 if( expected == future_ABANDONED ) { free( &this ); return 0p; } 339 341 #if defined(__GNUC__) && __GNUC__ >= 7 340 342 #pragma GCC diagnostic pop 341 343 #endif 342 344 343 /* paranoid */ verify( expected != 1p); // Future is already fulfilled, should not happen344 /* paranoid */ verify( expected != 2p); // Future is bein fulfilled by someone else, this is even less supported then the previous case.345 /* paranoid */ verify( expected != future_FULFILLED ); // Future is already fulfilled, should not happen 346 /* paranoid */ verify( expected != future_PROGRESS ); // Future is bein fulfilled by someone else, this is even less supported then the previous case. 345 347 346 348 // If there is a wait context, we need to consume it and mark it as consumed after 347 349 // If there is no context then we can skip the in progress phase 348 struct oneshot * want = expected == 0p ? 1p : 2p;350 struct oneshot * want = expected == future_ARMED ? future_FULFILLED : future_PROGRESS; 349 351 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 350 if( expected == 0p) { return 0p; }352 if( expected == future_ARMED ) { return 0p; } 351 353 thread$ * ret = post( *expected, do_unpark ); 352 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);354 __atomic_store_n( &this.ptr, future_FULFILLED, __ATOMIC_SEQ_CST); 353 355 return ret; 354 356 } … … 366 368 367 369 // Wait for the future to tru 368 while( this.ptr == 2p) Pause();370 while( this.ptr == future_PROGRESS ) Pause(); 369 371 // Make sure the state makes sense 370 372 // Should be fulfilled, could be in progress but it's out of date if so … … 372 374 // and the oneshot should not be needed any more 373 375 __attribute__((unused)) struct oneshot * was = this.ptr; 374 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );376 /* paranoid */ verifyf( was == future_FULFILLED, "Expected this.ptr to be 1p, was %p\n", was ); 375 377 376 378 // Mark the future as fulfilled, to be consistent -
libcfa/src/concurrency/kernel/private.hfa
r428adbc r7f6a7c9 88 88 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 89 89 extern "Cforall" { 90 extern __attribute__((aligned(64))) thread_localvolatile struct rseq __cfaabi_rseq;90 extern __attribute__((aligned(64))) __thread volatile struct rseq __cfaabi_rseq; 91 91 } 92 92 #else … … 139 139 //----------------------------------------------------------------------------- 140 140 // I/O 141 $io_arbiter* create(void);142 void destroy( $io_arbiter*);141 io_arbiter$ * create(void); 142 void destroy(io_arbiter$ *); 143 143 144 144 //======================================================================= … … 161 161 // Blocking acquire 162 162 static inline void __atomic_acquire(volatile bool * ll) { 163 /* paranoid */ verify( ! __preemption_enabled() ); 164 /* paranoid */ verify(ll); 165 163 166 while( __builtin_expect(__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST), false) ) { 164 167 while(__atomic_load_n(ll, (int)__ATOMIC_RELAXED)) … … 166 169 } 167 170 /* paranoid */ verify(*ll); 171 /* paranoid */ verify( ! __preemption_enabled() ); 168 172 } 169 173 170 174 // Non-Blocking acquire 171 175 static inline bool __atomic_try_acquire(volatile bool * ll) { 176 /* paranoid */ verify( ! __preemption_enabled() ); 177 /* paranoid */ verify(ll); 178 172 179 return !__atomic_exchange_n(ll, (bool)true, __ATOMIC_SEQ_CST); 173 180 } … … 175 182 // Release 176 183 static inline void __atomic_unlock(volatile bool * ll) { 184 /* paranoid */ verify( ! __preemption_enabled() ); 185 /* paranoid */ verify(ll); 177 186 /* paranoid */ verify(*ll); 178 187 __atomic_store_n(ll, (bool)false, __ATOMIC_RELEASE); … … 184 193 // have been hard-coded to for the ready-queue for 185 194 // simplicity and performance 186 struct __scheduler_RWLock_t { 187 // total cachelines allocated 188 unsigned int max; 189 190 // cachelines currently in use 191 volatile unsigned int alloc; 192 193 // cachelines ready to itereate over 194 // (!= to alloc when thread is in second half of doregister) 195 volatile unsigned int ready; 196 197 // writer lock 198 volatile bool write_lock; 199 200 // data pointer 201 volatile bool * volatile * data; 195 union __attribute__((aligned(64))) __scheduler_RWLock_t { 196 struct { 197 __attribute__((aligned(64))) char padding; 198 199 // total cachelines allocated 200 __attribute__((aligned(64))) unsigned int max; 201 202 // cachelines currently in use 203 volatile unsigned int alloc; 204 205 // cachelines ready to itereate over 206 // (!= to alloc when thread is in second half of doregister) 207 volatile unsigned int ready; 208 209 // writer lock 210 volatile bool write_lock; 211 212 // data pointer 213 volatile bool * volatile * data; 214 } lock; 215 char pad[192]; 202 216 }; 203 217 … … 205 219 void ^?{}(__scheduler_RWLock_t & this); 206 220 207 extern __scheduler_RWLock_t *__scheduler_lock;221 extern __scheduler_RWLock_t __scheduler_lock; 208 222 209 223 //----------------------------------------------------------------------- 210 224 // Reader side : acquire when using the ready queue to schedule but not 211 225 // creating/destroying queues 212 static inline void ready_schedule_lock(void) with( *__scheduler_lock) {226 static inline void ready_schedule_lock(void) with(__scheduler_lock.lock) { 213 227 /* paranoid */ verify( ! __preemption_enabled() ); 214 228 /* paranoid */ verify( ! kernelTLS().in_sched_lock ); … … 235 249 } 236 250 237 static inline void ready_schedule_unlock(void) with( *__scheduler_lock) {251 static inline void ready_schedule_unlock(void) with(__scheduler_lock.lock) { 238 252 /* paranoid */ verify( ! __preemption_enabled() ); 239 253 /* paranoid */ verify( data[kernelTLS().sched_id] == &kernelTLS().sched_lock ); … … 256 270 257 271 static inline bool ready_mutate_islocked() { 258 return __scheduler_lock ->write_lock;272 return __scheduler_lock.lock.write_lock; 259 273 } 260 274 #endif -
libcfa/src/concurrency/kernel/startup.cfa
r428adbc r7f6a7c9 113 113 KERNEL_STORAGE(thread$, mainThread); 114 114 KERNEL_STORAGE(__stack_t, mainThreadCtx); 115 KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);115 // KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock); 116 116 KERNEL_STORAGE(eventfd_t, mainIdleEventFd); 117 117 KERNEL_STORAGE(io_future_t, mainIdleFuture); … … 123 123 processor * mainProcessor; 124 124 thread$ * mainThread; 125 __scheduler_RWLock_t * __scheduler_lock;126 125 127 126 extern "C" { … … 134 133 //----------------------------------------------------------------------------- 135 134 // Global state 136 thread_localstruct KernelThreadData __cfaabi_tls __attribute__ ((tls_model ( "initial-exec" ))) @= {135 __thread struct KernelThreadData __cfaabi_tls __attribute__ ((tls_model ( "initial-exec" ))) @= { 137 136 NULL, // cannot use 0p 138 137 NULL, … … 148 147 }; 149 148 149 __scheduler_RWLock_t __scheduler_lock @= { 0 }; 150 150 151 #if defined(CFA_HAVE_LINUX_LIBRSEQ) 151 152 // No data needed 152 153 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 153 154 extern "Cforall" { 154 __attribute__((aligned(64))) thread_localvolatile struct rseq __cfaabi_rseq @= {155 __attribute__((aligned(64))) __thread volatile struct rseq __cfaabi_rseq @= { 155 156 .cpu_id : RSEQ_CPU_ID_UNINITIALIZED, 156 157 }; … … 198 199 199 200 // Initialize the global scheduler lock 200 __scheduler_lock = (__scheduler_RWLock_t*)&storage___scheduler_lock;201 ( *__scheduler_lock){};201 // __scheduler_lock = (__scheduler_RWLock_t*)&storage___scheduler_lock; 202 (__scheduler_lock){}; 202 203 203 204 // Initialize the main cluster … … 345 346 ^(*mainCluster){}; 346 347 347 ^( *__scheduler_lock){};348 ^(__scheduler_lock){}; 348 349 349 350 ^(__cfa_dbg_global_clusters.list){};
Note: See TracChangeset
for help on using the changeset viewer.