- Timestamp:
- Sep 20, 2017, 2:07:57 PM (8 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
- Children:
- 0895cba
- Parents:
- f980549
- Location:
- src
- Files:
- 
      - 7 edited
 
 - 
          
  Concurrency/Waitfor.cc (modified) (2 diffs)
- 
          
  libcfa/concurrency/invoke.h (modified) (1 diff)
- 
          
  libcfa/concurrency/kernel.c (modified) (6 diffs)
- 
          
  libcfa/concurrency/monitor (modified) (1 diff)
- 
          
  libcfa/concurrency/monitor.c (modified) (16 diffs)
- 
          
  libcfa/concurrency/thread (modified) (1 diff)
- 
          
  libcfa/concurrency/thread.c (modified) (1 diff)
 
Legend:
- Unmodified
- Added
- Removed
- 
      src/Concurrency/Waitfor.ccrf980549 rb18830e 484 484 new CaseStmt( 485 485 noLabels, 486 new ConstantExpr( Constant::from_ ulong( i++) ),486 new ConstantExpr( Constant::from_int( -2 ) ), 487 487 { 488 488 waitfor->timeout.statement, … … 501 501 new CaseStmt( 502 502 noLabels, 503 new ConstantExpr( Constant::from_ ulong( i++) ),503 new ConstantExpr( Constant::from_int( -1 ) ), 504 504 { 505 505 waitfor->orelse.statement, 
- 
      src/libcfa/concurrency/invoke.hrf980549 rb18830e 94 94 unsigned short acceptable_count; // number of acceptable functions 95 95 short accepted_index; // the index of the accepted function, -1 if none 96 }; 96 }; 97 98 struct __monitor_group { 99 struct monitor_desc ** list; // currently held monitors 100 short size; // number of currently held monitors 101 fptr_t func; // last function that acquired monitors 102 }; 97 103 98 104 struct thread_desc { 99 105 // Core threading fields 100 struct coroutine_desc cor; // coroutine body used to store context 101 struct monitor_desc mon; // monitor body used for mutual exclusion 106 struct coroutine_desc self_cor; // coroutine body used to store context 107 struct monitor_desc self_mon; // monitor body used for mutual exclusion 108 struct monitor_desc * self_mon_p; // pointer to monitor with sufficient lifetime for current monitors 109 struct __monitor_group monitors; // monitors currently held by this thread 102 110 103 111 // Link lists fields 104 112 struct thread_desc * next; // instrusive link field for threads 105 113 106 // Current status related to monitors 107 struct monitor_desc ** current_monitors; // currently held monitors 108 unsigned short current_monitor_count; // number of currently held monitors 109 fptr_t current_monitor_func; // last function that acquired monitors 114 110 115 }; 116 117 #ifdef __CFORALL__ 118 extern "Cforall" { 119 static inline monitor_desc * ?[?]( const __monitor_group & this, ptrdiff_t index ) { 120 return this.list[index]; 121 } 122 123 static inline bool ?==?( const __monitor_group & lhs, const __monitor_group & rhs ) { 124 if( lhs.size != rhs.size ) return false; 125 if( lhs.func != rhs.func ) return false; 126 127 // Check that all the monitors match 128 for( int i = 0; i < lhs.size; i++ ) { 129 // If not a match, check next function 130 if( lhs[i] != rhs[i] ) return false; 131 } 132 133 return true; 134 } 135 } 136 #endif 111 137 112 138 #endif //_INVOKE_H_ 
- 
      src/libcfa/concurrency/kernel.crf980549 rb18830e 106 106 107 107 void ?{}( thread_desc & this, current_stack_info_t * info) { 108 (this. cor){ info };108 (this.self_cor){ info }; 109 109 } 110 110 … … 115 115 void ?{}(processorCtx_t & this, processor * proc) { 116 116 (this.__cor){ "Processor" }; 117 this.__cor.starter = &mainThread-> cor;117 this.__cor.starter = &mainThread->self_cor; 118 118 this.proc = proc; 119 119 proc->runner = &this; … … 328 328 // if( !thrd ) return; 329 329 verify( thrd ); 330 verify( thrd-> cor.state != Halted );330 verify( thrd->self_cor.state != Halted ); 331 331 332 332 verify( disable_preempt_count > 0 ); … … 373 373 assert(thrd); 374 374 disable_interrupts(); 375 assert( thrd-> cor.state != Halted );375 assert( thrd->self_cor.state != Halted ); 376 376 this_processor->finish.action_code = Schedule; 377 377 this_processor->finish.thrd = thrd; … … 466 466 this_processor = mainProcessor; 467 467 this_thread = mainThread; 468 this_coroutine = &mainThread-> cor;468 this_coroutine = &mainThread->self_cor; 469 469 470 470 // Enable preemption … … 547 547 thread_desc * thrd = kernel_data; 548 548 549 int len = snprintf( abort_text, abort_text_size, "Error occurred while executing task %.256s (%p)", thrd-> cor.name, thrd );549 int len = snprintf( abort_text, abort_text_size, "Error occurred while executing task %.256s (%p)", thrd->self_cor.name, thrd ); 550 550 __lib_debug_write( STDERR_FILENO, abort_text, len ); 551 551 
- 
      src/libcfa/concurrency/monitorrf980549 rb18830e 105 105 106 106 struct __acceptable_t { 107 fptr_t func; 108 unsigned short count; 109 monitor_desc ** monitors; 107 __monitor_group monitors; 110 108 bool is_dtor; 111 109 }; 
- 
      src/libcfa/concurrency/monitor.crf980549 rb18830e 25 25 static inline void set_owner( monitor_desc * this, thread_desc * owner ); 26 26 static inline thread_desc * next_thread( monitor_desc * this ); 27 static inline int is_accepted( thread_desc * owner, monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)());27 static inline int is_accepted( thread_desc * owner, monitor_desc * this, const __monitor_group & monitors ); 28 28 29 29 static inline void lock_all( spinlock ** locks, unsigned short count ); … … 42 42 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ); 43 43 44 static inline thread_desc * search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ); 44 static inline [thread_desc *, int] search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ); 45 46 static inline short count_max( short acc_count, __acceptable_t * acceptables ); 47 static inline short aggregate( monitor_desc ** storage, short count, __acceptable_t * acceptables ); 48 static inline void set_mask ( monitor_desc ** storage, short count, __acceptable_t * acceptables, short acc_count ); 45 49 46 50 //----------------------------------------------------------------------------- … … 68 72 extern "C" { 69 73 // Enter single monitor 70 static void __enter_monitor_desc( monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)() ) { 74 static void __enter_monitor_desc( const __monitor_group & group ) { 75 monitor_desc * this = group.list[0]; 76 71 77 // Lock the monitor spinlock, lock_yield to reduce contention 72 78 lock_yield( &this->lock DEBUG_CTX2 ); … … 89 95 LIB_DEBUG_PRINT_SAFE("Kernel : mon already owned \n"); 90 96 } 91 else if( (this->accepted_index = is_accepted( thrd, this, group , group_cnt, func)) >= 0 ) {97 else if( (this->accepted_index = is_accepted( thrd, this, group)) >= 0 ) { 92 98 // Some one was waiting for us, enter 93 99 set_owner( this, thrd ); … … 146 152 // Should never return 147 153 void __leave_thread_monitor( thread_desc * thrd ) { 148 monitor_desc * this = &thrd-> mon;154 monitor_desc * this = &thrd->self_mon; 149 155 150 156 // Lock the monitor now … … 153 159 disable_interrupts(); 154 160 155 thrd-> cor.state = Halted;161 thrd->self_cor.state = Halted; 156 162 157 163 verifyf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion ); … … 178 184 // Enter multiple monitor 179 185 // relies on the monitor array being sorted 180 static inline void enter( monitor_desc ** monitors, int count, void (*func)()) {181 for(int i = 0; i < count; i++) {182 __enter_monitor_desc( monitors [i], monitors, count, func);186 static inline void enter( __monitor_group monitors ) { 187 for(int i = 0; i < monitors.size; i++) { 188 __enter_monitor_desc( monitors ); 183 189 } 184 190 } … … 203 209 204 210 // Save previous thread context 205 this.prev_mntrs = this_thread-> current_monitors;206 this.prev_count = this_thread-> current_monitor_count;207 this.prev_func = this_thread-> current_monitor_func;211 this.prev_mntrs = this_thread->monitors.list; 212 this.prev_count = this_thread->monitors.size; 213 this.prev_func = this_thread->monitors.func; 208 214 209 215 // Update thread context (needed for conditions) 210 this_thread-> current_monitors= m;211 this_thread-> current_monitor_count= count;212 this_thread-> current_monitor_func= func;216 this_thread->monitors.list = m; 217 this_thread->monitors.size = count; 218 this_thread->monitors.func = func; 213 219 214 220 // Enter the monitors in order 215 enter( this.m, this.count, func ); 221 __monitor_group group = {this.m, this.count, func}; 222 enter( group ); 216 223 } 217 224 … … 223 230 224 231 // Restore thread context 225 this_thread-> current_monitors= this.prev_mntrs;226 this_thread-> current_monitor_count= this.prev_count;227 this_thread-> current_monitor_func= this.prev_func;232 this_thread->monitors.list = this.prev_mntrs; 233 this_thread->monitors.size = this.prev_count; 234 this_thread->monitors.func = this.prev_func; 228 235 } 229 236 … … 315 322 LIB_DEBUG_DO( 316 323 thread_desc * this_thrd = this_thread; 317 if ( this->monitor_count != this_thrd-> current_monitor_count) {318 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd-> current_monitor_count);324 if ( this->monitor_count != this_thrd->monitors.size ) { 325 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->monitors.size ); 319 326 } 320 327 321 328 for(int i = 0; i < this->monitor_count; i++) { 322 if ( this->monitors[i] != this_thrd-> current_monitors[i] ) {323 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd-> current_monitors[i] );329 if ( this->monitors[i] != this_thrd->monitors.list[i] ) { 330 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->monitors.list[i] ); 324 331 } 325 332 } … … 397 404 398 405 //----------------------------------------------------------------------------- 399 // Internal scheduling 400 int __waitfor_internal( unsigned short acc_count, __acceptable_t * acceptables ) { 401 thread_desc * thrd = this_thread; 406 // External scheduling 407 // cases to handle : 408 // - target already there : 409 // block and wake 410 // - dtor already there 411 // put thread on signaller stack 412 // - non-blocking 413 // return else 414 // - timeout 415 // return timeout 416 // - block 417 // setup mask 418 // block 419 int __waitfor_internal( unsigned short acc_count, __acceptable_t * acceptables, int duration ) { 420 // This statment doesn't have a contiguous list of monitors... 421 // Create one! 422 short max = count_max( acc_count, acceptables ); 423 monitor_desc * mon_storage[max]; 424 short actual_count = aggregate( mon_storage, acc_count, acceptables ); 402 425 403 426 // Create storage for monitor context 404 monitor_ctx( acceptables->monitors, acceptables->count );427 monitor_ctx( mon_storage, actual_count ); 405 428 406 429 // Lock all monitors (aggregates the lock them as well) 407 430 lock_all( monitors, locks, count ); 408 431 409 // Create the node specific to this wait operation 410 wait_ctx_primed( thrd, 0 ); 411 412 // Check if the entry queue 413 thread_desc * next = search_entry_queue( acceptables, acc_count, monitors, count ); 414 415 LIB_DEBUG_PRINT_SAFE("Owner(s) :"); 416 for(int i = 0; i < count; i++) { 417 LIB_DEBUG_PRINT_SAFE(" %p", monitors[i]->owner ); 418 } 419 LIB_DEBUG_PRINT_SAFE("\n"); 420 421 LIB_DEBUG_PRINT_SAFE("Passing mon to %p\n", next); 422 423 if( !next ) { 424 // Update acceptables on the current monitors 425 for(int i = 0; i < count; i++) { 426 monitors[i]->acceptables = acceptables; 427 monitors[i]->acceptable_count = acc_count; 428 } 429 } 430 else { 431 for(int i = 0; i < count; i++) { 432 set_owner( monitors[i], next ); 433 } 434 } 432 { 433 // Check if the entry queue 434 thread_desc * next; 435 int index; 436 [next, index] = search_entry_queue( acceptables, acc_count, monitors, count ); 437 438 if( next ) { 439 if( acceptables[index].is_dtor ) { 440 #warning case not implemented 441 } 442 else { 443 save_recursion( monitors, recursions, count ); 444 445 // Everything is ready to go to sleep 446 BlockInternal( locks, count, &next, 1 ); 447 448 449 //WE WOKE UP 450 451 452 //We are back, restore the owners and recursions 453 lock_all( locks, count ); 454 restore_recursion( monitors, recursions, count ); 455 unlock_all( locks, count ); 456 } 457 458 return index; 459 } 460 } 461 462 463 if( duration == 0 ) return -1; 464 465 set_mask( monitors, count, acceptables, acc_count ); 466 467 verifyf( duration < 0, "Timeout on waitfor statments not supported yet."); 435 468 436 469 … … 439 472 440 473 // Everything is ready to go to sleep 441 BlockInternal( locks, count , &next, next ? 1 : 0);474 BlockInternal( locks, count ); 442 475 443 476 … … 485 518 } 486 519 487 static inline int is_accepted( thread_desc * owner, monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)() ) {488 __acceptable_t* accs = this->acceptables; // Optim489 int acc_cnt = this->acceptable_count;490 491 // Check if there are any acceptable functions492 if( !accs ) return -1;493 494 // If this isn't the first monitor to test this, there is no reason to repeat the test.495 if( this != group[0] ) return group[0]->accepted_index;496 497 // For all acceptable functions check if this is the current function.498 OUT_LOOP:499 for( int i = 0; i < acc_cnt; i++ ) {500 __acceptable_t * acc = &accs[i];501 502 // if function matches, check the monitors503 if( acc->func == func ) {504 505 // If the group count is different then it can't be a match506 if( acc->count != group_cnt ) return -1;507 508 // Check that all the monitors match509 for( int j = 0; j < group_cnt; j++ ) {510 // If not a match, check next function511 if( acc->monitors[j] != group[j] ) continue OUT_LOOP;512 }513 514 // It's a complete match, accept the call515 return i;516 }517 }518 519 // No function matched520 return -1;521 }522 523 520 static inline void init( int count, monitor_desc ** monitors, __condition_node_t * waiter, __condition_criterion_t * criteria ) { 524 521 for(int i = 0; i < count; i++) { … … 607 604 if( !this->monitors ) { 608 605 // LIB_DEBUG_PRINT_SAFE("Branding\n"); 609 assertf( thrd-> current_monitors != NULL, "No current monitor to brand condition %p", thrd->current_monitors);610 this->monitor_count = thrd-> current_monitor_count;606 assertf( thrd->monitors.list != NULL, "No current monitor to brand condition %p", thrd->monitors.list ); 607 this->monitor_count = thrd->monitors.size; 611 608 612 609 this->monitors = malloc( this->monitor_count * sizeof( *this->monitors ) ); 613 610 for( int i = 0; i < this->monitor_count; i++ ) { 614 this->monitors[i] = thrd-> current_monitors[i];611 this->monitors[i] = thrd->monitors.list[i]; 615 612 } 616 613 } … … 628 625 } 629 626 630 631 static inline bool match( __acceptable_t * acc, thread_desc * thrd ) { 632 verify( thrd ); 633 verify( acc ); 634 if( acc->func != thrd->current_monitor_func ) return false; 635 636 return true; 637 } 638 639 static inline thread_desc * search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ) { 627 static inline int is_accepted( thread_desc * owner, monitor_desc * this, const __monitor_group & group ) { 628 __acceptable_t* accs = this->acceptables; // Optim 629 int acc_cnt = this->acceptable_count; 630 631 // Check if there are any acceptable functions 632 if( !accs ) return -1; 633 634 // If this isn't the first monitor to test this, there is no reason to repeat the test. 635 if( this != group[0] ) return group[0]->accepted_index; 636 637 // For all acceptable functions check if this is the current function. 638 for( int i = 0; i < acc_cnt; i++ ) { 639 __acceptable_t * acc = &accs[i]; 640 641 if( acc->monitors == group ) return i; 642 } 643 644 // No function matched 645 return -1; 646 } 647 648 static inline [thread_desc *, int] search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ) { 640 649 641 650 __thread_queue_t * entry_queue = &monitors[0]->entry_queue; … … 647 656 { 648 657 // For each acceptable check if it matches 658 int i; 649 659 __acceptable_t * acc_end = acceptables + acc_count; 650 for( __acceptable_t * acc_it = acceptables; acc_it != acc_end; acc_it++ ) {660 for( __acceptable_t * acc_it = acceptables; acc_it != acc_end; acc_it++, i++ ) { 651 661 // Check if we have a match 652 if( match( acc_it, *thrd_it )) {662 if( acc_it->monitors == (*thrd_it)->monitors ) { 653 663 654 664 // If we have a match return it 655 665 // after removeing it from the entry queue 656 return remove( entry_queue, thrd_it );666 return [remove( entry_queue, thrd_it ), i]; 657 667 } 658 668 } 659 669 } 660 670 661 return NULL; 662 } 671 return [0, -1]; 672 } 673 674 static inline short count_max( short acc_count, __acceptable_t * acceptables ) { 675 short max = 0; 676 for( int i = 0; i < acc_count; i++ ) { 677 max += acceptables[i].monitors.size; 678 } 679 return max; 680 } 681 682 static inline short aggregate( monitor_desc ** storage, short count, __acceptable_t * acceptables ) { 683 #warning function not implemented 684 return 0; 685 } 686 687 static inline void set_mask( monitor_desc ** storage, short count, __acceptable_t * acceptables, short acc_count ) { 688 for(int i = 0; i < count; i++) { 689 storage[i]->acceptables = acceptables; 690 storage[i]->acceptable_count = acc_count; 691 storage[i]->accepted_index = -1; 692 } 693 } 694 695 663 696 void ?{}( __condition_blocked_queue_t & this ) { 664 697 this.head = NULL; 
- 
      src/libcfa/concurrency/threadrf980549 rb18830e 36 36 forall( dtype T | is_thread(T) ) 37 37 static inline coroutine_desc* get_coroutine(T & this) { 38 return &get_thread(this)-> cor;38 return &get_thread(this)->self_cor; 39 39 } 40 40 41 41 forall( dtype T | is_thread(T) ) 42 42 static inline monitor_desc* get_monitor(T & this) { 43 return &get_thread(this)-> mon;43 return &get_thread(this)->self_mon; 44 44 } 45 45 46 46 static inline coroutine_desc* get_coroutine(thread_desc * this) { 47 return &this-> cor;47 return &this->self_cor; 48 48 } 49 49 50 50 static inline monitor_desc* get_monitor(thread_desc * this) { 51 return &this-> mon;51 return &this->self_mon; 52 52 } 53 53 
- 
      src/libcfa/concurrency/thread.crf980549 rb18830e 33 33 34 34 void ?{}(thread_desc& this) { 35 (this.cor){}; 36 this.cor.name = "Anonymous Coroutine"; 37 this.mon.owner = &this; 38 this.mon.recursion = 1; 35 (this.self_cor){}; 36 this.self_cor.name = "Anonymous Coroutine"; 37 this.self_mon.owner = &this; 38 this.self_mon.recursion = 1; 39 this.self_mon_p = &this.self_mon; 39 40 this.next = NULL; 40 41 41 this.current_monitors = &this.mon; 42 this.current_monitor_count = 1; 42 (this.monitors){ &this.self_mon_p, 1, (fptr_t)0 }; 43 43 } 44 44 45 45 void ^?{}(thread_desc& this) { 46 ^(this. cor){};46 ^(this.self_cor){}; 47 47 } 48 48 
  Note:
 See   TracChangeset
 for help on using the changeset viewer.
  