source: libcfa/src/concurrency/monitor.cfa @ 3381ed7

ADTarm-ehast-experimentalenumforall-pointer-decayjacob/cs343-translationjenkins-sandboxnew-astnew-ast-unique-exprpthread-emulationqualifiedEnum
Last change on this file since 3381ed7 was 3381ed7, checked in by Thierry Delisle <tdelisle@…>, 5 years ago

Added park/unpark primitives thread and removed BlockInternal?.
Converted monitors to use park unpark.
Intrusive Queue now mark next field when thread is inside queue.
Added several asserts to kernel and monitor.
Added a few tests for park and unpark.

  • Property mode set to 100644
File size: 32.5 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// monitor_desc.c --
8//
9// Author           : Thierry Delisle
10// Created On       : Thd Feb 23 12:27:26 2017
11// Last Modified By : Peter A. Buhr
12// Last Modified On : Wed Dec  4 07:55:14 2019
13// Update Count     : 10
14//
15
16#define __cforall_thread__
17
18#include "monitor.hfa"
19
20#include <stdlib.hfa>
21#include <inttypes.h>
22
23#include "kernel_private.hfa"
24
25#include "bits/algorithm.hfa"
26
27//-----------------------------------------------------------------------------
28// Forward declarations
29static inline void set_owner ( monitor_desc * this, thread_desc * owner, enum __Owner_Reason );
30static inline void set_owner ( monitor_desc * storage [], __lock_size_t count, thread_desc * owner, enum __Owner_Reason );
31static inline void set_mask  ( monitor_desc * storage [], __lock_size_t count, const __waitfor_mask_t & mask );
32static inline void reset_mask( monitor_desc * this );
33
34static inline thread_desc * next_thread( monitor_desc * this, enum __Owner_Reason );
35static inline bool is_accepted( monitor_desc * this, const __monitor_group_t & monitors );
36
37static inline void lock_all  ( __spinlock_t * locks [], __lock_size_t count );
38static inline void lock_all  ( monitor_desc * source [], __spinlock_t * /*out*/ locks [], __lock_size_t count );
39static inline void unlock_all( __spinlock_t * locks [], __lock_size_t count );
40static inline void unlock_all( monitor_desc * locks [], __lock_size_t count );
41
42static inline void save   ( monitor_desc * ctx [], __lock_size_t count, __spinlock_t * locks [], unsigned int /*out*/ recursions [], __waitfor_mask_t /*out*/ masks [] );
43static inline void restore( monitor_desc * ctx [], __lock_size_t count, __spinlock_t * locks [], unsigned int /*in */ recursions [], __waitfor_mask_t /*in */ masks [] );
44
45static inline void init     ( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] );
46static inline void init_push( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] );
47
48static inline thread_desc *        check_condition   ( __condition_criterion_t * );
49static inline void                 brand_condition   ( condition & );
50static inline [thread_desc *, int] search_entry_queue( const __waitfor_mask_t &, monitor_desc * monitors [], __lock_size_t count );
51
52forall(dtype T | sized( T ))
53static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val );
54static inline __lock_size_t count_max    ( const __waitfor_mask_t & mask );
55static inline __lock_size_t aggregate    ( monitor_desc * storage [], const __waitfor_mask_t & mask );
56
57//-----------------------------------------------------------------------------
58// Useful defines
59#define wait_ctx(thrd, user_info)                               /* Create the necessary information to use the signaller stack                         */ \
60        __condition_node_t waiter = { thrd, count, user_info };   /* Create the node specific to this wait operation                                     */ \
61        __condition_criterion_t criteria[count];                  /* Create the creteria this wait operation needs to wake up                            */ \
62        init( count, monitors, waiter, criteria );                /* Link everything together                                                            */ \
63
64#define wait_ctx_primed(thrd, user_info)                        /* Create the necessary information to use the signaller stack                         */ \
65        __condition_node_t waiter = { thrd, count, user_info };   /* Create the node specific to this wait operation                                     */ \
66        __condition_criterion_t criteria[count];                  /* Create the creteria this wait operation needs to wake up                            */ \
67        init_push( count, monitors, waiter, criteria );           /* Link everything together and push it to the AS-Stack                                */ \
68
69#define monitor_ctx( mons, cnt )                                /* Define that create the necessary struct for internal/external scheduling operations */ \
70        monitor_desc ** monitors = mons;                          /* Save the targeted monitors                                                          */ \
71        __lock_size_t count = cnt;                                /* Save the count to a local variable                                                  */ \
72        unsigned int recursions[ count ];                         /* Save the current recursion levels to restore them later                             */ \
73        __waitfor_mask_t masks [ count ];                         /* Save the current waitfor masks to restore them later                                */ \
74        __spinlock_t *   locks [ count ];                         /* We need to pass-in an array of locks to BlockInternal                               */ \
75
76#define monitor_save    save   ( monitors, count, locks, recursions, masks )
77#define monitor_restore restore( monitors, count, locks, recursions, masks )
78
79
80//-----------------------------------------------------------------------------
81// Enter/Leave routines
82
83
84extern "C" {
85        // Enter single monitor
86        static void __enter_monitor_desc( monitor_desc * this, const __monitor_group_t & group ) {
87                // Lock the monitor spinlock
88                lock( this->lock __cfaabi_dbg_ctx2 );
89                // Interrupts disable inside critical section
90                thread_desc * thrd = kernelTLS.this_thread;
91
92                __cfaabi_dbg_print_safe( "Kernel : %10p Entering mon %p (%p)\n", thrd, this, this->owner);
93
94                if( !this->owner ) {
95                        // No one has the monitor, just take it
96                        set_owner( this, thrd, __ENTER_FREE );
97
98                        __cfaabi_dbg_print_safe( "Kernel :  mon is free \n" );
99                }
100                else if( this->owner == thrd) {
101                        // We already have the monitor, just note how many times we took it
102                        this->recursion += 1;
103
104                        __cfaabi_dbg_print_safe( "Kernel :  mon already owned \n" );
105                }
106                else if( is_accepted( this, group) ) {
107                        // Some one was waiting for us, enter
108                        set_owner( this, thrd, __ENTER_ACCEPT );
109
110                        // Reset mask
111                        reset_mask( this );
112
113                        __cfaabi_dbg_print_safe( "Kernel :  mon accepts \n" );
114                }
115                else {
116                        __cfaabi_dbg_print_safe( "Kernel :  blocking \n" );
117
118                        // Some one else has the monitor, wait in line for it
119                        /* paranoid */ verify( thrd->next == 0p );
120                        append( this->entry_queue, thrd );
121                        /* paranoid */ verify( thrd->next == 1p );
122
123                        unlock( this->lock );
124                        park();
125
126                        __cfaabi_dbg_print_safe( "Kernel : %10p Entered  mon %p\n", thrd, this);
127
128                        /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
129                        return;
130                }
131
132                __cfaabi_dbg_print_safe( "Kernel : %10p Entered  mon %p\n", thrd, this);
133
134                /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
135                /* paranoid */ verify( this->lock.lock );
136
137                // Release the lock and leave
138                unlock( this->lock );
139                return;
140        }
141
142        static void __enter_monitor_dtor( monitor_desc * this, fptr_t func ) {
143                // Lock the monitor spinlock
144                lock( this->lock __cfaabi_dbg_ctx2 );
145                // Interrupts disable inside critical section
146                thread_desc * thrd = kernelTLS.this_thread;
147
148                __cfaabi_dbg_print_safe( "Kernel : %10p Entering dtor for mon %p (%p)\n", thrd, this, this->owner);
149
150
151                if( !this->owner ) {
152                        __cfaabi_dbg_print_safe( "Kernel : Destroying free mon %p\n", this);
153
154                        // No one has the monitor, just take it
155                        set_owner( this, thrd, __ENTER_DTOR_FREE );
156
157                        verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
158
159                        unlock( this->lock );
160                        return;
161                }
162                else if( this->owner == thrd) {
163                        // We already have the monitor... but where about to destroy it so the nesting will fail
164                        // Abort!
165                        abort( "Attempt to destroy monitor %p by thread \"%.256s\" (%p) in nested mutex.", this, thrd->self_cor.name, thrd );
166                }
167
168                __lock_size_t count = 1;
169                monitor_desc ** monitors = &this;
170                __monitor_group_t group = { &this, 1, func };
171                if( is_accepted( this, group) ) {
172                        __cfaabi_dbg_print_safe( "Kernel :  mon accepts dtor, block and signal it \n" );
173
174                        // Wake the thread that is waiting for this
175                        __condition_criterion_t * urgent = pop( this->signal_stack );
176                        /* paranoid */ verify( urgent );
177
178                        // Reset mask
179                        reset_mask( this );
180
181                        // Create the node specific to this wait operation
182                        wait_ctx_primed( thrd, 0 )
183
184                        // Some one else has the monitor, wait for him to finish and then run
185                        unlock( this->lock );
186
187                        // Release the next thread
188                        /* paranoid */ verifyf( urgent->owner->waiting_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
189                        unpark( urgent->owner->waiting_thread );
190
191                        // Park current thread waiting
192                        park();
193
194                        // Some one was waiting for us, enter
195                        /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
196                }
197                else {
198                        __cfaabi_dbg_print_safe( "Kernel :  blocking \n" );
199
200                        wait_ctx( thrd, 0 )
201                        this->dtor_node = &waiter;
202
203                        // Some one else has the monitor, wait in line for it
204                        /* paranoid */ verify( thrd->next == 0p );
205                        append( this->entry_queue, thrd );
206                        /* paranoid */ verify( thrd->next == 1p );
207                        unlock( this->lock );
208
209                        // Park current thread waiting
210                        park();
211
212                        /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
213                        return;
214                }
215
216                __cfaabi_dbg_print_safe( "Kernel : Destroying %p\n", this);
217
218        }
219
220        // Leave single monitor
221        void __leave_monitor_desc( monitor_desc * this ) {
222                // Lock the monitor spinlock
223                lock( this->lock __cfaabi_dbg_ctx2 );
224
225                __cfaabi_dbg_print_safe( "Kernel : %10p Leaving mon %p (%p)\n", kernelTLS.this_thread, this, this->owner);
226
227                /* paranoid */ verifyf( kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
228
229                // Leaving a recursion level, decrement the counter
230                this->recursion -= 1;
231
232                // If we haven't left the last level of recursion
233                // it means we don't need to do anything
234                if( this->recursion != 0) {
235                        __cfaabi_dbg_print_safe( "Kernel :  recursion still %d\n", this->recursion);
236                        unlock( this->lock );
237                        return;
238                }
239
240                // Get the next thread, will be null on low contention monitor
241                thread_desc * new_owner = next_thread( this, __LEAVE );
242
243                // Check the new owner is consistent with who we wake-up
244                // new_owner might be null even if someone owns the monitor when the owner is still waiting for another monitor
245                /* paranoid */ verifyf( !new_owner || new_owner == this->owner, "Expected owner to be %p, got %p (m: %p)", new_owner, this->owner, this );
246
247                // We can now let other threads in safely
248                unlock( this->lock );
249
250                //We need to wake-up the thread
251                /* paranoid */ verifyf( !new_owner || new_owner == this->owner, "Expected owner to be %p, got %p (m: %p)", new_owner, this->owner, this );
252                unpark( new_owner );
253        }
254
255        // Leave single monitor for the last time
256        void __leave_dtor_monitor_desc( monitor_desc * this ) {
257                __cfaabi_dbg_debug_do(
258                        if( TL_GET( this_thread ) != this->owner ) {
259                                abort( "Destroyed monitor %p has inconsistent owner, expected %p got %p.\n", this, TL_GET( this_thread ), this->owner);
260                        }
261                        if( this->recursion != 1 ) {
262                                abort( "Destroyed monitor %p has %d outstanding nested calls.\n", this, this->recursion - 1);
263                        }
264                )
265        }
266
267        // Leave the thread monitor
268        // last routine called by a thread.
269        // Should never return
270        void __leave_thread_monitor() {
271                thread_desc * thrd = TL_GET( this_thread );
272                monitor_desc * this = &thrd->self_mon;
273
274                // Lock the monitor now
275                lock( this->lock __cfaabi_dbg_ctx2 );
276
277                disable_interrupts();
278
279                thrd->self_cor.state = Halted;
280
281                /* paranoid */ verifyf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", thrd, this->owner, this->recursion, this );
282
283                // Leaving a recursion level, decrement the counter
284                this->recursion -= 1;
285
286                // If we haven't left the last level of recursion
287                // it must mean there is an error
288                if( this->recursion != 0) { abort( "Thread internal monitor has unbalanced recursion" ); }
289
290                // Fetch the next thread, can be null
291                thread_desc * new_owner = next_thread( this, __LEAVE_THREAD );
292
293                // Release the monitor lock
294                unlock( this->lock );
295
296                // Unpark the next owner if needed
297                /* paranoid */ verifyf( !new_owner || new_owner == this->owner, "Expected owner to be %p, got %p (m: %p)", new_owner, this->owner, this );
298                unpark( new_owner );
299
300                // Leave the thread, this will unlock the spinlock
301                // Use leave thread instead of park which is
302                // specialized for this case
303                LeaveThread();
304
305                // Control flow should never reach here!
306        }
307}
308
309// Enter multiple monitor
310// relies on the monitor array being sorted
311static inline void enter( __monitor_group_t monitors ) {
312        for( __lock_size_t i = 0; i < monitors.size; i++) {
313                __enter_monitor_desc( monitors[i], monitors );
314        }
315}
316
317// Leave multiple monitor
318// relies on the monitor array being sorted
319static inline void leave(monitor_desc * monitors [], __lock_size_t count) {
320        for( __lock_size_t i = count - 1; i >= 0; i--) {
321                __leave_monitor_desc( monitors[i] );
322        }
323}
324
325// Ctor for monitor guard
326// Sorts monitors before entering
327void ?{}( monitor_guard_t & this, monitor_desc * m [], __lock_size_t count, fptr_t func ) {
328        thread_desc * thrd = TL_GET( this_thread );
329
330        // Store current array
331        this.m = m;
332        this.count = count;
333
334        // Sort monitors based on address
335        __libcfa_small_sort(this.m, count);
336
337        // Save previous thread context
338        this.prev = thrd->monitors;
339
340        // Update thread context (needed for conditions)
341        (thrd->monitors){m, count, func};
342
343        // __cfaabi_dbg_print_safe( "MGUARD : enter %d\n", count);
344
345        // Enter the monitors in order
346        __monitor_group_t group = {this.m, this.count, func};
347        enter( group );
348
349        // __cfaabi_dbg_print_safe( "MGUARD : entered\n" );
350}
351
352
353// Dtor for monitor guard
354void ^?{}( monitor_guard_t & this ) {
355        // __cfaabi_dbg_print_safe( "MGUARD : leaving %d\n", this.count);
356
357        // Leave the monitors in order
358        leave( this.m, this.count );
359
360        // __cfaabi_dbg_print_safe( "MGUARD : left\n" );
361
362        // Restore thread context
363        TL_GET( this_thread )->monitors = this.prev;
364}
365
366// Ctor for monitor guard
367// Sorts monitors before entering
368void ?{}( monitor_dtor_guard_t & this, monitor_desc * m [], fptr_t func ) {
369        // optimization
370        thread_desc * thrd = TL_GET( this_thread );
371
372        // Store current array
373        this.m = *m;
374
375        // Save previous thread context
376        this.prev = thrd->monitors;
377
378        // Update thread context (needed for conditions)
379        (thrd->monitors){m, 1, func};
380
381        __enter_monitor_dtor( this.m, func );
382}
383
384// Dtor for monitor guard
385void ^?{}( monitor_dtor_guard_t & this ) {
386        // Leave the monitors in order
387        __leave_dtor_monitor_desc( this.m );
388
389        // Restore thread context
390        TL_GET( this_thread )->monitors = this.prev;
391}
392
393//-----------------------------------------------------------------------------
394// Internal scheduling types
395void ?{}(__condition_node_t & this, thread_desc * waiting_thread, __lock_size_t count, uintptr_t user_info ) {
396        this.waiting_thread = waiting_thread;
397        this.count = count;
398        this.next = 0p;
399        this.user_info = user_info;
400}
401
402void ?{}(__condition_criterion_t & this ) with( this ) {
403        ready  = false;
404        target = 0p;
405        owner  = 0p;
406        next   = 0p;
407}
408
409void ?{}(__condition_criterion_t & this, monitor_desc * target, __condition_node_t & owner ) {
410        this.ready  = false;
411        this.target = target;
412        this.owner  = &owner;
413        this.next   = 0p;
414}
415
416//-----------------------------------------------------------------------------
417// Internal scheduling
418void wait( condition & this, uintptr_t user_info = 0 ) {
419        brand_condition( this );
420
421        // Check that everything is as expected
422        assertf( this.monitors != 0p, "Waiting with no monitors (%p)", this.monitors );
423        verifyf( this.monitor_count != 0, "Waiting with 0 monitors (%"PRIiFAST16")", this.monitor_count );
424        verifyf( this.monitor_count < 32u, "Excessive monitor count (%"PRIiFAST16")", this.monitor_count );
425
426        // Create storage for monitor context
427        monitor_ctx( this.monitors, this.monitor_count );
428
429        // Create the node specific to this wait operation
430        wait_ctx( TL_GET( this_thread ), user_info );
431
432        // Append the current wait operation to the ones already queued on the condition
433        // We don't need locks for that since conditions must always be waited on inside monitor mutual exclusion
434        /* paranoid */ verify( waiter.next == 0p );
435        append( this.blocked, &waiter );
436        /* paranoid */ verify( waiter.next == 1p );
437
438        // Lock all monitors (aggregates the locks as well)
439        lock_all( monitors, locks, count );
440
441        // Find the next thread(s) to run
442        __lock_size_t thread_count = 0;
443        thread_desc * threads[ count ];
444        __builtin_memset( threads, 0, sizeof( threads ) );
445
446        // Save monitor states
447        monitor_save;
448
449        // Remove any duplicate threads
450        for( __lock_size_t i = 0; i < count; i++) {
451                thread_desc * new_owner = next_thread( monitors[i], __WAIT );
452                insert_unique( threads, thread_count, new_owner );
453        }
454
455        // Unlock the locks, we don't need them anymore
456        for(int i = 0; i < count; i++) {
457                unlock( *locks[i] );
458        }
459
460        // Wake the threads
461        for(int i = 0; i < thread_count; i++) {
462                unpark( threads[i] );
463        }
464
465        // Everything is ready to go to sleep
466        park();
467
468        // We are back, restore the owners and recursions
469        monitor_restore;
470}
471
472bool signal( condition & this ) {
473        if( is_empty( this ) ) { return false; }
474
475        //Check that everything is as expected
476        verify( this.monitors );
477        verify( this.monitor_count != 0 );
478
479        //Some more checking in debug
480        __cfaabi_dbg_debug_do(
481                thread_desc * this_thrd = TL_GET( this_thread );
482                if ( this.monitor_count != this_thrd->monitors.size ) {
483                        abort( "Signal on condition %p made with different number of monitor(s), expected %zi got %zi", &this, this.monitor_count, this_thrd->monitors.size );
484                }
485
486                for(int i = 0; i < this.monitor_count; i++) {
487                        if ( this.monitors[i] != this_thrd->monitors[i] ) {
488                                abort( "Signal on condition %p made with different monitor, expected %p got %p", &this, this.monitors[i], this_thrd->monitors[i] );
489                        }
490                }
491        );
492
493        __lock_size_t count = this.monitor_count;
494
495        // Lock all monitors
496        lock_all( this.monitors, 0p, count );
497
498        //Pop the head of the waiting queue
499        __condition_node_t * node = pop_head( this.blocked );
500
501        //Add the thread to the proper AS stack
502        for(int i = 0; i < count; i++) {
503                __condition_criterion_t * crit = &node->criteria[i];
504                assert( !crit->ready );
505                push( crit->target->signal_stack, crit );
506        }
507
508        //Release
509        unlock_all( this.monitors, count );
510
511        return true;
512}
513
514bool signal_block( condition & this ) {
515        if( !this.blocked.head ) { return false; }
516
517        //Check that everything is as expected
518        verifyf( this.monitors != 0p, "Waiting with no monitors (%p)", this.monitors );
519        verifyf( this.monitor_count != 0, "Waiting with 0 monitors (%"PRIiFAST16")", this.monitor_count );
520
521        // Create storage for monitor context
522        monitor_ctx( this.monitors, this.monitor_count );
523
524        // Lock all monitors (aggregates the locks them as well)
525        lock_all( monitors, locks, count );
526
527
528        // Create the node specific to this wait operation
529        wait_ctx_primed( kernelTLS.this_thread, 0 )
530
531        //save contexts
532        monitor_save;
533
534        //Find the thread to run
535        thread_desc * signallee = pop_head( this.blocked )->waiting_thread;
536        /* paranoid */ verify( signallee->next == 0p );
537        set_owner( monitors, count, signallee, __ENTER_SIGNAL_BLOCK );
538
539        __cfaabi_dbg_print_buffer_decl( "Kernel : signal_block condition %p (s: %p)\n", &this, signallee );
540
541        // unlock all the monitors
542        unlock_all( locks, count );
543
544        // unpark the thread we signalled
545        unpark( signallee );
546
547        //Everything is ready to go to sleep
548        park();
549
550
551        // WE WOKE UP
552
553
554        __cfaabi_dbg_print_buffer_local( "Kernel :   signal_block returned\n" );
555
556        //We are back, restore the masks and recursions
557        monitor_restore;
558
559        return true;
560}
561
562// Access the user_info of the thread waiting at the front of the queue
563uintptr_t front( condition & this ) {
564        verifyf( !is_empty(this),
565                "Attempt to access user data on an empty condition.\n"
566                "Possible cause is not checking if the condition is empty before reading stored data."
567        );
568        return ((typeof(this.blocked.head))this.blocked.head)->user_info;
569}
570
571//-----------------------------------------------------------------------------
572// External scheduling
573// cases to handle :
574//      - target already there :
575//              block and wake
576//      - dtor already there
577//              put thread on signaller stack
578//      - non-blocking
579//              return else
580//      - timeout
581//              return timeout
582//      - block
583//              setup mask
584//              block
585void __waitfor_internal( const __waitfor_mask_t & mask, int duration ) {
586        // This statment doesn't have a contiguous list of monitors...
587        // Create one!
588        __lock_size_t max = count_max( mask );
589        monitor_desc * mon_storage[max];
590        __builtin_memset( mon_storage, 0, sizeof( mon_storage ) );
591        __lock_size_t actual_count = aggregate( mon_storage, mask );
592
593        __cfaabi_dbg_print_buffer_decl( "Kernel : waitfor %"PRIdFAST16" (s: %"PRIdFAST16", m: %"PRIdFAST16")\n", actual_count, mask.size, (__lock_size_t)max);
594
595        if(actual_count == 0) return;
596
597        __cfaabi_dbg_print_buffer_local( "Kernel : waitfor internal proceeding\n" );
598
599        // Create storage for monitor context
600        monitor_ctx( mon_storage, actual_count );
601
602        // Lock all monitors (aggregates the locks as well)
603        lock_all( monitors, locks, count );
604
605        {
606                // Check if the entry queue
607                thread_desc * next; int index;
608                [next, index] = search_entry_queue( mask, monitors, count );
609
610                if( next ) {
611                        *mask.accepted = index;
612                        __acceptable_t& accepted = mask[index];
613                        if( accepted.is_dtor ) {
614                                __cfaabi_dbg_print_buffer_local( "Kernel : dtor already there\n" );
615                                verifyf( accepted.size == 1,  "ERROR: Accepted dtor has more than 1 mutex parameter." );
616
617                                monitor_desc * mon2dtor = accepted[0];
618                                verifyf( mon2dtor->dtor_node, "ERROR: Accepted monitor has no dtor_node." );
619
620                                __condition_criterion_t * dtor_crit = mon2dtor->dtor_node->criteria;
621                                push( mon2dtor->signal_stack, dtor_crit );
622
623                                unlock_all( locks, count );
624                        }
625                        else {
626                                __cfaabi_dbg_print_buffer_local( "Kernel : thread present, baton-passing\n" );
627
628                                // Create the node specific to this wait operation
629                                wait_ctx_primed( kernelTLS.this_thread, 0 );
630
631                                // Save monitor states
632                                monitor_save;
633
634                                __cfaabi_dbg_print_buffer_local( "Kernel :  baton of %"PRIdFAST16" monitors : ", count );
635                                #ifdef __CFA_DEBUG_PRINT__
636                                        for( int i = 0; i < count; i++) {
637                                                __cfaabi_dbg_print_buffer_local( "%p %p ", monitors[i], monitors[i]->signal_stack.top );
638                                        }
639                                #endif
640                                __cfaabi_dbg_print_buffer_local( "\n" );
641
642                                // Set the owners to be the next thread
643                                set_owner( monitors, count, next, __WAITFOR );
644
645                                // unlock all the monitors
646                                unlock_all( locks, count );
647
648                                // unpark the thread we signalled
649                                unpark( next );
650
651                                //Everything is ready to go to sleep
652                                park();
653
654                                // We are back, restore the owners and recursions
655                                monitor_restore;
656
657                                __cfaabi_dbg_print_buffer_local( "Kernel : thread present, returned\n" );
658                        }
659
660                        __cfaabi_dbg_print_buffer_local( "Kernel : accepted %d\n", *mask.accepted);
661                        return;
662                }
663        }
664
665
666        if( duration == 0 ) {
667                __cfaabi_dbg_print_buffer_local( "Kernel : non-blocking, exiting\n" );
668
669                unlock_all( locks, count );
670
671                __cfaabi_dbg_print_buffer_local( "Kernel : accepted %d\n", *mask.accepted);
672                return;
673        }
674
675
676        verifyf( duration < 0, "Timeout on waitfor statments not supported yet." );
677
678        __cfaabi_dbg_print_buffer_local( "Kernel : blocking waitfor\n" );
679
680        // Create the node specific to this wait operation
681        wait_ctx_primed( kernelTLS.this_thread, 0 );
682
683        monitor_save;
684        set_mask( monitors, count, mask );
685
686        for( __lock_size_t i = 0; i < count; i++) {
687                verify( monitors[i]->owner == kernelTLS.this_thread );
688        }
689
690        // unlock all the monitors
691        unlock_all( locks, count );
692
693        //Everything is ready to go to sleep
694        park();
695
696
697        // WE WOKE UP
698
699
700        //We are back, restore the masks and recursions
701        monitor_restore;
702
703        __cfaabi_dbg_print_buffer_local( "Kernel : exiting\n" );
704
705        __cfaabi_dbg_print_buffer_local( "Kernel : accepted %d\n", *mask.accepted);
706}
707
708//-----------------------------------------------------------------------------
709// Utilities
710
711static inline void set_owner( monitor_desc * this, thread_desc * owner, enum __Owner_Reason reason ) {
712        /* paranoid */ verify( this->lock.lock );
713
714        //Pass the monitor appropriately
715        this->owner = owner;
716        this->owner_reason = reason;
717
718        //We are passing the monitor to someone else, which means recursion level is not 0
719        this->recursion = owner ? 1 : 0;
720}
721
722static inline void set_owner( monitor_desc * monitors [], __lock_size_t count, thread_desc * owner, enum __Owner_Reason reason ) {
723        /* paranoid */ verify ( monitors[0]->lock.lock );
724        /* paranoid */ verifyf( monitors[0]->owner == kernelTLS.this_thread, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, monitors[0]->owner, monitors[0]->recursion, monitors[0] );
725        monitors[0]->owner        = owner;
726        monitors[0]->owner_reason = reason;
727        monitors[0]->recursion    = 1;
728        for( __lock_size_t i = 1; i < count; i++ ) {
729                /* paranoid */ verify ( monitors[i]->lock.lock );
730                /* paranoid */ verifyf( monitors[i]->owner == kernelTLS.this_thread, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, monitors[i]->owner, monitors[i]->recursion, monitors[i] );
731                monitors[i]->owner        = owner;
732                monitors[i]->owner_reason = reason;
733                monitors[i]->recursion    = 0;
734        }
735}
736
737static inline void set_mask( monitor_desc * storage [], __lock_size_t count, const __waitfor_mask_t & mask ) {
738        for( __lock_size_t i = 0; i < count; i++) {
739                storage[i]->mask = mask;
740        }
741}
742
743static inline void reset_mask( monitor_desc * this ) {
744        this->mask.accepted = 0p;
745        this->mask.data = 0p;
746        this->mask.size = 0;
747}
748
749static inline thread_desc * next_thread( monitor_desc * this, enum __Owner_Reason reason ) {
750        //Check the signaller stack
751        __cfaabi_dbg_print_safe( "Kernel :  mon %p AS-stack top %p\n", this, this->signal_stack.top);
752        __condition_criterion_t * urgent = pop( this->signal_stack );
753        if( urgent ) {
754                //The signaller stack is not empty,
755                //regardless of if we are ready to baton pass,
756                //we need to set the monitor as in use
757                /* paranoid */ verifyf( !this->owner || kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
758                set_owner( this,  urgent->owner->waiting_thread, reason );
759
760                return check_condition( urgent );
761        }
762
763        // No signaller thread
764        // Get the next thread in the entry_queue
765        thread_desc * new_owner = pop_head( this->entry_queue );
766        /* paranoid */ verifyf( !this->owner || kernelTLS.this_thread == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", kernelTLS.this_thread, this->owner, this->recursion, this );
767        /* paranoid */ verify( !new_owner || new_owner->next == 0p );
768        set_owner( this, new_owner, reason );
769
770        return new_owner;
771}
772
773static inline bool is_accepted( monitor_desc * this, const __monitor_group_t & group ) {
774        __acceptable_t * it = this->mask.data; // Optim
775        __lock_size_t count = this->mask.size;
776
777        // Check if there are any acceptable functions
778        if( !it ) return false;
779
780        // If this isn't the first monitor to test this, there is no reason to repeat the test.
781        if( this != group[0] ) return group[0]->mask.accepted >= 0;
782
783        // For all acceptable functions check if this is the current function.
784        for( __lock_size_t i = 0; i < count; i++, it++ ) {
785                if( *it == group ) {
786                        *this->mask.accepted = i;
787                        return true;
788                }
789        }
790
791        // No function matched
792        return false;
793}
794
795static inline void init( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ) {
796        for( __lock_size_t i = 0; i < count; i++) {
797                (criteria[i]){ monitors[i], waiter };
798        }
799
800        waiter.criteria = criteria;
801}
802
803static inline void init_push( __lock_size_t count, monitor_desc * monitors [], __condition_node_t & waiter, __condition_criterion_t criteria [] ) {
804        for( __lock_size_t i = 0; i < count; i++) {
805                (criteria[i]){ monitors[i], waiter };
806                __cfaabi_dbg_print_safe( "Kernel :  target %p = %p\n", criteria[i].target, &criteria[i] );
807                push( criteria[i].target->signal_stack, &criteria[i] );
808        }
809
810        waiter.criteria = criteria;
811}
812
813static inline void lock_all( __spinlock_t * locks [], __lock_size_t count ) {
814        for( __lock_size_t i = 0; i < count; i++ ) {
815                lock( *locks[i] __cfaabi_dbg_ctx2 );
816        }
817}
818
819static inline void lock_all( monitor_desc * source [], __spinlock_t * /*out*/ locks [], __lock_size_t count ) {
820        for( __lock_size_t i = 0; i < count; i++ ) {
821                __spinlock_t * l = &source[i]->lock;
822                lock( *l __cfaabi_dbg_ctx2 );
823                if(locks) locks[i] = l;
824        }
825}
826
827static inline void unlock_all( __spinlock_t * locks [], __lock_size_t count ) {
828        for( __lock_size_t i = 0; i < count; i++ ) {
829                unlock( *locks[i] );
830        }
831}
832
833static inline void unlock_all( monitor_desc * locks [], __lock_size_t count ) {
834        for( __lock_size_t i = 0; i < count; i++ ) {
835                unlock( locks[i]->lock );
836        }
837}
838
839static inline void save(
840        monitor_desc * ctx [],
841        __lock_size_t count,
842        __attribute((unused)) __spinlock_t * locks [],
843        unsigned int /*out*/ recursions [],
844        __waitfor_mask_t /*out*/ masks []
845) {
846        for( __lock_size_t i = 0; i < count; i++ ) {
847                recursions[i] = ctx[i]->recursion;
848                masks[i]      = ctx[i]->mask;
849        }
850}
851
852static inline void restore(
853        monitor_desc * ctx [],
854        __lock_size_t count,
855        __spinlock_t * locks [],
856        unsigned int /*out*/ recursions [],
857        __waitfor_mask_t /*out*/ masks []
858) {
859        lock_all( locks, count );
860        for( __lock_size_t i = 0; i < count; i++ ) {
861                ctx[i]->recursion = recursions[i];
862                ctx[i]->mask      = masks[i];
863        }
864        unlock_all( locks, count );
865}
866
867// Function has 2 different behavior
868// 1 - Marks a monitors as being ready to run
869// 2 - Checks if all the monitors are ready to run
870//     if so return the thread to run
871static inline thread_desc * check_condition( __condition_criterion_t * target ) {
872        __condition_node_t * node = target->owner;
873        unsigned short count = node->count;
874        __condition_criterion_t * criteria = node->criteria;
875
876        bool ready2run = true;
877
878        for(    int i = 0; i < count; i++ ) {
879
880                // __cfaabi_dbg_print_safe( "Checking %p for %p\n", &criteria[i], target );
881                if( &criteria[i] == target ) {
882                        criteria[i].ready = true;
883                        // __cfaabi_dbg_print_safe( "True\n" );
884                }
885
886                ready2run = criteria[i].ready && ready2run;
887        }
888
889        __cfaabi_dbg_print_safe( "Kernel :  Runing %i (%p)\n", ready2run, ready2run ? node->waiting_thread : 0p );
890        return ready2run ? node->waiting_thread : 0p;
891}
892
893static inline void brand_condition( condition & this ) {
894        thread_desc * thrd = TL_GET( this_thread );
895        if( !this.monitors ) {
896                // __cfaabi_dbg_print_safe( "Branding\n" );
897                assertf( thrd->monitors.data != 0p, "No current monitor to brand condition %p", thrd->monitors.data );
898                this.monitor_count = thrd->monitors.size;
899
900                this.monitors = (monitor_desc **)malloc( this.monitor_count * sizeof( *this.monitors ) );
901                for( int i = 0; i < this.monitor_count; i++ ) {
902                        this.monitors[i] = thrd->monitors[i];
903                }
904        }
905}
906
907static inline [thread_desc *, int] search_entry_queue( const __waitfor_mask_t & mask, monitor_desc * monitors [], __lock_size_t count ) {
908
909        __queue_t(thread_desc) & entry_queue = monitors[0]->entry_queue;
910
911        // For each thread in the entry-queue
912        for(    thread_desc ** thrd_it = &entry_queue.head;
913                *thrd_it != 1p;
914                thrd_it = &(*thrd_it)->next
915        ) {
916                // For each acceptable check if it matches
917                int i = 0;
918                __acceptable_t * end   = end  (mask);
919                __acceptable_t * begin = begin(mask);
920                for( __acceptable_t * it = begin; it != end; it++, i++ ) {
921                        // Check if we have a match
922                        if( *it == (*thrd_it)->monitors ) {
923
924                                // If we have a match return it
925                                // after removeing it from the entry queue
926                                return [remove( entry_queue, thrd_it ), i];
927                        }
928                }
929        }
930
931        return [0, -1];
932}
933
934forall(dtype T | sized( T ))
935static inline __lock_size_t insert_unique( T * array [], __lock_size_t & size, T * val ) {
936        if( !val ) return size;
937
938        for( __lock_size_t i = 0; i <= size; i++) {
939                if( array[i] == val ) return size;
940        }
941
942        array[size] = val;
943        size = size + 1;
944        return size;
945}
946
947static inline __lock_size_t count_max( const __waitfor_mask_t & mask ) {
948        __lock_size_t max = 0;
949        for( __lock_size_t i = 0; i < mask.size; i++ ) {
950                __acceptable_t & accepted = mask[i];
951                max += accepted.size;
952        }
953        return max;
954}
955
956static inline __lock_size_t aggregate( monitor_desc * storage [], const __waitfor_mask_t & mask ) {
957        __lock_size_t size = 0;
958        for( __lock_size_t i = 0; i < mask.size; i++ ) {
959                __acceptable_t & accepted = mask[i];
960                __libcfa_small_sort( accepted.data, accepted.size );
961                for( __lock_size_t j = 0; j < accepted.size; j++) {
962                        insert_unique( storage, size, accepted[j] );
963                }
964        }
965        // TODO insertion sort instead of this
966        __libcfa_small_sort( storage, size );
967        return size;
968}
969
970// Local Variables: //
971// mode: c //
972// tab-width: 4 //
973// End: //
Note: See TracBrowser for help on using the repository browser.