Changeset 454f478
- Timestamp:
- Jan 20, 2021, 5:35:39 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 9db2c92
- Parents:
- dafbde8
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/bits/defs.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // defs.hfa -- 7 // defs.hfa -- Commen macros, functions and typedefs 8 // Most files depend on them and they are always useful to have. 9 // 10 // *** Must not contain code specific to libcfathread *** 8 11 // 9 12 // Author : Thierry Delisle … … 62 65 #endif 63 66 } 67 68 // pause to prevent excess processor bus usage 69 #if defined( __i386 ) || defined( __x86_64 ) 70 #define Pause() __asm__ __volatile__ ( "pause" : : : ) 71 #elif defined( __ARM_ARCH ) 72 #define Pause() __asm__ __volatile__ ( "YIELD" : : : ) 73 #else 74 #error unsupported architecture 75 #endif -
libcfa/src/bits/locks.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // bits/locks.hfa -- Fast internal locks. 7 // bits/locks.hfa -- Basic spinlocks that are reused in the system. 8 // Used for locks that aren't specific to cforall threads and can be used anywhere 9 // 10 // *** Must not contain code specific to libcfathread *** 8 11 // 9 12 // Author : Thierry Delisle … … 19 22 #include "bits/defs.hfa" 20 23 #include <assert.h> 21 22 #ifdef __cforall23 extern "C" {24 #include <pthread.h>25 }26 #endif27 28 // pause to prevent excess processor bus usage29 #if defined( __i386 ) || defined( __x86_64 )30 #define Pause() __asm__ __volatile__ ( "pause" : : : )31 #elif defined( __ARM_ARCH )32 #define Pause() __asm__ __volatile__ ( "YIELD" : : : )33 #else34 #error unsupported architecture35 #endif36 24 37 25 struct __spinlock_t { … … 104 92 enable_interrupts_noPoll(); 105 93 } 106 107 108 #ifdef __CFA_WITH_VERIFY__109 extern bool __cfaabi_dbg_in_kernel();110 #endif111 112 extern "C" {113 char * strerror(int);114 }115 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); }116 117 struct __bin_sem_t {118 pthread_mutex_t lock;119 pthread_cond_t cond;120 int val;121 };122 123 static inline void ?{}(__bin_sem_t & this) with( this ) {124 // Create the mutex with error checking125 pthread_mutexattr_t mattr;126 pthread_mutexattr_init( &mattr );127 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP);128 pthread_mutex_init(&lock, &mattr);129 130 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required131 val = 0;132 }133 134 static inline void ^?{}(__bin_sem_t & this) with( this ) {135 CHECKED( pthread_mutex_destroy(&lock) );136 CHECKED( pthread_cond_destroy (&cond) );137 }138 139 static inline void wait(__bin_sem_t & this) with( this ) {140 verify(__cfaabi_dbg_in_kernel());141 CHECKED( pthread_mutex_lock(&lock) );142 while(val < 1) {143 pthread_cond_wait(&cond, &lock);144 }145 val -= 1;146 CHECKED( pthread_mutex_unlock(&lock) );147 }148 149 static inline bool post(__bin_sem_t & this) with( this ) {150 bool needs_signal = false;151 152 CHECKED( pthread_mutex_lock(&lock) );153 if(val < 1) {154 val += 1;155 pthread_cond_signal(&cond);156 needs_signal = true;157 }158 CHECKED( pthread_mutex_unlock(&lock) );159 160 return needs_signal;161 }162 163 #undef CHECKED164 165 struct $thread;166 extern void park( void );167 extern void unpark( struct $thread * this );168 static inline struct $thread * active_thread ();169 170 // Semaphore which only supports a single thread171 struct single_sem {172 struct $thread * volatile ptr;173 };174 175 static inline {176 void ?{}(single_sem & this) {177 this.ptr = 0p;178 }179 180 void ^?{}(single_sem &) {}181 182 bool wait(single_sem & this) {183 for() {184 struct $thread * expected = this.ptr;185 if(expected == 1p) {186 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {187 return false;188 }189 }190 else {191 /* paranoid */ verify( expected == 0p );192 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {193 park();194 return true;195 }196 }197 198 }199 }200 201 bool post(single_sem & this) {202 for() {203 struct $thread * expected = this.ptr;204 if(expected == 1p) return false;205 if(expected == 0p) {206 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {207 return false;208 }209 }210 else {211 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {212 unpark( expected );213 return true;214 }215 }216 }217 }218 }219 220 // Synchronozation primitive which only supports a single thread and one post221 // Similar to a binary semaphore with a 'one shot' semantic222 // is expected to be discarded after each party call their side223 struct oneshot {224 // Internal state :225 // 0p : is initial state (wait will block)226 // 1p : fulfilled (wait won't block)227 // any thread : a thread is currently waiting228 struct $thread * volatile ptr;229 };230 231 static inline {232 void ?{}(oneshot & this) {233 this.ptr = 0p;234 }235 236 void ^?{}(oneshot &) {}237 238 // Wait for the post, return immidiately if it already happened.239 // return true if the thread was parked240 bool wait(oneshot & this) {241 for() {242 struct $thread * expected = this.ptr;243 if(expected == 1p) return false;244 /* paranoid */ verify( expected == 0p );245 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {246 park();247 /* paranoid */ verify( this.ptr == 1p );248 return true;249 }250 }251 }252 253 // Mark as fulfilled, wake thread if needed254 // return true if a thread was unparked255 bool post(oneshot & this) {256 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);257 if( got == 0p ) return false;258 unpark( got );259 return true;260 }261 }262 263 // base types for future to build upon264 // It is based on the 'oneshot' type to allow multiple futures265 // to block on the same instance, permitting users to block a single266 // thread on "any of" [a given set of] futures.267 // does not support multiple threads waiting on the same future268 struct future_t {269 // Internal state :270 // 0p : is initial state (wait will block)271 // 1p : fulfilled (wait won't block)272 // 2p : in progress ()273 // 3p : abandoned, server should delete274 // any oneshot : a context has been setup to wait, a thread could wait on it275 struct oneshot * volatile ptr;276 };277 278 static inline {279 void ?{}(future_t & this) {280 this.ptr = 0p;281 }282 283 void ^?{}(future_t &) {}284 285 void reset(future_t & this) {286 // needs to be in 0p or 1p287 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);288 }289 290 // check if the future is available291 bool available( future_t & this ) {292 return this.ptr == 1p;293 }294 295 // Prepare the future to be waited on296 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly297 bool setup( future_t & this, oneshot & wait_ctx ) {298 /* paranoid */ verify( wait_ctx.ptr == 0p );299 // The future needs to set the wait context300 for() {301 struct oneshot * expected = this.ptr;302 // Is the future already fulfilled?303 if(expected == 1p) return false; // Yes, just return false (didn't block)304 305 // The future is not fulfilled, try to setup the wait context306 /* paranoid */ verify( expected == 0p );307 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {308 return true;309 }310 }311 }312 313 // Stop waiting on a future314 // When multiple futures are waited for together in "any of" pattern315 // futures that weren't fulfilled before the thread woke up316 // should retract the wait ctx317 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly318 void retract( future_t & this, oneshot & wait_ctx ) {319 // Remove the wait context320 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST);321 322 // got == 0p: future was never actually setup, just return323 if( got == 0p ) return;324 325 // got == wait_ctx: since fulfil does an atomic_swap,326 // if we got back the original then no one else saw context327 // It is safe to delete (which could happen after the return)328 if( got == &wait_ctx ) return;329 330 // got == 1p: the future is ready and the context was fully consumed331 // the server won't use the pointer again332 // It is safe to delete (which could happen after the return)333 if( got == 1p ) return;334 335 // got == 2p: the future is ready but the context hasn't fully been consumed336 // spin until it is safe to move on337 if( got == 2p ) {338 while( this.ptr != 1p ) Pause();339 return;340 }341 342 // got == any thing else, something wen't wrong here, abort343 abort("Future in unexpected state");344 }345 346 // Mark the future as abandoned, meaning it will be deleted by the server347 bool abandon( future_t & this ) {348 /* paranoid */ verify( this.ptr != 3p );349 350 // Mark the future as abandonned351 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST);352 353 // If the future isn't already fulfilled, let the server delete it354 if( got == 0p ) return false;355 356 // got == 2p: the future is ready but the context hasn't fully been consumed357 // spin until it is safe to move on358 if( got == 2p ) {359 while( this.ptr != 1p ) Pause();360 got = 1p;361 }362 363 // The future is completed delete it now364 /* paranoid */ verify( this.ptr != 1p );365 free( &this );366 return true;367 }368 369 // from the server side, mark the future as fulfilled370 // delete it if needed371 bool fulfil( future_t & this ) {372 for() {373 struct oneshot * expected = this.ptr;374 // was this abandoned?375 #if defined(__GNUC__) && __GNUC__ >= 7376 #pragma GCC diagnostic push377 #pragma GCC diagnostic ignored "-Wfree-nonheap-object"378 #endif379 if( expected == 3p ) { free( &this ); return false; }380 #if defined(__GNUC__) && __GNUC__ >= 7381 #pragma GCC diagnostic pop382 #endif383 384 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen385 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case.386 387 // If there is a wait context, we need to consume it and mark it as consumed after388 // If there is no context then we can skip the in progress phase389 struct oneshot * want = expected == 0p ? 1p : 2p;390 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) {391 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; }392 bool ret = post( *expected );393 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST);394 return ret;395 }396 }397 398 }399 400 // Wait for the future to be fulfilled401 bool wait( future_t & this ) {402 oneshot temp;403 if( !setup(this, temp) ) return false;404 405 // Wait context is setup, just wait on it406 bool ret = wait( temp );407 408 // Wait for the future to tru409 while( this.ptr == 2p ) Pause();410 // Make sure the state makes sense411 // Should be fulfilled, could be in progress but it's out of date if so412 // since if that is the case, the oneshot was fulfilled (unparking this thread)413 // and the oneshot should not be needed any more414 __attribute__((unused)) struct oneshot * was = this.ptr;415 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was );416 417 // Mark the future as fulfilled, to be consistent418 // with potential calls to avail419 // this.ptr = 1p;420 return ret;421 }422 }423 94 #endif -
libcfa/src/concurrency/io/types.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // io/types.hfa -- 7 // io/types.hfa -- PRIVATE 8 // Types used by the I/O subsystem 8 9 // 9 10 // Author : Thierry Delisle … … 20 21 } 21 22 22 #include " bits/locks.hfa"23 #include "kernel/fwd.hfa" 23 24 24 25 #if defined(CFA_HAVE_LINUX_IO_URING_H) -
libcfa/src/concurrency/kernel.cfa
rdafbde8 r454f478 224 224 } 225 225 226 V( this->terminated );226 post( this->terminated ); 227 227 228 228 if(this == mainProcessor) { … … 665 665 //============================================================================================= 666 666 //----------------------------------------------------------------------------- 667 // Locks668 void ?{}( semaphore & this, int count = 1 ) {669 (this.lock){};670 this.count = count;671 (this.waiting){};672 }673 void ^?{}(semaphore & this) {}674 675 bool P(semaphore & this) with( this ){676 lock( lock __cfaabi_dbg_ctx2 );677 count -= 1;678 if ( count < 0 ) {679 // queue current task680 append( waiting, active_thread() );681 682 // atomically release spin lock and block683 unlock( lock );684 park();685 return true;686 }687 else {688 unlock( lock );689 return false;690 }691 }692 693 bool V(semaphore & this) with( this ) {694 $thread * thrd = 0p;695 lock( lock __cfaabi_dbg_ctx2 );696 count += 1;697 if ( count <= 0 ) {698 // remove task at head of waiting list699 thrd = pop_head( waiting );700 }701 702 unlock( lock );703 704 // make new owner705 unpark( thrd );706 707 return thrd != 0p;708 }709 710 bool V(semaphore & this, unsigned diff) with( this ) {711 $thread * thrd = 0p;712 lock( lock __cfaabi_dbg_ctx2 );713 int release = max(-count, (int)diff);714 count += diff;715 for(release) {716 unpark( pop_head( waiting ) );717 }718 719 unlock( lock );720 721 return thrd != 0p;722 }723 724 //-----------------------------------------------------------------------------725 667 // Debug 726 668 __cfaabi_dbg_debug_do( -
libcfa/src/concurrency/kernel.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel -- 7 // kernel -- Header containing the core of the kernel API 8 8 // 9 9 // Author : Thierry Delisle … … 24 24 extern "C" { 25 25 #include <bits/pthreadtypes.h> 26 #include <pthread.h> 26 27 #include <linux/types.h> 27 28 } 28 29 29 30 //----------------------------------------------------------------------------- 30 // Locks 31 struct semaphore { 32 __spinlock_t lock; 33 int count; 34 __queue_t($thread) waiting; 35 }; 36 37 void ?{}(semaphore & this, int count = 1); 38 void ^?{}(semaphore & this); 39 bool P (semaphore & this); 40 bool V (semaphore & this); 41 bool V (semaphore & this, unsigned count); 31 // Underlying Locks 32 #ifdef __CFA_WITH_VERIFY__ 33 extern bool __cfaabi_dbg_in_kernel(); 34 #endif 35 36 extern "C" { 37 char * strerror(int); 38 } 39 #define CHECKED(x) { int err = x; if( err != 0 ) abort("KERNEL ERROR: Operation \"" #x "\" return error %d - %s\n", err, strerror(err)); } 40 41 struct __bin_sem_t { 42 pthread_mutex_t lock; 43 pthread_cond_t cond; 44 int val; 45 }; 46 47 static inline void ?{}(__bin_sem_t & this) with( this ) { 48 // Create the mutex with error checking 49 pthread_mutexattr_t mattr; 50 pthread_mutexattr_init( &mattr ); 51 pthread_mutexattr_settype( &mattr, PTHREAD_MUTEX_ERRORCHECK_NP); 52 pthread_mutex_init(&lock, &mattr); 53 54 pthread_cond_init (&cond, (const pthread_condattr_t *)0p); // workaround trac#208: cast should not be required 55 val = 0; 56 } 57 58 static inline void ^?{}(__bin_sem_t & this) with( this ) { 59 CHECKED( pthread_mutex_destroy(&lock) ); 60 CHECKED( pthread_cond_destroy (&cond) ); 61 } 62 63 static inline void wait(__bin_sem_t & this) with( this ) { 64 verify(__cfaabi_dbg_in_kernel()); 65 CHECKED( pthread_mutex_lock(&lock) ); 66 while(val < 1) { 67 pthread_cond_wait(&cond, &lock); 68 } 69 val -= 1; 70 CHECKED( pthread_mutex_unlock(&lock) ); 71 } 72 73 static inline bool post(__bin_sem_t & this) with( this ) { 74 bool needs_signal = false; 75 76 CHECKED( pthread_mutex_lock(&lock) ); 77 if(val < 1) { 78 val += 1; 79 pthread_cond_signal(&cond); 80 needs_signal = true; 81 } 82 CHECKED( pthread_mutex_unlock(&lock) ); 83 84 return needs_signal; 85 } 86 87 #undef CHECKED 42 88 43 89 … … 91 137 92 138 // Termination synchronisation (user semaphore) 93 semaphoreterminated;139 oneshot terminated; 94 140 95 141 // pthread Stack -
libcfa/src/concurrency/kernel/fwd.hfa
rdafbde8 r454f478 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // kernel/fwd.hfa -- 7 // kernel/fwd.hfa -- PUBLIC 8 // Fundamental code needed to implement threading M.E.S. algorithms. 8 9 // 9 10 // Author : Thierry Delisle … … 134 135 extern uint64_t thread_rand(); 135 136 137 // Semaphore which only supports a single thread 138 struct single_sem { 139 struct $thread * volatile ptr; 140 }; 141 142 static inline { 143 void ?{}(single_sem & this) { 144 this.ptr = 0p; 145 } 146 147 void ^?{}(single_sem &) {} 148 149 bool wait(single_sem & this) { 150 for() { 151 struct $thread * expected = this.ptr; 152 if(expected == 1p) { 153 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 154 return false; 155 } 156 } 157 else { 158 /* paranoid */ verify( expected == 0p ); 159 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 160 park(); 161 return true; 162 } 163 } 164 165 } 166 } 167 168 bool post(single_sem & this) { 169 for() { 170 struct $thread * expected = this.ptr; 171 if(expected == 1p) return false; 172 if(expected == 0p) { 173 if(__atomic_compare_exchange_n(&this.ptr, &expected, 1p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 174 return false; 175 } 176 } 177 else { 178 if(__atomic_compare_exchange_n(&this.ptr, &expected, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 179 unpark( expected ); 180 return true; 181 } 182 } 183 } 184 } 185 } 186 187 // Synchronozation primitive which only supports a single thread and one post 188 // Similar to a binary semaphore with a 'one shot' semantic 189 // is expected to be discarded after each party call their side 190 struct oneshot { 191 // Internal state : 192 // 0p : is initial state (wait will block) 193 // 1p : fulfilled (wait won't block) 194 // any thread : a thread is currently waiting 195 struct $thread * volatile ptr; 196 }; 197 198 static inline { 199 void ?{}(oneshot & this) { 200 this.ptr = 0p; 201 } 202 203 void ^?{}(oneshot &) {} 204 205 // Wait for the post, return immidiately if it already happened. 206 // return true if the thread was parked 207 bool wait(oneshot & this) { 208 for() { 209 struct $thread * expected = this.ptr; 210 if(expected == 1p) return false; 211 /* paranoid */ verify( expected == 0p ); 212 if(__atomic_compare_exchange_n(&this.ptr, &expected, active_thread(), false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 213 park(); 214 /* paranoid */ verify( this.ptr == 1p ); 215 return true; 216 } 217 } 218 } 219 220 // Mark as fulfilled, wake thread if needed 221 // return true if a thread was unparked 222 bool post(oneshot & this) { 223 struct $thread * got = __atomic_exchange_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 224 if( got == 0p ) return false; 225 unpark( got ); 226 return true; 227 } 228 } 229 230 // base types for future to build upon 231 // It is based on the 'oneshot' type to allow multiple futures 232 // to block on the same instance, permitting users to block a single 233 // thread on "any of" [a given set of] futures. 234 // does not support multiple threads waiting on the same future 235 struct future_t { 236 // Internal state : 237 // 0p : is initial state (wait will block) 238 // 1p : fulfilled (wait won't block) 239 // 2p : in progress () 240 // 3p : abandoned, server should delete 241 // any oneshot : a context has been setup to wait, a thread could wait on it 242 struct oneshot * volatile ptr; 243 }; 244 245 static inline { 246 void ?{}(future_t & this) { 247 this.ptr = 0p; 248 } 249 250 void ^?{}(future_t &) {} 251 252 void reset(future_t & this) { 253 // needs to be in 0p or 1p 254 __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 255 } 256 257 // check if the future is available 258 bool available( future_t & this ) { 259 return this.ptr == 1p; 260 } 261 262 // Prepare the future to be waited on 263 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 264 bool setup( future_t & this, oneshot & wait_ctx ) { 265 /* paranoid */ verify( wait_ctx.ptr == 0p ); 266 // The future needs to set the wait context 267 for() { 268 struct oneshot * expected = this.ptr; 269 // Is the future already fulfilled? 270 if(expected == 1p) return false; // Yes, just return false (didn't block) 271 272 // The future is not fulfilled, try to setup the wait context 273 /* paranoid */ verify( expected == 0p ); 274 if(__atomic_compare_exchange_n(&this.ptr, &expected, &wait_ctx, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 275 return true; 276 } 277 } 278 } 279 280 // Stop waiting on a future 281 // When multiple futures are waited for together in "any of" pattern 282 // futures that weren't fulfilled before the thread woke up 283 // should retract the wait ctx 284 // intented to be use by wait, wait_any, waitfor, etc. rather than used directly 285 void retract( future_t & this, oneshot & wait_ctx ) { 286 // Remove the wait context 287 struct oneshot * got = __atomic_exchange_n( &this.ptr, 0p, __ATOMIC_SEQ_CST); 288 289 // got == 0p: future was never actually setup, just return 290 if( got == 0p ) return; 291 292 // got == wait_ctx: since fulfil does an atomic_swap, 293 // if we got back the original then no one else saw context 294 // It is safe to delete (which could happen after the return) 295 if( got == &wait_ctx ) return; 296 297 // got == 1p: the future is ready and the context was fully consumed 298 // the server won't use the pointer again 299 // It is safe to delete (which could happen after the return) 300 if( got == 1p ) return; 301 302 // got == 2p: the future is ready but the context hasn't fully been consumed 303 // spin until it is safe to move on 304 if( got == 2p ) { 305 while( this.ptr != 1p ) Pause(); 306 return; 307 } 308 309 // got == any thing else, something wen't wrong here, abort 310 abort("Future in unexpected state"); 311 } 312 313 // Mark the future as abandoned, meaning it will be deleted by the server 314 bool abandon( future_t & this ) { 315 /* paranoid */ verify( this.ptr != 3p ); 316 317 // Mark the future as abandonned 318 struct oneshot * got = __atomic_exchange_n( &this.ptr, 3p, __ATOMIC_SEQ_CST); 319 320 // If the future isn't already fulfilled, let the server delete it 321 if( got == 0p ) return false; 322 323 // got == 2p: the future is ready but the context hasn't fully been consumed 324 // spin until it is safe to move on 325 if( got == 2p ) { 326 while( this.ptr != 1p ) Pause(); 327 got = 1p; 328 } 329 330 // The future is completed delete it now 331 /* paranoid */ verify( this.ptr != 1p ); 332 free( &this ); 333 return true; 334 } 335 336 // from the server side, mark the future as fulfilled 337 // delete it if needed 338 bool fulfil( future_t & this ) { 339 for() { 340 struct oneshot * expected = this.ptr; 341 // was this abandoned? 342 #if defined(__GNUC__) && __GNUC__ >= 7 343 #pragma GCC diagnostic push 344 #pragma GCC diagnostic ignored "-Wfree-nonheap-object" 345 #endif 346 if( expected == 3p ) { free( &this ); return false; } 347 #if defined(__GNUC__) && __GNUC__ >= 7 348 #pragma GCC diagnostic pop 349 #endif 350 351 /* paranoid */ verify( expected != 1p ); // Future is already fulfilled, should not happen 352 /* paranoid */ verify( expected != 2p ); // Future is bein fulfilled by someone else, this is even less supported then the previous case. 353 354 // If there is a wait context, we need to consume it and mark it as consumed after 355 // If there is no context then we can skip the in progress phase 356 struct oneshot * want = expected == 0p ? 1p : 2p; 357 if(__atomic_compare_exchange_n(&this.ptr, &expected, want, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) { 358 if( expected == 0p ) { /* paranoid */ verify( this.ptr == 1p); return false; } 359 bool ret = post( *expected ); 360 __atomic_store_n( &this.ptr, 1p, __ATOMIC_SEQ_CST); 361 return ret; 362 } 363 } 364 365 } 366 367 // Wait for the future to be fulfilled 368 bool wait( future_t & this ) { 369 oneshot temp; 370 if( !setup(this, temp) ) return false; 371 372 // Wait context is setup, just wait on it 373 bool ret = wait( temp ); 374 375 // Wait for the future to tru 376 while( this.ptr == 2p ) Pause(); 377 // Make sure the state makes sense 378 // Should be fulfilled, could be in progress but it's out of date if so 379 // since if that is the case, the oneshot was fulfilled (unparking this thread) 380 // and the oneshot should not be needed any more 381 __attribute__((unused)) struct oneshot * was = this.ptr; 382 /* paranoid */ verifyf( was == 1p, "Expected this.ptr to be 1p, was %p\n", was ); 383 384 // Mark the future as fulfilled, to be consistent 385 // with potential calls to avail 386 // this.ptr = 1p; 387 return ret; 388 } 389 } 390 136 391 //----------------------------------------------------------------------- 137 392 // Statics call at the end of each thread to register statistics -
libcfa/src/concurrency/kernel/startup.cfa
rdafbde8 r454f478 199 199 void ?{}(processor & this) with( this ) { 200 200 ( this.idle ){}; 201 ( this.terminated ){ 0};201 ( this.terminated ){}; 202 202 ( this.runner ){}; 203 203 init( this, "Main Processor", *mainCluster ); … … 528 528 void ?{}(processor & this, const char name[], cluster & _cltr) { 529 529 ( this.idle ){}; 530 ( this.terminated ){ 0};530 ( this.terminated ){}; 531 531 ( this.runner ){}; 532 532 … … 549 549 __wake_proc( &this ); 550 550 551 P( terminated );551 wait( terminated ); 552 552 /* paranoid */ verify( active_processor() != &this); 553 553 } -
libcfa/src/concurrency/locks.cfa
rdafbde8 r454f478 356 356 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ) with(this) { WAIT_TIME( info, &l , time ) } 357 357 } 358 359 //----------------------------------------------------------------------------- 360 // Semaphore 361 void ?{}( semaphore & this, int count = 1 ) { 362 (this.lock){}; 363 this.count = count; 364 (this.waiting){}; 365 } 366 void ^?{}(semaphore & this) {} 367 368 bool P(semaphore & this) with( this ){ 369 lock( lock __cfaabi_dbg_ctx2 ); 370 count -= 1; 371 if ( count < 0 ) { 372 // queue current task 373 append( waiting, active_thread() ); 374 375 // atomically release spin lock and block 376 unlock( lock ); 377 park(); 378 return true; 379 } 380 else { 381 unlock( lock ); 382 return false; 383 } 384 } 385 386 bool V(semaphore & this) with( this ) { 387 $thread * thrd = 0p; 388 lock( lock __cfaabi_dbg_ctx2 ); 389 count += 1; 390 if ( count <= 0 ) { 391 // remove task at head of waiting list 392 thrd = pop_head( waiting ); 393 } 394 395 unlock( lock ); 396 397 // make new owner 398 unpark( thrd ); 399 400 return thrd != 0p; 401 } 402 403 bool V(semaphore & this, unsigned diff) with( this ) { 404 $thread * thrd = 0p; 405 lock( lock __cfaabi_dbg_ctx2 ); 406 int release = max(-count, (int)diff); 407 count += diff; 408 for(release) { 409 unpark( pop_head( waiting ) ); 410 } 411 412 unlock( lock ); 413 414 return thrd != 0p; 415 } -
libcfa/src/concurrency/locks.hfa
rdafbde8 r454f478 157 157 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Time time ); 158 158 } 159 160 //----------------------------------------------------------------------------- 161 // Semaphore 162 struct semaphore { 163 __spinlock_t lock; 164 int count; 165 __queue_t($thread) waiting; 166 }; 167 168 void ?{}(semaphore & this, int count = 1); 169 void ^?{}(semaphore & this); 170 bool P (semaphore & this); 171 bool V (semaphore & this); 172 bool V (semaphore & this, unsigned count); -
tests/concurrent/thread.cfa
rdafbde8 r454f478 1 1 #include <fstream.hfa> 2 2 #include <kernel.hfa> 3 #include <locks.hfa> 3 4 #include <stdlib.hfa> 4 5 #include <thread.hfa>
Note: See TracChangeset
for help on using the changeset viewer.