Changeset 0c78741
- Timestamp:
- Apr 27, 2017, 2:00:07 PM (8 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
- Children:
- f80ab45
- Parents:
- e464759
- Location:
- src/libcfa/concurrency
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
src/libcfa/concurrency/invoke.h
re464759 r0c78741 38 38 }; 39 39 40 struct __ thread_stack_t {41 struct thread_desc* top;40 struct __condition_stack_t { 41 struct __condition_criterion_t * top; 42 42 }; 43 43 … … 48 48 struct thread_desc * pop_head( struct __thread_queue_t * ); 49 49 50 void ?{}( struct __ thread_stack_t * );51 void push( struct __ thread_stack_t *, struct thread_desc * );52 struct thread_desc * pop( struct __thread_stack_t * );50 void ?{}( struct __condition_stack_t * ); 51 void push( struct __condition_stack_t *, struct __condition_criterion_t * ); 52 struct __condition_criterion_t * pop( struct __condition_stack_t * ); 53 53 54 54 void ?{}(spinlock * this); … … 82 82 struct thread_desc * owner; // current owner of the monitor 83 83 struct __thread_queue_t entry_queue; // queue of threads that are blocked waiting for the monitor 84 struct __ thread_stack_t signal_stack; // stack of threads to run next once we exit the monitor84 struct __condition_stack_t signal_stack; // stack of conditions to run next once we exit the monitor 85 85 struct monitor_desc * stack_owner; // if bulk acquiring was used we need to synchronize signals with an other monitor 86 86 unsigned int recursion; // monitor routines can be called recursively, we need to keep track of that -
src/libcfa/concurrency/kernel
re464759 r0c78741 55 55 //----------------------------------------------------------------------------- 56 56 // Processor 57 enum FinishOpCode { No_Action, Release, Schedule, Release_Schedule }; 57 enum FinishOpCode { No_Action, Release, Schedule, Release_Schedule, Release_Multi, Release_Multi_Schedule }; 58 59 //TODO use union, many of these fields are mutually exclusive (i.e. MULTI vs NOMULTI) 58 60 struct FinishAction { 59 61 FinishOpCode action_code; 60 62 thread_desc * thrd; 61 63 spinlock * lock; 64 spinlock ** locks; 65 unsigned short lock_count; 66 thread_desc ** thrds; 67 unsigned short thrd_count; 62 68 }; 63 69 static inline void ?{}(FinishAction * this) { -
src/libcfa/concurrency/kernel.c
re464759 r0c78741 235 235 ScheduleThread( this->finish.thrd ); 236 236 } 237 else if( this->finish.action_code == Release_Multi ) { 238 for(int i = 0; i < this->finish.lock_count; i++) { 239 unlock( this->finish.locks[i] ); 240 } 241 } 242 else if( this->finish.action_code == Release_Multi_Schedule ) { 243 for(int i = 0; i < this->finish.lock_count; i++) { 244 unlock( this->finish.locks[i] ); 245 } 246 for(int i = 0; i < this->finish.thrd_count; i++) { 247 ScheduleThread( this->finish.thrds[i] ); 248 } 249 } 237 250 else { 238 251 assert(this->finish.action_code == No_Action); … … 335 348 this_processor->finish.lock = lock; 336 349 this_processor->finish.thrd = thrd; 350 suspend(); 351 } 352 353 void ScheduleInternal(spinlock ** locks, unsigned short count) { 354 this_processor->finish.action_code = Release_Multi; 355 this_processor->finish.locks = locks; 356 this_processor->finish.lock_count = count; 357 suspend(); 358 } 359 360 void ScheduleInternal(spinlock ** locks, unsigned short lock_count, thread_desc ** thrds, unsigned short thrd_count) { 361 this_processor->finish.action_code = Release_Multi_Schedule; 362 this_processor->finish.locks = locks; 363 this_processor->finish.lock_count = lock_count; 364 this_processor->finish.thrds = thrds; 365 this_processor->finish.thrd_count = thrd_count; 337 366 suspend(); 338 367 } … … 529 558 } 530 559 531 void ?{}( __ thread_stack_t * this ) {560 void ?{}( __condition_stack_t * this ) { 532 561 this->top = NULL; 533 562 } 534 563 535 void push( __ thread_stack_t * this, thread_desc* t ) {536 assert( t->next != NULL);564 void push( __condition_stack_t * this, __condition_criterion_t * t ) { 565 assert( !t->next ); 537 566 t->next = this->top; 538 567 this->top = t; 539 568 } 540 569 541 thread_desc * pop( __thread_stack_t * this ) {542 thread_desc* top = this->top;570 __condition_criterion_t * pop( __condition_stack_t * this ) { 571 __condition_criterion_t * top = this->top; 543 572 if( top ) { 544 573 this->top = top->next; -
src/libcfa/concurrency/kernel_private.h
re464759 r0c78741 26 26 thread_desc * nextThread(cluster * this); 27 27 28 void ScheduleInternal( );28 void ScheduleInternal(void); 29 29 void ScheduleInternal(spinlock * lock); 30 30 void ScheduleInternal(thread_desc * thrd); 31 31 void ScheduleInternal(spinlock * lock, thread_desc * thrd); 32 void ScheduleInternal(spinlock ** locks, unsigned short count); 33 void ScheduleInternal(spinlock ** locks, unsigned short count, thread_desc ** thrds, unsigned short thrd_count); 32 34 33 35 //----------------------------------------------------------------------------- -
src/libcfa/concurrency/monitor
re464759 r0c78741 46 46 //----------------------------------------------------------------------------- 47 47 // Internal scheduling 48 49 struct __condition_criterion_t { 50 bool ready; //Whether or not the criterion is met (True if met) 51 monitor_desc * target; //The monitor this criterion concerns 52 struct __condition_node_t * owner; //The parent node to which this criterion belongs 53 __condition_criterion_t * next; //Intrusive linked list Next field 54 }; 55 56 struct __condition_node_t { 57 thread_desc * waiting_thread; //Thread that needs to be woken when all criteria are met 58 __condition_criterion_t * criteria; //Array of criteria (Criterions are contiguous in memory) 59 unsigned short count; //Number of criterions in the criteria 60 __condition_node_t * next; //Intrusive linked list Next field 61 }; 62 63 struct __condition_blocked_queue_t { 64 __condition_node_t * head; 65 __condition_node_t ** tail; 66 }; 67 68 void ?{}( __condition_blocked_queue_t * ); 69 void append( __condition_blocked_queue_t *, __condition_node_t * ); 70 __condition_node_t * pop_head( __condition_blocked_queue_t * ); 71 48 72 struct condition { 49 __ thread_queue_t blocked;50 monitor_desc ** monitors; 51 unsigned short monitor_count; 73 __condition_blocked_queue_t blocked; //Link list which contains the blocked threads as-well as the information needed to unblock them 74 monitor_desc ** monitors; //Array of monitor pointers (Monitors are NOT contiguous in memory) 75 unsigned short monitor_count; //Number of monitors in the array 52 76 }; 53 77 -
src/libcfa/concurrency/monitor.c
re464759 r0c78741 20 20 #include "libhdr.h" 21 21 22 void set_owner( monitor_desc * this, thread_desc * owner ) { 23 //Pass the monitor appropriately 24 this->owner = owner; 25 26 //We are passing the monitor to someone else, which means recursion level is not 0 27 this->recursion = owner ? 1 : 0; 28 } 22 //----------------------------------------------------------------------------- 23 // Forward declarations 24 static inline void set_owner( monitor_desc * this, thread_desc * owner ); 25 static inline thread_desc * next_thread( monitor_desc * this ); 26 27 static inline void lock_all( spinlock ** locks, unsigned short count ); 28 static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ); 29 static inline void unlock_all( spinlock ** locks, unsigned short count ); 30 static inline void unlock_all( monitor_desc ** locks, unsigned short count ); 31 32 static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ); 33 static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ); 34 35 static inline thread_desc * check_condition( __condition_criterion_t * ); 36 static inline void brand_condition( condition * ); 37 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ); 38 39 //----------------------------------------------------------------------------- 40 // Enter/Leave routines 41 29 42 30 43 extern "C" { 31 void __enter_monitor_desc(monitor_desc * this , monitor_desc * leader) {44 void __enter_monitor_desc(monitor_desc * this) { 32 45 lock( &this->lock ); 33 46 thread_desc * thrd = this_thread(); 34 47 35 // //Update the stack owner 36 // this->stack_owner = leader; 37 38 LIB_DEBUG_PRINT_SAFE("Entering %p (o: %p, r: %i)\n", this, this->owner, this->recursion); 48 LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion); 39 49 40 50 if( !this->owner ) { … … 61 71 62 72 // leave pseudo code : 63 // decrement level 64 // leve == 0 ? 65 // no : done 66 // yes : 67 // signal stack empty ? 68 // has leader : 69 // bulk acquiring means we don't own the signal stack 70 // ignore it but don't release the monitor 71 // yes : 72 // next in entry queue is new owner 73 // no : 74 // top of the signal stack is the owner 75 // context switch to him right away 76 // 77 void __leave_monitor_desc(monitor_desc * this, monitor_desc * leader) { 73 // TODO 74 void __leave_monitor_desc(monitor_desc * this) { 78 75 lock( &this->lock ); 79 76 80 LIB_DEBUG_PRINT_SAFE("Leaving %p (o: %p, r: %i)\n", this, this->owner, this->recursion);81 82 77 thread_desc * thrd = this_thread(); 83 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", this->owner, thrd, this->recursion ); 78 79 LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion); 80 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion ); 84 81 85 82 //Leaving a recursion level, decrement the counter … … 89 86 //it means we don't need to do anything 90 87 if( this->recursion != 0) { 91 // this->stack_owner = leader;92 88 unlock( &this->lock ); 93 89 return; 94 90 } 95 96 // //If we don't own the signal stack then just leave it to the owner 97 // if( this->stack_owner ) { 98 // this->stack_owner = leader; 99 // unlock( &this->lock ); 100 // return; 101 // } 102 103 //We are the stack owner and have left the last recursion level. 104 //We are in charge of passing the monitor 105 thread_desc * new_owner = 0; 106 107 //Check the signaller stack 108 new_owner = pop( &this->signal_stack ); 109 if( new_owner ) { 110 //The signaller stack is not empty, 111 //transfer control immediately 112 set_owner( this, new_owner ); 113 // this->stack_owner = leader; 114 ScheduleInternal( &this->lock, new_owner ); 115 return; 116 } 117 118 // No signaller thread 119 // Get the next thread in the entry_queue 120 new_owner = pop_head( &this->entry_queue ); 121 set_owner( this, new_owner ); 122 123 // //Update the stack owner 124 // this->stack_owner = leader; 91 92 thread_desc * new_owner = next_thread( this ); 125 93 126 94 //We can now let other threads in safely … … 133 101 134 102 static inline void enter(monitor_desc ** monitors, int count) { 135 __enter_monitor_desc( monitors[0], NULL ); 136 for(int i = 1; i < count; i++) { 137 __enter_monitor_desc( monitors[i], monitors[0] ); 103 for(int i = 0; i < count; i++) { 104 __enter_monitor_desc( monitors[i] ); 138 105 } 139 106 } 140 107 141 108 static inline void leave(monitor_desc ** monitors, int count) { 142 __leave_monitor_desc( monitors[0], NULL ); 143 for(int i = count - 1; i >= 1; i--) { 144 __leave_monitor_desc( monitors[i], monitors[0] ); 109 for(int i = count - 1; i >= 0; i--) { 110 __leave_monitor_desc( monitors[i] ); 145 111 } 146 112 } … … 169 135 // Internal scheduling 170 136 void wait( condition * this ) { 171 assertf(false, "NO SUPPORTED"); 172 // LIB_DEBUG_FPRINTF("Waiting\n"); 173 thread_desc * this_thrd = this_thread(); 174 175 if( !this->monitors ) { 176 this->monitors = this_thrd->current_monitors; 177 this->monitor_count = this_thrd->current_monitor_count; 178 } 137 LIB_DEBUG_PRINT_SAFE("Waiting\n"); 138 139 brand_condition( this ); 140 141 //Check that everything is as expected 142 assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors ); 143 assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count ); 179 144 180 145 unsigned short count = this->monitor_count; 181 182 //Check that everything is as expected 183 assert( this->monitors != NULL ); 184 assert( this->monitor_count != 0 ); 185 186 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later 187 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal 188 189 // LIB_DEBUG_FPRINTF("Getting ready to wait\n"); 190 191 //Loop on all the monitors and release the owner 192 for( unsigned int i = 0; i < count; i++ ) { 193 monitor_desc * cur = this->monitors[i]; 194 195 assert( cur ); 196 197 // LIB_DEBUG_FPRINTF("cur %p lock %p\n", cur, &cur->lock); 198 199 //Store the locks for later 200 locks[i] = &cur->lock; 201 202 //Protect the monitors 203 lock( locks[i] ); 204 { 205 //Save the recursion levels 206 recursions[i] = cur->recursion; 207 208 //Release the owner 209 cur->recursion = 0; 210 cur->owner = NULL; 211 } 212 //Release the monitor 213 unlock( locks[i] ); 214 } 215 216 // LIB_DEBUG_FPRINTF("Waiting now\n"); 146 unsigned int blarg_recursions[ count ]; //Save the current recursion levels to restore them later 147 spinlock * blarg_locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal 148 149 LIB_DEBUG_PRINT_SAFE("count %i\n", count); 150 151 __condition_node_t waiter; 152 waiter.waiting_thread = this_thread(); 153 waiter.count = count; 154 waiter.next = NULL; 155 156 __condition_criterion_t criteria[count]; 157 for(int i = 0; i < count; i++) { 158 criteria[i].ready = false; 159 criteria[i].target = this->monitors[i]; 160 criteria[i].owner = &waiter; 161 criteria[i].next = NULL; 162 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] ); 163 } 164 165 waiter.criteria = criteria; 166 append( &this->blocked, &waiter ); 167 168 lock_all( this->monitors, blarg_locks, count ); 169 save_recursion( this->monitors, blarg_recursions, count ); 170 //DON'T unlock, ask the kernel to do it 171 172 //Find the next thread(s) to run 173 unsigned short thread_count = count; 174 thread_desc * threads[ count ]; 175 176 for( int i = 0; i < count; i++) { 177 thread_desc * new_owner = next_thread( this->monitors[i] ); 178 thread_count = insert_unique( threads, i, new_owner ); 179 } 180 181 LIB_DEBUG_PRINT_SAFE("Will unblock: "); 182 for(int i = 0; i < thread_count; i++) { 183 LIB_DEBUG_PRINT_SAFE("%p ", threads[i]); 184 } 185 LIB_DEBUG_PRINT_SAFE("\n"); 217 186 218 187 //Everything is ready to go to sleep 219 ScheduleInternal( locks,count );188 ScheduleInternal( blarg_locks, count, threads, thread_count ); 220 189 221 190 … … 224 193 225 194 //We are back, restore the owners and recursions 226 for( unsigned int i = 0; i < count; i++ ) { 227 monitor_desc * cur = this->monitors[i]; 228 229 //Protect the monitors 230 lock( locks[i] ); 231 { 232 //Release the owner 233 cur->owner = this_thrd; 234 cur->recursion = recursions[i]; 235 } 236 //Release the monitor 237 unlock( locks[i] ); 238 } 239 } 240 241 static void __signal_internal( condition * this ) { 242 assertf(false, "NO SUPPORTED"); 243 if( !this->blocked.head ) return; 195 lock_all( blarg_locks, count ); 196 restore_recursion( this->monitors, blarg_recursions, count ); 197 unlock_all( blarg_locks, count ); 198 } 199 200 void signal( condition * this ) { 201 if( !this->blocked.head ) { 202 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n"); 203 return; 204 } 244 205 245 206 //Check that everything is as expected 246 207 assert( this->monitors ); 247 208 assert( this->monitor_count != 0 ); 209 210 unsigned short count = this->monitor_count; 248 211 249 212 LIB_DEBUG_DO( 250 if ( this->monitors != this_thread()->current_monitors ) { 251 abortf( "Signal on condition %p made outside of the correct monitor(s)", this ); 213 thread_desc * this_thrd = this_thread(); 214 if ( this->monitor_count != this_thrd->current_monitor_count ) { 215 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count ); 252 216 } // if 217 218 for(int i = 0; i < this->monitor_count; i++) { 219 if ( this->monitors[i] != this_thrd->current_monitors[i] ) { 220 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] ); 221 } // if 222 } 253 223 ); 254 224 255 monitor_desc * owner = this->monitors[0]; 256 lock( &owner->lock ); 257 { 258 thread_desc * unblock = pop_head( &this->blocked ); 259 push( &owner->signal_stack, unblock ); 260 } 261 unlock( &owner->lock ); 262 } 263 264 void signal( condition * this ) { 265 __signal_internal( this ); 266 } 225 lock_all( this->monitors, NULL, count ); 226 LIB_DEBUG_PRINT_SAFE("Signalling"); 227 228 __condition_node_t * node = pop_head( &this->blocked ); 229 for(int i = 0; i < count; i++) { 230 __condition_criterion_t * crit = &node->criteria[i]; 231 LIB_DEBUG_PRINT_SAFE(" %p", crit->target); 232 assert( !crit->ready ); 233 push( &crit->target->signal_stack, crit ); 234 } 235 236 LIB_DEBUG_PRINT_SAFE("\n"); 237 238 unlock_all( this->monitors, count ); 239 } 240 241 //----------------------------------------------------------------------------- 242 // Utilities 243 244 static inline void set_owner( monitor_desc * this, thread_desc * owner ) { 245 //Pass the monitor appropriately 246 this->owner = owner; 247 248 //We are passing the monitor to someone else, which means recursion level is not 0 249 this->recursion = owner ? 1 : 0; 250 } 251 252 static inline thread_desc * next_thread( monitor_desc * this ) { 253 //Check the signaller stack 254 __condition_criterion_t * urgent = pop( &this->signal_stack ); 255 if( urgent ) { 256 //The signaller stack is not empty, 257 //regardless of if we are ready to baton pass, 258 //we need to set the monitor as in use 259 set_owner( this, urgent->owner->waiting_thread ); 260 261 return check_condition( urgent ); 262 } 263 264 // No signaller thread 265 // Get the next thread in the entry_queue 266 thread_desc * new_owner = pop_head( &this->entry_queue ); 267 set_owner( this, new_owner ); 268 269 return new_owner; 270 } 271 272 static inline void lock_all( spinlock ** locks, unsigned short count ) { 273 for( int i = 0; i < count; i++ ) { 274 lock( locks[i] ); 275 } 276 } 277 278 static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) { 279 for( int i = 0; i < count; i++ ) { 280 spinlock * l = &source[i]->lock; 281 lock( l ); 282 if(locks) locks[i] = l; 283 } 284 } 285 286 static inline void unlock_all( spinlock ** locks, unsigned short count ) { 287 for( int i = 0; i < count; i++ ) { 288 unlock( locks[i] ); 289 } 290 } 291 292 static inline void unlock_all( monitor_desc ** locks, unsigned short count ) { 293 for( int i = 0; i < count; i++ ) { 294 unlock( &locks[i]->lock ); 295 } 296 } 297 298 299 static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) { 300 for( int i = 0; i < count; i++ ) { 301 recursions[i] = ctx[i]->recursion; 302 } 303 } 304 305 static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) { 306 for( int i = 0; i < count; i++ ) { 307 ctx[i]->recursion = recursions[i]; 308 } 309 } 310 311 // Function has 2 different behavior 312 // 1 - Marks a monitors as being ready to run 313 // 2 - Checks if all the monitors are ready to run 314 // if so return the thread to run 315 static inline thread_desc * check_condition( __condition_criterion_t * target ) { 316 __condition_node_t * node = target->owner; 317 unsigned short count = node->count; 318 __condition_criterion_t * criteria = node->criteria; 319 320 bool ready2run = true; 321 322 for( int i = 0; i < count; i++ ) { 323 LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target ); 324 if( &criteria[i] == target ) { 325 criteria[i].ready = true; 326 LIB_DEBUG_PRINT_SAFE( "True\n" ); 327 } 328 329 ready2run = criteria[i].ready && ready2run; 330 } 331 332 LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run ); 333 return ready2run ? node->waiting_thread : NULL; 334 } 335 336 static inline void brand_condition( condition * this ) { 337 thread_desc * thrd = this_thread(); 338 if( !this->monitors ) { 339 LIB_DEBUG_PRINT_SAFE("Branding\n"); 340 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors ); 341 this->monitors = thrd->current_monitors; 342 this->monitor_count = thrd->current_monitor_count; 343 } 344 } 345 346 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) { 347 for(int i = 0; i < end; i++) { 348 if( thrds[i] == val ) return end; 349 } 350 351 thrds[end] = val; 352 return end + 1; 353 } 354 355 void ?{}( __condition_blocked_queue_t * this ) { 356 this->head = NULL; 357 this->tail = &this->head; 358 } 359 360 void append( __condition_blocked_queue_t * this, __condition_node_t * c ) { 361 assert(this->tail != NULL); 362 *this->tail = c; 363 this->tail = &c->next; 364 } 365 366 __condition_node_t * pop_head( __condition_blocked_queue_t * this ) { 367 __condition_node_t * head = this->head; 368 if( head ) { 369 this->head = head->next; 370 if( !head->next ) { 371 this->tail = &this->head; 372 } 373 head->next = NULL; 374 } 375 return head; 376 }
Note: See TracChangeset
for help on using the changeset viewer.