source: src/libcfa/concurrency/monitor.c@ 5783e94

ADT aaron-thesis arm-eh ast-experimental cleanup-dtors deferred_resn demangler enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr new-env no_list persistent-indexer pthread-emulation qualifiedEnum resolv-new with_gc
Last change on this file since 5783e94 was ad1a8dd, checked in by Thierry Delisle <tdelisle@…>, 9 years ago

Tentative fix for monitors

  • Property mode set to 100644
File size: 11.1 KB
RevLine 
[f07e037]1// -*- Mode: CFA -*-
2//
3// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
4//
5// The contents of this file are covered under the licence agreement in the
6// file "LICENCE" distributed with Cforall.
7//
[84c52a8]8// monitor_desc.c --
[f07e037]9//
10// Author : Thierry Delisle
11// Created On : Thd Feb 23 12:27:26 2017
12// Last Modified By : Thierry Delisle
13// Last Modified On : --
14// Update Count : 0
15//
16
17#include "monitor"
18
19#include "kernel_private.h"
[5ea06d6]20#include "libhdr.h"
[f07e037]21
[0c78741]22//-----------------------------------------------------------------------------
23// Forward declarations
24static inline void set_owner( monitor_desc * this, thread_desc * owner );
25static inline thread_desc * next_thread( monitor_desc * this );
26
27static inline void lock_all( spinlock ** locks, unsigned short count );
28static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count );
29static inline void unlock_all( spinlock ** locks, unsigned short count );
30static inline void unlock_all( monitor_desc ** locks, unsigned short count );
31
32static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count );
33static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count );
34
35static inline thread_desc * check_condition( __condition_criterion_t * );
36static inline void brand_condition( condition * );
37static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val );
38
39//-----------------------------------------------------------------------------
40// Enter/Leave routines
[690f13c]41
42
[cb0e6de]43extern "C" {
[0c78741]44 void __enter_monitor_desc(monitor_desc * this) {
[cb0e6de]45 lock( &this->lock );
46 thread_desc * thrd = this_thread();
[f07e037]47
[0c78741]48 LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
[5ea06d6]49
[cb0e6de]50 if( !this->owner ) {
51 //No one has the monitor, just take it
[cd348e7]52 set_owner( this, thrd );
[cb0e6de]53 }
54 else if( this->owner == thrd) {
55 //We already have the monitor, just not how many times we took it
56 assert( this->recursion > 0 );
57 this->recursion += 1;
58 }
59 else {
60 //Some one else has the monitor, wait in line for it
61 append( &this->entry_queue, thrd );
62 ScheduleInternal( &this->lock );
[cc7f4b1]63
[cb0e6de]64 //ScheduleInternal will unlock spinlock, no need to unlock ourselves
65 return;
66 }
[f07e037]67
[cb0e6de]68 unlock( &this->lock );
[5ea06d6]69 return;
[cb0e6de]70 }
[f07e037]71
[690f13c]72 // leave pseudo code :
[0c78741]73 // TODO
74 void __leave_monitor_desc(monitor_desc * this) {
[cb0e6de]75 lock( &this->lock );
[f07e037]76
[cb0e6de]77 thread_desc * thrd = this_thread();
[0c78741]78
79 LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
80 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion );
[cc7f4b1]81
[cb0e6de]82 //Leaving a recursion level, decrement the counter
83 this->recursion -= 1;
[f07e037]84
[690f13c]85 //If we haven't left the last level of recursion
86 //it means we don't need to do anything
87 if( this->recursion != 0) {
88 unlock( &this->lock );
89 return;
90 }
[f07e037]91
[0c78741]92 thread_desc * new_owner = next_thread( this );
[5ea06d6]93
[690f13c]94 //We can now let other threads in safely
[cb0e6de]95 unlock( &this->lock );
[51f3798]96
[690f13c]97 //We need to wake-up the thread
98 ScheduleThread( new_owner );
[cc7f4b1]99 }
[2781e65]100}
101
[5ea06d6]102static inline void enter(monitor_desc ** monitors, int count) {
[0c78741]103 for(int i = 0; i < count; i++) {
104 __enter_monitor_desc( monitors[i] );
[2781e65]105 }
106}
107
[5ea06d6]108static inline void leave(monitor_desc ** monitors, int count) {
[0c78741]109 for(int i = count - 1; i >= 0; i--) {
110 __leave_monitor_desc( monitors[i] );
[2781e65]111 }
[5ea06d6]112}
113
114void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) {
115 this->m = m;
116 this->count = count;
117 qsort(this->m, count);
118 enter( this->m, this->count );
119
120 this->prev_mntrs = this_thread()->current_monitors;
121 this->prev_count = this_thread()->current_monitor_count;
122
123 this_thread()->current_monitors = m;
124 this_thread()->current_monitor_count = count;
125}
126
127void ^?{}( monitor_guard_t * this ) {
128 leave( this->m, this->count );
129
130 this_thread()->current_monitors = this->prev_mntrs;
131 this_thread()->current_monitor_count = this->prev_count;
132}
133
[ad1a8dd]134void debug_break() __attribute__(( noinline ))
135{
136
137}
138
[5ea06d6]139//-----------------------------------------------------------------------------
140// Internal scheduling
141void wait( condition * this ) {
[0c78741]142 LIB_DEBUG_PRINT_SAFE("Waiting\n");
[5ea06d6]143
[0c78741]144 brand_condition( this );
[5ea06d6]145
146 //Check that everything is as expected
[0c78741]147 assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );
148 assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );
[5ea06d6]149
[0c78741]150 unsigned short count = this->monitor_count;
[9c59cd4]151 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later
152 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal
[0c78741]153
154 LIB_DEBUG_PRINT_SAFE("count %i\n", count);
155
156 __condition_node_t waiter;
157 waiter.waiting_thread = this_thread();
158 waiter.count = count;
159 waiter.next = NULL;
160
161 __condition_criterion_t criteria[count];
162 for(int i = 0; i < count; i++) {
163 criteria[i].ready = false;
164 criteria[i].target = this->monitors[i];
165 criteria[i].owner = &waiter;
166 criteria[i].next = NULL;
167 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] );
168 }
[5ea06d6]169
[0c78741]170 waiter.criteria = criteria;
171 append( &this->blocked, &waiter );
[5ea06d6]172
[9c59cd4]173 lock_all( this->monitors, locks, count );
174 save_recursion( this->monitors, recursions, count );
[0c78741]175 //DON'T unlock, ask the kernel to do it
[5ea06d6]176
[0c78741]177 //Find the next thread(s) to run
[ad1a8dd]178 unsigned short thread_count = 0;
[0c78741]179 thread_desc * threads[ count ];
[ad1a8dd]180 for(int i = 0; i < count; i++) {
181 threads[i] = 0;
182 }
183
184 debug_break();
[5ea06d6]185
[0c78741]186 for( int i = 0; i < count; i++) {
187 thread_desc * new_owner = next_thread( this->monitors[i] );
[ad1a8dd]188 thread_count = insert_unique( threads, thread_count, new_owner );
[5ea06d6]189 }
190
[ad1a8dd]191 debug_break();
192
[0c78741]193 LIB_DEBUG_PRINT_SAFE("Will unblock: ");
194 for(int i = 0; i < thread_count; i++) {
195 LIB_DEBUG_PRINT_SAFE("%p ", threads[i]);
196 }
197 LIB_DEBUG_PRINT_SAFE("\n");
[5ea06d6]198
[9c59cd4]199 // Everything is ready to go to sleep
200 ScheduleInternal( locks, count, threads, thread_count );
[5ea06d6]201
202
203 //WE WOKE UP
204
205
206 //We are back, restore the owners and recursions
[9c59cd4]207 lock_all( locks, count );
208 restore_recursion( this->monitors, recursions, count );
209 unlock_all( locks, count );
[5ea06d6]210}
211
[0c78741]212void signal( condition * this ) {
213 if( !this->blocked.head ) {
214 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n");
215 return;
216 }
[5ea06d6]217
218 //Check that everything is as expected
219 assert( this->monitors );
220 assert( this->monitor_count != 0 );
[0c78741]221
222 unsigned short count = this->monitor_count;
[5ea06d6]223
224 LIB_DEBUG_DO(
[0c78741]225 thread_desc * this_thrd = this_thread();
226 if ( this->monitor_count != this_thrd->current_monitor_count ) {
227 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count );
[5ea06d6]228 } // if
[0c78741]229
230 for(int i = 0; i < this->monitor_count; i++) {
231 if ( this->monitors[i] != this_thrd->current_monitors[i] ) {
232 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );
233 } // if
234 }
[5ea06d6]235 );
236
[0c78741]237 lock_all( this->monitors, NULL, count );
238 LIB_DEBUG_PRINT_SAFE("Signalling");
239
240 __condition_node_t * node = pop_head( &this->blocked );
241 for(int i = 0; i < count; i++) {
242 __condition_criterion_t * crit = &node->criteria[i];
243 LIB_DEBUG_PRINT_SAFE(" %p", crit->target);
244 assert( !crit->ready );
245 push( &crit->target->signal_stack, crit );
[5ea06d6]246 }
[0c78741]247
248 LIB_DEBUG_PRINT_SAFE("\n");
249
250 unlock_all( this->monitors, count );
[5ea06d6]251}
252
[0c78741]253//-----------------------------------------------------------------------------
254// Utilities
255
256static inline void set_owner( monitor_desc * this, thread_desc * owner ) {
257 //Pass the monitor appropriately
258 this->owner = owner;
259
260 //We are passing the monitor to someone else, which means recursion level is not 0
261 this->recursion = owner ? 1 : 0;
262}
263
264static inline thread_desc * next_thread( monitor_desc * this ) {
265 //Check the signaller stack
266 __condition_criterion_t * urgent = pop( &this->signal_stack );
267 if( urgent ) {
268 //The signaller stack is not empty,
269 //regardless of if we are ready to baton pass,
270 //we need to set the monitor as in use
271 set_owner( this, urgent->owner->waiting_thread );
272
273 return check_condition( urgent );
274 }
275
276 // No signaller thread
277 // Get the next thread in the entry_queue
278 thread_desc * new_owner = pop_head( &this->entry_queue );
279 set_owner( this, new_owner );
280
281 return new_owner;
282}
283
284static inline void lock_all( spinlock ** locks, unsigned short count ) {
285 for( int i = 0; i < count; i++ ) {
286 lock( locks[i] );
287 }
288}
289
290static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) {
291 for( int i = 0; i < count; i++ ) {
292 spinlock * l = &source[i]->lock;
293 lock( l );
294 if(locks) locks[i] = l;
295 }
296}
297
298static inline void unlock_all( spinlock ** locks, unsigned short count ) {
299 for( int i = 0; i < count; i++ ) {
300 unlock( locks[i] );
301 }
302}
303
304static inline void unlock_all( monitor_desc ** locks, unsigned short count ) {
305 for( int i = 0; i < count; i++ ) {
306 unlock( &locks[i]->lock );
307 }
308}
309
310
311static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) {
312 for( int i = 0; i < count; i++ ) {
313 recursions[i] = ctx[i]->recursion;
314 }
315}
316
317static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) {
318 for( int i = 0; i < count; i++ ) {
319 ctx[i]->recursion = recursions[i];
320 }
321}
322
323// Function has 2 different behavior
324// 1 - Marks a monitors as being ready to run
325// 2 - Checks if all the monitors are ready to run
326// if so return the thread to run
327static inline thread_desc * check_condition( __condition_criterion_t * target ) {
328 __condition_node_t * node = target->owner;
329 unsigned short count = node->count;
330 __condition_criterion_t * criteria = node->criteria;
331
332 bool ready2run = true;
333
334 for( int i = 0; i < count; i++ ) {
335 LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target );
336 if( &criteria[i] == target ) {
337 criteria[i].ready = true;
338 LIB_DEBUG_PRINT_SAFE( "True\n" );
339 }
340
341 ready2run = criteria[i].ready && ready2run;
342 }
343
344 LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run );
345 return ready2run ? node->waiting_thread : NULL;
346}
347
348static inline void brand_condition( condition * this ) {
349 thread_desc * thrd = this_thread();
350 if( !this->monitors ) {
351 LIB_DEBUG_PRINT_SAFE("Branding\n");
352 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors );
353 this->monitors = thrd->current_monitors;
354 this->monitor_count = thrd->current_monitor_count;
355 }
356}
357
358static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) {
[ad1a8dd]359 if( !val ) return end;
360
361 for(int i = 0; i <= end; i++) {
[0c78741]362 if( thrds[i] == val ) return end;
363 }
364
365 thrds[end] = val;
366 return end + 1;
367}
368
369void ?{}( __condition_blocked_queue_t * this ) {
370 this->head = NULL;
371 this->tail = &this->head;
372}
373
374void append( __condition_blocked_queue_t * this, __condition_node_t * c ) {
375 assert(this->tail != NULL);
376 *this->tail = c;
377 this->tail = &c->next;
378}
379
380__condition_node_t * pop_head( __condition_blocked_queue_t * this ) {
381 __condition_node_t * head = this->head;
382 if( head ) {
383 this->head = head->next;
384 if( !head->next ) {
385 this->tail = &this->head;
386 }
387 head->next = NULL;
388 }
389 return head;
[f07e037]390}
Note: See TracBrowser for help on using the repository browser.