source: libcfa/src/concurrency/locks.cfa@ f977509

ADT ast-experimental
Last change on this file since f977509 was fece3d9, checked in by caparsons <caparson@…>, 2 years ago

Added fix for cond var timeout handling race. Cleanup of locks.hfa/cfa changes is an ongoing TODO

  • Property mode set to 100644
File size: 21.3 KB
RevLine 
[ab1b971]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// locks.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2021
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#define __cforall_thread__
18
[848439f]19#include "locks.hfa"
[708ae38]20#include "kernel/private.hfa"
[848439f]21
22#include <kernel.hfa>
23#include <stdlib.hfa>
24
[c18bf9e]25#pragma GCC visibility push(default)
26
[ac5816d]27//-----------------------------------------------------------------------------
28// info_thread
[fd54fef]29forall(L & | is_blocking_lock(L)) {
[ac5816d]30 struct info_thread {
[82f4063]31 // used to put info_thread on a dl queue
32 inline dlink(info_thread(L));
[ac5816d]33
34 // waiting thread
[e84ab3d]35 struct thread$ * t;
[ac5816d]36
37 // shadow field
38 uintptr_t info;
39
40 // lock that is passed to wait() (if one is passed)
41 L * lock;
42
43 // true when signalled and false when timeout wakes thread
44 bool signalled;
45 };
[82f4063]46 P9_EMBEDDED( info_thread(L), dlink(info_thread(L)) )
[848439f]47
[e84ab3d]48 void ?{}( info_thread(L) & this, thread$ * t, uintptr_t info, L * l ) {
[848439f]49 this.t = t;
50 this.info = info;
[ac5816d]51 this.lock = l;
[848439f]52 }
53
[ac5816d]54 void ^?{}( info_thread(L) & this ) {}
[848439f]55}
[cad1df1]56
[ac5816d]57//-----------------------------------------------------------------------------
58// Blocking Locks
[848439f]59void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner ) {
60 this.lock{};
61 this.blocked_threads{};
62 this.wait_count = 0;
63 this.multi_acquisition = multi_acquisition;
64 this.strict_owner = strict_owner;
65 this.owner = 0p;
66 this.recursion_count = 0;
67}
68
[cad1df1]69void ^?{}( blocking_lock & this ) {}
[ab1b971]70
[848439f]71
72void lock( blocking_lock & this ) with( this ) {
73 lock( lock __cfaabi_dbg_ctx2 );
[e84ab3d]74 thread$ * thrd = active_thread();
[ac5816d]75
76 // single acquisition lock is held by current thread
77 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
78
79 // lock is held by some other thread
80 if ( owner != 0p && owner != thrd ) {
[beeff61e]81 select_node node;
82 insert_last( blocked_threads, node );
[848439f]83 wait_count++;
84 unlock( lock );
[eeb5023]85 park( );
[beeff61e]86 return;
87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
[848439f]88 recursion_count++;
[beeff61e]89 } else { // lock isn't held
[ac5816d]90 owner = thrd;
[848439f]91 recursion_count = 1;
92 }
[beeff61e]93 unlock( lock );
[848439f]94}
95
96bool try_lock( blocking_lock & this ) with( this ) {
97 bool ret = false;
98 lock( lock __cfaabi_dbg_ctx2 );
[ac5816d]99
100 // lock isn't held
101 if ( owner == 0p ) {
[6a8882c]102 owner = active_thread();
103 recursion_count = 1;
[848439f]104 ret = true;
[ac5816d]105 }
106 // multi acquisition lock is held by current thread
107 else if ( owner == active_thread() && multi_acquisition ) {
[848439f]108 recursion_count++;
109 ret = true;
110 }
[ac5816d]111
[848439f]112 unlock( lock );
113 return ret;
114}
115
[beeff61e]116// static void pop_and_set_new_owner( blocking_lock & this ) with( this ) {
117// thread$ * t = &try_pop_front( blocked_threads );
118// owner = t;
119// recursion_count = ( t ? 1 : 0 );
120// if ( t ) wait_count--;
121// unpark( t );
122// }
123
124static inline void pop_node( blocking_lock & this ) with( this ) {
125 __handle_waituntil_OR( blocked_threads );
126 select_node * node = &try_pop_front( blocked_threads );
127 if ( node ) {
128 wait_count--;
129 owner = node->blocked_thread;
130 recursion_count = 1;
131 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
132 wake_one( blocked_threads, *node );
133 } else {
134 owner = 0p;
135 recursion_count = 0;
136 }
[cad1df1]137}
138
139void unlock( blocking_lock & this ) with( this ) {
140 lock( lock __cfaabi_dbg_ctx2 );
[ac5816d]141 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
[22b7579]142 /* paranoid */ verifyf( owner == active_thread() || !strict_owner , "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
143 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to release owner lock %p which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
[ac5816d]144
145 // if recursion count is zero release lock and set new owner if one is waiting
[848439f]146 recursion_count--;
[ac5816d]147 if ( recursion_count == 0 ) {
[beeff61e]148 pop_node( this );
[848439f]149 }
150 unlock( lock );
151}
152
153size_t wait_count( blocking_lock & this ) with( this ) {
154 return wait_count;
155}
156
[e84ab3d]157void on_notify( blocking_lock & this, thread$ * t ) with( this ) {
[ac5816d]158 lock( lock __cfaabi_dbg_ctx2 );
159 // lock held
160 if ( owner != 0p ) {
[beeff61e]161 insert_last( blocked_threads, *(select_node *)t->link_node );
[848439f]162 wait_count++;
[ac5816d]163 }
164 // lock not held
165 else {
[848439f]166 owner = t;
[6a8882c]167 recursion_count = 1;
[eeb5023]168 unpark( t );
[848439f]169 }
[beeff61e]170 unlock( lock );
[848439f]171}
172
[fece3d9]173size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
[ac5816d]174 lock( lock __cfaabi_dbg_ctx2 );
175 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
176 /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
177
[22b7579]178 size_t ret = recursion_count;
179
[beeff61e]180 pop_node( this );
181
182 select_node node;
183 active_thread()->link_node = (void *)&node;
[848439f]184 unlock( lock );
[beeff61e]185
[fece3d9]186 pre_park_then_park( pp_fn, pp_datum );
[beeff61e]187
[22b7579]188 return ret;
189}
190
191void on_wakeup( blocking_lock & this, size_t recursion ) with( this ) {
192 recursion_count = recursion;
[848439f]193}
194
[beeff61e]195// waituntil() support
196bool register_select( blocking_lock & this, select_node & node ) with(this) {
197 lock( lock __cfaabi_dbg_ctx2 );
198 thread$ * thrd = active_thread();
199
200 // single acquisition lock is held by current thread
201 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
202
203 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
204 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
205 unlock( lock );
206 return false;
207 }
208 }
209
210 // lock is held by some other thread
211 if ( owner != 0p && owner != thrd ) {
212 insert_last( blocked_threads, node );
213 wait_count++;
214 unlock( lock );
215 return false;
216 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
217 recursion_count++;
218 } else { // lock isn't held
219 owner = thrd;
220 recursion_count = 1;
221 }
222
223 if ( node.park_counter ) __make_select_node_available( node );
224 unlock( lock );
225 return true;
226}
227
228bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
229 lock( lock __cfaabi_dbg_ctx2 );
230 if ( node`isListed ) {
231 remove( node );
232 wait_count--;
233 unlock( lock );
234 return false;
235 }
236
237 if ( owner == active_thread() ) {
238 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
239 // if recursion count is zero release lock and set new owner if one is waiting
240 recursion_count--;
241 if ( recursion_count == 0 ) {
242 pop_node( this );
243 }
244 }
245 unlock( lock );
246 return false;
247}
248
249bool on_selected( blocking_lock & this, select_node & node ) { return true; }
250
[ac5816d]251//-----------------------------------------------------------------------------
252// alarm node wrapper
[fd54fef]253forall(L & | is_blocking_lock(L)) {
[c20533ea]254 struct alarm_node_wrap {
255 alarm_node_t alarm_node;
256 condition_variable(L) * cond;
[90a10e8]257 info_thread(L) * info_thd;
[c20533ea]258 };
259
[c457dc41]260 void ?{}( alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, condition_variable(L) * c, info_thread(L) * i ) {
[c20533ea]261 this.alarm_node{ callback, alarm, period };
[ac5816d]262 this.cond = c;
[90a10e8]263 this.info_thd = i;
[c20533ea]264 }
265
266 void ^?{}( alarm_node_wrap(L) & this ) { }
[848439f]267
[c18bf9e]268 static void timeout_handler ( alarm_node_wrap(L) & this ) with( this ) {
[ac5816d]269 // This condition_variable member is called from the kernel, and therefore, cannot block, but it can spin.
270 lock( cond->lock __cfaabi_dbg_ctx2 );
271
272 // this check is necessary to avoid a race condition since this timeout handler
273 // may still be called after a thread has been removed from the queue but
274 // before the alarm is unregistered
[82f4063]275 if ( (*info_thd)`isListed ) { // is thread on queue
[90a10e8]276 info_thd->signalled = false;
[ac5816d]277 // remove this thread O(1)
[82f4063]278 remove( *info_thd );
[6a8882c]279 cond->count--;
[90a10e8]280 if( info_thd->lock ) {
[ac5816d]281 // call lock's on_notify if a lock was passed
[90a10e8]282 on_notify(*info_thd->lock, info_thd->t);
[ac5816d]283 } else {
284 // otherwise wake thread
[90a10e8]285 unpark( info_thd->t );
[ac5816d]286 }
287 }
288 unlock( cond->lock );
[848439f]289 }
290
[797a193]291 // this casts the alarm node to our wrapped type since we used type erasure
[c18bf9e]292 static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
[ae06e0b]293
294 struct pthread_alarm_node_wrap {
295 alarm_node_t alarm_node;
296 pthread_cond_var(L) * cond;
297 info_thread(L) * info_thd;
298 };
299
300 void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
301 this.alarm_node{ callback, alarm, period };
302 this.cond = c;
303 this.info_thd = i;
304 }
305
306 void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
307
308 static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
309 // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
310 lock( cond->lock __cfaabi_dbg_ctx2 );
311 // this check is necessary to avoid a race condition since this timeout handler
312 // may still be called after a thread has been removed from the queue but
313 // before the alarm is unregistered
314 if ( (*info_thd)`isListed ) { // is thread on queue
315 info_thd->signalled = false;
316 // remove this thread O(1)
317 remove( *info_thd );
318 on_notify(*info_thd->lock, info_thd->t);
319 }
320 unlock( cond->lock );
321 }
322
323 // this casts the alarm node to our wrapped type since we used type erasure
324 static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
[c20533ea]325}
326
[ac5816d]327//-----------------------------------------------------------------------------
[7f958c4]328// Synchronization Locks
[fd54fef]329forall(L & | is_blocking_lock(L)) {
[c20533ea]330
[7f958c4]331 //-----------------------------------------------------------------------------
332 // condition variable
[848439f]333 void ?{}( condition_variable(L) & this ){
[eeb5023]334 this.lock{};
335 this.blocked_threads{};
336 this.count = 0;
[848439f]337 }
338
[cad1df1]339 void ^?{}( condition_variable(L) & this ){ }
[848439f]340
[c18bf9e]341 static void process_popped( condition_variable(L) & this, info_thread(L) & popped ) with( this ) {
[cad1df1]342 if(&popped != 0p) {
[dff1fd1]343 popped.signalled = true;
[eeb5023]344 count--;
[cad1df1]345 if (popped.lock) {
[ac5816d]346 // if lock passed call on_notify
347 on_notify(*popped.lock, popped.t);
[848439f]348 } else {
[ac5816d]349 // otherwise wake thread
350 unpark(popped.t);
[848439f]351 }
352 }
[cad1df1]353 }
354
355 bool notify_one( condition_variable(L) & this ) with( this ) {
356 lock( lock __cfaabi_dbg_ctx2 );
[82f4063]357 bool ret = ! blocked_threads`isEmpty;
358 process_popped(this, try_pop_front( blocked_threads ));
[848439f]359 unlock( lock );
360 return ret;
361 }
362
[eeb5023]363 bool notify_all( condition_variable(L) & this ) with(this) {
[848439f]364 lock( lock __cfaabi_dbg_ctx2 );
[82f4063]365 bool ret = ! blocked_threads`isEmpty;
366 while( ! blocked_threads`isEmpty ) {
367 process_popped(this, try_pop_front( blocked_threads ));
[848439f]368 }
369 unlock( lock );
370 return ret;
371 }
372
[eeb5023]373 uintptr_t front( condition_variable(L) & this ) with(this) {
[82f4063]374 return blocked_threads`isEmpty ? NULL : blocked_threads`first.info;
[848439f]375 }
376
[22b7579]377 bool empty( condition_variable(L) & this ) with(this) {
378 lock( lock __cfaabi_dbg_ctx2 );
[82f4063]379 bool ret = blocked_threads`isEmpty;
[22b7579]380 unlock( lock );
381 return ret;
382 }
[cad1df1]383
384 int counter( condition_variable(L) & this ) with(this) { return count; }
[848439f]385
[beeff61e]386 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
[ac5816d]387 // add info_thread to waiting queue
[82f4063]388 insert_last( blocked_threads, *i );
[cad1df1]389 count++;
[beeff61e]390 // size_t recursion_count = 0;
391 // if (i->lock) {
392 // // if lock was passed get recursion count to reset to after waking thread
393 // recursion_count = on_wait( *i->lock );
394 // }
395 // return recursion_count;
[848439f]396 }
397
[fece3d9]398 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
[beeff61e]399 size_t recursion_count = 0;
400 if ( i.lock ) {
401 // if lock was passed get recursion count to reset to after waking thread
[fece3d9]402 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
403 } else pre_park_then_park( pp_fn, pp_datum );
[beeff61e]404 return recursion_count;
405 }
[fece3d9]406 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
[beeff61e]407
[cad1df1]408 // helper for wait()'s' with no timeout
[c18bf9e]409 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
[848439f]410 lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]411 enqueue_thread( this, &i );
412 // size_t recursion_count = queue_and_get_recursion( this, &i );
[848439f]413 unlock( lock );
[ac5816d]414
415 // blocks here
[beeff61e]416 size_t recursion_count = block_and_get_recursion( i );
417 // park( );
[ac5816d]418
419 // resets recursion count here after waking
[beeff61e]420 if ( i.lock ) on_wakeup( *i.lock, recursion_count );
[848439f]421 }
422
[ac5816d]423 #define WAIT( u, l ) \
424 info_thread( L ) i = { active_thread(), u, l }; \
425 queue_info_thread( this, i );
426
[fece3d9]427 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
428
[eeb5023]429 // helper for wait()'s' with a timeout
[c18bf9e]430 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
[eeb5023]431 lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]432 enqueue_thread( this, &info );
433 // size_t recursion_count = queue_and_get_recursion( this, &info );
[afd7faf]434 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
[848439f]435 unlock( lock );
436
[4e83bb7]437 // registers alarm outside cond lock to avoid deadlock
[fece3d9]438 // register_self( &node_wrap.alarm_node );
[4e83bb7]439
[ac5816d]440 // blocks here
[fece3d9]441 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
[beeff61e]442 // park();
[848439f]443
[ac5816d]444 // unregisters alarm so it doesn't go off if this happens first
445 unregister_self( &node_wrap.alarm_node );
[848439f]446
[ac5816d]447 // resets recursion count here after waking
[beeff61e]448 if ( info.lock ) on_wakeup( *info.lock, recursion_count );
[848439f]449 }
450
[ac5816d]451 #define WAIT_TIME( u, l, t ) \
452 info_thread( L ) i = { active_thread(), u, l }; \
[afd7faf]453 queue_info_thread_timeout(this, i, t, alarm_node_wrap_cast ); \
[dff1fd1]454 return i.signalled;
[848439f]455
[ac5816d]456 void wait( condition_variable(L) & this ) with(this) { WAIT( 0, 0p ) }
457 void wait( condition_variable(L) & this, uintptr_t info ) with(this) { WAIT( info, 0p ) }
458 void wait( condition_variable(L) & this, L & l ) with(this) { WAIT( 0, &l ) }
459 void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { WAIT( info, &l ) }
460
[c457dc41]461 bool wait( condition_variable(L) & this, Duration duration ) with(this) { WAIT_TIME( 0 , 0p , duration ) }
462 bool wait( condition_variable(L) & this, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, 0p , duration ) }
463 bool wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { WAIT_TIME( 0 , &l , duration ) }
464 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, &l , duration ) }
[7f958c4]465
466 //-----------------------------------------------------------------------------
467 // fast_cond_var
468 void ?{}( fast_cond_var(L) & this ){
[c18bf9e]469 this.blocked_threads{};
[7f958c4]470 #ifdef __CFA_DEBUG__
471 this.lock_used = 0p;
472 #endif
473 }
474 void ^?{}( fast_cond_var(L) & this ){ }
475
476 bool notify_one( fast_cond_var(L) & this ) with(this) {
477 bool ret = ! blocked_threads`isEmpty;
478 if ( ret ) {
479 info_thread(L) & popped = try_pop_front( blocked_threads );
480 on_notify(*popped.lock, popped.t);
481 }
482 return ret;
483 }
484 bool notify_all( fast_cond_var(L) & this ) with(this) {
485 bool ret = ! blocked_threads`isEmpty;
486 while( ! blocked_threads`isEmpty ) {
487 info_thread(L) & popped = try_pop_front( blocked_threads );
488 on_notify(*popped.lock, popped.t);
489 }
490 return ret;
491 }
492
493 uintptr_t front( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
494 bool empty ( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
495
496 void wait( fast_cond_var(L) & this, L & l ) {
497 wait( this, l, 0 );
498 }
499
500 void wait( fast_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
501 // brand cond lock with lock
502 #ifdef __CFA_DEBUG__
503 if ( lock_used == 0p ) lock_used = &l;
[7d9598d8]504 else assert(lock_used == &l);
[7f958c4]505 #endif
506 info_thread( L ) i = { active_thread(), info, &l };
507 insert_last( blocked_threads, i );
[fece3d9]508 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
[beeff61e]509 // park( );
[7f958c4]510 on_wakeup(*i.lock, recursion_count);
511 }
[454f478]512
[ae06e0b]513 //-----------------------------------------------------------------------------
514 // pthread_cond_var
515
516 void ?{}( pthread_cond_var(L) & this ) with(this) {
517 blocked_threads{};
518 lock{};
519 }
520
521 void ^?{}( pthread_cond_var(L) & this ) { }
522
523 bool notify_one( pthread_cond_var(L) & this ) with(this) {
524 lock( lock __cfaabi_dbg_ctx2 );
525 bool ret = ! blocked_threads`isEmpty;
526 if ( ret ) {
527 info_thread(L) & popped = try_pop_front( blocked_threads );
[4e83bb7]528 popped.signalled = true;
[ae06e0b]529 on_notify(*popped.lock, popped.t);
530 }
531 unlock( lock );
532 return ret;
533 }
534
535 bool notify_all( pthread_cond_var(L) & this ) with(this) {
536 lock( lock __cfaabi_dbg_ctx2 );
537 bool ret = ! blocked_threads`isEmpty;
538 while( ! blocked_threads`isEmpty ) {
539 info_thread(L) & popped = try_pop_front( blocked_threads );
[4e83bb7]540 popped.signalled = true;
[ae06e0b]541 on_notify(*popped.lock, popped.t);
542 }
543 unlock( lock );
544 return ret;
545 }
546
547 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
548 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
549
[beeff61e]550 // static size_t queue_and_get_recursion( pthread_cond_var(L) & this, info_thread(L) * i ) with(this) {
551 // // add info_thread to waiting queue
552 // insert_last( blocked_threads, *i );
553 // size_t recursion_count = 0;
554 // recursion_count = on_wait( *i->lock );
555 // return recursion_count;
556 // }
[ae06e0b]557
558 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
559 lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]560 // size_t recursion_count = queue_and_get_recursion(this, &info);
561 insert_last( blocked_threads, info );
[ae06e0b]562 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
563 unlock( lock );
564
[4e83bb7]565 // registers alarm outside cond lock to avoid deadlock
[fece3d9]566 // register_self( &node_wrap.alarm_node ); // C_TODO: fix race: registers itself and then alarm handler calls on_notify before block_and_get_recursion is run
[4e83bb7]567
[ae06e0b]568 // blocks here
[fece3d9]569 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
[beeff61e]570 // park();
[ae06e0b]571
572 // unregisters alarm so it doesn't go off if this happens first
573 unregister_self( &node_wrap.alarm_node );
574
575 // resets recursion count here after waking
[beeff61e]576 if ( info.lock ) on_wakeup( *info.lock, recursion_count );
[ae06e0b]577 }
578
579 void wait( pthread_cond_var(L) & this, L & l ) with(this) {
580 wait( this, l, 0 );
581 }
582
583 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
584 lock( lock __cfaabi_dbg_ctx2 );
585 info_thread( L ) i = { active_thread(), info, &l };
[beeff61e]586 insert_last( blocked_threads, i );
587 // size_t recursion_count = queue_and_get_recursion( this, &i );
[ae06e0b]588 unlock( lock );
[beeff61e]589
590 // blocks here
591 size_t recursion_count = block_and_get_recursion( i );
592 // park();
593 on_wakeup( *i.lock, recursion_count );
[ae06e0b]594 }
595
596 #define PTHREAD_WAIT_TIME( u, l, t ) \
597 info_thread( L ) i = { active_thread(), u, l }; \
598 queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
599 return i.signalled;
600
[4e83bb7]601 Duration getDuration(timespec t) {
602 timespec currTime;
603 clock_gettime(CLOCK_REALTIME, &currTime);
604 Duration waitUntil = { t };
605 Duration currDur = { currTime };
606 if ( currDur >= waitUntil ) return currDur - waitUntil;
607 Duration zero = { 0 };
608 return zero;
609 }
610
[ae06e0b]611 bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
[4e83bb7]612 PTHREAD_WAIT_TIME( 0, &l , getDuration( t ) )
[ae06e0b]613 }
614
615 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) {
[4e83bb7]616 PTHREAD_WAIT_TIME( info, &l , getDuration( t ) )
[ae06e0b]617 }
618}
[454f478]619//-----------------------------------------------------------------------------
620// Semaphore
621void ?{}( semaphore & this, int count = 1 ) {
622 (this.lock){};
623 this.count = count;
624 (this.waiting){};
625}
626void ^?{}(semaphore & this) {}
627
628bool P(semaphore & this) with( this ){
629 lock( lock __cfaabi_dbg_ctx2 );
630 count -= 1;
631 if ( count < 0 ) {
632 // queue current task
633 append( waiting, active_thread() );
634
635 // atomically release spin lock and block
636 unlock( lock );
637 park();
638 return true;
639 }
640 else {
641 unlock( lock );
642 return false;
643 }
644}
645
[e84ab3d]646thread$ * V (semaphore & this, const bool doUnpark ) with( this ) {
647 thread$ * thrd = 0p;
[454f478]648 lock( lock __cfaabi_dbg_ctx2 );
649 count += 1;
650 if ( count <= 0 ) {
651 // remove task at head of waiting list
652 thrd = pop_head( waiting );
653 }
654
655 unlock( lock );
656
657 // make new owner
[22b7579]658 if( doUnpark ) unpark( thrd );
659
660 return thrd;
661}
[454f478]662
[22b7579]663bool V(semaphore & this) with( this ) {
[e84ab3d]664 thread$ * thrd = V(this, true);
[454f478]665 return thrd != 0p;
666}
667
668bool V(semaphore & this, unsigned diff) with( this ) {
[e84ab3d]669 thread$ * thrd = 0p;
[454f478]670 lock( lock __cfaabi_dbg_ctx2 );
671 int release = max(-count, (int)diff);
672 count += diff;
673 for(release) {
674 unpark( pop_head( waiting ) );
675 }
676
677 unlock( lock );
678
679 return thrd != 0p;
[f5f2768]680}
Note: See TracBrowser for help on using the repository browser.