Changes in / [221c2de:115a868]
- Files:
-
- 1 deleted
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/proposals/concurrency/thePlan.md
r221c2de r115a868 10 10 done - Multi monitors calls, 11 11 done - Monitors as a language feature (not calling enter/leave by hand) 12 Internal scheduling 12 13 13 _Phase 3_ : Monitor features 14 Internal scheduling 15 External scheduling 16 17 _Phase 4_ : Kernel features 14 _Phase 3_ : Kernel features 18 15 Preemption 19 16 Detach thread 20 17 Cluster migration 18 19 _Phase 4_ : Monitor features 20 External scheduling 21 21 22 22 _Phase 5_ : Performance -
src/libcfa/concurrency/invoke.h
r221c2de r115a868 33 33 }; 34 34 35 struct __thread_queue_t {35 struct simple_thread_list { 36 36 struct thread_desc * head; 37 37 struct thread_desc ** tail; 38 38 }; 39 39 40 struct __thread_stack_t {41 struct thread_desc * top;42 };43 44 40 #ifdef __CFORALL__ 45 41 extern "Cforall" { 46 void ?{}( struct __thread_queue_t * ); 47 void append( struct __thread_queue_t *, struct thread_desc * ); 48 struct thread_desc * pop_head( struct __thread_queue_t * ); 49 50 void ?{}( struct __thread_stack_t * ); 51 void push( struct __thread_stack_t *, struct thread_desc * ); 52 struct thread_desc * pop( struct __thread_stack_t * ); 42 void ?{}( struct simple_thread_list * ); 43 void append( struct simple_thread_list *, struct thread_desc * ); 44 struct thread_desc * pop_head( struct simple_thread_list * ); 53 45 54 46 void ?{}(spinlock * this); … … 58 50 59 51 struct coStack_t { 60 unsigned int size; 61 void *storage; 62 void *limit; 63 void *base; 64 void *context; 65 void *top; 66 bool userStack; 52 unsigned int size; // size of stack 53 void *storage; // pointer to stack 54 void *limit; // stack grows towards stack limit 55 void *base; // base of stack 56 void *context; // address of cfa_context_t 57 void *top; // address of top of storage 58 bool userStack; // whether or not the user allocated the stack 67 59 }; 68 60 … … 70 62 71 63 struct coroutine_desc { 72 struct coStack_t stack; 73 const char *name; 74 int errno_; 75 enum coroutine_state state; 76 struct coroutine_desc *starter; 77 struct coroutine_desc *last; 64 struct coStack_t stack; // stack information of the coroutine 65 const char *name; // textual name for coroutine/task, initialized by uC++ generated code 66 int errno_; // copy of global UNIX variable errno 67 enum coroutine_state state; // current execution status for coroutine 68 struct coroutine_desc *starter; // first coroutine to resume this one 69 struct coroutine_desc *last; // last coroutine to resume this one 78 70 }; 79 71 80 72 struct monitor_desc { 81 struct spinlock lock; // spinlock to protect internal data 82 struct thread_desc * owner; // current owner of the monitor 83 struct __thread_queue_t entry_queue; // queue of threads that are blocked waiting for the monitor 84 struct __thread_stack_t signal_stack; // stack of threads to run next once we exit the monitor 85 struct monitor_desc * stack_owner; // if bulk acquiring was used we need to synchronize signals with an other monitor 86 unsigned int recursion; // monitor routines can be called recursively, we need to keep track of that 73 struct spinlock lock; 74 struct thread_desc * owner; 75 struct simple_thread_list entry_queue; 76 unsigned int recursion; 87 77 }; 88 78 89 79 struct thread_desc { 90 struct coroutine_desc cor; // coroutine body used to store context 91 struct monitor_desc mon; // monitor body used for mutual exclusion 92 struct thread_desc * next; // instrusive link field for threads 93 struct monitor_desc ** current_monitors; // currently held monitors 94 unsigned short current_monitor_count; // number of currently held monitors 80 struct coroutine_desc cor; // coroutine body used to store context 81 struct monitor_desc mon; // monitor body used for mutual exclusion 82 struct thread_desc * next; // instrusive link field for threads 95 83 }; 96 84 -
src/libcfa/concurrency/kernel
r221c2de r115a868 32 32 33 33 struct signal_once { 34 volatile bool cond ;34 volatile bool condition; 35 35 struct spinlock lock; 36 struct __thread_queue_t blocked;36 struct simple_thread_list blocked; 37 37 }; 38 38 … … 46 46 // Cluster 47 47 struct cluster { 48 __thread_queue_t ready_queue;48 simple_thread_list ready_queue; 49 49 spinlock lock; 50 50 }; -
src/libcfa/concurrency/kernel.c
r221c2de r115a868 299 299 // Scheduler routines 300 300 void ScheduleThread( thread_desc * thrd ) { 301 if( !thrd ) return;302 303 301 assertf( thrd->next == NULL, "Expected null got %p", thrd->next ); 304 302 … … 475 473 476 474 void ?{}( signal_once * this ) { 477 this->cond = false;475 this->condition = false; 478 476 } 479 477 void ^?{}( signal_once * this ) { … … 483 481 void wait( signal_once * this ) { 484 482 lock( &this->lock ); 485 if( !this->cond ) {483 if( !this->condition ) { 486 484 append( &this->blocked, this_thread() ); 487 485 ScheduleInternal( &this->lock ); … … 494 492 lock( &this->lock ); 495 493 { 496 this->cond = true;494 this->condition = true; 497 495 498 496 thread_desc * it; … … 506 504 //----------------------------------------------------------------------------- 507 505 // Queues 508 void ?{}( __thread_queue_t * this ) {506 void ?{}( simple_thread_list * this ) { 509 507 this->head = NULL; 510 508 this->tail = &this->head; 511 509 } 512 510 513 void append( __thread_queue_t * this, thread_desc * t ) {511 void append( simple_thread_list * this, thread_desc * t ) { 514 512 assert(this->tail != NULL); 515 513 *this->tail = t; … … 517 515 } 518 516 519 thread_desc * pop_head( __thread_queue_t * this ) {517 thread_desc * pop_head( simple_thread_list * this ) { 520 518 thread_desc * head = this->head; 521 519 if( head ) { … … 528 526 return head; 529 527 } 530 531 void ?{}( __thread_stack_t * this ) {532 this->top = NULL;533 }534 535 void push( __thread_stack_t * this, thread_desc * t ) {536 assert(t->next != NULL);537 t->next = this->top;538 this->top = t;539 }540 541 thread_desc * pop( __thread_stack_t * this ) {542 thread_desc * top = this->top;543 if( top ) {544 this->top = top->next;545 top->next = NULL;546 }547 return top;548 }549 528 // Local Variables: // 550 529 // mode: c // -
src/libcfa/concurrency/monitor
r221c2de r115a868 18 18 #define MONITOR_H 19 19 20 #include <stddef.h>21 22 20 #include "assert" 23 21 #include "invoke.h" … … 25 23 26 24 static inline void ?{}(monitor_desc * this) { 27 this->owner = NULL; 28 this->stack_owner = NULL; 25 this->owner = 0; 29 26 this->recursion = 0; 30 27 } 28 29 //Array entering routine 30 void enter(monitor_desc **, int count); 31 void leave(monitor_desc **, int count); 31 32 32 33 struct monitor_guard_t { 33 34 monitor_desc ** m; 34 35 int count; 35 monitor_desc ** prev_mntrs;36 unsigned short prev_count;37 36 }; 38 37 … … 41 40 } 42 41 43 void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ); 44 void ^?{}( monitor_guard_t * this ); 45 46 //----------------------------------------------------------------------------- 47 // Internal scheduling 48 struct condition { 49 __thread_queue_t blocked; 50 monitor_desc ** monitors; 51 unsigned short monitor_count; 52 }; 53 54 static inline void ?{}( condition * this ) { 55 this->monitors = NULL; 56 this->monitor_count = 0; 42 static inline void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) { 43 this->m = m; 44 this->count = count; 45 qsort(this->m, count); 46 enter( this->m, this->count ); 57 47 } 58 48 59 void wait( condition * this ); 60 void signal( condition * this ); 49 static inline void ^?{}( monitor_guard_t * this ) { 50 leave( this->m, this->count ); 51 } 52 53 61 54 #endif //MONITOR_H -
src/libcfa/concurrency/monitor.c
r221c2de r115a868 18 18 19 19 #include "kernel_private.h" 20 #include "libhdr.h"21 22 void set_owner( monitor_desc * this, thread_desc * owner ) {23 //Pass the monitor appropriately24 this->owner = owner;25 26 //We are passing the monitor to someone else, which means recursion level is not 027 this->recursion = owner ? 1 : 0;28 }29 20 30 21 extern "C" { 31 void __enter_monitor_desc(monitor_desc * this , monitor_desc * leader) {22 void __enter_monitor_desc(monitor_desc * this) { 32 23 lock( &this->lock ); 33 24 thread_desc * thrd = this_thread(); 34 35 //Update the stack owner36 this->stack_owner = leader;37 25 38 26 if( !this->owner ) { … … 56 44 57 45 unlock( &this->lock ); 58 return;59 46 } 60 47 61 // leave pseudo code : 62 // decrement level 63 // leve == 0 ? 64 // no : done 65 // yes : 66 // signal stack empty ? 67 // has leader : 68 // bulk acquiring means we don't own the signal stack 69 // ignore it but don't release the monitor 70 // yes : 71 // next in entry queue is new owner 72 // no : 73 // top of the signal stack is the owner 74 // context switch to him right away 75 // 76 void __leave_monitor_desc(monitor_desc * this, monitor_desc * leader) { 48 void __leave_monitor_desc(monitor_desc * this) { 77 49 lock( &this->lock ); 78 50 79 51 thread_desc * thrd = this_thread(); 80 assert( thrd == this->owner || this->stack_owner);52 assert( thrd == this->owner ); 81 53 82 54 //Leaving a recursion level, decrement the counter 83 55 this->recursion -= 1; 84 56 85 //If we haven't left the last level of recursion 86 //it means we don't need to do anything 87 if( this->recursion != 0) { 88 this->stack_owner = leader; 89 unlock( &this->lock ); 90 return; 91 } 92 93 //If we don't own the signal stack then just leave it to the owner 94 if( this->stack_owner ) { 95 this->stack_owner = leader; 96 unlock( &this->lock ); 97 return; 98 } 57 //If we left the last level of recursion it means we are changing who owns the monitor 58 thread_desc * new_owner = 0; 59 if( this->recursion == 0) { 60 //Get the next thread in the list 61 new_owner = this->owner = pop_head( &this->entry_queue ); 99 62 100 //We are the stack owner and have left the last recursion level.101 //We are in charge of passing the monitor102 thread_desc * new_owner = 0;63 //We are passing the monitor to someone else, which means recursion level is not 0 64 this->recursion = new_owner ? 1 : 0; 65 } 103 66 104 //Check the signaller stack105 new_owner = pop( &this->signal_stack );106 if( new_owner ) {107 //The signaller stack is not empty,108 //transfer control immediately109 set_owner( this, new_owner );110 this->stack_owner = leader;111 ScheduleInternal( &this->lock, new_owner );112 return;113 }114 115 // No signaller thread116 // Get the next thread in the entry_queue117 new_owner = pop_head( &this->entry_queue );118 set_owner( this, new_owner );119 120 //Update the stack owner121 this->stack_owner = leader;122 123 //We can now let other threads in safely124 67 unlock( &this->lock ); 125 68 126 //We need to wake-up the thread 127 ScheduleThread( new_owner ); 69 //If we have a new owner, we need to wake-up the thread 70 if( new_owner ) { 71 ScheduleThread( new_owner ); 72 } 128 73 } 129 74 } 130 75 131 static inline void enter(monitor_desc ** monitors, int count) { 132 __enter_monitor_desc( monitors[0], NULL ); 133 for(int i = 1; i < count; i++) { 134 __enter_monitor_desc( monitors[i], monitors[0] ); 76 void enter(monitor_desc ** monitors, int count) { 77 for(int i = 0; i < count; i++) { 78 __enter_monitor_desc( monitors[i] ); 135 79 } 136 80 } 137 81 138 static inline void leave(monitor_desc ** monitors, int count) { 139 __leave_monitor_desc( monitors[0], NULL ); 82 void leave(monitor_desc ** monitors, int count) { 140 83 for(int i = count - 1; i >= 0; i--) { 141 __leave_monitor_desc( monitors[i] , monitors[0]);84 __leave_monitor_desc( monitors[i] ); 142 85 } 143 86 } 144 145 void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) {146 this->m = m;147 this->count = count;148 qsort(this->m, count);149 enter( this->m, this->count );150 151 this->prev_mntrs = this_thread()->current_monitors;152 this->prev_count = this_thread()->current_monitor_count;153 154 this_thread()->current_monitors = m;155 this_thread()->current_monitor_count = count;156 }157 158 void ^?{}( monitor_guard_t * this ) {159 leave( this->m, this->count );160 161 this_thread()->current_monitors = this->prev_mntrs;162 this_thread()->current_monitor_count = this->prev_count;163 }164 165 //-----------------------------------------------------------------------------166 // Internal scheduling167 void wait( condition * this ) {168 // LIB_DEBUG_FPRINTF("Waiting\n");169 thread_desc * this_thrd = this_thread();170 171 if( !this->monitors ) {172 this->monitors = this_thrd->current_monitors;173 this->monitor_count = this_thrd->current_monitor_count;174 }175 176 unsigned short count = this->monitor_count;177 178 //Check that everything is as expected179 assert( this->monitors != NULL );180 assert( this->monitor_count != 0 );181 182 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later183 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal184 185 // LIB_DEBUG_FPRINTF("Getting ready to wait\n");186 187 //Loop on all the monitors and release the owner188 for( unsigned int i = 0; i < count; i++ ) {189 monitor_desc * cur = this->monitors[i];190 191 assert( cur );192 193 // LIB_DEBUG_FPRINTF("cur %p lock %p\n", cur, &cur->lock);194 195 //Store the locks for later196 locks[i] = &cur->lock;197 198 //Protect the monitors199 lock( locks[i] );200 {201 //Save the recursion levels202 recursions[i] = cur->recursion;203 204 //Release the owner205 cur->recursion = 0;206 cur->owner = NULL;207 }208 //Release the monitor209 unlock( locks[i] );210 }211 212 // LIB_DEBUG_FPRINTF("Waiting now\n");213 214 //Everything is ready to go to sleep215 ScheduleInternal( locks, count );216 217 218 //WE WOKE UP219 220 221 //We are back, restore the owners and recursions222 for( unsigned int i = 0; i < count; i++ ) {223 monitor_desc * cur = this->monitors[i];224 225 //Protect the monitors226 lock( locks[i] );227 {228 //Release the owner229 cur->owner = this_thrd;230 cur->recursion = recursions[i];231 }232 //Release the monitor233 unlock( locks[i] );234 }235 }236 237 static void __signal_internal( condition * this ) {238 if( !this->blocked.head ) return;239 240 //Check that everything is as expected241 assert( this->monitors );242 assert( this->monitor_count != 0 );243 244 LIB_DEBUG_DO(245 if ( this->monitors != this_thread()->current_monitors ) {246 abortf( "Signal on condition %p made outside of the correct monitor(s)", this );247 } // if248 );249 250 monitor_desc * owner = this->monitors[0];251 lock( &owner->lock );252 {253 thread_desc * unblock = pop_head( &this->blocked );254 push( &owner->signal_stack, unblock );255 }256 unlock( &owner->lock );257 }258 259 void signal( condition * this ) {260 __signal_internal( this );261 } -
src/libcfa/concurrency/thread.c
r221c2de r115a868 39 39 this->mon.recursion = 1; 40 40 this->next = NULL; 41 42 this->current_monitors = NULL;43 this->current_monitor_count = 0;44 41 } 45 42
Note: See TracChangeset
for help on using the changeset viewer.