Changes in / [1d29d46:727cf70f]
- Files:
-
- 1 added
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
doc/proposals/concurrency/thePlan.md
r1d29d46 r727cf70f 10 10 done - Multi monitors calls, 11 11 done - Monitors as a language feature (not calling enter/leave by hand) 12 13 _Phase 3_ : Monitor features 12 14 Internal scheduling 15 External scheduling 13 16 14 _Phase 3_ : Kernel features17 _Phase 4_ : Kernel features 15 18 Preemption 16 19 Detach thread 17 20 Cluster migration 18 19 _Phase 4_ : Monitor features20 External scheduling21 21 22 22 _Phase 5_ : Performance -
src/libcfa/concurrency/invoke.h
r1d29d46 r727cf70f 33 33 }; 34 34 35 struct simple_thread_list {35 struct __thread_queue_t { 36 36 struct thread_desc * head; 37 37 struct thread_desc ** tail; 38 38 }; 39 39 40 struct __thread_stack_t { 41 struct thread_desc * top; 42 }; 43 40 44 #ifdef __CFORALL__ 41 45 extern "Cforall" { 42 void ?{}( struct simple_thread_list * ); 43 void append( struct simple_thread_list *, struct thread_desc * ); 44 struct thread_desc * pop_head( struct simple_thread_list * ); 46 void ?{}( struct __thread_queue_t * ); 47 void append( struct __thread_queue_t *, struct thread_desc * ); 48 struct thread_desc * pop_head( struct __thread_queue_t * ); 49 50 void ?{}( struct __thread_stack_t * ); 51 void push( struct __thread_stack_t *, struct thread_desc * ); 52 struct thread_desc * pop( struct __thread_stack_t * ); 45 53 46 54 void ?{}(spinlock * this); … … 50 58 51 59 struct coStack_t { 52 unsigned int size; // size of stack53 void *storage; // pointer to stack54 void *limit; // stack grows towards stack limit55 void *base; // base of stack56 void *context; // address of cfa_context_t57 void *top; // address of top of storage58 bool userStack; // whether or not the user allocated the stack60 unsigned int size; // size of stack 61 void *storage; // pointer to stack 62 void *limit; // stack grows towards stack limit 63 void *base; // base of stack 64 void *context; // address of cfa_context_t 65 void *top; // address of top of storage 66 bool userStack; // whether or not the user allocated the stack 59 67 }; 60 68 … … 62 70 63 71 struct coroutine_desc { 64 struct coStack_t stack; // stack information of the coroutine65 const char *name; // textual name for coroutine/task, initialized by uC++ generated code66 int errno_; // copy of global UNIX variable errno67 enum coroutine_state state; // current execution status for coroutine68 struct coroutine_desc *starter; // first coroutine to resume this one69 struct coroutine_desc *last; // last coroutine to resume this one72 struct coStack_t stack; // stack information of the coroutine 73 const char *name; // textual name for coroutine/task, initialized by uC++ generated code 74 int errno_; // copy of global UNIX variable errno 75 enum coroutine_state state; // current execution status for coroutine 76 struct coroutine_desc *starter; // first coroutine to resume this one 77 struct coroutine_desc *last; // last coroutine to resume this one 70 78 }; 71 79 72 80 struct monitor_desc { 73 struct spinlock lock; 74 struct thread_desc * owner; 75 struct simple_thread_list entry_queue; 76 unsigned int recursion; 81 struct spinlock lock; // spinlock to protect internal data 82 struct thread_desc * owner; // current owner of the monitor 83 struct __thread_queue_t entry_queue; // queue of threads that are blocked waiting for the monitor 84 struct __thread_stack_t signal_stack; // stack of threads to run next once we exit the monitor 85 struct monitor_desc * stack_owner; // if bulk acquiring was used we need to synchronize signals with an other monitor 86 unsigned int recursion; // monitor routines can be called recursively, we need to keep track of that 77 87 }; 78 88 79 89 struct thread_desc { 80 struct coroutine_desc cor; // coroutine body used to store context 81 struct monitor_desc mon; // monitor body used for mutual exclusion 82 struct thread_desc * next; // instrusive link field for threads 90 struct coroutine_desc cor; // coroutine body used to store context 91 struct monitor_desc mon; // monitor body used for mutual exclusion 92 struct thread_desc * next; // instrusive link field for threads 93 struct monitor_desc ** current_monitors; // currently held monitors 94 unsigned short current_monitor_count; // number of currently held monitors 83 95 }; 84 96 -
src/libcfa/concurrency/kernel
r1d29d46 r727cf70f 32 32 33 33 struct signal_once { 34 volatile bool cond ition;34 volatile bool cond; 35 35 struct spinlock lock; 36 struct simple_thread_list blocked;36 struct __thread_queue_t blocked; 37 37 }; 38 38 … … 46 46 // Cluster 47 47 struct cluster { 48 simple_thread_list ready_queue;48 __thread_queue_t ready_queue; 49 49 spinlock lock; 50 50 }; -
src/libcfa/concurrency/kernel.c
r1d29d46 r727cf70f 299 299 // Scheduler routines 300 300 void ScheduleThread( thread_desc * thrd ) { 301 if( !thrd ) return; 302 301 303 assertf( thrd->next == NULL, "Expected null got %p", thrd->next ); 302 304 … … 473 475 474 476 void ?{}( signal_once * this ) { 475 this->cond ition= false;477 this->cond = false; 476 478 } 477 479 void ^?{}( signal_once * this ) { … … 481 483 void wait( signal_once * this ) { 482 484 lock( &this->lock ); 483 if( !this->cond ition) {485 if( !this->cond ) { 484 486 append( &this->blocked, this_thread() ); 485 487 ScheduleInternal( &this->lock ); … … 492 494 lock( &this->lock ); 493 495 { 494 this->cond ition= true;496 this->cond = true; 495 497 496 498 thread_desc * it; … … 504 506 //----------------------------------------------------------------------------- 505 507 // Queues 506 void ?{}( simple_thread_list * this ) {508 void ?{}( __thread_queue_t * this ) { 507 509 this->head = NULL; 508 510 this->tail = &this->head; 509 511 } 510 512 511 void append( simple_thread_list * this, thread_desc * t ) {513 void append( __thread_queue_t * this, thread_desc * t ) { 512 514 assert(this->tail != NULL); 513 515 *this->tail = t; … … 515 517 } 516 518 517 thread_desc * pop_head( simple_thread_list * this ) {519 thread_desc * pop_head( __thread_queue_t * this ) { 518 520 thread_desc * head = this->head; 519 521 if( head ) { … … 526 528 return head; 527 529 } 530 531 void ?{}( __thread_stack_t * this ) { 532 this->top = NULL; 533 } 534 535 void push( __thread_stack_t * this, thread_desc * t ) { 536 assert(t->next != NULL); 537 t->next = this->top; 538 this->top = t; 539 } 540 541 thread_desc * pop( __thread_stack_t * this ) { 542 thread_desc * top = this->top; 543 if( top ) { 544 this->top = top->next; 545 top->next = NULL; 546 } 547 return top; 548 } 528 549 // Local Variables: // 529 550 // mode: c // -
src/libcfa/concurrency/monitor
r1d29d46 r727cf70f 18 18 #define MONITOR_H 19 19 20 #include <stddef.h> 21 20 22 #include "assert" 21 23 #include "invoke.h" … … 23 25 24 26 static inline void ?{}(monitor_desc * this) { 25 this->owner = 0; 27 this->owner = NULL; 28 this->stack_owner = NULL; 26 29 this->recursion = 0; 27 30 } 28 29 //Array entering routine30 void enter(monitor_desc **, int count);31 void leave(monitor_desc **, int count);32 31 33 32 struct monitor_guard_t { 34 33 monitor_desc ** m; 35 34 int count; 35 monitor_desc ** prev_mntrs; 36 unsigned short prev_count; 36 37 }; 37 38 … … 40 41 } 41 42 42 static inline void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) { 43 this->m = m; 44 this->count = count; 45 qsort(this->m, count); 46 enter( this->m, this->count ); 43 void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ); 44 void ^?{}( monitor_guard_t * this ); 45 46 //----------------------------------------------------------------------------- 47 // Internal scheduling 48 struct condition { 49 __thread_queue_t blocked; 50 monitor_desc ** monitors; 51 unsigned short monitor_count; 52 }; 53 54 static inline void ?{}( condition * this ) { 55 this->monitors = NULL; 56 this->monitor_count = 0; 47 57 } 48 58 49 static inline void ^?{}( monitor_guard_t * this ) { 50 leave( this->m, this->count ); 51 } 52 53 59 void wait( condition * this ); 60 void signal( condition * this ); 54 61 #endif //MONITOR_H -
src/libcfa/concurrency/monitor.c
r1d29d46 r727cf70f 18 18 19 19 #include "kernel_private.h" 20 #include "libhdr.h" 21 22 void set_owner( monitor_desc * this, thread_desc * owner ) { 23 //Pass the monitor appropriately 24 this->owner = owner; 25 26 //We are passing the monitor to someone else, which means recursion level is not 0 27 this->recursion = owner ? 1 : 0; 28 } 20 29 21 30 extern "C" { 22 void __enter_monitor_desc(monitor_desc * this ) {31 void __enter_monitor_desc(monitor_desc * this, monitor_desc * leader) { 23 32 lock( &this->lock ); 24 33 thread_desc * thrd = this_thread(); 34 35 //Update the stack owner 36 this->stack_owner = leader; 25 37 26 38 if( !this->owner ) { … … 44 56 45 57 unlock( &this->lock ); 46 } 47 48 void __leave_monitor_desc(monitor_desc * this) { 58 return; 59 } 60 61 // leave pseudo code : 62 // decrement level 63 // leve == 0 ? 64 // no : done 65 // yes : 66 // signal stack empty ? 67 // has leader : 68 // bulk acquiring means we don't own the signal stack 69 // ignore it but don't release the monitor 70 // yes : 71 // next in entry queue is new owner 72 // no : 73 // top of the signal stack is the owner 74 // context switch to him right away 75 // 76 void __leave_monitor_desc(monitor_desc * this, monitor_desc * leader) { 49 77 lock( &this->lock ); 50 78 51 79 thread_desc * thrd = this_thread(); 52 assert( thrd == this->owner );80 assert( thrd == this->owner || this->stack_owner ); 53 81 54 82 //Leaving a recursion level, decrement the counter 55 83 this->recursion -= 1; 56 84 57 //If we left the last level of recursion it means we are changing who owns the monitor 85 //If we haven't left the last level of recursion 86 //it means we don't need to do anything 87 if( this->recursion != 0) { 88 this->stack_owner = leader; 89 unlock( &this->lock ); 90 return; 91 } 92 93 //If we don't own the signal stack then just leave it to the owner 94 if( this->stack_owner ) { 95 this->stack_owner = leader; 96 unlock( &this->lock ); 97 return; 98 } 99 100 //We are the stack owner and have left the last recursion level. 101 //We are in charge of passing the monitor 58 102 thread_desc * new_owner = 0; 59 if( this->recursion == 0) { 60 //Get the next thread in the list 61 new_owner = this->owner = pop_head( &this->entry_queue ); 62 63 //We are passing the monitor to someone else, which means recursion level is not 0 64 this->recursion = new_owner ? 1 : 0; 65 } 66 103 104 //Check the signaller stack 105 new_owner = pop( &this->signal_stack ); 106 if( new_owner ) { 107 //The signaller stack is not empty, 108 //transfer control immediately 109 set_owner( this, new_owner ); 110 this->stack_owner = leader; 111 ScheduleInternal( &this->lock, new_owner ); 112 return; 113 } 114 115 // No signaller thread 116 // Get the next thread in the entry_queue 117 new_owner = pop_head( &this->entry_queue ); 118 set_owner( this, new_owner ); 119 120 //Update the stack owner 121 this->stack_owner = leader; 122 123 //We can now let other threads in safely 67 124 unlock( &this->lock ); 68 125 69 // If we have a new owner, we need to wake-up the thread70 if( new_owner ) {71 ScheduleThread( new_owner );72 73 } 74 } 75 76 void enter(monitor_desc ** monitors, int count) {77 for(int i = 0; i < count; i++) {78 __enter_monitor_desc( monitors[i] );79 80 } 81 82 void leave(monitor_desc ** monitors, int count) { 126 //We need to wake-up the thread 127 ScheduleThread( new_owner ); 128 } 129 } 130 131 static inline void enter(monitor_desc ** monitors, int count) { 132 __enter_monitor_desc( monitors[0], NULL ); 133 for(int i = 1; i < count; i++) { 134 __enter_monitor_desc( monitors[i], monitors[0] ); 135 } 136 } 137 138 static inline void leave(monitor_desc ** monitors, int count) { 139 __leave_monitor_desc( monitors[0], NULL ); 83 140 for(int i = count - 1; i >= 0; i--) { 84 __leave_monitor_desc( monitors[i] ); 85 } 86 } 141 __leave_monitor_desc( monitors[i], monitors[0] ); 142 } 143 } 144 145 void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) { 146 this->m = m; 147 this->count = count; 148 qsort(this->m, count); 149 enter( this->m, this->count ); 150 151 this->prev_mntrs = this_thread()->current_monitors; 152 this->prev_count = this_thread()->current_monitor_count; 153 154 this_thread()->current_monitors = m; 155 this_thread()->current_monitor_count = count; 156 } 157 158 void ^?{}( monitor_guard_t * this ) { 159 leave( this->m, this->count ); 160 161 this_thread()->current_monitors = this->prev_mntrs; 162 this_thread()->current_monitor_count = this->prev_count; 163 } 164 165 //----------------------------------------------------------------------------- 166 // Internal scheduling 167 void wait( condition * this ) { 168 // LIB_DEBUG_FPRINTF("Waiting\n"); 169 thread_desc * this_thrd = this_thread(); 170 171 if( !this->monitors ) { 172 this->monitors = this_thrd->current_monitors; 173 this->monitor_count = this_thrd->current_monitor_count; 174 } 175 176 unsigned short count = this->monitor_count; 177 178 //Check that everything is as expected 179 assert( this->monitors != NULL ); 180 assert( this->monitor_count != 0 ); 181 182 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later 183 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal 184 185 // LIB_DEBUG_FPRINTF("Getting ready to wait\n"); 186 187 //Loop on all the monitors and release the owner 188 for( unsigned int i = 0; i < count; i++ ) { 189 monitor_desc * cur = this->monitors[i]; 190 191 assert( cur ); 192 193 // LIB_DEBUG_FPRINTF("cur %p lock %p\n", cur, &cur->lock); 194 195 //Store the locks for later 196 locks[i] = &cur->lock; 197 198 //Protect the monitors 199 lock( locks[i] ); 200 { 201 //Save the recursion levels 202 recursions[i] = cur->recursion; 203 204 //Release the owner 205 cur->recursion = 0; 206 cur->owner = NULL; 207 } 208 //Release the monitor 209 unlock( locks[i] ); 210 } 211 212 // LIB_DEBUG_FPRINTF("Waiting now\n"); 213 214 //Everything is ready to go to sleep 215 ScheduleInternal( locks, count ); 216 217 218 //WE WOKE UP 219 220 221 //We are back, restore the owners and recursions 222 for( unsigned int i = 0; i < count; i++ ) { 223 monitor_desc * cur = this->monitors[i]; 224 225 //Protect the monitors 226 lock( locks[i] ); 227 { 228 //Release the owner 229 cur->owner = this_thrd; 230 cur->recursion = recursions[i]; 231 } 232 //Release the monitor 233 unlock( locks[i] ); 234 } 235 } 236 237 static void __signal_internal( condition * this ) { 238 if( !this->blocked.head ) return; 239 240 //Check that everything is as expected 241 assert( this->monitors ); 242 assert( this->monitor_count != 0 ); 243 244 LIB_DEBUG_DO( 245 if ( this->monitors != this_thread()->current_monitors ) { 246 abortf( "Signal on condition %p made outside of the correct monitor(s)", this ); 247 } // if 248 ); 249 250 monitor_desc * owner = this->monitors[0]; 251 lock( &owner->lock ); 252 { 253 thread_desc * unblock = pop_head( &this->blocked ); 254 push( &owner->signal_stack, unblock ); 255 } 256 unlock( &owner->lock ); 257 } 258 259 void signal( condition * this ) { 260 __signal_internal( this ); 261 } -
src/libcfa/concurrency/thread.c
r1d29d46 r727cf70f 39 39 this->mon.recursion = 1; 40 40 this->next = NULL; 41 42 this->current_monitors = NULL; 43 this->current_monitor_count = 0; 41 44 } 42 45
Note: See TracChangeset
for help on using the changeset viewer.