Changeset 3f7e12cb for src/libcfa/concurrency/monitor.c
- Timestamp:
- Nov 8, 2017, 5:43:33 PM (8 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
- Children:
- 954908d
- Parents:
- 78315272 (diff), e35f30a (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/libcfa/concurrency/monitor.c
r78315272 r3f7e12cb 17 17 18 18 #include <stdlib> 19 #include <inttypes.h> 19 20 20 21 #include "libhdr.h" 21 22 #include "kernel_private.h" 22 23 24 #include "bits/algorithms.h" 25 23 26 //----------------------------------------------------------------------------- 24 27 // Forward declarations 25 static inline void set_owner( monitor_desc * this, thread_desc * owner ); 28 static inline void set_owner ( monitor_desc * this, thread_desc * owner ); 29 static inline void set_owner ( monitor_desc * storage [], __lock_size_t count, thread_desc * owner ); 30 static inline void set_mask ( monitor_desc * storage [], __lock_size_t count, const __waitfor_mask_t & mask ); 31 static inline void reset_mask( monitor_desc * this ); 32 26 33 static inline thread_desc * next_thread( monitor_desc * this ); 27 static inline int is_accepted( thread_desc * owner, monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)() ); 28 29 static inline void lock_all( spinlock ** locks, unsigned short count ); 30 static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ); 31 static inline void unlock_all( spinlock ** locks, unsigned short count ); 32 static inline void unlock_all( monitor_desc ** locks, unsigned short count ); 33 34 static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ); 35 static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ); 36 37 static inline void init ( int count, monitor_desc ** monitors, __condition_node_t * waiter, __condition_criterion_t * criteria ); 38 static inline void init_push( int count, monitor_desc ** monitors, __condition_node_t * waiter, __condition_criterion_t * criteria ); 39 40 static inline thread_desc * check_condition( __condition_criterion_t * ); 41 static inline void brand_condition( condition * ); 42 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ); 43 44 static inline thread_desc * search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ); 34 static inline bool is_accepted( monitor_desc * this, const __monitor_group_t & monitors ); 35 36 static inline void lock_all ( spinlock * locks [], __lock_size_t count ); 37 static inline void lock_all ( monitor_desc * source [], spinlock * /*out*/ locks [], __lock_size_t count ); 38 static inline void unlock_all( spinlock * locks [], __lock_size_t count ); 39 static inline void unlock_all( monitor_desc * locks [], __lock_size_t count ); 40 41 static inline void save ( monitor_desc * ctx [], __lock_size_t count, spinlock * locks [], unsigned int /*out*/ recursions [], __waitfor_mask_t /*out*/ masks [] ); 42 static inline void restore( monitor_desc * ctx [], __lock_size_t count, spinlock * locks [], unsigned int /*in */ recursions [], __waitfor_mask_t /*in */ masks [] ); 43 44 static inline void init ( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ); 45 static inline void init_push( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ); 46 47 static inline thread_desc * check_condition ( __condition_criterion_t * ); 48 static inline void brand_condition ( condition & ); 49 static inline [thread_desc *, int] search_entry_queue( const __waitfor_mask_t &, monitor_desc * monitors [], __lock_size_t count ); 50 51 forall(dtype T | sized( T )) 52 static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ); 53 static inline __lock_size_t count_max ( const __waitfor_mask_t & mask ); 54 static inline __lock_size_t aggregate ( monitor_desc * storage [], const __waitfor_mask_t & mask ); 45 55 46 56 //----------------------------------------------------------------------------- 47 57 // Useful defines 48 #define wait_ctx(thrd, user_info) /* Create the necessary information to use the signaller stack */ \ 49 __condition_node_t waiter = { thrd, count, user_info }; /* Create the node specific to this wait operation */ \ 50 __condition_criterion_t criteria[count]; /* Create the creteria this wait operation needs to wake up */ \ 51 init( count, monitors, &waiter, criteria ); /* Link everything together */ \ 52 53 #define wait_ctx_primed(thrd, user_info) /* Create the necessary information to use the signaller stack */ \ 54 __condition_node_t waiter = { thrd, count, user_info }; /* Create the node specific to this wait operation */ \ 55 __condition_criterion_t criteria[count]; /* Create the creteria this wait operation needs to wake up */ \ 56 init_push( count, monitors, &waiter, criteria ); /* Link everything together and push it to the AS-Stack */ \ 57 58 #define monitor_ctx( mons, cnt ) /* Define that create the necessary struct for internal/external scheduling operations */ \ 59 monitor_desc ** monitors = mons; /* Save the targeted monitors */ \ 60 unsigned short count = cnt; /* Save the count to a local variable */ \ 61 unsigned int recursions[ count ]; /* Save the current recursion levels to restore them later */ \ 62 spinlock * locks [ count ]; /* We need to pass-in an array of locks to BlockInternal */ \ 58 #define wait_ctx(thrd, user_info) /* Create the necessary information to use the signaller stack */ \ 59 __condition_node_t waiter = { thrd, count, user_info }; /* Create the node specific to this wait operation */ \ 60 __condition_criterion_t criteria[count]; /* Create the creteria this wait operation needs to wake up */ \ 61 init( count, monitors, waiter, criteria ); /* Link everything together */ \ 62 63 #define wait_ctx_primed(thrd, user_info) /* Create the necessary information to use the signaller stack */ \ 64 __condition_node_t waiter = { thrd, count, user_info }; /* Create the node specific to this wait operation */ \ 65 __condition_criterion_t criteria[count]; /* Create the creteria this wait operation needs to wake up */ \ 66 init_push( count, monitors, waiter, criteria ); /* Link everything together and push it to the AS-Stack */ \ 67 68 #define monitor_ctx( mons, cnt ) /* Define that create the necessary struct for internal/external scheduling operations */ \ 69 monitor_desc ** monitors = mons; /* Save the targeted monitors */ \ 70 __lock_size_t count = cnt; /* Save the count to a local variable */ \ 71 unsigned int recursions[ count ]; /* Save the current recursion levels to restore them later */ \ 72 __waitfor_mask_t masks [ count ]; /* Save the current waitfor masks to restore them later */ \ 73 spinlock * locks [ count ]; /* We need to pass-in an array of locks to BlockInternal */ \ 74 75 #define monitor_save save ( monitors, count, locks, recursions, masks ) 76 #define monitor_restore restore( monitors, count, locks, recursions, masks ) 77 63 78 64 79 //----------------------------------------------------------------------------- … … 68 83 extern "C" { 69 84 // Enter single monitor 70 static void __enter_monitor_desc( monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)()) {85 static void __enter_monitor_desc( monitor_desc * this, const __monitor_group_t & group ) { 71 86 // Lock the monitor spinlock, lock_yield to reduce contention 72 87 lock_yield( &this->lock DEBUG_CTX2 ); … … 75 90 LIB_DEBUG_PRINT_SAFE("Kernel : %10p Entering mon %p (%p)\n", thrd, this, this->owner); 76 91 77 this->accepted_index = -1;78 92 if( !this->owner ) { 79 93 // No one has the monitor, just take it … … 83 97 } 84 98 else if( this->owner == thrd) { 85 // We already have the monitor, just not how many times we took it 86 verify( this->recursion > 0 ); 99 // We already have the monitor, just note how many times we took it 87 100 this->recursion += 1; 88 101 89 102 LIB_DEBUG_PRINT_SAFE("Kernel : mon already owned \n"); 90 103 } 91 else if( (this->accepted_index = is_accepted( thrd, this, group, group_cnt, func)) >= 0) {104 else if( is_accepted( this, group) ) { 92 105 // Some one was waiting for us, enter 93 106 set_owner( this, thrd ); 94 107 108 // Reset mask 109 reset_mask( this ); 110 95 111 LIB_DEBUG_PRINT_SAFE("Kernel : mon accepts \n"); 96 112 } … … 99 115 100 116 // Some one else has the monitor, wait in line for it 101 append( &this->entry_queue, thrd );117 append( this->entry_queue, thrd ); 102 118 BlockInternal( &this->lock ); 103 119 … … 113 129 unlock( &this->lock ); 114 130 return; 131 } 132 133 static void __enter_monitor_dtor( monitor_desc * this, fptr_t func ) { 134 // Lock the monitor spinlock, lock_yield to reduce contention 135 lock_yield( &this->lock DEBUG_CTX2 ); 136 thread_desc * thrd = this_thread; 137 138 LIB_DEBUG_PRINT_SAFE("Kernel : %10p Entering dtor for mon %p (%p)\n", thrd, this, this->owner); 139 140 141 if( !this->owner ) { 142 LIB_DEBUG_PRINT_SAFE("Kernel : Destroying free mon %p\n", this); 143 144 // No one has the monitor, just take it 145 set_owner( this, thrd ); 146 147 unlock( &this->lock ); 148 return; 149 } 150 else if( this->owner == thrd) { 151 // We already have the monitor... but where about to destroy it so the nesting will fail 152 // Abort! 153 abortf("Attempt to destroy monitor %p by thread \"%.256s\" (%p) in nested mutex."); 154 } 155 156 __lock_size_t count = 1; 157 monitor_desc ** monitors = &this; 158 __monitor_group_t group = { &this, 1, func }; 159 if( is_accepted( this, group) ) { 160 LIB_DEBUG_PRINT_SAFE("Kernel : mon accepts dtor, block and signal it \n"); 161 162 // Wake the thread that is waiting for this 163 __condition_criterion_t * urgent = pop( this->signal_stack ); 164 verify( urgent ); 165 166 // Reset mask 167 reset_mask( this ); 168 169 // Create the node specific to this wait operation 170 wait_ctx_primed( this_thread, 0 ) 171 172 // Some one else has the monitor, wait for him to finish and then run 173 BlockInternal( &this->lock, urgent->owner->waiting_thread ); 174 175 // Some one was waiting for us, enter 176 set_owner( this, thrd ); 177 } 178 else { 179 LIB_DEBUG_PRINT_SAFE("Kernel : blocking \n"); 180 181 wait_ctx( this_thread, 0 ) 182 this->dtor_node = &waiter; 183 184 // Some one else has the monitor, wait in line for it 185 append( this->entry_queue, thrd ); 186 BlockInternal( &this->lock ); 187 188 // BlockInternal will unlock spinlock, no need to unlock ourselves 189 return; 190 } 191 192 LIB_DEBUG_PRINT_SAFE("Kernel : Destroying %p\n", this); 193 115 194 } 116 195 … … 120 199 lock_yield( &this->lock DEBUG_CTX2 ); 121 200 122 verifyf( this_thread == this->owner, "Expected owner to be %p, got %p (r: %i)", this_thread, this->owner, this->recursion ); 201 LIB_DEBUG_PRINT_SAFE("Kernel : %10p Leaving mon %p (%p)\n", this_thread, this, this->owner); 202 203 verifyf( this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", this_thread, this->owner, this->recursion, this ); 123 204 124 205 // Leaving a recursion level, decrement the counter … … 128 209 // it means we don't need to do anything 129 210 if( this->recursion != 0) { 211 LIB_DEBUG_PRINT_SAFE("Kernel : recursion still %d\n", this->recursion); 130 212 unlock( &this->lock ); 131 213 return; … … 140 222 //We need to wake-up the thread 141 223 WakeThread( new_owner ); 224 } 225 226 // Leave single monitor for the last time 227 void __leave_dtor_monitor_desc( monitor_desc * this ) { 228 LIB_DEBUG_DO( 229 if( this_thread != this->owner ) { 230 abortf("Destroyed monitor %p has inconsistent owner, expected %p got %p.\n", this, this_thread, this->owner); 231 } 232 if( this->recursion != 1 ) { 233 abortf("Destroyed monitor %p has %d outstanding nested calls.\n", this, this->recursion - 1); 234 } 235 ) 142 236 } 143 237 … … 146 240 // Should never return 147 241 void __leave_thread_monitor( thread_desc * thrd ) { 148 monitor_desc * this = &thrd-> mon;242 monitor_desc * this = &thrd->self_mon; 149 243 150 244 // Lock the monitor now … … 153 247 disable_interrupts(); 154 248 155 thrd-> cor.state = Halted;156 157 verifyf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i )", thrd, this->owner, this->recursion);249 thrd->self_cor.state = Halted; 250 251 verifyf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", thrd, this->owner, this->recursion, this ); 158 252 159 253 // Leaving a recursion level, decrement the counter … … 178 272 // Enter multiple monitor 179 273 // relies on the monitor array being sorted 180 static inline void enter( monitor_desc ** monitors, int count, void (*func)()) {181 for( int i = 0; i < count; i++) {182 __enter_monitor_desc( monitors [i], monitors, count, func);274 static inline void enter( __monitor_group_t monitors ) { 275 for( __lock_size_t i = 0; i < monitors.size; i++) { 276 __enter_monitor_desc( monitors.list[i], monitors ); 183 277 } 184 278 } … … 186 280 // Leave multiple monitor 187 281 // relies on the monitor array being sorted 188 static inline void leave(monitor_desc * * monitors, int count) {189 for( int i = count - 1; i >= 0; i--) {282 static inline void leave(monitor_desc * monitors [], __lock_size_t count) { 283 for( __lock_size_t i = count - 1; i >= 0; i--) { 190 284 __leave_monitor_desc( monitors[i] ); 191 285 } … … 194 288 // Ctor for monitor guard 195 289 // Sorts monitors before entering 196 void ?{}( monitor_guard_t & this, monitor_desc * * m, int count, void (*func)()) {290 void ?{}( monitor_guard_t & this, monitor_desc * m [], __lock_size_t count, fptr_t func ) { 197 291 // Store current array 198 292 this.m = m; … … 200 294 201 295 // Sort monitors based on address -> TODO use a sort specialized for small numbers 202 qsort(this.m, count);296 __libcfa_small_sort(this.m, count); 203 297 204 298 // Save previous thread context 205 this.prev_mntrs = this_thread->current_monitors; 206 this.prev_count = this_thread->current_monitor_count; 207 this.prev_func = this_thread->current_monitor_func; 299 this.[prev_mntrs, prev_count, prev_func] = this_thread->monitors.[list, size, func]; 208 300 209 301 // Update thread context (needed for conditions) 210 this_thread-> current_monitors = m;211 this_thread->current_monitor_count = count; 212 this_thread->current_monitor_func = func;302 this_thread->monitors.[list, size, func] = [m, count, func]; 303 304 // LIB_DEBUG_PRINT_SAFE("MGUARD : enter %d\n", count); 213 305 214 306 // Enter the monitors in order 215 enter( this.m, this.count, func ); 307 __monitor_group_t group = {this.m, this.count, func}; 308 enter( group ); 309 310 // LIB_DEBUG_PRINT_SAFE("MGUARD : entered\n"); 216 311 } 217 312 … … 219 314 // Dtor for monitor guard 220 315 void ^?{}( monitor_guard_t & this ) { 316 // LIB_DEBUG_PRINT_SAFE("MGUARD : leaving %d\n", this.count); 317 221 318 // Leave the monitors in order 222 319 leave( this.m, this.count ); 223 320 321 // LIB_DEBUG_PRINT_SAFE("MGUARD : left\n"); 322 224 323 // Restore thread context 225 this_thread->current_monitors = this.prev_mntrs; 226 this_thread->current_monitor_count = this.prev_count; 227 this_thread->current_monitor_func = this.prev_func; 324 this_thread->monitors.[list, size, func] = this.[prev_mntrs, prev_count, prev_func]; 325 } 326 327 // Ctor for monitor guard 328 // Sorts monitors before entering 329 void ?{}( monitor_dtor_guard_t & this, monitor_desc * m [], fptr_t func ) { 330 // Store current array 331 this.m = *m; 332 333 // Save previous thread context 334 this.[prev_mntrs, prev_count, prev_func] = this_thread->monitors.[list, size, func]; 335 336 // Update thread context (needed for conditions) 337 this_thread->monitors.[list, size, func] = [m, 1, func]; 338 339 __enter_monitor_dtor( this.m, func ); 340 } 341 342 // Dtor for monitor guard 343 void ^?{}( monitor_dtor_guard_t & this ) { 344 // Leave the monitors in order 345 __leave_dtor_monitor_desc( this.m ); 346 347 // Restore thread context 348 this_thread->monitors.[list, size, func] = this.[prev_mntrs, prev_count, prev_func]; 228 349 } 229 350 230 351 //----------------------------------------------------------------------------- 231 352 // Internal scheduling types 232 void ?{}(__condition_node_t & this, thread_desc * waiting_thread, unsigned short count, uintptr_t user_info ) {353 void ?{}(__condition_node_t & this, thread_desc * waiting_thread, __lock_size_t count, uintptr_t user_info ) { 233 354 this.waiting_thread = waiting_thread; 234 355 this.count = count; … … 244 365 } 245 366 246 void ?{}(__condition_criterion_t & this, monitor_desc * target, __condition_node_t *owner ) {367 void ?{}(__condition_criterion_t & this, monitor_desc * target, __condition_node_t & owner ) { 247 368 this.ready = false; 248 369 this.target = target; 249 this.owner = owner;370 this.owner = &owner; 250 371 this.next = NULL; 251 372 } … … 253 374 //----------------------------------------------------------------------------- 254 375 // Internal scheduling 255 void wait( condition *this, uintptr_t user_info = 0 ) {376 void wait( condition & this, uintptr_t user_info = 0 ) { 256 377 brand_condition( this ); 257 378 258 379 // Check that everything is as expected 259 assertf( this ->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );260 verifyf( this ->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );261 verifyf( this ->monitor_count < 32u, "Excessive monitor count (%i)", this->monitor_count );380 assertf( this.monitors != NULL, "Waiting with no monitors (%p)", this.monitors ); 381 verifyf( this.monitor_count != 0, "Waiting with 0 monitors (%"PRIiFAST16")", this.monitor_count ); 382 verifyf( this.monitor_count < 32u, "Excessive monitor count (%"PRIiFAST16")", this.monitor_count ); 262 383 263 384 // Create storage for monitor context 264 monitor_ctx( this ->monitors, this->monitor_count );385 monitor_ctx( this.monitors, this.monitor_count ); 265 386 266 387 // Create the node specific to this wait operation … … 269 390 // Append the current wait operation to the ones already queued on the condition 270 391 // We don't need locks for that since conditions must always be waited on inside monitor mutual exclusion 271 append( &this->blocked, &waiter );272 273 // Lock all monitors (aggregates the lock themas well)392 append( this.blocked, &waiter ); 393 394 // Lock all monitors (aggregates the locks as well) 274 395 lock_all( monitors, locks, count ); 275 396 276 // DON'T unlock, ask the kernel to do it277 278 // Save monitor state279 save_recursion( monitors, recursions, count );280 281 397 // Find the next thread(s) to run 282 unsigned short thread_count = 0;398 __lock_size_t thread_count = 0; 283 399 thread_desc * threads[ count ]; 284 for(int i = 0; i < count; i++) { 285 threads[i] = 0; 286 } 400 __builtin_memset( threads, 0, sizeof( threads ) ); 401 402 // Save monitor states 403 monitor_save; 287 404 288 405 // Remove any duplicate threads 289 for( int i = 0; i < count; i++) {406 for( __lock_size_t i = 0; i < count; i++) { 290 407 thread_desc * new_owner = next_thread( monitors[i] ); 291 thread_count =insert_unique( threads, thread_count, new_owner );408 insert_unique( threads, thread_count, new_owner ); 292 409 } 293 410 … … 295 412 BlockInternal( locks, count, threads, thread_count ); 296 413 297 298 // WE WOKE UP299 300 301 414 // We are back, restore the owners and recursions 302 lock_all( locks, count ); 303 restore_recursion( monitors, recursions, count ); 304 unlock_all( locks, count ); 305 } 306 307 bool signal( condition * this ) { 415 monitor_restore; 416 } 417 418 bool signal( condition & this ) { 308 419 if( is_empty( this ) ) { return false; } 309 420 310 421 //Check that everything is as expected 311 verify( this ->monitors );312 verify( this ->monitor_count != 0 );422 verify( this.monitors ); 423 verify( this.monitor_count != 0 ); 313 424 314 425 //Some more checking in debug 315 426 LIB_DEBUG_DO( 316 427 thread_desc * this_thrd = this_thread; 317 if ( this ->monitor_count != this_thrd->current_monitor_count) {318 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count);319 } 320 321 for(int i = 0; i < this ->monitor_count; i++) {322 if ( this ->monitors[i] != this_thrd->current_monitors[i] ) {323 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );428 if ( this.monitor_count != this_thrd->monitors.size ) { 429 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", &this, this.monitor_count, this_thrd->monitors.size ); 430 } 431 432 for(int i = 0; i < this.monitor_count; i++) { 433 if ( this.monitors[i] != this_thrd->monitors.list[i] ) { 434 abortf( "Signal on condition %p made with different monitor, expected %p got %i", &this, this.monitors[i], this_thrd->monitors.list[i] ); 324 435 } 325 436 } 326 437 ); 327 438 328 unsigned short count = this->monitor_count;439 __lock_size_t count = this.monitor_count; 329 440 330 441 // Lock all monitors 331 lock_all( this ->monitors, NULL, count );442 lock_all( this.monitors, NULL, count ); 332 443 333 444 //Pop the head of the waiting queue 334 __condition_node_t * node = pop_head( &this->blocked );445 __condition_node_t * node = pop_head( this.blocked ); 335 446 336 447 //Add the thread to the proper AS stack … … 338 449 __condition_criterion_t * crit = &node->criteria[i]; 339 450 assert( !crit->ready ); 340 push( &crit->target->signal_stack, crit );451 push( crit->target->signal_stack, crit ); 341 452 } 342 453 343 454 //Release 344 unlock_all( this ->monitors, count );455 unlock_all( this.monitors, count ); 345 456 346 457 return true; 347 458 } 348 459 349 bool signal_block( condition *this ) {350 if( !this ->blocked.head ) { return false; }460 bool signal_block( condition & this ) { 461 if( !this.blocked.head ) { return false; } 351 462 352 463 //Check that everything is as expected 353 verifyf( this ->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );354 verifyf( this ->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );464 verifyf( this.monitors != NULL, "Waiting with no monitors (%p)", this.monitors ); 465 verifyf( this.monitor_count != 0, "Waiting with 0 monitors (%"PRIiFAST16")", this.monitor_count ); 355 466 356 467 // Create storage for monitor context 357 monitor_ctx( this ->monitors, this->monitor_count );468 monitor_ctx( this.monitors, this.monitor_count ); 358 469 359 470 // Lock all monitors (aggregates the locks them as well) … … 364 475 365 476 //save contexts 366 save_recursion( monitors, recursions, count );477 monitor_save; 367 478 368 479 //Find the thread to run 369 thread_desc * signallee = pop_head( &this->blocked )->waiting_thread;370 for(int i = 0; i < count; i++) {371 set_owner( monitors[i], signallee ); 372 }480 thread_desc * signallee = pop_head( this.blocked )->waiting_thread; 481 set_owner( monitors, count, signallee ); 482 483 LIB_DEBUG_PRINT_BUFFER_DECL( "Kernel : signal_block condition %p (s: %p)\n", &this, signallee ); 373 484 374 485 //Everything is ready to go to sleep … … 379 490 380 491 381 //We are back, restore the owners and recursions382 lock_all( locks, count ); 383 restore_recursion( monitors, recursions, count );384 unlock_all( locks, count );492 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : signal_block returned\n" ); 493 494 //We are back, restore the masks and recursions 495 monitor_restore; 385 496 386 497 return true; … … 388 499 389 500 // Access the user_info of the thread waiting at the front of the queue 390 uintptr_t front( condition *this ) {501 uintptr_t front( condition & this ) { 391 502 verifyf( !is_empty(this), 392 503 "Attempt to access user data on an empty condition.\n" 393 504 "Possible cause is not checking if the condition is empty before reading stored data." 394 505 ); 395 return this ->blocked.head->user_info;506 return this.blocked.head->user_info; 396 507 } 397 508 398 509 //----------------------------------------------------------------------------- 399 // Internal scheduling 400 int __accept_internal( unsigned short acc_count, __acceptable_t * acceptables ) { 401 thread_desc * thrd = this_thread; 510 // External scheduling 511 // cases to handle : 512 // - target already there : 513 // block and wake 514 // - dtor already there 515 // put thread on signaller stack 516 // - non-blocking 517 // return else 518 // - timeout 519 // return timeout 520 // - block 521 // setup mask 522 // block 523 void __waitfor_internal( const __waitfor_mask_t & mask, int duration ) { 524 // This statment doesn't have a contiguous list of monitors... 525 // Create one! 526 __lock_size_t max = count_max( mask ); 527 monitor_desc * mon_storage[max]; 528 __builtin_memset( mon_storage, 0, sizeof( mon_storage ) ); 529 __lock_size_t actual_count = aggregate( mon_storage, mask ); 530 531 LIB_DEBUG_PRINT_BUFFER_DECL( "Kernel : waitfor %d (s: %d, m: %d)\n", actual_count, mask.size, (__lock_size_t)max); 532 533 if(actual_count == 0) return; 534 535 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : waitfor internal proceeding\n"); 402 536 403 537 // Create storage for monitor context 404 monitor_ctx( acceptables->monitors, acceptables->count );405 406 // Lock all monitors (aggregates the lock themas well)538 monitor_ctx( mon_storage, actual_count ); 539 540 // Lock all monitors (aggregates the locks as well) 407 541 lock_all( monitors, locks, count ); 408 542 543 { 544 // Check if the entry queue 545 thread_desc * next; int index; 546 [next, index] = search_entry_queue( mask, monitors, count ); 547 548 if( next ) { 549 *mask.accepted = index; 550 if( mask.clauses[index].is_dtor ) { 551 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : dtor already there\n"); 552 verifyf( mask.clauses[index].size == 1 , "ERROR: Accepted dtor has more than 1 mutex parameter." ); 553 554 monitor_desc * mon2dtor = mask.clauses[index].list[0]; 555 verifyf( mon2dtor->dtor_node, "ERROR: Accepted monitor has no dtor_node." ); 556 557 __condition_criterion_t * dtor_crit = mon2dtor->dtor_node->criteria; 558 push( mon2dtor->signal_stack, dtor_crit ); 559 560 unlock_all( locks, count ); 561 } 562 else { 563 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : thread present, baton-passing\n"); 564 565 // Create the node specific to this wait operation 566 wait_ctx_primed( this_thread, 0 ); 567 568 // Save monitor states 569 monitor_save; 570 571 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : baton of %d monitors : ", count ); 572 #ifdef __CFA_DEBUG_PRINT__ 573 for( int i = 0; i < count; i++) { 574 LIB_DEBUG_PRINT_BUFFER_LOCAL( "%p %p ", monitors[i], monitors[i]->signal_stack.top ); 575 } 576 #endif 577 LIB_DEBUG_PRINT_BUFFER_LOCAL( "\n"); 578 579 // Set the owners to be the next thread 580 set_owner( monitors, count, next ); 581 582 // Everything is ready to go to sleep 583 BlockInternal( locks, count, &next, 1 ); 584 585 // We are back, restore the owners and recursions 586 monitor_restore; 587 588 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : thread present, returned\n"); 589 } 590 591 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : accepted %d\n", *mask.accepted); 592 593 return; 594 } 595 } 596 597 598 if( duration == 0 ) { 599 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : non-blocking, exiting\n"); 600 601 unlock_all( locks, count ); 602 603 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : accepted %d\n", *mask.accepted); 604 return; 605 } 606 607 608 verifyf( duration < 0, "Timeout on waitfor statments not supported yet."); 609 610 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : blocking waitfor\n"); 611 409 612 // Create the node specific to this wait operation 410 wait_ctx_primed( thrd, 0 ); 411 412 // Check if the entry queue 413 thread_desc * next = search_entry_queue( acceptables, acc_count, monitors, count ); 414 415 LIB_DEBUG_PRINT_SAFE("Owner(s) :"); 416 for(int i = 0; i < count; i++) { 417 LIB_DEBUG_PRINT_SAFE(" %p", monitors[i]->owner ); 418 } 419 LIB_DEBUG_PRINT_SAFE("\n"); 420 421 LIB_DEBUG_PRINT_SAFE("Passing mon to %p\n", next); 422 423 if( !next ) { 424 // Update acceptables on the current monitors 425 for(int i = 0; i < count; i++) { 426 monitors[i]->acceptables = acceptables; 427 monitors[i]->acceptable_count = acc_count; 428 } 429 } 430 else { 431 for(int i = 0; i < count; i++) { 432 set_owner( monitors[i], next ); 433 } 434 } 435 436 437 save_recursion( monitors, recursions, count ); 438 439 440 // Everything is ready to go to sleep 441 BlockInternal( locks, count, &next, next ? 1 : 0 ); 442 443 444 //WE WOKE UP 445 446 447 //We are back, restore the owners and recursions 448 lock_all( locks, count ); 449 restore_recursion( monitors, recursions, count ); 450 int acc_idx = monitors[0]->accepted_index; 451 unlock_all( locks, count ); 452 453 return acc_idx; 613 wait_ctx_primed( this_thread, 0 ); 614 615 monitor_save; 616 set_mask( monitors, count, mask ); 617 618 for( __lock_size_t i = 0; i < count; i++) { 619 verify( monitors[i]->owner == this_thread ); 620 } 621 622 //Everything is ready to go to sleep 623 BlockInternal( locks, count ); 624 625 626 // WE WOKE UP 627 628 629 //We are back, restore the masks and recursions 630 monitor_restore; 631 632 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : exiting\n"); 633 634 LIB_DEBUG_PRINT_BUFFER_LOCAL( "Kernel : accepted %d\n", *mask.accepted); 454 635 } 455 636 … … 458 639 459 640 static inline void set_owner( monitor_desc * this, thread_desc * owner ) { 641 // LIB_DEBUG_PRINT_SAFE("Kernal : Setting owner of %p to %p ( was %p)\n", this, owner, this->owner ); 642 460 643 //Pass the monitor appropriately 461 644 this->owner = owner; … … 465 648 } 466 649 650 static inline void set_owner( monitor_desc * monitors [], __lock_size_t count, thread_desc * owner ) { 651 monitors[0]->owner = owner; 652 monitors[0]->recursion = 1; 653 for( __lock_size_t i = 1; i < count; i++ ) { 654 monitors[i]->owner = owner; 655 monitors[i]->recursion = 0; 656 } 657 } 658 659 static inline void set_mask( monitor_desc * storage [], __lock_size_t count, const __waitfor_mask_t & mask ) { 660 for( __lock_size_t i = 0; i < count; i++) { 661 storage[i]->mask = mask; 662 } 663 } 664 665 static inline void reset_mask( monitor_desc * this ) { 666 this->mask.accepted = NULL; 667 this->mask.clauses = NULL; 668 this->mask.size = 0; 669 } 670 467 671 static inline thread_desc * next_thread( monitor_desc * this ) { 468 672 //Check the signaller stack 469 __condition_criterion_t * urgent = pop( &this->signal_stack ); 673 LIB_DEBUG_PRINT_SAFE("Kernel : mon %p AS-stack top %p\n", this, this->signal_stack.top); 674 __condition_criterion_t * urgent = pop( this->signal_stack ); 470 675 if( urgent ) { 471 676 //The signaller stack is not empty, … … 479 684 // No signaller thread 480 685 // Get the next thread in the entry_queue 481 thread_desc * new_owner = pop_head( &this->entry_queue );686 thread_desc * new_owner = pop_head( this->entry_queue ); 482 687 set_owner( this, new_owner ); 483 688 … … 485 690 } 486 691 487 static inline int is_accepted( thread_desc * owner, monitor_desc * this, monitor_desc ** group, int group_cnt, void (*func)()) {488 __acceptable_t * accs = this->acceptables; // Optim489 int acc_cnt = this->acceptable_count;692 static inline bool is_accepted( monitor_desc * this, const __monitor_group_t & group ) { 693 __acceptable_t * it = this->mask.clauses; // Optim 694 __lock_size_t count = this->mask.size; 490 695 491 696 // Check if there are any acceptable functions 492 if( ! accs ) return -1;697 if( !it ) return false; 493 698 494 699 // If this isn't the first monitor to test this, there is no reason to repeat the test. 495 if( this != group[0] ) return group[0]-> accepted_index;700 if( this != group[0] ) return group[0]->mask.accepted >= 0; 496 701 497 702 // For all acceptable functions check if this is the current function. 498 OUT_LOOP: 499 for( int i = 0; i < acc_cnt; i++ ) { 500 __acceptable_t * acc = &accs[i]; 501 502 // if function matches, check the monitors 503 if( acc->func == func ) { 504 505 // If the group count is different then it can't be a match 506 if( acc->count != group_cnt ) return -1; 507 508 // Check that all the monitors match 509 for( int j = 0; j < group_cnt; j++ ) { 510 // If not a match, check next function 511 if( acc->monitors[j] != group[j] ) continue OUT_LOOP; 512 } 513 514 // It's a complete match, accept the call 515 return i; 703 for( __lock_size_t i = 0; i < count; i++, it++ ) { 704 if( *it == group ) { 705 *this->mask.accepted = i; 706 return true; 516 707 } 517 708 } 518 709 519 710 // No function matched 520 return -1;521 } 522 523 static inline void init( int count, monitor_desc ** monitors, __condition_node_t * waiter, __condition_criterion_t * criteria) {524 for( int i = 0; i < count; i++) {711 return false; 712 } 713 714 static inline void init( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ) { 715 for( __lock_size_t i = 0; i < count; i++) { 525 716 (criteria[i]){ monitors[i], waiter }; 526 717 } 527 718 528 waiter ->criteria = criteria;529 } 530 531 static inline void init_push( int count, monitor_desc ** monitors, __condition_node_t * waiter, __condition_criterion_t * criteria) {532 for( int i = 0; i < count; i++) {719 waiter.criteria = criteria; 720 } 721 722 static inline void init_push( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ) { 723 for( __lock_size_t i = 0; i < count; i++) { 533 724 (criteria[i]){ monitors[i], waiter }; 534 push( &criteria[i].target->signal_stack, &criteria[i] ); 535 } 536 537 waiter->criteria = criteria; 538 } 539 540 static inline void lock_all( spinlock ** locks, unsigned short count ) { 541 for( int i = 0; i < count; i++ ) { 725 LIB_DEBUG_PRINT_SAFE( "Kernel : target %p = %p\n", criteria[i].target, &criteria[i] ); 726 push( criteria[i].target->signal_stack, &criteria[i] ); 727 } 728 729 waiter.criteria = criteria; 730 } 731 732 static inline void lock_all( spinlock * locks [], __lock_size_t count ) { 733 for( __lock_size_t i = 0; i < count; i++ ) { 542 734 lock_yield( locks[i] DEBUG_CTX2 ); 543 735 } 544 736 } 545 737 546 static inline void lock_all( monitor_desc * * source, spinlock ** /*out*/ locks, unsigned short count ) {547 for( int i = 0; i < count; i++ ) {738 static inline void lock_all( monitor_desc * source [], spinlock * /*out*/ locks [], __lock_size_t count ) { 739 for( __lock_size_t i = 0; i < count; i++ ) { 548 740 spinlock * l = &source[i]->lock; 549 741 lock_yield( l DEBUG_CTX2 ); … … 552 744 } 553 745 554 static inline void unlock_all( spinlock * * locks, unsigned short count ) {555 for( int i = 0; i < count; i++ ) {746 static inline void unlock_all( spinlock * locks [], __lock_size_t count ) { 747 for( __lock_size_t i = 0; i < count; i++ ) { 556 748 unlock( locks[i] ); 557 749 } 558 750 } 559 751 560 static inline void unlock_all( monitor_desc * * locks, unsigned short count ) {561 for( int i = 0; i < count; i++ ) {752 static inline void unlock_all( monitor_desc * locks [], __lock_size_t count ) { 753 for( __lock_size_t i = 0; i < count; i++ ) { 562 754 unlock( &locks[i]->lock ); 563 755 } 564 756 } 565 757 566 567 static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) { 568 for( int i = 0; i < count; i++ ) { 758 static inline void save( 759 monitor_desc * ctx [], 760 __lock_size_t count, 761 __attribute((unused)) spinlock * locks [], 762 unsigned int /*out*/ recursions [], 763 __waitfor_mask_t /*out*/ masks [] 764 ) { 765 for( __lock_size_t i = 0; i < count; i++ ) { 569 766 recursions[i] = ctx[i]->recursion; 570 } 571 } 572 573 static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) { 574 for( int i = 0; i < count; i++ ) { 767 masks[i] = ctx[i]->mask; 768 } 769 } 770 771 static inline void restore( 772 monitor_desc * ctx [], 773 __lock_size_t count, 774 spinlock * locks [], 775 unsigned int /*out*/ recursions [], 776 __waitfor_mask_t /*out*/ masks [] 777 ) { 778 lock_all( locks, count ); 779 for( __lock_size_t i = 0; i < count; i++ ) { 575 780 ctx[i]->recursion = recursions[i]; 576 } 781 ctx[i]->mask = masks[i]; 782 } 783 unlock_all( locks, count ); 577 784 } 578 785 … … 599 806 } 600 807 601 // LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run);808 LIB_DEBUG_PRINT_SAFE( "Kernel : Runing %i (%p)\n", ready2run, ready2run ? node->waiting_thread : NULL ); 602 809 return ready2run ? node->waiting_thread : NULL; 603 810 } 604 811 605 static inline void brand_condition( condition *this ) {812 static inline void brand_condition( condition & this ) { 606 813 thread_desc * thrd = this_thread; 607 if( !this ->monitors ) {814 if( !this.monitors ) { 608 815 // LIB_DEBUG_PRINT_SAFE("Branding\n"); 609 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition %p", thrd->current_monitors ); 610 this->monitor_count = thrd->current_monitor_count; 611 612 this->monitors = malloc( this->monitor_count * sizeof( *this->monitors ) ); 613 for( int i = 0; i < this->monitor_count; i++ ) { 614 this->monitors[i] = thrd->current_monitors[i]; 615 } 616 } 617 } 618 619 static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) { 620 if( !val ) return end; 621 622 for(int i = 0; i <= end; i++) { 623 if( thrds[i] == val ) return end; 624 } 625 626 thrds[end] = val; 627 return end + 1; 628 } 629 630 631 static inline bool match( __acceptable_t * acc, thread_desc * thrd ) { 632 verify( thrd ); 633 verify( acc ); 634 if( acc->func != thrd->current_monitor_func ) return false; 635 636 return true; 637 } 638 639 static inline thread_desc * search_entry_queue( __acceptable_t * acceptables, int acc_count, monitor_desc ** monitors, int count ) { 640 641 __thread_queue_t * entry_queue = &monitors[0]->entry_queue; 816 assertf( thrd->monitors.list != NULL, "No current monitor to brand condition %p", thrd->monitors.list ); 817 this.monitor_count = thrd->monitors.size; 818 819 this.monitors = malloc( this.monitor_count * sizeof( *this.monitors ) ); 820 for( int i = 0; i < this.monitor_count; i++ ) { 821 this.monitors[i] = thrd->monitors.list[i]; 822 } 823 } 824 } 825 826 static inline [thread_desc *, int] search_entry_queue( const __waitfor_mask_t & mask, monitor_desc * monitors [], __lock_size_t count ) { 827 828 __thread_queue_t & entry_queue = monitors[0]->entry_queue; 642 829 643 830 // For each thread in the entry-queue 644 for( thread_desc ** thrd_it = &entry_queue ->head;831 for( thread_desc ** thrd_it = &entry_queue.head; 645 832 *thrd_it; 646 thrd_it = &(*thrd_it)->next )647 {833 thrd_it = &(*thrd_it)->next 834 ) { 648 835 // For each acceptable check if it matches 649 __acceptable_t * acc_end = acceptables + acc_count; 650 for( __acceptable_t * acc_it = acceptables; acc_it != acc_end; acc_it++ ) { 836 int i = 0; 837 __acceptable_t * end = mask.clauses + mask.size; 838 for( __acceptable_t * it = mask.clauses; it != end; it++, i++ ) { 651 839 // Check if we have a match 652 if( match( acc_it, *thrd_it )) {840 if( *it == (*thrd_it)->monitors ) { 653 841 654 842 // If we have a match return it 655 843 // after removeing it from the entry queue 656 return remove( entry_queue, thrd_it );844 return [remove( entry_queue, thrd_it ), i]; 657 845 } 658 846 } 659 847 } 660 848 661 return NULL; 662 } 849 return [0, -1]; 850 } 851 852 forall(dtype T | sized( T )) 853 static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) { 854 if( !val ) return size; 855 856 for( __lock_size_t i = 0; i <= size; i++) { 857 if( array[i] == val ) return size; 858 } 859 860 array[size] = val; 861 size = size + 1; 862 return size; 863 } 864 865 static inline __lock_size_t count_max( const __waitfor_mask_t & mask ) { 866 __lock_size_t max = 0; 867 for( __lock_size_t i = 0; i < mask.size; i++ ) { 868 max += mask.clauses[i].size; 869 } 870 return max; 871 } 872 873 static inline __lock_size_t aggregate( monitor_desc * storage [], const __waitfor_mask_t & mask ) { 874 __lock_size_t size = 0; 875 for( __lock_size_t i = 0; i < mask.size; i++ ) { 876 __libcfa_small_sort( mask.clauses[i].list, mask.clauses[i].size ); 877 for( __lock_size_t j = 0; j < mask.clauses[i].size; j++) { 878 insert_unique( storage, size, mask.clauses[i].list[j] ); 879 } 880 } 881 // TODO insertion sort instead of this 882 __libcfa_small_sort( storage, size ); 883 return size; 884 } 885 663 886 void ?{}( __condition_blocked_queue_t & this ) { 664 887 this.head = NULL; … … 666 889 } 667 890 668 void append( __condition_blocked_queue_t *this, __condition_node_t * c ) {669 verify(this ->tail != NULL);670 *this ->tail = c;671 this ->tail = &c->next;672 } 673 674 __condition_node_t * pop_head( __condition_blocked_queue_t *this ) {675 __condition_node_t * head = this ->head;891 void append( __condition_blocked_queue_t & this, __condition_node_t * c ) { 892 verify(this.tail != NULL); 893 *this.tail = c; 894 this.tail = &c->next; 895 } 896 897 __condition_node_t * pop_head( __condition_blocked_queue_t & this ) { 898 __condition_node_t * head = this.head; 676 899 if( head ) { 677 this ->head = head->next;900 this.head = head->next; 678 901 if( !head->next ) { 679 this ->tail = &this->head;902 this.tail = &this.head; 680 903 } 681 904 head->next = NULL;
Note:
See TracChangeset
for help on using the changeset viewer.