Changeset db6f06a for src/libcfa/concurrency
- Timestamp:
- Feb 13, 2017, 5:04:43 PM (8 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
- Children:
- ee897e4b
- Parents:
- 75f3522
- Location:
- src/libcfa/concurrency
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
src/libcfa/concurrency/coroutines.c
r75f3522 rdb6f06a 61 61 62 62 void ?{}(coroutine* this) { 63 this->name = "Anonymous Coroutine"; 64 this->errno_ = 0; 65 this->state = Start; 66 this->notHalted = true; 67 this->starter = NULL; 68 this->last = NULL; 63 this{ "Anonymous Coroutine" }; 69 64 } 70 65 … … 73 68 this->errno_ = 0; 74 69 this->state = Start; 75 70 this->notHalted = true; 76 71 this->starter = NULL; 77 72 this->last = NULL; … … 169 164 this->context = this->base; 170 165 this->top = (char *)this->context + cxtSize; 171 172 LIB_DEBUG_PRINTF("Coroutine : created stack %p\n", this->base);173 166 } 174 167 -
src/libcfa/concurrency/invoke.h
r75f3522 rdb6f06a 30 30 #define SCHEDULER_CAPACITY 10 31 31 32 struct spinlock { 33 volatile int lock; 34 }; 35 32 36 struct simple_thread_list { 33 37 struct thread * head; 34 38 struct thread ** tail; 39 }; 40 41 // struct simple_lock { 42 // struct simple_thread_list blocked; 43 // }; 44 45 struct signal_once { 46 volatile bool condition; 47 struct spinlock lock; 48 struct simple_thread_list blocked; 35 49 }; 36 50 … … 40 54 void append( struct simple_thread_list *, struct thread * ); 41 55 struct thread * pop_head( struct simple_thread_list * ); 56 57 // void ?{}(simple_lock * this); 58 // void ^?{}(simple_lock * this); 59 60 void ?{}(spinlock * this); 61 void ^?{}(spinlock * this); 62 63 void ?{}(signal_once * this); 64 void ^?{}(signal_once * this); 42 65 } 43 66 #endif … … 60 83 int errno_; // copy of global UNIX variable errno 61 84 enum coroutine_state state; // current execution status for coroutine 62 bool notHalted; // indicate if execuation state is not halted 63 85 bool notHalted; // indicate if execuation state is not halted 64 86 struct coroutine *starter; // first coroutine to resume this one 65 87 struct coroutine *last; // last coroutine to resume this one 66 88 }; 67 89 68 struct simple_lock {69 struct simple_thread_list blocked;70 };71 72 90 struct thread { 73 91 struct coroutine c; 74 s truct simple_lock lock;92 signal_once terminated; // indicate if execuation state is not halted 75 93 struct thread * next; 76 94 }; -
src/libcfa/concurrency/kernel
r75f3522 rdb6f06a 27 27 28 28 //----------------------------------------------------------------------------- 29 // Locks 30 // void lock( simple_lock * ); 31 // void lock( simple_lock *, spinlock * ); 32 // void unlock( simple_lock * ); 33 34 void lock( spinlock * ); 35 void unlock( spinlock * ); 36 37 void wait( signal_once * ); 38 void signal( signal_once * ); 39 40 //----------------------------------------------------------------------------- 29 41 // Cluster 30 42 struct cluster { 31 43 simple_thread_list ready_queue; 32 // pthread_spinlock_tlock;44 spinlock lock; 33 45 }; 34 46 … … 38 50 //----------------------------------------------------------------------------- 39 51 // Processor 40 enum ProcessorAction { 41 Reschedule, 42 NoAction 52 enum FinishOpCode { No_Action, Release, Schedule, Release_Schedule }; 53 struct FinishAction { 54 FinishOpCode action_code; 55 thread * thrd; 56 spinlock * lock; 43 57 }; 58 static inline void ?{}(FinishAction * this) { 59 this->action_code = No_Action; 60 this->thrd = NULL; 61 this->lock = NULL; 62 } 63 static inline void ^?{}(FinishAction * this) {} 44 64 45 65 struct processor { … … 49 69 thread * current_thread; 50 70 pthread_t kernel_thread; 51 simple_lock lock; 52 volatile bool terminated; 53 ProcessorAction thread_action; 71 72 signal_once terminated; 73 volatile bool is_terminated; 74 75 struct FinishAction finish; 54 76 }; 55 77 … … 57 79 void ?{}(processor * this, cluster * cltr); 58 80 void ^?{}(processor * this); 59 60 //-----------------------------------------------------------------------------61 // Locks62 63 void ?{}(simple_lock * this);64 void ^?{}(simple_lock * this);65 66 void lock( simple_lock * );67 void unlock( simple_lock * );68 81 69 82 #endif //KERNEL_H -
src/libcfa/concurrency/kernel.c
r75f3522 rdb6f06a 141 141 } 142 142 143 void start(processor * this);144 145 143 void ?{}(processor * this) { 146 144 this{ systemCluster }; … … 151 149 this->current_coroutine = NULL; 152 150 this->current_thread = NULL; 153 (&this-> lock){};154 this-> terminated = false;151 (&this->terminated){}; 152 this->is_terminated = false; 155 153 156 154 start( this ); … … 161 159 this->current_coroutine = NULL; 162 160 this->current_thread = NULL; 163 (&this-> lock){};164 this-> terminated = false;161 (&this->terminated){}; 162 this->is_terminated = false; 165 163 166 164 this->runner = runner; … … 170 168 171 169 void ^?{}(processor * this) { 172 if( ! this-> terminated ) {170 if( ! this->is_terminated ) { 173 171 LIB_DEBUG_PRINTF("Kernel : core %p signaling termination\n", this); 174 this-> terminated = true;175 lock( &this->lock);172 this->is_terminated = true; 173 wait( &this->terminated ); 176 174 } 177 175 } … … 194 192 LIB_DEBUG_PRINTF("Kernel : core %p starting\n", this); 195 193 196 fenv_t envp;197 fegetenv( &envp );198 LIB_DEBUG_PRINTF("Kernel : mxcsr %x\n", envp.__mxcsr);199 200 194 thread * readyThread = NULL; 201 for( unsigned int spin_count = 0; ! this-> terminated; spin_count++ )195 for( unsigned int spin_count = 0; ! this->is_terminated; spin_count++ ) 202 196 { 203 197 readyThread = nextThread( this->cltr ); … … 208 202 209 203 //Some actions need to be taken from the kernel 210 finishRunning(this , readyThread);204 finishRunning(this); 211 205 212 206 spin_count = 0; … … 219 213 220 214 LIB_DEBUG_PRINTF("Kernel : core %p unlocking thread\n", this); 221 unlock( &this->lock);215 signal( &this->terminated ); 222 216 LIB_DEBUG_PRINTF("Kernel : core %p terminated\n", this); 223 217 } … … 230 224 231 225 //Reset the terminating actions here 232 this-> thread_action = NoAction;226 this->finish.action_code = No_Action; 233 227 234 228 //Update global state … … 242 236 // Once a thread has finished running, some of 243 237 // its final actions must be executed from the kernel 244 void finishRunning(processor * this, thread * thrd) { 245 if(this->thread_action == Reschedule) { 246 ScheduleThread( thrd ); 238 void finishRunning(processor * this) { 239 if( this->finish.action_code == Release ) { 240 unlock( this->finish.lock ); 241 } 242 else if( this->finish.action_code == Schedule ) { 243 ScheduleThread( this->finish.thrd ); 244 } 245 else if( this->finish.action_code == Release_Schedule ) { 246 unlock( this->finish.lock ); 247 ScheduleThread( this->finish.thrd ); 248 } 249 else { 250 assert(this->finish.action_code == No_Action); 247 251 } 248 252 } … … 310 314 assertf( thrd->next == NULL, "Expected null got %p", thrd->next ); 311 315 312 spin_lock( &lock );316 lock( &systemProcessor->cltr->lock ); 313 317 append( &systemProcessor->cltr->ready_queue, thrd ); 314 spin_unlock( &lock ); 318 unlock( &systemProcessor->cltr->lock ); 319 } 320 321 thread * nextThread(cluster * this) { 322 lock( &this->lock ); 323 thread * head = pop_head( &this->ready_queue ); 324 unlock( &this->lock ); 325 return head; 315 326 } 316 327 317 328 void ScheduleInternal() { 318 get_this_processor()->thread_action = Reschedule;319 329 suspend(); 320 330 } 321 331 322 thread * nextThread(cluster * this) { 323 spin_lock( &lock ); 324 thread * head = pop_head( &this->ready_queue ); 325 spin_unlock( &lock ); 326 return head; 332 void ScheduleInternal( spinlock * lock ) { 333 get_this_processor()->finish.action_code = Release; 334 get_this_processor()->finish.lock = lock; 335 suspend(); 336 } 337 338 void ScheduleInternal( thread * thrd ) { 339 get_this_processor()->finish.action_code = Schedule; 340 get_this_processor()->finish.thrd = thrd; 341 suspend(); 342 } 343 344 void ScheduleInternal( spinlock * lock, thread * thrd ) { 345 get_this_processor()->finish.action_code = Release_Schedule; 346 get_this_processor()->finish.lock = lock; 347 get_this_processor()->finish.thrd = thrd; 348 suspend(); 327 349 } 328 350 … … 374 396 // When its coroutine terminates, it return control to the mainThread 375 397 // which is currently here 376 systemProcessor-> terminated = true;398 systemProcessor->is_terminated = true; 377 399 suspend(); 378 400 … … 393 415 //----------------------------------------------------------------------------- 394 416 // Locks 395 void ?{}( simple_lock * this ) { 396 ( &this->blocked ){}; 397 } 398 399 void ^?{}( simple_lock * this ) { 400 401 } 402 403 void lock( simple_lock * this ) { 417 // void ?{}( simple_lock * this ) { 418 // ( &this->blocked ){}; 419 // } 420 421 // void ^?{}( simple_lock * this ) { 422 423 // } 424 425 // void lock( simple_lock * this ) { 426 // { 427 // spin_lock( &lock ); 428 // append( &this->blocked, this_thread() ); 429 // spin_unlock( &lock ); 430 // } 431 // ScheduleInternal(); 432 // } 433 434 // void lock( simple_lock * this, spinlock * to_release ) { 435 // { 436 // spin_lock( &lock ); 437 // append( &this->blocked, this_thread() ); 438 // spin_unlock( &lock ); 439 // } 440 // ScheduleInternal( to_release ); 441 // lock( to_release ); 442 // } 443 444 // void unlock( simple_lock * this ) { 445 // thread * it; 446 // while( it = pop_head( &this->blocked) ) { 447 // ScheduleThread( it ); 448 // } 449 // } 450 451 void ?{}( spinlock * this ) { 452 this->lock = 0; 453 } 454 void ^?{}( spinlock * this ) { 455 456 } 457 458 void lock( spinlock * this ) { 459 for ( unsigned int i = 1;; i += 1 ) { 460 if ( this->lock == 0 && __sync_lock_test_and_set_4( &this->lock, 1 ) == 0 ) break; 461 } 462 } 463 464 void unlock( spinlock * this ) { 465 __sync_lock_release_4( &this->lock ); 466 } 467 468 void ?{}( signal_once * this ) { 469 this->condition = false; 470 } 471 void ^?{}( signal_once * this ) { 472 473 } 474 475 void wait( signal_once * this ) { 476 lock( &this->lock ); 477 if( !this->condition ) { 478 append( &this->blocked, this_thread() ); 479 ScheduleInternal( &this->lock ); 480 lock( &this->lock ); 481 } 482 unlock( &this->lock ); 483 } 484 485 void signal( signal_once * this ) { 486 lock( &this->lock ); 404 487 { 405 spin_lock( &lock ); 406 append( &this->blocked, this_thread() ); 407 spin_unlock( &lock ); 408 } 409 suspend(); 410 } 411 412 void unlock( simple_lock * this ) { 413 thread * it; 414 while( it = pop_head( &this->blocked) ) { 415 ScheduleThread( it ); 416 } 488 this->condition = true; 489 490 thread * it; 491 while( it = pop_head( &this->blocked) ) { 492 ScheduleThread( it ); 493 } 494 } 495 unlock( &this->lock ); 417 496 } 418 497 -
src/libcfa/concurrency/kernel_private.h
r75f3522 rdb6f06a 24 24 // Scheduler 25 25 void ScheduleThread( thread * ); 26 thread * nextThread(cluster * this); 27 26 28 void ScheduleInternal(); 27 thread * nextThread(cluster * this); 29 void ScheduleInternal(spinlock * lock); 30 void ScheduleInternal(thread * thrd); 31 void ScheduleInternal(spinlock * lock, thread * thrd); 28 32 29 33 //----------------------------------------------------------------------------- … … 37 41 38 42 void main(processorCtx_t *); 43 void start(processor * this); 39 44 void runThread(processor * this, thread * dst); 40 void finishRunning(processor * this , thread * thrd);45 void finishRunning(processor * this); 41 46 void spin(processor * this, unsigned int * spin_count); 42 47 -
src/libcfa/concurrency/threads.c
r75f3522 rdb6f06a 44 44 (&this->c){}; 45 45 this->c.name = "Anonymous Coroutine"; 46 (&this-> lock){};46 (&this->terminated){}; 47 47 this->next = NULL; 48 48 } … … 90 90 forall( dtype T | is_thread(T) ) 91 91 void stop( T* this ) { 92 thread* thrd = get_thread(this); 93 if( thrd->c.notHalted ) { 94 lock( &thrd->lock ); 95 } 92 wait( & get_thread(this)->terminated ); 96 93 } 97 94 98 95 void yield( void ) { 99 ScheduleInternal( );96 ScheduleInternal( get_this_processor()->current_thread ); 100 97 } 101 98 … … 124 121 void __thread_signal_termination( thread * this ) { 125 122 this->c.state = Halt; 126 this->c.notHalted = false;127 unlock( &this->lock );123 LIB_DEBUG_PRINTF("Thread end : %p\n", this); 124 signal( &this->terminated ); 128 125 } 129 126 }
Note: See TracChangeset
for help on using the changeset viewer.