source: src/libcfa/concurrency/monitor.c@ ad1a8dd

ADT aaron-thesis arm-eh ast-experimental cleanup-dtors deferred_resn demangler enum forall-pointer-decay jacob/cs343-translation jenkins-sandbox new-ast new-ast-unique-expr new-env no_list persistent-indexer pthread-emulation qualifiedEnum resolv-new with_gc
Last change on this file since ad1a8dd was ad1a8dd, checked in by Thierry Delisle <tdelisle@…>, 9 years ago

Tentative fix for monitors

  • Property mode set to 100644
File size: 11.1 KB
Line 
1// -*- Mode: CFA -*-
2//
3// Cforall Version 1.0.0 Copyright (C) 2016 University of Waterloo
4//
5// The contents of this file are covered under the licence agreement in the
6// file "LICENCE" distributed with Cforall.
7//
8// monitor_desc.c --
9//
10// Author : Thierry Delisle
11// Created On : Thd Feb 23 12:27:26 2017
12// Last Modified By : Thierry Delisle
13// Last Modified On : --
14// Update Count : 0
15//
16
17#include "monitor"
18
19#include "kernel_private.h"
20#include "libhdr.h"
21
22//-----------------------------------------------------------------------------
23// Forward declarations
24static inline void set_owner( monitor_desc * this, thread_desc * owner );
25static inline thread_desc * next_thread( monitor_desc * this );
26
27static inline void lock_all( spinlock ** locks, unsigned short count );
28static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count );
29static inline void unlock_all( spinlock ** locks, unsigned short count );
30static inline void unlock_all( monitor_desc ** locks, unsigned short count );
31
32static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count );
33static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count );
34
35static inline thread_desc * check_condition( __condition_criterion_t * );
36static inline void brand_condition( condition * );
37static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val );
38
39//-----------------------------------------------------------------------------
40// Enter/Leave routines
41
42
43extern "C" {
44 void __enter_monitor_desc(monitor_desc * this) {
45 lock( &this->lock );
46 thread_desc * thrd = this_thread();
47
48 LIB_DEBUG_PRINT_SAFE("%p Entering %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
49
50 if( !this->owner ) {
51 //No one has the monitor, just take it
52 set_owner( this, thrd );
53 }
54 else if( this->owner == thrd) {
55 //We already have the monitor, just not how many times we took it
56 assert( this->recursion > 0 );
57 this->recursion += 1;
58 }
59 else {
60 //Some one else has the monitor, wait in line for it
61 append( &this->entry_queue, thrd );
62 ScheduleInternal( &this->lock );
63
64 //ScheduleInternal will unlock spinlock, no need to unlock ourselves
65 return;
66 }
67
68 unlock( &this->lock );
69 return;
70 }
71
72 // leave pseudo code :
73 // TODO
74 void __leave_monitor_desc(monitor_desc * this) {
75 lock( &this->lock );
76
77 thread_desc * thrd = this_thread();
78
79 LIB_DEBUG_PRINT_SAFE("%p Leaving %p (o: %p, r: %i)\n", thrd, this, this->owner, this->recursion);
80 assertf( thrd == this->owner, "Expected owner to be %p, got %p (r: %i)", thrd, this->owner, this->recursion );
81
82 //Leaving a recursion level, decrement the counter
83 this->recursion -= 1;
84
85 //If we haven't left the last level of recursion
86 //it means we don't need to do anything
87 if( this->recursion != 0) {
88 unlock( &this->lock );
89 return;
90 }
91
92 thread_desc * new_owner = next_thread( this );
93
94 //We can now let other threads in safely
95 unlock( &this->lock );
96
97 //We need to wake-up the thread
98 ScheduleThread( new_owner );
99 }
100}
101
102static inline void enter(monitor_desc ** monitors, int count) {
103 for(int i = 0; i < count; i++) {
104 __enter_monitor_desc( monitors[i] );
105 }
106}
107
108static inline void leave(monitor_desc ** monitors, int count) {
109 for(int i = count - 1; i >= 0; i--) {
110 __leave_monitor_desc( monitors[i] );
111 }
112}
113
114void ?{}( monitor_guard_t * this, monitor_desc ** m, int count ) {
115 this->m = m;
116 this->count = count;
117 qsort(this->m, count);
118 enter( this->m, this->count );
119
120 this->prev_mntrs = this_thread()->current_monitors;
121 this->prev_count = this_thread()->current_monitor_count;
122
123 this_thread()->current_monitors = m;
124 this_thread()->current_monitor_count = count;
125}
126
127void ^?{}( monitor_guard_t * this ) {
128 leave( this->m, this->count );
129
130 this_thread()->current_monitors = this->prev_mntrs;
131 this_thread()->current_monitor_count = this->prev_count;
132}
133
134void debug_break() __attribute__(( noinline ))
135{
136
137}
138
139//-----------------------------------------------------------------------------
140// Internal scheduling
141void wait( condition * this ) {
142 LIB_DEBUG_PRINT_SAFE("Waiting\n");
143
144 brand_condition( this );
145
146 //Check that everything is as expected
147 assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors );
148 assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count );
149
150 unsigned short count = this->monitor_count;
151 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later
152 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal
153
154 LIB_DEBUG_PRINT_SAFE("count %i\n", count);
155
156 __condition_node_t waiter;
157 waiter.waiting_thread = this_thread();
158 waiter.count = count;
159 waiter.next = NULL;
160
161 __condition_criterion_t criteria[count];
162 for(int i = 0; i < count; i++) {
163 criteria[i].ready = false;
164 criteria[i].target = this->monitors[i];
165 criteria[i].owner = &waiter;
166 criteria[i].next = NULL;
167 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] );
168 }
169
170 waiter.criteria = criteria;
171 append( &this->blocked, &waiter );
172
173 lock_all( this->monitors, locks, count );
174 save_recursion( this->monitors, recursions, count );
175 //DON'T unlock, ask the kernel to do it
176
177 //Find the next thread(s) to run
178 unsigned short thread_count = 0;
179 thread_desc * threads[ count ];
180 for(int i = 0; i < count; i++) {
181 threads[i] = 0;
182 }
183
184 debug_break();
185
186 for( int i = 0; i < count; i++) {
187 thread_desc * new_owner = next_thread( this->monitors[i] );
188 thread_count = insert_unique( threads, thread_count, new_owner );
189 }
190
191 debug_break();
192
193 LIB_DEBUG_PRINT_SAFE("Will unblock: ");
194 for(int i = 0; i < thread_count; i++) {
195 LIB_DEBUG_PRINT_SAFE("%p ", threads[i]);
196 }
197 LIB_DEBUG_PRINT_SAFE("\n");
198
199 // Everything is ready to go to sleep
200 ScheduleInternal( locks, count, threads, thread_count );
201
202
203 //WE WOKE UP
204
205
206 //We are back, restore the owners and recursions
207 lock_all( locks, count );
208 restore_recursion( this->monitors, recursions, count );
209 unlock_all( locks, count );
210}
211
212void signal( condition * this ) {
213 if( !this->blocked.head ) {
214 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n");
215 return;
216 }
217
218 //Check that everything is as expected
219 assert( this->monitors );
220 assert( this->monitor_count != 0 );
221
222 unsigned short count = this->monitor_count;
223
224 LIB_DEBUG_DO(
225 thread_desc * this_thrd = this_thread();
226 if ( this->monitor_count != this_thrd->current_monitor_count ) {
227 abortf( "Signal on condition %p made with different number of monitor(s), expected %i got %i", this, this->monitor_count, this_thrd->current_monitor_count );
228 } // if
229
230 for(int i = 0; i < this->monitor_count; i++) {
231 if ( this->monitors[i] != this_thrd->current_monitors[i] ) {
232 abortf( "Signal on condition %p made with different monitor, expected %p got %i", this, this->monitors[i], this_thrd->current_monitors[i] );
233 } // if
234 }
235 );
236
237 lock_all( this->monitors, NULL, count );
238 LIB_DEBUG_PRINT_SAFE("Signalling");
239
240 __condition_node_t * node = pop_head( &this->blocked );
241 for(int i = 0; i < count; i++) {
242 __condition_criterion_t * crit = &node->criteria[i];
243 LIB_DEBUG_PRINT_SAFE(" %p", crit->target);
244 assert( !crit->ready );
245 push( &crit->target->signal_stack, crit );
246 }
247
248 LIB_DEBUG_PRINT_SAFE("\n");
249
250 unlock_all( this->monitors, count );
251}
252
253//-----------------------------------------------------------------------------
254// Utilities
255
256static inline void set_owner( monitor_desc * this, thread_desc * owner ) {
257 //Pass the monitor appropriately
258 this->owner = owner;
259
260 //We are passing the monitor to someone else, which means recursion level is not 0
261 this->recursion = owner ? 1 : 0;
262}
263
264static inline thread_desc * next_thread( monitor_desc * this ) {
265 //Check the signaller stack
266 __condition_criterion_t * urgent = pop( &this->signal_stack );
267 if( urgent ) {
268 //The signaller stack is not empty,
269 //regardless of if we are ready to baton pass,
270 //we need to set the monitor as in use
271 set_owner( this, urgent->owner->waiting_thread );
272
273 return check_condition( urgent );
274 }
275
276 // No signaller thread
277 // Get the next thread in the entry_queue
278 thread_desc * new_owner = pop_head( &this->entry_queue );
279 set_owner( this, new_owner );
280
281 return new_owner;
282}
283
284static inline void lock_all( spinlock ** locks, unsigned short count ) {
285 for( int i = 0; i < count; i++ ) {
286 lock( locks[i] );
287 }
288}
289
290static inline void lock_all( monitor_desc ** source, spinlock ** /*out*/ locks, unsigned short count ) {
291 for( int i = 0; i < count; i++ ) {
292 spinlock * l = &source[i]->lock;
293 lock( l );
294 if(locks) locks[i] = l;
295 }
296}
297
298static inline void unlock_all( spinlock ** locks, unsigned short count ) {
299 for( int i = 0; i < count; i++ ) {
300 unlock( locks[i] );
301 }
302}
303
304static inline void unlock_all( monitor_desc ** locks, unsigned short count ) {
305 for( int i = 0; i < count; i++ ) {
306 unlock( &locks[i]->lock );
307 }
308}
309
310
311static inline void save_recursion ( monitor_desc ** ctx, unsigned int * /*out*/ recursions, unsigned short count ) {
312 for( int i = 0; i < count; i++ ) {
313 recursions[i] = ctx[i]->recursion;
314 }
315}
316
317static inline void restore_recursion( monitor_desc ** ctx, unsigned int * /*in */ recursions, unsigned short count ) {
318 for( int i = 0; i < count; i++ ) {
319 ctx[i]->recursion = recursions[i];
320 }
321}
322
323// Function has 2 different behavior
324// 1 - Marks a monitors as being ready to run
325// 2 - Checks if all the monitors are ready to run
326// if so return the thread to run
327static inline thread_desc * check_condition( __condition_criterion_t * target ) {
328 __condition_node_t * node = target->owner;
329 unsigned short count = node->count;
330 __condition_criterion_t * criteria = node->criteria;
331
332 bool ready2run = true;
333
334 for( int i = 0; i < count; i++ ) {
335 LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target );
336 if( &criteria[i] == target ) {
337 criteria[i].ready = true;
338 LIB_DEBUG_PRINT_SAFE( "True\n" );
339 }
340
341 ready2run = criteria[i].ready && ready2run;
342 }
343
344 LIB_DEBUG_PRINT_SAFE( "Runing %i\n", ready2run );
345 return ready2run ? node->waiting_thread : NULL;
346}
347
348static inline void brand_condition( condition * this ) {
349 thread_desc * thrd = this_thread();
350 if( !this->monitors ) {
351 LIB_DEBUG_PRINT_SAFE("Branding\n");
352 assertf( thrd->current_monitors != NULL, "No current monitor to brand condition", thrd->current_monitors );
353 this->monitors = thrd->current_monitors;
354 this->monitor_count = thrd->current_monitor_count;
355 }
356}
357
358static inline unsigned short insert_unique( thread_desc ** thrds, unsigned short end, thread_desc * val ) {
359 if( !val ) return end;
360
361 for(int i = 0; i <= end; i++) {
362 if( thrds[i] == val ) return end;
363 }
364
365 thrds[end] = val;
366 return end + 1;
367}
368
369void ?{}( __condition_blocked_queue_t * this ) {
370 this->head = NULL;
371 this->tail = &this->head;
372}
373
374void append( __condition_blocked_queue_t * this, __condition_node_t * c ) {
375 assert(this->tail != NULL);
376 *this->tail = c;
377 this->tail = &c->next;
378}
379
380__condition_node_t * pop_head( __condition_blocked_queue_t * this ) {
381 __condition_node_t * head = this->head;
382 if( head ) {
383 this->head = head->next;
384 if( !head->next ) {
385 this->tail = &this->head;
386 }
387 head->next = NULL;
388 }
389 return head;
390}
Note: See TracBrowser for help on using the repository browser.