// -*- Mode: CFA -*- // // Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo // // The contents of this file are covered under the licence agreement in the // file "LICENCE" distributed with Cforall. // // monitor_desc.c -- // // Author : Thierry Delisle // Created On : Thd Feb 23 12:27:26 2017 // Last Modified By : Thierry Delisle // Last Modified On : -- // Update Count : 0 // #include "monitor" #include #include "kernel_private.h" #include "libhdr.h" //----------------------------------------------------------------------------- // Forward declarations static inline void set_owner( monitor_desc * this, thread_desc * owner ); static inline thread_desc * next_thread( monitor_desc * this ); static inline void lock_all( spinlock ** locks, unsigned short count ); static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ); static inline void unlock_all( spinlock ** locks, unsigned short count ); static inline void unlock_all( monitor_desc ** locks, unsigned short count ); static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ); static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ); static inline thread_desc * check_condition( __condition_criterion_t * ); static inline void brand_condition( condition * ); static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ); //----------------------------------------------------------------------------- // Enter/Leave routines extern "C" { void __enter_monitor_desc(monitor_desc * this) { lock( &this->lock ); thread_desc * thrd = this_thread(); LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion); if( !this->owner ) { //No one has the monitor, just take it set_owner( this, thrd ); } else if( this->owner == thrd) { //We already have the monitor, just not how many times we took it assert( this->recursion > 0 ); this->recursion += 1; } else { //Some one else has the monitor, wait in line for it append( &this->entry_queue, thrd ); LIB_DEBUG_PRINT_SAFE("%p Blocking on entry\n", thrd); ScheduleInternal( &this->lock ); //ScheduleInternal will unlock spinlock, no need to unlock ourselves return; } unlock( &this->lock ); return; } // leave pseudo code : // TODO void __leave_monitor_desc(monitor_desc * this) { lock( &this->lock ); thread_desc * thrd = this_thread(); LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion); assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion ); //Leaving a recursion level, decrement the counter this->recursion -= 1; //If we haven't left the last level of recursion //it means we don't need to do anything if( this->recursion != 0) { unlock( &this->lock ); return; } thread_desc * new_owner = next_thread( this ); //We can now let other threads in safely unlock( &this->lock ); LIB_DEBUG_PRINT_SAFE("Next owner is %p\n", new_owner); //We need to wake-up the thread ScheduleThread( new_owner ); } } static inline void enter(monitor_desc ** monitors, int count) { for(int i = 0; i < count; i++) { __enter_monitor_desc( monitors[i] ); } } static inline void leave(monitor_desc ** monitors, int count) { for(int i = count - 1; i >= 0; i--) { __leave_monitor_desc( monitors[i] ); } } void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) { this->m = m; this->count = count; qsort(this->m, count); enter( this->m, this->count ); this->prev_mntrs = this_thread()->current_monitors; this->prev_count = this_thread()->current_monitor_count; this_thread()->current_monitors = m; this_thread()->current_monitor_count = count; } void ^?{}( monitor_guard_t * this ) { leave( this->m, this->count ); this_thread()->current_monitors = this->prev_mntrs; this_thread()->current_monitor_count = this->prev_count; } void ?{}(__condition_node_t * this, thread_desc * waiting_thread, unsigned short count, uintptr_t user_info ) { this->waiting_thread = waiting_thread; this->count = count; this->next = NULL; this->user_info = user_info; } void ?{}(__condition_criterion_t * this ) { this->ready = false; this->target = NULL; this->owner = NULL; this->next = NULL; } void ?{}(__condition_criterion_t * this, monitor_desc * target, __condition_node_t * owner ) { this->ready = false; this->target = target; this->owner = owner; this->next = NULL; } //----------------------------------------------------------------------------- // Internal scheduling void wait( condition * this, uintptr_t user_info = 0 ) { LIB_DEBUG_PRINT_SAFE("Waiting\n"); brand_condition( this ); //Check that everything is as expected assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors ); assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count ); assertf( this->monitor_count < 32u, "Excessive monitor count (%i)", this->monitor_count ); unsigned short count = this->monitor_count; unsigned int recursions[ count ]; //Save the current recursion levels to restore them later spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal LIB_DEBUG_PRINT_SAFE("count %i\n", count); __condition_node_t waiter = { this_thread(), count, user_info }; __condition_criterion_t criteria[count]; for(int i = 0; i < count; i++) { (&criteria[i]){ this->monitors[i], &waiter }; LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] ); } waiter.criteria = criteria; append( &this->blocked, &waiter ); lock_all( this->monitors, locks, count ); save_recursion( this->monitors, recursions, count ); //DON'T unlock, ask the kernel to do it //Find the next thread(s) to run unsigned short thread_count = 0; thread_desc * threads[ count ]; for(int i = 0; i < count; i++) { threads[i] = 0; } for( int i = 0; i < count; i++) { thread_desc * new_owner = next_thread( this->monitors[i] ); thread_count = insert_unique( threads, thread_count, new_owner ); } LIB_DEBUG_PRINT_SAFE("Will unblock: "); for(int i = 0; i < thread_count; i++) { LIB_DEBUG_PRINT_SAFE("%p ", threads[i]); } LIB_DEBUG_PRINT_SAFE("\n"); // Everything is ready to go to sleep ScheduleInternal( locks, count, threads, thread_count ); //WE WOKE UP //We are back, restore the owners and recursions lock_all( locks, count ); restore_recursion( this->monitors, recursions, count ); unlock_all( locks, count ); } bool signal( condition * this ) { if( is_empty( this ) ) { LIB_DEBUG_PRINT_SAFE("Nothing to signal\n"); return false; } //Check that everything is as expected assert( this->monitors ); assert( this->monitor_count != 0 ); unsigned short count = this->monitor_count; //Some more checking in debug LIB_DEBUG_DO( thread_desc * this_thrd = this_thread(); if ( this->monitor_count != this_thrd->current_monitor_count ) { abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count ); } // if for(int i = 0; i < this->monitor_count; i++) { if ( this->monitors[i] != this_thrd->current_monitors[i] ) { abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] ); } // if } ); //Lock all the monitors lock_all( this->monitors, NULL, count ); LIB_DEBUG_PRINT_SAFE("Signalling"); //Pop the head of the waiting queue __condition_node_t * node = pop_head( &this->blocked ); //Add the thread to the proper AS stack for(int i = 0; i < count; i++) { __condition_criterion_t * crit = &node->criteria[i]; LIB_DEBUG_PRINT_SAFE(" %p", crit->target); assert( !crit->ready ); push( &crit->target->signal_stack, crit ); } LIB_DEBUG_PRINT_SAFE("\n"); //Release unlock_all( this->monitors, count ); return true; } bool signal_block( condition * this ) { if( !this->blocked.head ) { LIB_DEBUG_PRINT_SAFE("Nothing to signal\n"); return false; } //Check that everything is as expected assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors ); assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count ); unsigned short count = this->monitor_count; unsigned int recursions[ count ]; //Save the current recursion levels to restore them later spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal lock_all( this->monitors, locks, count ); //create creteria __condition_node_t waiter = { this_thread(), count, 0 }; __condition_criterion_t criteria[count]; for(int i = 0; i < count; i++) { (&criteria[i]){ this->monitors[i], &waiter }; LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] ); push( &criteria[i].target->signal_stack, &criteria[i] ); } waiter.criteria = criteria; //save contexts save_recursion( this->monitors, recursions, count ); //Find the thread to run thread_desc * signallee = pop_head( &this->blocked )->waiting_thread; for(int i = 0; i < count; i++) { set_owner( this->monitors[i], signallee ); } LIB_DEBUG_PRINT_SAFE( "Waiting on signal block\n" ); //Everything is ready to go to sleep ScheduleInternal( locks, count, &signallee, 1 ); LIB_DEBUG_PRINT_SAFE( "Back from signal block\n" ); //We are back, restore the owners and recursions lock_all( locks, count ); restore_recursion( this->monitors, recursions, count ); unlock_all( locks, count ); return true; } uintptr_t front( condition * this ) { LIB_DEBUG_DO( if( is_empty(this) ) { abortf( "Attempt to access user data on an empty condition.\n" "Possible cause is not checking if the condition is empty before reading stored data." ); } ); return this->blocked.head->user_info; } //----------------------------------------------------------------------------- // Internal scheduling void __accept_internal( unsigned short count, __acceptable_t * acceptables, void (*func)(void) ) { // thread_desc * this = this_thread(); // unsigned short count = this->current_monitor_count; // unsigned int recursions[ count ]; //Save the current recursion levels to restore them later // spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal // lock_all( this->current_monitors, locks, count ); // // // Everything is ready to go to sleep // // ScheduleInternal( locks, count, threads, thread_count ); // //WE WOKE UP // //We are back, restore the owners and recursions // lock_all( locks, count ); // restore_recursion( this->monitors, recursions, count ); // unlock_all( locks, count ); } //----------------------------------------------------------------------------- // Utilities static inline void set_owner( monitor_desc * this, thread_desc * owner ) { //Pass the monitor appropriately this->owner = owner; //We are passing the monitor to someone else, which means recursion level is not 0 this->recursion = owner ? 1 : 0; } static inline thread_desc * next_thread( monitor_desc * this ) { //Check the signaller stack __condition_criterion_t * urgent = pop( &this->signal_stack ); if( urgent ) { //The signaller stack is not empty, //regardless of if we are ready to baton pass, //we need to set the monitor as in use set_owner( this, urgent->owner->waiting_thread ); return check_condition( urgent ); } // No signaller thread // Get the next thread in the entry_queue thread_desc * new_owner = pop_head( &this->entry_queue ); set_owner( this, new_owner ); return new_owner; } static inline void lock_all( spinlock ** locks, unsigned short count ) { for( int i = 0; i < count; i++ ) { lock( locks[i] ); } } static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) { for( int i = 0; i < count; i++ ) { spinlock * l = &source[i]->lock; lock( l ); if(locks) locks[i] = l; } } static inline void unlock_all( spinlock ** locks, unsigned short count ) { for( int i = 0; i < count; i++ ) { unlock( locks[i] ); } } static inline void unlock_all( monitor_desc ** locks, unsigned short count ) { for( int i = 0; i < count; i++ ) { unlock( &locks[i]->lock ); } } static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) { for( int i = 0; i < count; i++ ) { recursions[i] = ctx[i]->recursion; } } static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) { for( int i = 0; i < count; i++ ) { ctx[i]->recursion = recursions[i]; } } // Function has 2 different behavior // 1 - Marks a monitors as being ready to run // 2 - Checks if all the monitors are ready to run // if so return the thread to run static inline thread_desc * check_condition( __condition_criterion_t * target ) { __condition_node_t * node = target->owner; unsigned short count = node->count; __condition_criterion_t * criteria = node->criteria; bool ready2run = true; for( int i = 0; i < count; i++ ) { LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target ); if( &criteria[i] == target ) { criteria[i].ready = true; LIB_DEBUG_PRINT_SAFE( "True\n" ); } ready2run = criteria[i].ready && ready2run; } LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run ); return ready2run ? node->waiting_thread : NULL; } static inline void brand_condition( condition * this ) { thread_desc * thrd = this_thread(); if( !this->monitors ) { LIB_DEBUG_PRINT_SAFE("Branding\n"); assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors ); this->monitor_count = thrd->current_monitor_count; this->monitors = malloc( this->monitor_count * sizeof( *this->monitors ) ); for( int i = 0; i < this->monitor_count; i++ ) { this->monitors[i] = thrd->current_monitors[i]; } } } static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) { if( !val ) return end; for(int i = 0; i <= end; i++) { if( thrds[i] == val ) return end; } thrds[end] = val; return end + 1; } void ?{}( __condition_blocked_queue_t * this ) { this->head = NULL; this->tail = &this->head; } void append( __condition_blocked_queue_t * this, __condition_node_t * c ) { assert(this->tail != NULL); *this->tail = c; this->tail = &c->next; } __condition_node_t * pop_head( __condition_blocked_queue_t * this ) { __condition_node_t * head = this->head; if( head ) { this->head = head->next; if( !head->next ) { this->tail = &this->head; } head->next = NULL; } return head; }