source: libcfa/src/concurrency/monitor.cfa@ 3381ed7

ADT arm-eh ast-experimental enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr pthread-emulation qualifiedEnum stuck-waitfor-destruct
Last change on this file since 3381ed7 was 3381ed7, checked in by Thierry Delisle <tdelisle@…>, 6 years ago

Added park/unpark primitives thread and removed BlockInternal.
Converted monitors to use park unpark.
Intrusive Queue now mark next field when thread is inside queue.
Added several asserts to kernel and monitor.
Added a few tests for park and unpark.

  • Property mode set to 100644
File size: 32.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// monitor_desc.c --
8//
9// Author : Thierry Delisle
10// Created On : Thd Feb 23 12:27:26 2017
11// Last Modified By : Peter A. Buhr
12// Last Modified On : Wed Dec 4 07:55:14 2019
13// Update Count : 10
14//
15
16#define __cforall_thread__
17
18#include "monitor.hfa"
19
20#include <stdlib.hfa>
21#include <inttypes.h>
22
23#include "kernel_private.hfa"
24
25#include "bits/algorithm.hfa"
26
27//-----------------------------------------------------------------------------
28// Forward declarations
29static inline void set_owner ( monitor_desc * this, thread_desc * owner, enum __Owner_Reason );
30static inline void set_owner ( monitor_desc * storage [], __lock_size_t count, thread_desc * owner, enum __Owner_Reason );
31static inline void set_mask ( monitor_desc * storage [], __lock_size_t count, const __waitfor_mask_t & mask );
32static inline void reset_mask( monitor_desc * this );
33
34static inline thread_desc * next_thread( monitor_desc * this, enum __Owner_Reason );
35static inline bool is_accepted( monitor_desc * this, const __monitor_group_t & monitors );
36
37static inline void lock_all ( __spinlock_t * locks [], __lock_size_t count );
38static inline void lock_all ( monitor_desc * source [], __spinlock_t * /*out*/ locks [], __lock_size_t count );
39static inline void unlock_all( __spinlock_t * locks [], __lock_size_t count );
40static inline void unlock_all( monitor_desc * locks [], __lock_size_t count );
41
42static inline void save ( monitor_desc * ctx [], __lock_size_t count, __spinlock_t * locks [], unsigned int /*out*/ recursions [], __waitfor_mask_t /*out*/ masks [] );
43static inline void restore( monitor_desc * ctx [], __lock_size_t count, __spinlock_t * locks [], unsigned int /*in */ recursions [], __waitfor_mask_t /*in */ masks [] );
44
45static inline void init ( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] );
46static inline void init_push( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] );
47
48static inline thread_desc * check_condition ( __condition_criterion_t * );
49static inline void brand_condition ( condition & );
50static inline [thread_desc *, int] search_entry_queue( const __waitfor_mask_t &, monitor_desc * monitors [], __lock_size_t count );
51
52forall(dtype T | sized( T ))
53static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val );
54static inline __lock_size_t count_max ( const __waitfor_mask_t & mask );
55static inline __lock_size_t aggregate ( monitor_desc * storage [], const __waitfor_mask_t & mask );
56
57//-----------------------------------------------------------------------------
58// Useful defines
59#define wait_ctx(thrd, user_info) /* Create the necessary information to use the signaller stack */ \
60 __condition_node_t waiter = { thrd, count, user_info }; /* Create the node specific to this wait operation */ \
61 __condition_criterion_t criteria[count]; /* Create the creteria this wait operation needs to wake up */ \
62 init( count, monitors, waiter, criteria ); /* Link everything together */ \
63
64#define wait_ctx_primed(thrd, user_info) /* Create the necessary information to use the signaller stack */ \
65 __condition_node_t waiter = { thrd, count, user_info }; /* Create the node specific to this wait operation */ \
66 __condition_criterion_t criteria[count]; /* Create the creteria this wait operation needs to wake up */ \
67 init_push( count, monitors, waiter, criteria ); /* Link everything together and push it to the AS-Stack */ \
68
69#define monitor_ctx( mons, cnt ) /* Define that create the necessary struct for internal/external scheduling operations */ \
70 monitor_desc ** monitors = mons; /* Save the targeted monitors */ \
71 __lock_size_t count = cnt; /* Save the count to a local variable */ \
72 unsigned int recursions[ count ]; /* Save the current recursion levels to restore them later */ \
73 __waitfor_mask_t masks [ count ]; /* Save the current waitfor masks to restore them later */ \
74 __spinlock_t * locks [ count ]; /* We need to pass-in an array of locks to BlockInternal */ \
75
76#define monitor_save save ( monitors, count, locks, recursions, masks )
77#define monitor_restore restore( monitors, count, locks, recursions, masks )
78
79
80//-----------------------------------------------------------------------------
81// Enter/Leave routines
82
83
84extern "C" {
85 // Enter single monitor
86 static void __enter_monitor_desc( monitor_desc * this, const __monitor_group_t & group ) {
87 // Lock the monitor spinlock
88 lock( this->lock __cfaabi_dbg_ctx2 );
89 // Interrupts disable inside critical section
90 thread_desc * thrd = kernelTLS.this_thread;
91
92 __cfaabi_dbg_print_safe( "Kernel : %10p Entering mon %p (%p)\n", thrd, this, this->owner);
93
94 if( !this->owner ) {
95 // No one has the monitor, just take it
96 set_owner( this, thrd, __ENTER_FREE );
97
98 __cfaabi_dbg_print_safe( "Kernel : mon is free \n" );
99 }
100 else if( this->owner == thrd) {
101 // We already have the monitor, just note how many times we took it
102 this->recursion += 1;
103
104 __cfaabi_dbg_print_safe( "Kernel : mon already owned \n" );
105 }
106 else if( is_accepted( this, group) ) {
107 // Some one was waiting for us, enter
108 set_owner( this, thrd, __ENTER_ACCEPT );
109
110 // Reset mask
111 reset_mask( this );
112
113 __cfaabi_dbg_print_safe( "Kernel : mon accepts \n" );
114 }
115 else {
116 __cfaabi_dbg_print_safe( "Kernel : blocking \n" );
117
118 // Some one else has the monitor, wait in line for it
119 /* paranoid */ verify( thrd->next == 0p );
120 append( this->entry_queue, thrd );
121 /* paranoid */ verify( thrd->next == 1p );
122
123 unlock( this->lock );
124 park();
125
126 __cfaabi_dbg_print_safe( "Kernel : %10p Entered mon %p\n", thrd, this);
127
128 /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
129 return;
130 }
131
132 __cfaabi_dbg_print_safe( "Kernel : %10p Entered mon %p\n", thrd, this);
133
134 /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
135 /* paranoid */ verify( this->lock.lock );
136
137 // Release the lock and leave
138 unlock( this->lock );
139 return;
140 }
141
142 static void __enter_monitor_dtor( monitor_desc * this, fptr_t func ) {
143 // Lock the monitor spinlock
144 lock( this->lock __cfaabi_dbg_ctx2 );
145 // Interrupts disable inside critical section
146 thread_desc * thrd = kernelTLS.this_thread;
147
148 __cfaabi_dbg_print_safe( "Kernel : %10p Entering dtor for mon %p (%p)\n", thrd, this, this->owner);
149
150
151 if( !this->owner ) {
152 __cfaabi_dbg_print_safe( "Kernel : Destroying free mon %p\n", this);
153
154 // No one has the monitor, just take it
155 set_owner( this, thrd, __ENTER_DTOR_FREE );
156
157 verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
158
159 unlock( this->lock );
160 return;
161 }
162 else if( this->owner == thrd) {
163 // We already have the monitor... but where about to destroy it so the nesting will fail
164 // Abort!
165 abort( "Attempt to destroy monitor %p by thread \"%.256s\" (%p) in nested mutex.", this, thrd->self_cor.name, thrd );
166 }
167
168 __lock_size_t count = 1;
169 monitor_desc ** monitors = &this;
170 __monitor_group_t group = { &this, 1, func };
171 if( is_accepted( this, group) ) {
172 __cfaabi_dbg_print_safe( "Kernel : mon accepts dtor, block and signal it \n" );
173
174 // Wake the thread that is waiting for this
175 __condition_criterion_t * urgent = pop( this->signal_stack );
176 /* paranoid */ verify( urgent );
177
178 // Reset mask
179 reset_mask( this );
180
181 // Create the node specific to this wait operation
182 wait_ctx_primed( thrd, 0 )
183
184 // Some one else has the monitor, wait for him to finish and then run
185 unlock( this->lock );
186
187 // Release the next thread
188 /* paranoid */ verifyf( urgent->owner->waiting_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
189 unpark( urgent->owner->waiting_thread );
190
191 // Park current thread waiting
192 park();
193
194 // Some one was waiting for us, enter
195 /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
196 }
197 else {
198 __cfaabi_dbg_print_safe( "Kernel : blocking \n" );
199
200 wait_ctx( thrd, 0 )
201 this->dtor_node = &waiter;
202
203 // Some one else has the monitor, wait in line for it
204 /* paranoid */ verify( thrd->next == 0p );
205 append( this->entry_queue, thrd );
206 /* paranoid */ verify( thrd->next == 1p );
207 unlock( this->lock );
208
209 // Park current thread waiting
210 park();
211
212 /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
213 return;
214 }
215
216 __cfaabi_dbg_print_safe( "Kernel : Destroying %p\n", this);
217
218 }
219
220 // Leave single monitor
221 void __leave_monitor_desc( monitor_desc * this ) {
222 // Lock the monitor spinlock
223 lock( this->lock __cfaabi_dbg_ctx2 );
224
225 __cfaabi_dbg_print_safe( "Kernel : %10p Leaving mon %p (%p)\n", kernelTLS.this_thread, this, this->owner);
226
227 /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
228
229 // Leaving a recursion level, decrement the counter
230 this->recursion -= 1;
231
232 // If we haven't left the last level of recursion
233 // it means we don't need to do anything
234 if( this->recursion != 0) {
235 __cfaabi_dbg_print_safe( "Kernel : recursion still %d\n", this->recursion);
236 unlock( this->lock );
237 return;
238 }
239
240 // Get the next thread, will be null on low contention monitor
241 thread_desc * new_owner = next_thread( this, __LEAVE );
242
243 // Check the new owner is consistent with who we wake-up
244 // new_owner might be null even if someone owns the monitor when the owner is still waiting for another monitor
245 /* paranoid */ verifyf( !new_owner || new_owner == this->owner, "Expected owner to be %p, got %p (m: %p)", new_owner, this->owner, this );
246
247 // We can now let other threads in safely
248 unlock( this->lock );
249
250 //We need to wake-up the thread
251 /* paranoid */ verifyf( !new_owner || new_owner == this->owner, "Expected owner to be %p, got %p (m: %p)", new_owner, this->owner, this );
252 unpark( new_owner );
253 }
254
255 // Leave single monitor for the last time
256 void __leave_dtor_monitor_desc( monitor_desc * this ) {
257 __cfaabi_dbg_debug_do(
258 if( TL_GET( this_thread ) != this->owner ) {
259 abort( "Destroyed monitor %p has inconsistent owner, expected %p got %p.\n", this, TL_GET( this_thread ), this->owner);
260 }
261 if( this->recursion != 1 ) {
262 abort( "Destroyed monitor %p has %d outstanding nested calls.\n", this, this->recursion - 1);
263 }
264 )
265 }
266
267 // Leave the thread monitor
268 // last routine called by a thread.
269 // Should never return
270 void __leave_thread_monitor() {
271 thread_desc * thrd = TL_GET( this_thread );
272 monitor_desc * this = &thrd->self_mon;
273
274 // Lock the monitor now
275 lock( this->lock __cfaabi_dbg_ctx2 );
276
277 disable_interrupts();
278
279 thrd->self_cor.state = Halted;
280
281 /* paranoid */ verifyf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", thrd, this->owner, this->recursion, this );
282
283 // Leaving a recursion level, decrement the counter
284 this->recursion -= 1;
285
286 // If we haven't left the last level of recursion
287 // it must mean there is an error
288 if( this->recursion != 0) { abort( "Thread internal monitor has unbalanced recursion" ); }
289
290 // Fetch the next thread, can be null
291 thread_desc * new_owner = next_thread( this, __LEAVE_THREAD );
292
293 // Release the monitor lock
294 unlock( this->lock );
295
296 // Unpark the next owner if needed
297 /* paranoid */ verifyf( !new_owner || new_owner == this->owner, "Expected owner to be %p, got %p (m: %p)", new_owner, this->owner, this );
298 unpark( new_owner );
299
300 // Leave the thread, this will unlock the spinlock
301 // Use leave thread instead of park which is
302 // specialized for this case
303 LeaveThread();
304
305 // Control flow should never reach here!
306 }
307}
308
309// Enter multiple monitor
310// relies on the monitor array being sorted
311static inline void enter( __monitor_group_t monitors ) {
312 for( __lock_size_t i = 0; i < monitors.size; i++) {
313 __enter_monitor_desc( monitors[i], monitors );
314 }
315}
316
317// Leave multiple monitor
318// relies on the monitor array being sorted
319static inline void leave(monitor_desc * monitors [], __lock_size_t count) {
320 for( __lock_size_t i = count - 1; i >= 0; i--) {
321 __leave_monitor_desc( monitors[i] );
322 }
323}
324
325// Ctor for monitor guard
326// Sorts monitors before entering
327void ?{}( monitor_guard_t & this, monitor_desc * m [], __lock_size_t count, fptr_t func ) {
328 thread_desc * thrd = TL_GET( this_thread );
329
330 // Store current array
331 this.m = m;
332 this.count = count;
333
334 // Sort monitors based on address
335 __libcfa_small_sort(this.m, count);
336
337 // Save previous thread context
338 this.prev = thrd->monitors;
339
340 // Update thread context (needed for conditions)
341 (thrd->monitors){m, count, func};
342
343 // __cfaabi_dbg_print_safe( "MGUARD : enter %d\n", count);
344
345 // Enter the monitors in order
346 __monitor_group_t group = {this.m, this.count, func};
347 enter( group );
348
349 // __cfaabi_dbg_print_safe( "MGUARD : entered\n" );
350}
351
352
353// Dtor for monitor guard
354void ^?{}( monitor_guard_t & this ) {
355 // __cfaabi_dbg_print_safe( "MGUARD : leaving %d\n", this.count);
356
357 // Leave the monitors in order
358 leave( this.m, this.count );
359
360 // __cfaabi_dbg_print_safe( "MGUARD : left\n" );
361
362 // Restore thread context
363 TL_GET( this_thread )->monitors = this.prev;
364}
365
366// Ctor for monitor guard
367// Sorts monitors before entering
368void ?{}( monitor_dtor_guard_t & this, monitor_desc * m [], fptr_t func ) {
369 // optimization
370 thread_desc * thrd = TL_GET( this_thread );
371
372 // Store current array
373 this.m = *m;
374
375 // Save previous thread context
376 this.prev = thrd->monitors;
377
378 // Update thread context (needed for conditions)
379 (thrd->monitors){m, 1, func};
380
381 __enter_monitor_dtor( this.m, func );
382}
383
384// Dtor for monitor guard
385void ^?{}( monitor_dtor_guard_t & this ) {
386 // Leave the monitors in order
387 __leave_dtor_monitor_desc( this.m );
388
389 // Restore thread context
390 TL_GET( this_thread )->monitors = this.prev;
391}
392
393//-----------------------------------------------------------------------------
394// Internal scheduling types
395void ?{}(__condition_node_t & this, thread_desc * waiting_thread, __lock_size_t count, uintptr_t user_info ) {
396 this.waiting_thread = waiting_thread;
397 this.count = count;
398 this.next = 0p;
399 this.user_info = user_info;
400}
401
402void ?{}(__condition_criterion_t & this ) with( this ) {
403 ready = false;
404 target = 0p;
405 owner = 0p;
406 next = 0p;
407}
408
409void ?{}(__condition_criterion_t & this, monitor_desc * target, __condition_node_t & owner ) {
410 this.ready = false;
411 this.target = target;
412 this.owner = &owner;
413 this.next = 0p;
414}
415
416//-----------------------------------------------------------------------------
417// Internal scheduling
418void wait( condition & this, uintptr_t user_info = 0 ) {
419 brand_condition( this );
420
421 // Check that everything is as expected
422 assertf( this.monitors != 0p, "Waiting with no monitors (%p)", this.monitors );
423 verifyf( this.monitor_count != 0, "Waiting with 0 monitors (%"PRIiFAST16")", this.monitor_count );
424 verifyf( this.monitor_count < 32u, "Excessive monitor count (%"PRIiFAST16")", this.monitor_count );
425
426 // Create storage for monitor context
427 monitor_ctx( this.monitors, this.monitor_count );
428
429 // Create the node specific to this wait operation
430 wait_ctx( TL_GET( this_thread ), user_info );
431
432 // Append the current wait operation to the ones already queued on the condition
433 // We don't need locks for that since conditions must always be waited on inside monitor mutual exclusion
434 /* paranoid */ verify( waiter.next == 0p );
435 append( this.blocked, &waiter );
436 /* paranoid */ verify( waiter.next == 1p );
437
438 // Lock all monitors (aggregates the locks as well)
439 lock_all( monitors, locks, count );
440
441 // Find the next thread(s) to run
442 __lock_size_t thread_count = 0;
443 thread_desc * threads[ count ];
444 __builtin_memset( threads, 0, sizeof( threads ) );
445
446 // Save monitor states
447 monitor_save;
448
449 // Remove any duplicate threads
450 for( __lock_size_t i = 0; i < count; i++) {
451 thread_desc * new_owner = next_thread( monitors[i], __WAIT );
452 insert_unique( threads, thread_count, new_owner );
453 }
454
455 // Unlock the locks, we don't need them anymore
456 for(int i = 0; i < count; i++) {
457 unlock( *locks[i] );
458 }
459
460 // Wake the threads
461 for(int i = 0; i < thread_count; i++) {
462 unpark( threads[i] );
463 }
464
465 // Everything is ready to go to sleep
466 park();
467
468 // We are back, restore the owners and recursions
469 monitor_restore;
470}
471
472bool signal( condition & this ) {
473 if( is_empty( this ) ) { return false; }
474
475 //Check that everything is as expected
476 verify( this.monitors );
477 verify( this.monitor_count != 0 );
478
479 //Some more checking in debug
480 __cfaabi_dbg_debug_do(
481 thread_desc * this_thrd = TL_GET( this_thread );
482 if ( this.monitor_count != this_thrd->monitors.size ) {
483 abort( "Signal on condition %p made with different number of monitor(s), expected %zi got %zi", &this, this.monitor_count, this_thrd->monitors.size );
484 }
485
486 for(int i = 0; i < this.monitor_count; i++) {
487 if ( this.monitors[i] != this_thrd->monitors[i] ) {
488 abort( "Signal on condition %p made with different monitor, expected %p got %p", &this, this.monitors[i], this_thrd->monitors[i] );
489 }
490 }
491 );
492
493 __lock_size_t count = this.monitor_count;
494
495 // Lock all monitors
496 lock_all( this.monitors, 0p, count );
497
498 //Pop the head of the waiting queue
499 __condition_node_t * node = pop_head( this.blocked );
500
501 //Add the thread to the proper AS stack
502 for(int i = 0; i < count; i++) {
503 __condition_criterion_t * crit = &node->criteria[i];
504 assert( !crit->ready );
505 push( crit->target->signal_stack, crit );
506 }
507
508 //Release
509 unlock_all( this.monitors, count );
510
511 return true;
512}
513
514bool signal_block( condition & this ) {
515 if( !this.blocked.head ) { return false; }
516
517 //Check that everything is as expected
518 verifyf( this.monitors != 0p, "Waiting with no monitors (%p)", this.monitors );
519 verifyf( this.monitor_count != 0, "Waiting with 0 monitors (%"PRIiFAST16")", this.monitor_count );
520
521 // Create storage for monitor context
522 monitor_ctx( this.monitors, this.monitor_count );
523
524 // Lock all monitors (aggregates the locks them as well)
525 lock_all( monitors, locks, count );
526
527
528 // Create the node specific to this wait operation
529 wait_ctx_primed( kernelTLS.this_thread, 0 )
530
531 //save contexts
532 monitor_save;
533
534 //Find the thread to run
535 thread_desc * signallee = pop_head( this.blocked )->waiting_thread;
536 /* paranoid */ verify( signallee->next == 0p );
537 set_owner( monitors, count, signallee, __ENTER_SIGNAL_BLOCK );
538
539 __cfaabi_dbg_print_buffer_decl( "Kernel : signal_block condition %p (s: %p)\n", &this, signallee );
540
541 // unlock all the monitors
542 unlock_all( locks, count );
543
544 // unpark the thread we signalled
545 unpark( signallee );
546
547 //Everything is ready to go to sleep
548 park();
549
550
551 // WE WOKE UP
552
553
554 __cfaabi_dbg_print_buffer_local( "Kernel : signal_block returned\n" );
555
556 //We are back, restore the masks and recursions
557 monitor_restore;
558
559 return true;
560}
561
562// Access the user_info of the thread waiting at the front of the queue
563uintptr_t front( condition & this ) {
564 verifyf( !is_empty(this),
565 "Attempt to access user data on an empty condition.\n"
566 "Possible cause is not checking if the condition is empty before reading stored data."
567 );
568 return ((typeof(this.blocked.head))this.blocked.head)->user_info;
569}
570
571//-----------------------------------------------------------------------------
572// External scheduling
573// cases to handle :
574// - target already there :
575// block and wake
576// - dtor already there
577// put thread on signaller stack
578// - non-blocking
579// return else
580// - timeout
581// return timeout
582// - block
583// setup mask
584// block
585void __waitfor_internal( const __waitfor_mask_t & mask, int duration ) {
586 // This statment doesn't have a contiguous list of monitors...
587 // Create one!
588 __lock_size_t max = count_max( mask );
589 monitor_desc * mon_storage[max];
590 __builtin_memset( mon_storage, 0, sizeof( mon_storage ) );
591 __lock_size_t actual_count = aggregate( mon_storage, mask );
592
593 __cfaabi_dbg_print_buffer_decl( "Kernel : waitfor %"PRIdFAST16" (s: %"PRIdFAST16", m: %"PRIdFAST16")\n", actual_count, mask.size, (__lock_size_t)max);
594
595 if(actual_count == 0) return;
596
597 __cfaabi_dbg_print_buffer_local( "Kernel : waitfor internal proceeding\n" );
598
599 // Create storage for monitor context
600 monitor_ctx( mon_storage, actual_count );
601
602 // Lock all monitors (aggregates the locks as well)
603 lock_all( monitors, locks, count );
604
605 {
606 // Check if the entry queue
607 thread_desc * next; int index;
608 [next, index] = search_entry_queue( mask, monitors, count );
609
610 if( next ) {
611 *mask.accepted = index;
612 __acceptable_t& accepted = mask[index];
613 if( accepted.is_dtor ) {
614 __cfaabi_dbg_print_buffer_local( "Kernel : dtor already there\n" );
615 verifyf( accepted.size == 1, "ERROR: Accepted dtor has more than 1 mutex parameter." );
616
617 monitor_desc * mon2dtor = accepted[0];
618 verifyf( mon2dtor->dtor_node, "ERROR: Accepted monitor has no dtor_node." );
619
620 __condition_criterion_t * dtor_crit = mon2dtor->dtor_node->criteria;
621 push( mon2dtor->signal_stack, dtor_crit );
622
623 unlock_all( locks, count );
624 }
625 else {
626 __cfaabi_dbg_print_buffer_local( "Kernel : thread present, baton-passing\n" );
627
628 // Create the node specific to this wait operation
629 wait_ctx_primed( kernelTLS.this_thread, 0 );
630
631 // Save monitor states
632 monitor_save;
633
634 __cfaabi_dbg_print_buffer_local( "Kernel : baton of %"PRIdFAST16" monitors : ", count );
635 #ifdef __CFA_DEBUG_PRINT__
636 for( int i = 0; i < count; i++) {
637 __cfaabi_dbg_print_buffer_local( "%p %p ", monitors[i], monitors[i]->signal_stack.top );
638 }
639 #endif
640 __cfaabi_dbg_print_buffer_local( "\n" );
641
642 // Set the owners to be the next thread
643 set_owner( monitors, count, next, __WAITFOR );
644
645 // unlock all the monitors
646 unlock_all( locks, count );
647
648 // unpark the thread we signalled
649 unpark( next );
650
651 //Everything is ready to go to sleep
652 park();
653
654 // We are back, restore the owners and recursions
655 monitor_restore;
656
657 __cfaabi_dbg_print_buffer_local( "Kernel : thread present, returned\n" );
658 }
659
660 __cfaabi_dbg_print_buffer_local( "Kernel : accepted %d\n", *mask.accepted);
661 return;
662 }
663 }
664
665
666 if( duration == 0 ) {
667 __cfaabi_dbg_print_buffer_local( "Kernel : non-blocking, exiting\n" );
668
669 unlock_all( locks, count );
670
671 __cfaabi_dbg_print_buffer_local( "Kernel : accepted %d\n", *mask.accepted);
672 return;
673 }
674
675
676 verifyf( duration < 0, "Timeout on waitfor statments not supported yet." );
677
678 __cfaabi_dbg_print_buffer_local( "Kernel : blocking waitfor\n" );
679
680 // Create the node specific to this wait operation
681 wait_ctx_primed( kernelTLS.this_thread, 0 );
682
683 monitor_save;
684 set_mask( monitors, count, mask );
685
686 for( __lock_size_t i = 0; i < count; i++) {
687 verify( monitors[i]->owner == kernelTLS.this_thread );
688 }
689
690 // unlock all the monitors
691 unlock_all( locks, count );
692
693 //Everything is ready to go to sleep
694 park();
695
696
697 // WE WOKE UP
698
699
700 //We are back, restore the masks and recursions
701 monitor_restore;
702
703 __cfaabi_dbg_print_buffer_local( "Kernel : exiting\n" );
704
705 __cfaabi_dbg_print_buffer_local( "Kernel : accepted %d\n", *mask.accepted);
706}
707
708//-----------------------------------------------------------------------------
709// Utilities
710
711static inline void set_owner( monitor_desc * this, thread_desc * owner, enum __Owner_Reason reason ) {
712 /* paranoid */ verify( this->lock.lock );
713
714 //Pass the monitor appropriately
715 this->owner = owner;
716 this->owner_reason = reason;
717
718 //We are passing the monitor to someone else, which means recursion level is not 0
719 this->recursion = owner ? 1 : 0;
720}
721
722static inline void set_owner( monitor_desc * monitors [], __lock_size_t count, thread_desc * owner, enum __Owner_Reason reason ) {
723 /* paranoid */ verify ( monitors[0]->lock.lock );
724 /* paranoid */ verifyf( monitors[0]->owner == kernelTLS.this_thread, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, monitors[0]->owner, monitors[0]->recursion, monitors[0] );
725 monitors[0]->owner = owner;
726 monitors[0]->owner_reason = reason;
727 monitors[0]->recursion = 1;
728 for( __lock_size_t i = 1; i < count; i++ ) {
729 /* paranoid */ verify ( monitors[i]->lock.lock );
730 /* paranoid */ verifyf( monitors[i]->owner == kernelTLS.this_thread, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, monitors[i]->owner, monitors[i]->recursion, monitors[i] );
731 monitors[i]->owner = owner;
732 monitors[i]->owner_reason = reason;
733 monitors[i]->recursion = 0;
734 }
735}
736
737static inline void set_mask( monitor_desc * storage [], __lock_size_t count, const __waitfor_mask_t & mask ) {
738 for( __lock_size_t i = 0; i < count; i++) {
739 storage[i]->mask = mask;
740 }
741}
742
743static inline void reset_mask( monitor_desc * this ) {
744 this->mask.accepted = 0p;
745 this->mask.data = 0p;
746 this->mask.size = 0;
747}
748
749static inline thread_desc * next_thread( monitor_desc * this, enum __Owner_Reason reason ) {
750 //Check the signaller stack
751 __cfaabi_dbg_print_safe( "Kernel : mon %p AS-stack top %p\n", this, this->signal_stack.top);
752 __condition_criterion_t * urgent = pop( this->signal_stack );
753 if( urgent ) {
754 //The signaller stack is not empty,
755 //regardless of if we are ready to baton pass,
756 //we need to set the monitor as in use
757 /* paranoid */ verifyf( !this->owner || kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
758 set_owner( this, urgent->owner->waiting_thread, reason );
759
760 return check_condition( urgent );
761 }
762
763 // No signaller thread
764 // Get the next thread in the entry_queue
765 thread_desc * new_owner = pop_head( this->entry_queue );
766 /* paranoid */ verifyf( !this->owner || kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
767 /* paranoid */ verify( !new_owner || new_owner->next == 0p );
768 set_owner( this, new_owner, reason );
769
770 return new_owner;
771}
772
773static inline bool is_accepted( monitor_desc * this, const __monitor_group_t & group ) {
774 __acceptable_t * it = this->mask.data; // Optim
775 __lock_size_t count = this->mask.size;
776
777 // Check if there are any acceptable functions
778 if( !it ) return false;
779
780 // If this isn't the first monitor to test this, there is no reason to repeat the test.
781 if( this != group[0] ) return group[0]->mask.accepted >= 0;
782
783 // For all acceptable functions check if this is the current function.
784 for( __lock_size_t i = 0; i < count; i++, it++ ) {
785 if( *it == group ) {
786 *this->mask.accepted = i;
787 return true;
788 }
789 }
790
791 // No function matched
792 return false;
793}
794
795static inline void init( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ) {
796 for( __lock_size_t i = 0; i < count; i++) {
797 (criteria[i]){ monitors[i], waiter };
798 }
799
800 waiter.criteria = criteria;
801}
802
803static inline void init_push( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ) {
804 for( __lock_size_t i = 0; i < count; i++) {
805 (criteria[i]){ monitors[i], waiter };
806 __cfaabi_dbg_print_safe( "Kernel : target %p = %p\n", criteria[i].target, &criteria[i] );
807 push( criteria[i].target->signal_stack, &criteria[i] );
808 }
809
810 waiter.criteria = criteria;
811}
812
813static inline void lock_all( __spinlock_t * locks [], __lock_size_t count ) {
814 for( __lock_size_t i = 0; i < count; i++ ) {
815 lock( *locks[i] __cfaabi_dbg_ctx2 );
816 }
817}
818
819static inline void lock_all( monitor_desc * source [], __spinlock_t * /*out*/ locks [], __lock_size_t count ) {
820 for( __lock_size_t i = 0; i < count; i++ ) {
821 __spinlock_t * l = &source[i]->lock;
822 lock( *l __cfaabi_dbg_ctx2 );
823 if(locks) locks[i] = l;
824 }
825}
826
827static inline void unlock_all( __spinlock_t * locks [], __lock_size_t count ) {
828 for( __lock_size_t i = 0; i < count; i++ ) {
829 unlock( *locks[i] );
830 }
831}
832
833static inline void unlock_all( monitor_desc * locks [], __lock_size_t count ) {
834 for( __lock_size_t i = 0; i < count; i++ ) {
835 unlock( locks[i]->lock );
836 }
837}
838
839static inline void save(
840 monitor_desc * ctx [],
841 __lock_size_t count,
842 __attribute((unused)) __spinlock_t * locks [],
843 unsigned int /*out*/ recursions [],
844 __waitfor_mask_t /*out*/ masks []
845) {
846 for( __lock_size_t i = 0; i < count; i++ ) {
847 recursions[i] = ctx[i]->recursion;
848 masks[i] = ctx[i]->mask;
849 }
850}
851
852static inline void restore(
853 monitor_desc * ctx [],
854 __lock_size_t count,
855 __spinlock_t * locks [],
856 unsigned int /*out*/ recursions [],
857 __waitfor_mask_t /*out*/ masks []
858) {
859 lock_all( locks, count );
860 for( __lock_size_t i = 0; i < count; i++ ) {
861 ctx[i]->recursion = recursions[i];
862 ctx[i]->mask = masks[i];
863 }
864 unlock_all( locks, count );
865}
866
867// Function has 2 different behavior
868// 1 - Marks a monitors as being ready to run
869// 2 - Checks if all the monitors are ready to run
870// if so return the thread to run
871static inline thread_desc * check_condition( __condition_criterion_t * target ) {
872 __condition_node_t * node = target->owner;
873 unsigned short count = node->count;
874 __condition_criterion_t * criteria = node->criteria;
875
876 bool ready2run = true;
877
878 for( int i = 0; i < count; i++ ) {
879
880 // __cfaabi_dbg_print_safe( "Checking %p for %p\n", &criteria[i], target );
881 if( &criteria[i] == target ) {
882 criteria[i].ready = true;
883 // __cfaabi_dbg_print_safe( "True\n" );
884 }
885
886 ready2run = criteria[i].ready && ready2run;
887 }
888
889 __cfaabi_dbg_print_safe( "Kernel : Runing %i (%p)\n", ready2run, ready2run ? node->waiting_thread : 0p );
890 return ready2run ? node->waiting_thread : 0p;
891}
892
893static inline void brand_condition( condition & this ) {
894 thread_desc * thrd = TL_GET( this_thread );
895 if( !this.monitors ) {
896 // __cfaabi_dbg_print_safe( "Branding\n" );
897 assertf( thrd->monitors.data != 0p, "No current monitor to brand condition %p", thrd->monitors.data );
898 this.monitor_count = thrd->monitors.size;
899
900 this.monitors = (monitor_desc **)malloc( this.monitor_count * sizeof( *this.monitors ) );
901 for( int i = 0; i < this.monitor_count; i++ ) {
902 this.monitors[i] = thrd->monitors[i];
903 }
904 }
905}
906
907static inline [thread_desc *, int] search_entry_queue( const __waitfor_mask_t & mask, monitor_desc * monitors [], __lock_size_t count ) {
908
909 __queue_t(thread_desc) & entry_queue = monitors[0]->entry_queue;
910
911 // For each thread in the entry-queue
912 for( thread_desc ** thrd_it = &entry_queue.head;
913 *thrd_it != 1p;
914 thrd_it = &(*thrd_it)->next
915 ) {
916 // For each acceptable check if it matches
917 int i = 0;
918 __acceptable_t * end = end (mask);
919 __acceptable_t * begin = begin(mask);
920 for( __acceptable_t * it = begin; it != end; it++, i++ ) {
921 // Check if we have a match
922 if( *it == (*thrd_it)->monitors ) {
923
924 // If we have a match return it
925 // after removeing it from the entry queue
926 return [remove( entry_queue, thrd_it ), i];
927 }
928 }
929 }
930
931 return [0, -1];
932}
933
934forall(dtype T | sized( T ))
935static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) {
936 if( !val ) return size;
937
938 for( __lock_size_t i = 0; i <= size; i++) {
939 if( array[i] == val ) return size;
940 }
941
942 array[size] = val;
943 size = size + 1;
944 return size;
945}
946
947static inline __lock_size_t count_max( const __waitfor_mask_t & mask ) {
948 __lock_size_t max = 0;
949 for( __lock_size_t i = 0; i < mask.size; i++ ) {
950 __acceptable_t & accepted = mask[i];
951 max += accepted.size;
952 }
953 return max;
954}
955
956static inline __lock_size_t aggregate( monitor_desc * storage [], const __waitfor_mask_t & mask ) {
957 __lock_size_t size = 0;
958 for( __lock_size_t i = 0; i < mask.size; i++ ) {
959 __acceptable_t & accepted = mask[i];
960 __libcfa_small_sort( accepted.data, accepted.size );
961 for( __lock_size_t j = 0; j < accepted.size; j++) {
962 insert_unique( storage, size, accepted[j] );
963 }
964 }
965 // TODO insertion sort instead of this
966 __libcfa_small_sort( storage, size );
967 return size;
968}
969
970// Local Variables: //
971// mode: c //
972// tab-width: 4 //
973// End: //
Note: See TracBrowser for help on using the repository browser.