Changeset 97f65d5 for src/libcfa/concurrency
- Timestamp:
- Feb 15, 2017, 8:13:49 AM (9 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, stuck-waitfor-destruct, with_gc
- Children:
- e6512c8
- Parents:
- aa9ee19 (diff), 3149e7e (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- src/libcfa/concurrency
- Files:
-
- 1 added
- 7 edited
-
coroutines (modified) (3 diffs)
-
coroutines.c (modified) (3 diffs)
-
invoke.c (modified) (1 diff)
-
invoke.h (modified) (4 diffs)
-
kernel (modified) (5 diffs)
-
kernel.c (modified) (18 diffs)
-
kernel_private.h (added)
-
threads.c (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
-
src/libcfa/concurrency/coroutines
raa9ee19 r97f65d5 77 77 "Possible cause is a suspend executed in a member called by a coroutine user rather than by the coroutine main.", 78 78 src->name, src ); 79 assertf( src->last-> notHalted,79 assertf( src->last->state != Halted, 80 80 "Attempt by coroutine \"%.256s\" (%p) to suspend back to terminated coroutine \"%.256s\" (%p).\n" 81 81 "Possible cause is terminated coroutine's main routine has already returned.", … … 98 98 // not resuming self ? 99 99 if ( src != dst ) { 100 assertf( dst-> notHalted ,100 assertf( dst->state != Halted , 101 101 "Attempt by coroutine %.256s (%p) to resume terminated coroutine %.256s (%p).\n" 102 102 "Possible cause is terminated coroutine's main routine has already returned.", … … 116 116 // not resuming self ? 117 117 if ( src != dst ) { 118 assertf( dst-> notHalted ,118 assertf( dst->state != Halted , 119 119 "Attempt by coroutine %.256s (%p) to resume terminated coroutine %.256s (%p).\n" 120 120 "Possible cause is terminated coroutine's main routine has already returned.", -
src/libcfa/concurrency/coroutines.c
raa9ee19 r97f65d5 61 61 62 62 void ?{}(coroutine* this) { 63 this->name = "Anonymous Coroutine"; 64 this->errno_ = 0; 65 this->state = Start; 66 this->notHalted = true; 67 this->starter = NULL; 68 this->last = NULL; 63 this{ "Anonymous Coroutine" }; 69 64 } 70 65 … … 73 68 this->errno_ = 0; 74 69 this->state = Start; 75 this->notHalted = true;76 70 this->starter = NULL; 77 71 this->last = NULL; … … 169 163 this->context = this->base; 170 164 this->top = (char *)this->context + cxtSize; 171 172 LIB_DEBUG_PRINTF("Coroutine : created stack %p\n", this->base);173 165 } 174 166 -
src/libcfa/concurrency/invoke.c
raa9ee19 r97f65d5 48 48 main( this ); 49 49 50 cor->state = Halt; 51 cor->notHalted = false; 50 cor->state = Halted; 52 51 53 52 //Final suspend, should never return -
src/libcfa/concurrency/invoke.h
raa9ee19 r97f65d5 30 30 #define SCHEDULER_CAPACITY 10 31 31 32 struct spinlock { 33 volatile int lock; 34 }; 35 32 36 struct simple_thread_list { 33 37 struct thread * head; 34 38 struct thread ** tail; 39 }; 40 41 struct signal_once { 42 volatile bool condition; 43 struct spinlock lock; 44 struct simple_thread_list blocked; 35 45 }; 36 46 … … 40 50 void append( struct simple_thread_list *, struct thread * ); 41 51 struct thread * pop_head( struct simple_thread_list * ); 52 53 void ?{}(spinlock * this); 54 void ^?{}(spinlock * this); 55 56 void ?{}(signal_once * this); 57 void ^?{}(signal_once * this); 42 58 } 43 59 #endif … … 53 69 }; 54 70 55 enum coroutine_state { Start, Inactive, Active, Halt, Primed };71 enum coroutine_state { Halted, Start, Inactive, Active, Primed }; 56 72 57 73 struct coroutine { … … 60 76 int errno_; // copy of global UNIX variable errno 61 77 enum coroutine_state state; // current execution status for coroutine 62 bool notHalted; // indicate if execuation state is not halted63 64 78 struct coroutine *starter; // first coroutine to resume this one 65 79 struct coroutine *last; // last coroutine to resume this one 66 80 }; 67 81 68 struct simple_lock {69 struct simple_thread_list blocked;70 };71 72 82 struct thread { 73 struct coroutine c; 74 struct si mple_lock lock;75 struct thread * next; 83 struct coroutine c; // coroutine body used to store context 84 struct signal_once terminated;// indicate if execuation state is not halted 85 struct thread * next; // instrusive link field for threads 76 86 }; 77 87 -
src/libcfa/concurrency/kernel
raa9ee19 r97f65d5 9 9 // 10 10 // Author : Thierry Delisle 11 // Created On : Tue Jan 17 12:27:26 201 611 // Created On : Tue Jan 17 12:27:26 2017 12 12 // Last Modified By : Thierry Delisle 13 13 // Last Modified On : -- … … 27 27 28 28 //----------------------------------------------------------------------------- 29 // Locks 30 void lock( spinlock * ); 31 void unlock( spinlock * ); 32 33 void wait( signal_once * ); 34 void signal( signal_once * ); 35 36 //----------------------------------------------------------------------------- 29 37 // Cluster 30 38 struct cluster { 31 39 simple_thread_list ready_queue; 32 // pthread_spinlock_tlock;40 spinlock lock; 33 41 }; 34 42 … … 38 46 //----------------------------------------------------------------------------- 39 47 // Processor 40 enum ProcessorAction { 41 Reschedule, 42 NoAction 48 enum FinishOpCode { No_Action, Release, Schedule, Release_Schedule }; 49 struct FinishAction { 50 FinishOpCode action_code; 51 thread * thrd; 52 spinlock * lock; 43 53 }; 54 static inline void ?{}(FinishAction * this) { 55 this->action_code = No_Action; 56 this->thrd = NULL; 57 this->lock = NULL; 58 } 59 static inline void ^?{}(FinishAction * this) {} 44 60 45 61 struct processor { … … 49 65 thread * current_thread; 50 66 pthread_t kernel_thread; 51 simple_lock lock; 52 volatile bool terminated; 53 ProcessorAction thread_action; 67 68 signal_once terminated; 69 volatile bool is_terminated; 70 71 struct FinishAction finish; 54 72 }; 55 73 … … 57 75 void ?{}(processor * this, cluster * cltr); 58 76 void ^?{}(processor * this); 59 60 61 //-----------------------------------------------------------------------------62 // Locks63 64 void ?{}(simple_lock * this);65 void ^?{}(simple_lock * this);66 67 void lock( simple_lock * );68 void unlock( simple_lock * );69 77 70 78 #endif //KERNEL_H -
src/libcfa/concurrency/kernel.c
raa9ee19 r97f65d5 9 9 // 10 10 // Author : Thierry Delisle 11 // Created On : Tue Jan 17 12:27:26 201 611 // Created On : Tue Jan 17 12:27:26 2017 12 12 // Last Modified By : Thierry Delisle 13 13 // Last Modified On : -- … … 20 20 21 21 //Header 22 #include "kernel "22 #include "kernel_private.h" 23 23 24 24 //C Includes … … 31 31 //CFA Includes 32 32 #include "libhdr.h" 33 #include "threads"34 33 35 34 //Private includes … … 37 36 #include "invoke.h" 38 37 39 static volatile int lock;40 41 void spin_lock( volatile int *lock ) {42 for ( unsigned int i = 1;; i += 1 ) {43 if ( *lock == 0 && __sync_lock_test_and_set_4( lock, 1 ) == 0 ) break;44 }45 }46 47 void spin_unlock( volatile int *lock ) {48 __sync_lock_release_4( lock );49 }50 51 38 //----------------------------------------------------------------------------- 52 39 // Kernel storage 53 struct processorCtx_t {54 processor * proc;55 coroutine c;56 };57 58 DECL_COROUTINE(processorCtx_t);59 60 40 #define KERNEL_STORAGE(T,X) static char X##_storage[sizeof(T)] 61 41 … … 127 107 this->name = "Main Thread"; 128 108 this->errno_ = 0; 129 this->state = Inactive; 130 this->notHalted = true; 109 this->state = Start; 131 110 } 132 111 … … 149 128 } 150 129 151 void start(processor * this);152 153 130 void ?{}(processor * this) { 154 131 this{ systemCluster }; … … 159 136 this->current_coroutine = NULL; 160 137 this->current_thread = NULL; 161 (&this-> lock){};162 this-> terminated = false;138 (&this->terminated){}; 139 this->is_terminated = false; 163 140 164 141 start( this ); … … 169 146 this->current_coroutine = NULL; 170 147 this->current_thread = NULL; 171 (&this-> lock){};172 this-> terminated = false;148 (&this->terminated){}; 149 this->is_terminated = false; 173 150 174 151 this->runner = runner; … … 178 155 179 156 void ^?{}(processor * this) { 180 if( ! this-> terminated ) {157 if( ! this->is_terminated ) { 181 158 LIB_DEBUG_PRINTF("Kernel : core %p signaling termination\n", this); 182 this-> terminated = true;183 lock( &this->lock);159 this->is_terminated = true; 160 wait( &this->terminated ); 184 161 } 185 162 } … … 187 164 void ?{}(cluster * this) { 188 165 ( &this->ready_queue ){}; 189 lock = 0;166 ( &this->lock ){}; 190 167 } 191 168 … … 194 171 } 195 172 196 //----------------------------------------------------------------------------- 197 // Processor running routines 198 void main(processorCtx_t *); 199 thread * nextThread(cluster * this); 200 void scheduleInternal(processor * this, thread * dst); 201 void spin(processor * this, unsigned int * spin_count); 202 void thread_schedule( thread * thrd ); 203 173 //============================================================================================= 174 // Kernel Scheduling logic 175 //============================================================================================= 204 176 //Main of the processor contexts 205 177 void main(processorCtx_t * runner) { … … 207 179 LIB_DEBUG_PRINTF("Kernel : core %p starting\n", this); 208 180 209 fenv_t envp;210 fegetenv( &envp );211 LIB_DEBUG_PRINTF("Kernel : mxcsr %x\n", envp.__mxcsr);212 213 181 thread * readyThread = NULL; 214 for( unsigned int spin_count = 0; ! this-> terminated; spin_count++ ) {215 182 for( unsigned int spin_count = 0; ! this->is_terminated; spin_count++ ) 183 { 216 184 readyThread = nextThread( this->cltr ); 217 185 218 if(readyThread) { 219 scheduleInternal(this, readyThread); 186 if(readyThread) 187 { 188 runThread(this, readyThread); 189 190 //Some actions need to be taken from the kernel 191 finishRunning(this); 192 220 193 spin_count = 0; 221 } else { 194 } 195 else 196 { 222 197 spin(this, &spin_count); 223 198 } … … 225 200 226 201 LIB_DEBUG_PRINTF("Kernel : core %p unlocking thread\n", this); 227 unlock( &this->lock);202 signal( &this->terminated ); 228 203 LIB_DEBUG_PRINTF("Kernel : core %p terminated\n", this); 229 204 } 230 205 231 //Declarations for scheduleInternal 232 extern void ThreadCtxSwitch(coroutine * src, coroutine * dst); 233 234 // scheduleInternal runs a thread by context switching 206 // runThread runs a thread by context switching 235 207 // from the processor coroutine to the target thread 236 void scheduleInternal(processor * this, thread * dst) { 237 this->thread_action = NoAction; 238 239 // coroutine * proc_ctx = get_coroutine(this->ctx); 240 // coroutine * thrd_ctx = get_coroutine(dst); 241 242 // //Update global state 243 // this->current_thread = dst; 244 245 // // Context Switch to the thread 246 // ThreadCtxSwitch(proc_ctx, thrd_ctx); 247 // // when ThreadCtxSwitch returns we are back in the processor coroutine 248 249 coroutine * proc_ctx = get_coroutine(this->runner); 250 coroutine * thrd_ctx = get_coroutine(dst); 251 thrd_ctx->last = proc_ctx; 252 253 // context switch to specified coroutine 254 // Which is now the current_coroutine 255 // LIB_DEBUG_PRINTF("Kernel : switching to ctx %p (from %p, current %p)\n", thrd_ctx, proc_ctx, this->current_coroutine); 256 this->current_thread = dst; 257 this->current_coroutine = thrd_ctx; 258 CtxSwitch( proc_ctx->stack.context, thrd_ctx->stack.context ); 259 this->current_coroutine = proc_ctx; 260 // LIB_DEBUG_PRINTF("Kernel : returned from ctx %p (to %p, current %p)\n", thrd_ctx, proc_ctx, this->current_coroutine); 261 262 // when CtxSwitch returns we are back in the processor coroutine 263 if(this->thread_action == Reschedule) { 264 thread_schedule( dst ); 208 void runThread(processor * this, thread * dst) { 209 coroutine * proc_cor = get_coroutine(this->runner); 210 coroutine * thrd_cor = get_coroutine(dst); 211 212 //Reset the terminating actions here 213 this->finish.action_code = No_Action; 214 215 //Update global state 216 this->current_thread = dst; 217 218 // Context Switch to the thread 219 ThreadCtxSwitch(proc_cor, thrd_cor); 220 // when ThreadCtxSwitch returns we are back in the processor coroutine 221 } 222 223 // Once a thread has finished running, some of 224 // its final actions must be executed from the kernel 225 void finishRunning(processor * this) { 226 if( this->finish.action_code == Release ) { 227 unlock( this->finish.lock ); 228 } 229 else if( this->finish.action_code == Schedule ) { 230 ScheduleThread( this->finish.thrd ); 231 } 232 else if( this->finish.action_code == Release_Schedule ) { 233 unlock( this->finish.lock ); 234 ScheduleThread( this->finish.thrd ); 235 } 236 else { 237 assert(this->finish.action_code == No_Action); 265 238 } 266 239 } … … 301 274 proc_cor_storage.c.state = Active; 302 275 main( &proc_cor_storage ); 303 proc_cor_storage.c.state = Halt; 304 proc_cor_storage.c.notHalted = false; 276 proc_cor_storage.c.state = Halted; 305 277 306 278 // Main routine of the core returned, the core is now fully terminated … … 325 297 //----------------------------------------------------------------------------- 326 298 // Scheduler routines 327 void thread_schedule( thread * thrd ) {299 void ScheduleThread( thread * thrd ) { 328 300 assertf( thrd->next == NULL, "Expected null got %p", thrd->next ); 329 301 330 spin_lock( &lock );302 lock( &systemProcessor->cltr->lock ); 331 303 append( &systemProcessor->cltr->ready_queue, thrd ); 332 spin_unlock( &lock );304 unlock( &systemProcessor->cltr->lock ); 333 305 } 334 306 335 307 thread * nextThread(cluster * this) { 336 spin_lock( &lock );308 lock( &this->lock ); 337 309 thread * head = pop_head( &this->ready_queue ); 338 spin_unlock( &lock );310 unlock( &this->lock ); 339 311 return head; 312 } 313 314 void ScheduleInternal() { 315 suspend(); 316 } 317 318 void ScheduleInternal( spinlock * lock ) { 319 get_this_processor()->finish.action_code = Release; 320 get_this_processor()->finish.lock = lock; 321 suspend(); 322 } 323 324 void ScheduleInternal( thread * thrd ) { 325 get_this_processor()->finish.action_code = Schedule; 326 get_this_processor()->finish.thrd = thrd; 327 suspend(); 328 } 329 330 void ScheduleInternal( spinlock * lock, thread * thrd ) { 331 get_this_processor()->finish.action_code = Release_Schedule; 332 get_this_processor()->finish.lock = lock; 333 get_this_processor()->finish.thrd = thrd; 334 suspend(); 340 335 } 341 336 … … 363 358 // Add the main thread to the ready queue 364 359 // once resume is called on systemProcessor->ctx the mainThread needs to be scheduled like any normal thread 365 thread_schedule(mainThread);360 ScheduleThread(mainThread); 366 361 367 362 //initialize the global state variables … … 387 382 // When its coroutine terminates, it return control to the mainThread 388 383 // which is currently here 389 systemProcessor-> terminated = true;384 systemProcessor->is_terminated = true; 390 385 suspend(); 391 386 … … 406 401 //----------------------------------------------------------------------------- 407 402 // Locks 408 void ?{}( simple_lock * this ) { 409 ( &this->blocked ){}; 410 } 411 412 void ^?{}( simple_lock * this ) { 413 414 } 415 416 void lock( simple_lock * this ) { 403 void ?{}( spinlock * this ) { 404 this->lock = 0; 405 } 406 void ^?{}( spinlock * this ) { 407 408 } 409 410 void lock( spinlock * this ) { 411 for ( unsigned int i = 1;; i += 1 ) { 412 if ( this->lock == 0 && __sync_lock_test_and_set_4( &this->lock, 1 ) == 0 ) break; 413 } 414 } 415 416 void unlock( spinlock * this ) { 417 __sync_lock_release_4( &this->lock ); 418 } 419 420 void ?{}( signal_once * this ) { 421 this->condition = false; 422 } 423 void ^?{}( signal_once * this ) { 424 425 } 426 427 void wait( signal_once * this ) { 428 lock( &this->lock ); 429 if( !this->condition ) { 430 append( &this->blocked, this_thread() ); 431 ScheduleInternal( &this->lock ); 432 lock( &this->lock ); 433 } 434 unlock( &this->lock ); 435 } 436 437 void signal( signal_once * this ) { 438 lock( &this->lock ); 417 439 { 418 spin_lock( &lock ); 419 append( &this->blocked, this_thread() ); 420 spin_unlock( &lock ); 421 } 422 suspend(); 423 } 424 425 void unlock( simple_lock * this ) { 426 thread * it; 427 while( it = pop_head( &this->blocked) ) { 428 thread_schedule( it ); 429 } 440 this->condition = true; 441 442 thread * it; 443 while( it = pop_head( &this->blocked) ) { 444 ScheduleThread( it ); 445 } 446 } 447 unlock( &this->lock ); 430 448 } 431 449 -
src/libcfa/concurrency/threads.c
raa9ee19 r97f65d5 17 17 #include "threads" 18 18 19 #include "kernel "19 #include "kernel_private.h" 20 20 #include "libhdr.h" 21 21 … … 44 44 (&this->c){}; 45 45 this->c.name = "Anonymous Coroutine"; 46 (&this-> lock){};46 (&this->terminated){}; 47 47 this->next = NULL; 48 48 } … … 72 72 //----------------------------------------------------------------------------- 73 73 // Starting and stopping threads 74 extern "C" {75 forall(dtype T | is_thread(T))76 void CtxInvokeThread(T * this);77 }78 79 extern void thread_schedule( thread * );80 81 74 forall( dtype T | is_thread(T) ) 82 75 void start( T* this ) { … … 92 85 CtxSwitch( thrd_c->last->stack.context, thrd_c->stack.context ); 93 86 94 fenv_t envp; 95 fegetenv( &envp ); 96 LIB_DEBUG_PRINTF("Thread : mxcsr %x\n", envp.__mxcsr); 97 LIB_DEBUG_PRINTF("Thread started : %p (t %p, c %p)\n", this, thrd_c, thrd_h); 98 99 thread_schedule(thrd_h); 87 ScheduleThread(thrd_h); 100 88 } 101 89 102 90 forall( dtype T | is_thread(T) ) 103 91 void stop( T* this ) { 104 thread* thrd = get_thread(this); 105 if( thrd->c.notHalted ) { 106 lock( &thrd->lock ); 107 } 92 wait( & get_thread(this)->terminated ); 108 93 } 109 94 110 95 void yield( void ) { 111 get_this_processor()->thread_action = Reschedule; 112 suspend(); 96 ScheduleInternal( get_this_processor()->current_thread ); 113 97 } 114 98 115 99 void ThreadCtxSwitch(coroutine* src, coroutine* dst) { 100 // set state of current coroutine to inactive 101 src->state = Inactive; 102 dst->state = Active; 103 104 //update the last resumer 116 105 dst->last = src; 117 106 118 // set state of current coroutine to inactive 119 src->state = Inactive; 120 121 // set new coroutine that task is executing 107 // set new coroutine that the processor is executing 108 // and context switch to it 122 109 get_this_processor()->current_coroutine = dst; 123 124 // context switch to specified coroutine125 110 CtxSwitch( src->stack.context, dst->stack.context ); 126 // when CtxSwitch returns we are back in the src coroutine111 get_this_processor()->current_coroutine = src; 127 112 128 113 // set state of new coroutine to active 114 dst->state = Inactive; 129 115 src->state = Active; 130 116 } … … 134 120 extern "C" { 135 121 void __thread_signal_termination( thread * this ) { 136 this->c.state = Halt ;137 this->c.notHalted = false;138 unlock( &this->lock );122 this->c.state = Halted; 123 LIB_DEBUG_PRINTF("Thread end : %p\n", this); 124 signal( &this->terminated ); 139 125 } 140 126 }
Note:
See TracChangeset
for help on using the changeset viewer.