source: src/libcfa/concurrency/monitor.c@ 4845ae2

ADT aaron-thesis arm-eh ast-experimental cleanup-dtors deferred_resn demangler enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr new-env no_list persistent-indexer pthread-emulation qualifiedEnum resolv-new with_gc
Last change on this file since 4845ae2 was a933dcf4, checked in by Thierry Delisle <tdelisle@…>, 8 years ago
  • updated internal scheduler test for multi monitors
  • fixed branding for monitors
  • Property mode set to 100644
File size: 11.2 KB
RevLine 
[f07e037]1// -*- Mode: CFA -*-
2//
3// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
4//
5// The contents of this file are covered under the licence agreement in the
6// file "LICENCE" distributed with Cforall.
7//
[84c52a8]8// monitor_desc.c --
[f07e037]9//
10// Author : Thierry Delisle
11// Created On : Thd Feb 23 12:27:26 2017
12// Last Modified By : Thierry Delisle
13// Last Modified On : --
14// Update Count : 0
15//
16
17#include "monitor"
18
[a933dcf4]19#include <stdlib>
20
[f07e037]21#include "kernel_private.h"
[5ea06d6]22#include "libhdr.h"
[f07e037]23
[0c78741]24//-----------------------------------------------------------------------------
25// Forward declarations
26static inline void set_owner( monitor_desc * this, thread_desc * owner );
27static inline thread_desc * next_thread( monitor_desc * this );
28
29static inline void lock_all( spinlock ** locks, unsigned short count );
30static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count );
31static inline void unlock_all( spinlock ** locks, unsigned short count );
32static inline void unlock_all( monitor_desc ** locks, unsigned short count );
33
34static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count );
35static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count );
36
37static inline thread_desc * check_condition( __condition_criterion_t * );
38static inline void brand_condition( condition * );
39static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val );
40
41//-----------------------------------------------------------------------------
42// Enter/Leave routines
[690f13c]43
44
[cb0e6de]45extern "C" {
[0c78741]46 void __enter_monitor_desc(monitor_desc * this) {
[cb0e6de]47 lock( &this->lock );
48 thread_desc * thrd = this_thread();
[f07e037]49
[0c78741]50 LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
[5ea06d6]51
[cb0e6de]52 if( !this->owner ) {
53 //No one has the monitor, just take it
[cd348e7]54 set_owner( this, thrd );
[cb0e6de]55 }
56 else if( this->owner == thrd) {
57 //We already have the monitor, just not how many times we took it
58 assert( this->recursion > 0 );
59 this->recursion += 1;
60 }
61 else {
62 //Some one else has the monitor, wait in line for it
63 append( &this->entry_queue, thrd );
64 ScheduleInternal( &this->lock );
[cc7f4b1]65
[cb0e6de]66 //ScheduleInternal will unlock spinlock, no need to unlock ourselves
67 return;
68 }
[f07e037]69
[cb0e6de]70 unlock( &this->lock );
[5ea06d6]71 return;
[cb0e6de]72 }
[f07e037]73
[690f13c]74 // leave pseudo code :
[0c78741]75 // TODO
76 void __leave_monitor_desc(monitor_desc * this) {
[cb0e6de]77 lock( &this->lock );
[f07e037]78
[cb0e6de]79 thread_desc * thrd = this_thread();
[0c78741]80
81 LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
82 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion );
[cc7f4b1]83
[cb0e6de]84 //Leaving a recursion level, decrement the counter
85 this->recursion -= 1;
[f07e037]86
[690f13c]87 //If we haven't left the last level of recursion
88 //it means we don't need to do anything
89 if( this->recursion != 0) {
90 unlock( &this->lock );
91 return;
92 }
[f07e037]93
[0c78741]94 thread_desc * new_owner = next_thread( this );
[5ea06d6]95
[690f13c]96 //We can now let other threads in safely
[cb0e6de]97 unlock( &this->lock );
[51f3798]98
[690f13c]99 //We need to wake-up the thread
100 ScheduleThread( new_owner );
[cc7f4b1]101 }
[2781e65]102}
103
[5ea06d6]104static inline void enter(monitor_desc ** monitors, int count) {
[0c78741]105 for(int i = 0; i < count; i++) {
106 __enter_monitor_desc( monitors[i] );
[2781e65]107 }
108}
109
[5ea06d6]110static inline void leave(monitor_desc ** monitors, int count) {
[0c78741]111 for(int i = count - 1; i >= 0; i--) {
112 __leave_monitor_desc( monitors[i] );
[2781e65]113 }
[5ea06d6]114}
115
116void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) {
117 this->m = m;
118 this->count = count;
119 qsort(this->m, count);
120 enter( this->m, this->count );
121
122 this->prev_mntrs = this_thread()->current_monitors;
123 this->prev_count = this_thread()->current_monitor_count;
124
125 this_thread()->current_monitors = m;
126 this_thread()->current_monitor_count = count;
127}
128
129void ^?{}( monitor_guard_t * this ) {
130 leave( this->m, this->count );
131
132 this_thread()->current_monitors = this->prev_mntrs;
133 this_thread()->current_monitor_count = this->prev_count;
134}
135
[ad1a8dd]136void debug_break() __attribute__(( noinline ))
137{
138
139}
140
[5ea06d6]141//-----------------------------------------------------------------------------
142// Internal scheduling
143void wait( condition * this ) {
[0c78741]144 LIB_DEBUG_PRINT_SAFE("Waiting\n");
[5ea06d6]145
[0c78741]146 brand_condition( this );
[5ea06d6]147
148 //Check that everything is as expected
[0c78741]149 assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );
150 assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );
[5ea06d6]151
[0c78741]152 unsigned short count = this->monitor_count;
[9c59cd4]153 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later
154 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal
[0c78741]155
156 LIB_DEBUG_PRINT_SAFE("count %i\n", count);
157
158 __condition_node_t waiter;
159 waiter.waiting_thread = this_thread();
160 waiter.count = count;
161 waiter.next = NULL;
162
163 __condition_criterion_t criteria[count];
164 for(int i = 0; i < count; i++) {
165 criteria[i].ready = false;
166 criteria[i].target = this->monitors[i];
167 criteria[i].owner = &waiter;
168 criteria[i].next = NULL;
169 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] );
170 }
[5ea06d6]171
[0c78741]172 waiter.criteria = criteria;
173 append( &this->blocked, &waiter );
[5ea06d6]174
[9c59cd4]175 lock_all( this->monitors, locks, count );
176 save_recursion( this->monitors, recursions, count );
[0c78741]177 //DON'T unlock, ask the kernel to do it
[5ea06d6]178
[0c78741]179 //Find the next thread(s) to run
[ad1a8dd]180 unsigned short thread_count = 0;
[0c78741]181 thread_desc * threads[ count ];
[ad1a8dd]182 for(int i = 0; i < count; i++) {
183 threads[i] = 0;
184 }
185
186 debug_break();
[5ea06d6]187
[0c78741]188 for( int i = 0; i < count; i++) {
189 thread_desc * new_owner = next_thread( this->monitors[i] );
[ad1a8dd]190 thread_count = insert_unique( threads, thread_count, new_owner );
[5ea06d6]191 }
192
[ad1a8dd]193 debug_break();
194
[0c78741]195 LIB_DEBUG_PRINT_SAFE("Will unblock: ");
196 for(int i = 0; i < thread_count; i++) {
197 LIB_DEBUG_PRINT_SAFE("%p ", threads[i]);
198 }
199 LIB_DEBUG_PRINT_SAFE("\n");
[5ea06d6]200
[9c59cd4]201 // Everything is ready to go to sleep
202 ScheduleInternal( locks, count, threads, thread_count );
[5ea06d6]203
204
205 //WE WOKE UP
206
207
208 //We are back, restore the owners and recursions
[9c59cd4]209 lock_all( locks, count );
210 restore_recursion( this->monitors, recursions, count );
211 unlock_all( locks, count );
[5ea06d6]212}
213
[0c78741]214void signal( condition * this ) {
215 if( !this->blocked.head ) {
216 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n");
217 return;
218 }
[5ea06d6]219
220 //Check that everything is as expected
221 assert( this->monitors );
222 assert( this->monitor_count != 0 );
[0c78741]223
224 unsigned short count = this->monitor_count;
[5ea06d6]225
226 LIB_DEBUG_DO(
[0c78741]227 thread_desc * this_thrd = this_thread();
228 if ( this->monitor_count != this_thrd->current_monitor_count ) {
229 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count );
[5ea06d6]230 } // if
[0c78741]231
232 for(int i = 0; i < this->monitor_count; i++) {
233 if ( this->monitors[i] != this_thrd->current_monitors[i] ) {
234 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );
235 } // if
236 }
[5ea06d6]237 );
238
[0c78741]239 lock_all( this->monitors, NULL, count );
240 LIB_DEBUG_PRINT_SAFE("Signalling");
241
242 __condition_node_t * node = pop_head( &this->blocked );
243 for(int i = 0; i < count; i++) {
244 __condition_criterion_t * crit = &node->criteria[i];
245 LIB_DEBUG_PRINT_SAFE(" %p", crit->target);
246 assert( !crit->ready );
247 push( &crit->target->signal_stack, crit );
[5ea06d6]248 }
[0c78741]249
250 LIB_DEBUG_PRINT_SAFE("\n");
251
252 unlock_all( this->monitors, count );
[5ea06d6]253}
254
[0c78741]255//-----------------------------------------------------------------------------
256// Utilities
257
258static inline void set_owner( monitor_desc * this, thread_desc * owner ) {
259 //Pass the monitor appropriately
260 this->owner = owner;
261
262 //We are passing the monitor to someone else, which means recursion level is not 0
263 this->recursion = owner ? 1 : 0;
264}
265
266static inline thread_desc * next_thread( monitor_desc * this ) {
267 //Check the signaller stack
268 __condition_criterion_t * urgent = pop( &this->signal_stack );
269 if( urgent ) {
270 //The signaller stack is not empty,
271 //regardless of if we are ready to baton pass,
272 //we need to set the monitor as in use
273 set_owner( this, urgent->owner->waiting_thread );
274
275 return check_condition( urgent );
276 }
277
278 // No signaller thread
279 // Get the next thread in the entry_queue
280 thread_desc * new_owner = pop_head( &this->entry_queue );
281 set_owner( this, new_owner );
282
283 return new_owner;
284}
285
286static inline void lock_all( spinlock ** locks, unsigned short count ) {
287 for( int i = 0; i < count; i++ ) {
288 lock( locks[i] );
289 }
290}
291
292static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) {
293 for( int i = 0; i < count; i++ ) {
294 spinlock * l = &source[i]->lock;
295 lock( l );
296 if(locks) locks[i] = l;
297 }
298}
299
300static inline void unlock_all( spinlock ** locks, unsigned short count ) {
301 for( int i = 0; i < count; i++ ) {
302 unlock( locks[i] );
303 }
304}
305
306static inline void unlock_all( monitor_desc ** locks, unsigned short count ) {
307 for( int i = 0; i < count; i++ ) {
308 unlock( &locks[i]->lock );
309 }
310}
311
312
313static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) {
314 for( int i = 0; i < count; i++ ) {
315 recursions[i] = ctx[i]->recursion;
316 }
317}
318
319static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) {
320 for( int i = 0; i < count; i++ ) {
321 ctx[i]->recursion = recursions[i];
322 }
323}
324
325// Function has 2 different behavior
326// 1 - Marks a monitors as being ready to run
327// 2 - Checks if all the monitors are ready to run
328// if so return the thread to run
329static inline thread_desc * check_condition( __condition_criterion_t * target ) {
330 __condition_node_t * node = target->owner;
331 unsigned short count = node->count;
332 __condition_criterion_t * criteria = node->criteria;
333
334 bool ready2run = true;
335
336 for( int i = 0; i < count; i++ ) {
337 LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target );
338 if( &criteria[i] == target ) {
339 criteria[i].ready = true;
340 LIB_DEBUG_PRINT_SAFE( "True\n" );
341 }
342
343 ready2run = criteria[i].ready && ready2run;
344 }
345
346 LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run );
347 return ready2run ? node->waiting_thread : NULL;
348}
349
350static inline void brand_condition( condition * this ) {
351 thread_desc * thrd = this_thread();
352 if( !this->monitors ) {
353 LIB_DEBUG_PRINT_SAFE("Branding\n");
354 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors );
355 this->monitor_count = thrd->current_monitor_count;
[a933dcf4]356
357 this->monitors = malloc( this->monitor_count * sizeof( *this->monitors ) );
358 for( int i = 0; i < this->monitor_count; i++ ) {
359 this->monitors[i] = thrd->current_monitors[i];
360 }
[0c78741]361 }
362}
363
364static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) {
[ad1a8dd]365 if( !val ) return end;
366
367 for(int i = 0; i <= end; i++) {
[0c78741]368 if( thrds[i] == val ) return end;
369 }
370
371 thrds[end] = val;
372 return end + 1;
373}
374
375void ?{}( __condition_blocked_queue_t * this ) {
376 this->head = NULL;
377 this->tail = &this->head;
378}
379
380void append( __condition_blocked_queue_t * this, __condition_node_t * c ) {
381 assert(this->tail != NULL);
382 *this->tail = c;
383 this->tail = &c->next;
384}
385
386__condition_node_t * pop_head( __condition_blocked_queue_t * this ) {
387 __condition_node_t * head = this->head;
388 if( head ) {
389 this->head = head->next;
390 if( !head->next ) {
391 this->tail = &this->head;
392 }
393 head->next = NULL;
394 }
395 return head;
[f07e037]396}
Note: See TracBrowser for help on using the repository browser.