Changes in / [8731d8c:4be3669]
- Files:
-
- 1 deleted
- 7 edited
-
doc/proposals/concurrency/thePlan.md (modified) (1 diff)
-
src/libcfa/concurrency/invoke.h (modified) (3 diffs)
-
src/libcfa/concurrency/kernel (modified) (2 diffs)
-
src/libcfa/concurrency/kernel.c (modified) (7 diffs)
-
src/libcfa/concurrency/monitor (modified) (3 diffs)
-
src/libcfa/concurrency/monitor.c (modified) (2 diffs)
-
src/libcfa/concurrency/thread.c (modified) (1 diff)
-
src/tests/sched_internal.c (deleted)
Legend:
- Unmodified
- Added
- Removed
-
doc/proposals/concurrency/thePlan.md
r8731d8c r4be3669 10 10 done - Multi monitors calls, 11 11 done - Monitors as a language feature (not calling enter/leave by hand) 12 Internal scheduling 12 13 13 _Phase 3_ : Monitor features 14 Internal scheduling 15 External scheduling 16 17 _Phase 4_ : Kernel features 14 _Phase 3_ : Kernel features 18 15 Preemption 19 16 Detach thread 20 17 Cluster migration 18 19 _Phase 4_ : Monitor features 20 External scheduling 21 21 22 22 _Phase 5_ : Performance -
src/libcfa/concurrency/invoke.h
r8731d8c r4be3669 33 33 }; 34 34 35 struct __thread_queue_t {35 struct simple_thread_list { 36 36 struct thread_desc * head; 37 37 struct thread_desc ** tail; 38 38 }; 39 39 40 struct __thread_stack_t {41 struct thread_desc * top;42 };43 44 40 #ifdef __CFORALL__ 45 41 extern "Cforall" { 46 void ?{}( struct __thread_queue_t * ); 47 void append( struct __thread_queue_t *, struct thread_desc * ); 48 struct thread_desc * pop_head( struct __thread_queue_t * ); 49 50 void ?{}( struct __thread_stack_t * ); 51 void push( struct __thread_stack_t *, struct thread_desc * ); 52 struct thread_desc * pop( struct __thread_stack_t * ); 42 void ?{}( struct simple_thread_list * ); 43 void append( struct simple_thread_list *, struct thread_desc * ); 44 struct thread_desc * pop_head( struct simple_thread_list * ); 53 45 54 46 void ?{}(spinlock * this); … … 58 50 59 51 struct coStack_t { 60 unsigned int size; // size of stack61 void *storage; // pointer to stack62 void *limit; // stack grows towards stack limit63 void *base; // base of stack64 void *context; // address of cfa_context_t65 void *top; // address of top of storage66 bool userStack; // whether or not the user allocated the stack52 unsigned int size; // size of stack 53 void *storage; // pointer to stack 54 void *limit; // stack grows towards stack limit 55 void *base; // base of stack 56 void *context; // address of cfa_context_t 57 void *top; // address of top of storage 58 bool userStack; // whether or not the user allocated the stack 67 59 }; 68 60 … … 70 62 71 63 struct coroutine_desc { 72 struct coStack_t stack; // stack information of the coroutine73 const char *name; // textual name for coroutine/task, initialized by uC++ generated code74 int errno_; // copy of global UNIX variable errno75 enum coroutine_state state; // current execution status for coroutine76 struct coroutine_desc *starter; // first coroutine to resume this one77 struct coroutine_desc *last; // last coroutine to resume this one64 struct coStack_t stack; // stack information of the coroutine 65 const char *name; // textual name for coroutine/task, initialized by uC++ generated code 66 int errno_; // copy of global UNIX variable errno 67 enum coroutine_state state; // current execution status for coroutine 68 struct coroutine_desc *starter; // first coroutine to resume this one 69 struct coroutine_desc *last; // last coroutine to resume this one 78 70 }; 79 71 80 72 struct monitor_desc { 81 struct spinlock lock; // spinlock to protect internal data 82 struct thread_desc * owner; // current owner of the monitor 83 struct __thread_queue_t entry_queue; // queue of threads that are blocked waiting for the monitor 84 struct __thread_stack_t signal_stack; // stack of threads to run next once we exit the monitor 85 struct monitor_desc * stack_owner; // if bulk acquiring was used we need to synchronize signals with an other monitor 86 unsigned int recursion; // monitor routines can be called recursively, we need to keep track of that 73 struct spinlock lock; 74 struct thread_desc * owner; 75 struct simple_thread_list entry_queue; 76 unsigned int recursion; 87 77 }; 88 78 89 79 struct thread_desc { 90 struct coroutine_desc cor; // coroutine body used to store context 91 struct monitor_desc mon; // monitor body used for mutual exclusion 92 struct thread_desc * next; // instrusive link field for threads 93 struct monitor_desc ** current_monitors; // currently held monitors 94 unsigned short current_monitor_count; // number of currently held monitors 80 struct coroutine_desc cor; // coroutine body used to store context 81 struct monitor_desc mon; // monitor body used for mutual exclusion 82 struct thread_desc * next; // instrusive link field for threads 95 83 }; 96 84 -
src/libcfa/concurrency/kernel
r8731d8c r4be3669 32 32 33 33 struct signal_once { 34 volatile bool cond ;34 volatile bool condition; 35 35 struct spinlock lock; 36 struct __thread_queue_t blocked;36 struct simple_thread_list blocked; 37 37 }; 38 38 … … 46 46 // Cluster 47 47 struct cluster { 48 __thread_queue_t ready_queue;48 simple_thread_list ready_queue; 49 49 spinlock lock; 50 50 }; -
src/libcfa/concurrency/kernel.c
r8731d8c r4be3669 299 299 // Scheduler routines 300 300 void ScheduleThread( thread_desc * thrd ) { 301 if( !thrd ) return;302 303 301 assertf( thrd->next == NULL, "Expected null got %p", thrd->next ); 304 302 … … 475 473 476 474 void ?{}( signal_once * this ) { 477 this->cond = false;475 this->condition = false; 478 476 } 479 477 void ^?{}( signal_once * this ) { … … 483 481 void wait( signal_once * this ) { 484 482 lock( &this->lock ); 485 if( !this->cond ) {483 if( !this->condition ) { 486 484 append( &this->blocked, this_thread() ); 487 485 ScheduleInternal( &this->lock ); … … 494 492 lock( &this->lock ); 495 493 { 496 this->cond = true;494 this->condition = true; 497 495 498 496 thread_desc * it; … … 506 504 //----------------------------------------------------------------------------- 507 505 // Queues 508 void ?{}( __thread_queue_t * this ) {506 void ?{}( simple_thread_list * this ) { 509 507 this->head = NULL; 510 508 this->tail = &this->head; 511 509 } 512 510 513 void append( __thread_queue_t * this, thread_desc * t ) {511 void append( simple_thread_list * this, thread_desc * t ) { 514 512 assert(this->tail != NULL); 515 513 *this->tail = t; … … 517 515 } 518 516 519 thread_desc * pop_head( __thread_queue_t * this ) {517 thread_desc * pop_head( simple_thread_list * this ) { 520 518 thread_desc * head = this->head; 521 519 if( head ) { … … 528 526 return head; 529 527 } 530 531 void ?{}( __thread_stack_t * this ) {532 this->top = NULL;533 }534 535 void push( __thread_stack_t * this, thread_desc * t ) {536 assert(t->next != NULL);537 t->next = this->top;538 this->top = t;539 }540 541 thread_desc * pop( __thread_stack_t * this ) {542 thread_desc * top = this->top;543 if( top ) {544 this->top = top->next;545 top->next = NULL;546 }547 return top;548 }549 528 // Local Variables: // 550 529 // mode: c // -
src/libcfa/concurrency/monitor
r8731d8c r4be3669 18 18 #define MONITOR_H 19 19 20 #include <stddef.h>21 22 20 #include "assert" 23 21 #include "invoke.h" … … 25 23 26 24 static inline void ?{}(monitor_desc * this) { 27 this->owner = NULL; 28 this->stack_owner = NULL; 25 this->owner = 0; 29 26 this->recursion = 0; 30 27 } 28 29 //Array entering routine 30 void enter(monitor_desc **, int count); 31 void leave(monitor_desc **, int count); 31 32 32 33 struct monitor_guard_t { 33 34 monitor_desc ** m; 34 35 int count; 35 monitor_desc ** prev_mntrs;36 unsigned short prev_count;37 36 }; 38 37 … … 41 40 } 42 41 43 void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ); 44 void ^?{}( monitor_guard_t * this ); 45 46 //----------------------------------------------------------------------------- 47 // Internal scheduling 48 struct condition { 49 __thread_queue_t blocked; 50 monitor_desc ** monitors; 51 unsigned short monitor_count; 52 }; 53 54 static inline void ?{}( condition * this ) { 55 this->monitors = NULL; 56 this->monitor_count = 0; 42 static inline void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) { 43 this->m = m; 44 this->count = count; 45 qsort(this->m, count); 46 enter( this->m, this->count ); 57 47 } 58 48 59 void wait( condition * this ); 60 void signal( condition * this ); 49 static inline void ^?{}( monitor_guard_t * this ) { 50 leave( this->m, this->count ); 51 } 52 53 61 54 #endif //MONITOR_H -
src/libcfa/concurrency/monitor.c
r8731d8c r4be3669 18 18 19 19 #include "kernel_private.h" 20 #include "libhdr.h"21 22 void set_owner( monitor_desc * this, thread_desc * owner ) {23 //Pass the monitor appropriately24 this->owner = owner;25 26 //We are passing the monitor to someone else, which means recursion level is not 027 this->recursion = owner ? 1 : 0;28 }29 20 30 21 extern "C" { 31 void __enter_monitor_desc(monitor_desc * this , monitor_desc * leader) {22 void __enter_monitor_desc(monitor_desc * this) { 32 23 lock( &this->lock ); 33 24 thread_desc * thrd = this_thread(); 34 25 35 // //Update the stack owner36 // this->stack_owner = leader;37 38 LIB_DEBUG_PRINT_SAFE("Entering %p (o: %p, r: %i)\n", this, this->owner, this->recursion);39 40 26 if( !this->owner ) { 41 27 //No one has the monitor, just take it 42 set_owner( this, thrd ); 28 this->owner = thrd; 29 this->recursion = 1; 43 30 } 44 31 else if( this->owner == thrd) { … … 57 44 58 45 unlock( &this->lock ); 59 return;60 46 } 61 47 62 // leave pseudo code : 63 // decrement level 64 // leve == 0 ? 65 // no : done 66 // yes : 67 // signal stack empty ? 68 // has leader : 69 // bulk acquiring means we don't own the signal stack 70 // ignore it but don't release the monitor 71 // yes : 72 // next in entry queue is new owner 73 // no : 74 // top of the signal stack is the owner 75 // context switch to him right away 76 // 77 void __leave_monitor_desc(monitor_desc * this, monitor_desc * leader) { 48 void __leave_monitor_desc(monitor_desc * this) { 78 49 lock( &this->lock ); 79 50 80 LIB_DEBUG_PRINT_SAFE("Leaving %p (o: %p, r: %i)\n", this, this->owner, this->recursion);81 82 51 thread_desc * thrd = this_thread(); 83 assert f( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", this->owner, thrd, this->recursion);52 assert( thrd == this->owner ); 84 53 85 54 //Leaving a recursion level, decrement the counter 86 55 this->recursion -= 1; 87 56 88 //If we haven't left the last level of recursion 89 //it means we don't need to do anything 90 if( this->recursion != 0) { 91 // this->stack_owner = leader; 92 unlock( &this->lock ); 93 return; 94 } 95 96 // //If we don't own the signal stack then just leave it to the owner 97 // if( this->stack_owner ) { 98 // this->stack_owner = leader; 99 // unlock( &this->lock ); 100 // return; 101 // } 57 //If we left the last level of recursion it means we are changing who owns the monitor 58 thread_desc * new_owner = 0; 59 if( this->recursion == 0) { 60 //Get the next thread in the list 61 new_owner = this->owner = pop_head( &this->entry_queue ); 102 62 103 //We are the stack owner and have left the last recursion level.104 //We are in charge of passing the monitor105 thread_desc * new_owner = 0;63 //We are passing the monitor to someone else, which means recursion level is not 0 64 this->recursion = new_owner ? 1 : 0; 65 } 106 66 107 //Check the signaller stack108 new_owner = pop( &this->signal_stack );109 if( new_owner ) {110 //The signaller stack is not empty,111 //transfer control immediately112 set_owner( this, new_owner );113 // this->stack_owner = leader;114 ScheduleInternal( &this->lock, new_owner );115 return;116 }117 118 // No signaller thread119 // Get the next thread in the entry_queue120 new_owner = pop_head( &this->entry_queue );121 set_owner( this, new_owner );122 123 // //Update the stack owner124 // this->stack_owner = leader;125 126 //We can now let other threads in safely127 67 unlock( &this->lock ); 128 68 129 //We need to wake-up the thread 130 ScheduleThread( new_owner ); 69 //If we have a new owner, we need to wake-up the thread 70 if( new_owner ) { 71 ScheduleThread( new_owner ); 72 } 131 73 } 132 74 } 133 75 134 static inline void enter(monitor_desc ** monitors, int count) { 135 __enter_monitor_desc( monitors[0], NULL ); 136 for(int i = 1; i < count; i++) { 137 __enter_monitor_desc( monitors[i], monitors[0] ); 76 void enter(monitor_desc ** monitors, int count) { 77 for(int i = 0; i < count; i++) { 78 __enter_monitor_desc( monitors[i] ); 138 79 } 139 80 } 140 81 141 static inline void leave(monitor_desc ** monitors, int count) { 142 __leave_monitor_desc( monitors[0], NULL ); 143 for(int i = count - 1; i >= 1; i--) { 144 __leave_monitor_desc( monitors[i], monitors[0] ); 82 void leave(monitor_desc ** monitors, int count) { 83 for(int i = count - 1; i >= 0; i--) { 84 __leave_monitor_desc( monitors[i] ); 145 85 } 146 86 } 147 148 void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) {149 this->m = m;150 this->count = count;151 qsort(this->m, count);152 enter( this->m, this->count );153 154 this->prev_mntrs = this_thread()->current_monitors;155 this->prev_count = this_thread()->current_monitor_count;156 157 this_thread()->current_monitors = m;158 this_thread()->current_monitor_count = count;159 }160 161 void ^?{}( monitor_guard_t * this ) {162 leave( this->m, this->count );163 164 this_thread()->current_monitors = this->prev_mntrs;165 this_thread()->current_monitor_count = this->prev_count;166 }167 168 //-----------------------------------------------------------------------------169 // Internal scheduling170 void wait( condition * this ) {171 assertf(false, "NO SUPPORTED");172 // LIB_DEBUG_FPRINTF("Waiting\n");173 thread_desc * this_thrd = this_thread();174 175 if( !this->monitors ) {176 this->monitors = this_thrd->current_monitors;177 this->monitor_count = this_thrd->current_monitor_count;178 }179 180 unsigned short count = this->monitor_count;181 182 //Check that everything is as expected183 assert( this->monitors != NULL );184 assert( this->monitor_count != 0 );185 186 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later187 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal188 189 // LIB_DEBUG_FPRINTF("Getting ready to wait\n");190 191 //Loop on all the monitors and release the owner192 for( unsigned int i = 0; i < count; i++ ) {193 monitor_desc * cur = this->monitors[i];194 195 assert( cur );196 197 // LIB_DEBUG_FPRINTF("cur %p lock %p\n", cur, &cur->lock);198 199 //Store the locks for later200 locks[i] = &cur->lock;201 202 //Protect the monitors203 lock( locks[i] );204 {205 //Save the recursion levels206 recursions[i] = cur->recursion;207 208 //Release the owner209 cur->recursion = 0;210 cur->owner = NULL;211 }212 //Release the monitor213 unlock( locks[i] );214 }215 216 // LIB_DEBUG_FPRINTF("Waiting now\n");217 218 //Everything is ready to go to sleep219 ScheduleInternal( locks, count );220 221 222 //WE WOKE UP223 224 225 //We are back, restore the owners and recursions226 for( unsigned int i = 0; i < count; i++ ) {227 monitor_desc * cur = this->monitors[i];228 229 //Protect the monitors230 lock( locks[i] );231 {232 //Release the owner233 cur->owner = this_thrd;234 cur->recursion = recursions[i];235 }236 //Release the monitor237 unlock( locks[i] );238 }239 }240 241 static void __signal_internal( condition * this ) {242 assertf(false, "NO SUPPORTED");243 if( !this->blocked.head ) return;244 245 //Check that everything is as expected246 assert( this->monitors );247 assert( this->monitor_count != 0 );248 249 LIB_DEBUG_DO(250 if ( this->monitors != this_thread()->current_monitors ) {251 abortf( "Signal on condition %p made outside of the correct monitor(s)", this );252 } // if253 );254 255 monitor_desc * owner = this->monitors[0];256 lock( &owner->lock );257 {258 thread_desc * unblock = pop_head( &this->blocked );259 push( &owner->signal_stack, unblock );260 }261 unlock( &owner->lock );262 }263 264 void signal( condition * this ) {265 __signal_internal( this );266 } -
src/libcfa/concurrency/thread.c
r8731d8c r4be3669 39 39 this->mon.recursion = 1; 40 40 this->next = NULL; 41 42 this->current_monitors = NULL;43 this->current_monitor_count = 0;44 41 } 45 42
Note:
See TracChangeset
for help on using the changeset viewer.