source: libcfa/src/concurrency/locks.cfa@ 10a9479d

Last change on this file since 10a9479d was b93bf85, checked in by caparsons <caparson@…>, 2 years ago

fixed spurious channel close waituntil error case. Was caused by a race condition causing an exception to be thrown while another was in flight

  • Property mode set to 100644
File size: 20.1 KB
RevLine 
[ab1b971]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
[5a05946]7// locks.cfa -- LIBCFATHREAD
[ab1b971]8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2021
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#define __cforall_thread__
18
[848439f]19#include "locks.hfa"
[708ae38]20#include "kernel/private.hfa"
[848439f]21
22#include <kernel.hfa>
23#include <stdlib.hfa>
24
[c18bf9e]25#pragma GCC visibility push(default)
26
[ac5816d]27//-----------------------------------------------------------------------------
28// info_thread
[fd54fef]29forall(L & | is_blocking_lock(L)) {
[ac5816d]30 struct info_thread {
[82f4063]31 // used to put info_thread on a dl queue
32 inline dlink(info_thread(L));
[ac5816d]33
34 // waiting thread
[e84ab3d]35 struct thread$ * t;
[ac5816d]36
37 // shadow field
38 uintptr_t info;
39
40 // lock that is passed to wait() (if one is passed)
41 L * lock;
42
43 // true when signalled and false when timeout wakes thread
44 bool signalled;
45 };
[82f4063]46 P9_EMBEDDED( info_thread(L), dlink(info_thread(L)) )
[848439f]47
[e84ab3d]48 void ?{}( info_thread(L) & this, thread$ * t, uintptr_t info, L * l ) {
[848439f]49 this.t = t;
50 this.info = info;
[ac5816d]51 this.lock = l;
[848439f]52 }
53
[ac5816d]54 void ^?{}( info_thread(L) & this ) {}
[848439f]55}
[cad1df1]56
[ac5816d]57//-----------------------------------------------------------------------------
58// Blocking Locks
[848439f]59void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner ) {
60 this.lock{};
61 this.blocked_threads{};
62 this.wait_count = 0;
63 this.multi_acquisition = multi_acquisition;
64 this.strict_owner = strict_owner;
65 this.owner = 0p;
66 this.recursion_count = 0;
67}
68
[cad1df1]69void ^?{}( blocking_lock & this ) {}
[ab1b971]70
[848439f]71
72void lock( blocking_lock & this ) with( this ) {
73 lock( lock __cfaabi_dbg_ctx2 );
[e84ab3d]74 thread$ * thrd = active_thread();
[ac5816d]75
76 // single acquisition lock is held by current thread
77 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
78
79 // lock is held by some other thread
80 if ( owner != 0p && owner != thrd ) {
[beeff61e]81 select_node node;
82 insert_last( blocked_threads, node );
[848439f]83 wait_count++;
84 unlock( lock );
[eeb5023]85 park( );
[beeff61e]86 return;
87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
[848439f]88 recursion_count++;
[beeff61e]89 } else { // lock isn't held
[ac5816d]90 owner = thrd;
[848439f]91 recursion_count = 1;
92 }
[beeff61e]93 unlock( lock );
[848439f]94}
95
96bool try_lock( blocking_lock & this ) with( this ) {
97 bool ret = false;
98 lock( lock __cfaabi_dbg_ctx2 );
[ac5816d]99
100 // lock isn't held
101 if ( owner == 0p ) {
[6a8882c]102 owner = active_thread();
103 recursion_count = 1;
[848439f]104 ret = true;
[ac5816d]105 }
106 // multi acquisition lock is held by current thread
107 else if ( owner == active_thread() && multi_acquisition ) {
[848439f]108 recursion_count++;
109 ret = true;
110 }
[ac5816d]111
[848439f]112 unlock( lock );
113 return ret;
114}
115
[beeff61e]116static inline void pop_node( blocking_lock & this ) with( this ) {
117 __handle_waituntil_OR( blocked_threads );
118 select_node * node = &try_pop_front( blocked_threads );
119 if ( node ) {
120 wait_count--;
121 owner = node->blocked_thread;
122 recursion_count = 1;
123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
124 wake_one( blocked_threads, *node );
125 } else {
126 owner = 0p;
127 recursion_count = 0;
128 }
[cad1df1]129}
130
131void unlock( blocking_lock & this ) with( this ) {
132 lock( lock __cfaabi_dbg_ctx2 );
[ac5816d]133 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
[22b7579]134 /* paranoid */ verifyf( owner == active_thread() || !strict_owner , "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
135 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to release owner lock %p which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
[ac5816d]136
137 // if recursion count is zero release lock and set new owner if one is waiting
[848439f]138 recursion_count--;
[ac5816d]139 if ( recursion_count == 0 ) {
[beeff61e]140 pop_node( this );
[848439f]141 }
142 unlock( lock );
143}
144
145size_t wait_count( blocking_lock & this ) with( this ) {
146 return wait_count;
147}
148
[e84ab3d]149void on_notify( blocking_lock & this, thread$ * t ) with( this ) {
[ac5816d]150 lock( lock __cfaabi_dbg_ctx2 );
151 // lock held
152 if ( owner != 0p ) {
[beeff61e]153 insert_last( blocked_threads, *(select_node *)t->link_node );
[848439f]154 wait_count++;
[ac5816d]155 }
156 // lock not held
157 else {
[848439f]158 owner = t;
[6a8882c]159 recursion_count = 1;
[eeb5023]160 unpark( t );
[848439f]161 }
[beeff61e]162 unlock( lock );
[848439f]163}
164
[fece3d9]165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
[ac5816d]166 lock( lock __cfaabi_dbg_ctx2 );
167 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
168 /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
169
[22b7579]170 size_t ret = recursion_count;
171
[beeff61e]172 pop_node( this );
173
174 select_node node;
175 active_thread()->link_node = (void *)&node;
[848439f]176 unlock( lock );
[beeff61e]177
[fece3d9]178 pre_park_then_park( pp_fn, pp_datum );
[beeff61e]179
[22b7579]180 return ret;
181}
182
183void on_wakeup( blocking_lock & this, size_t recursion ) with( this ) {
184 recursion_count = recursion;
[848439f]185}
186
[beeff61e]187// waituntil() support
188bool register_select( blocking_lock & this, select_node & node ) with(this) {
189 lock( lock __cfaabi_dbg_ctx2 );
190 thread$ * thrd = active_thread();
191
192 // single acquisition lock is held by current thread
193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
194
195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
197 unlock( lock );
198 return false;
199 }
200 }
201
202 // lock is held by some other thread
203 if ( owner != 0p && owner != thrd ) {
204 insert_last( blocked_threads, node );
205 wait_count++;
206 unlock( lock );
207 return false;
208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
209 recursion_count++;
210 } else { // lock isn't held
211 owner = thrd;
212 recursion_count = 1;
213 }
214
215 if ( node.park_counter ) __make_select_node_available( node );
216 unlock( lock );
217 return true;
218}
219
220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
221 lock( lock __cfaabi_dbg_ctx2 );
222 if ( node`isListed ) {
223 remove( node );
224 wait_count--;
225 unlock( lock );
226 return false;
227 }
228
229 if ( owner == active_thread() ) {
230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
231 // if recursion count is zero release lock and set new owner if one is waiting
232 recursion_count--;
233 if ( recursion_count == 0 ) {
234 pop_node( this );
235 }
236 }
237 unlock( lock );
238 return false;
239}
240
[b93bf85]241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
[beeff61e]242
[ac5816d]243//-----------------------------------------------------------------------------
244// alarm node wrapper
[fd54fef]245forall(L & | is_blocking_lock(L)) {
[c20533ea]246 struct alarm_node_wrap {
247 alarm_node_t alarm_node;
248 condition_variable(L) * cond;
[90a10e8]249 info_thread(L) * info_thd;
[c20533ea]250 };
251
[c457dc41]252 void ?{}( alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, condition_variable(L) * c, info_thread(L) * i ) {
[c20533ea]253 this.alarm_node{ callback, alarm, period };
[ac5816d]254 this.cond = c;
[90a10e8]255 this.info_thd = i;
[c20533ea]256 }
257
258 void ^?{}( alarm_node_wrap(L) & this ) { }
[848439f]259
[c18bf9e]260 static void timeout_handler ( alarm_node_wrap(L) & this ) with( this ) {
[ac5816d]261 // This condition_variable member is called from the kernel, and therefore, cannot block, but it can spin.
262 lock( cond->lock __cfaabi_dbg_ctx2 );
263
264 // this check is necessary to avoid a race condition since this timeout handler
265 // may still be called after a thread has been removed from the queue but
266 // before the alarm is unregistered
[82f4063]267 if ( (*info_thd)`isListed ) { // is thread on queue
[90a10e8]268 info_thd->signalled = false;
[ac5816d]269 // remove this thread O(1)
[82f4063]270 remove( *info_thd );
[6a8882c]271 cond->count--;
[90a10e8]272 if( info_thd->lock ) {
[ac5816d]273 // call lock's on_notify if a lock was passed
[90a10e8]274 on_notify(*info_thd->lock, info_thd->t);
[ac5816d]275 } else {
276 // otherwise wake thread
[90a10e8]277 unpark( info_thd->t );
[ac5816d]278 }
279 }
280 unlock( cond->lock );
[848439f]281 }
282
[797a193]283 // this casts the alarm node to our wrapped type since we used type erasure
[c18bf9e]284 static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
[ae06e0b]285
286 struct pthread_alarm_node_wrap {
287 alarm_node_t alarm_node;
288 pthread_cond_var(L) * cond;
289 info_thread(L) * info_thd;
290 };
291
292 void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
293 this.alarm_node{ callback, alarm, period };
294 this.cond = c;
295 this.info_thd = i;
296 }
297
298 void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
299
300 static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
301 // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
302 lock( cond->lock __cfaabi_dbg_ctx2 );
303 // this check is necessary to avoid a race condition since this timeout handler
304 // may still be called after a thread has been removed from the queue but
305 // before the alarm is unregistered
306 if ( (*info_thd)`isListed ) { // is thread on queue
307 info_thd->signalled = false;
308 // remove this thread O(1)
309 remove( *info_thd );
310 on_notify(*info_thd->lock, info_thd->t);
311 }
312 unlock( cond->lock );
313 }
314
315 // this casts the alarm node to our wrapped type since we used type erasure
316 static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
[c20533ea]317}
318
[ac5816d]319//-----------------------------------------------------------------------------
[7f958c4]320// Synchronization Locks
[fd54fef]321forall(L & | is_blocking_lock(L)) {
[c20533ea]322
[7f958c4]323 //-----------------------------------------------------------------------------
324 // condition variable
[848439f]325 void ?{}( condition_variable(L) & this ){
[eeb5023]326 this.lock{};
327 this.blocked_threads{};
328 this.count = 0;
[848439f]329 }
330
[cad1df1]331 void ^?{}( condition_variable(L) & this ){ }
[848439f]332
[c18bf9e]333 static void process_popped( condition_variable(L) & this, info_thread(L) & popped ) with( this ) {
[cad1df1]334 if(&popped != 0p) {
[dff1fd1]335 popped.signalled = true;
[eeb5023]336 count--;
[cad1df1]337 if (popped.lock) {
[ac5816d]338 // if lock passed call on_notify
339 on_notify(*popped.lock, popped.t);
[848439f]340 } else {
[ac5816d]341 // otherwise wake thread
342 unpark(popped.t);
[848439f]343 }
344 }
[cad1df1]345 }
346
347 bool notify_one( condition_variable(L) & this ) with( this ) {
348 lock( lock __cfaabi_dbg_ctx2 );
[82f4063]349 bool ret = ! blocked_threads`isEmpty;
350 process_popped(this, try_pop_front( blocked_threads ));
[848439f]351 unlock( lock );
352 return ret;
353 }
354
[eeb5023]355 bool notify_all( condition_variable(L) & this ) with(this) {
[848439f]356 lock( lock __cfaabi_dbg_ctx2 );
[82f4063]357 bool ret = ! blocked_threads`isEmpty;
358 while( ! blocked_threads`isEmpty ) {
359 process_popped(this, try_pop_front( blocked_threads ));
[848439f]360 }
361 unlock( lock );
362 return ret;
363 }
364
[eeb5023]365 uintptr_t front( condition_variable(L) & this ) with(this) {
[82f4063]366 return blocked_threads`isEmpty ? NULL : blocked_threads`first.info;
[848439f]367 }
368
[22b7579]369 bool empty( condition_variable(L) & this ) with(this) {
370 lock( lock __cfaabi_dbg_ctx2 );
[82f4063]371 bool ret = blocked_threads`isEmpty;
[22b7579]372 unlock( lock );
373 return ret;
374 }
[cad1df1]375
376 int counter( condition_variable(L) & this ) with(this) { return count; }
[848439f]377
[beeff61e]378 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
[ac5816d]379 // add info_thread to waiting queue
[82f4063]380 insert_last( blocked_threads, *i );
[cad1df1]381 count++;
[848439f]382 }
383
[fece3d9]384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
[beeff61e]385 size_t recursion_count = 0;
[5a05946]386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
[fece3d9]387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
[5a05946]388 else
389 pre_park_then_park( pp_fn, pp_datum );
[beeff61e]390 return recursion_count;
391 }
[fece3d9]392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
[beeff61e]393
[cad1df1]394 // helper for wait()'s' with no timeout
[c18bf9e]395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
[848439f]396 lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]397 enqueue_thread( this, &i );
[848439f]398 unlock( lock );
[ac5816d]399
400 // blocks here
[beeff61e]401 size_t recursion_count = block_and_get_recursion( i );
[ac5816d]402
403 // resets recursion count here after waking
[beeff61e]404 if ( i.lock ) on_wakeup( *i.lock, recursion_count );
[848439f]405 }
406
[ac5816d]407 #define WAIT( u, l ) \
408 info_thread( L ) i = { active_thread(), u, l }; \
409 queue_info_thread( this, i );
410
[fece3d9]411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
412
[eeb5023]413 // helper for wait()'s' with a timeout
[c18bf9e]414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
[eeb5023]415 lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]416 enqueue_thread( this, &info );
[afd7faf]417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
[848439f]418 unlock( lock );
419
[5a05946]420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
[fece3d9]421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
[beeff61e]422 // park();
[848439f]423
[ac5816d]424 // unregisters alarm so it doesn't go off if this happens first
425 unregister_self( &node_wrap.alarm_node );
[848439f]426
[ac5816d]427 // resets recursion count here after waking
[beeff61e]428 if ( info.lock ) on_wakeup( *info.lock, recursion_count );
[848439f]429 }
430
[ac5816d]431 #define WAIT_TIME( u, l, t ) \
432 info_thread( L ) i = { active_thread(), u, l }; \
[afd7faf]433 queue_info_thread_timeout(this, i, t, alarm_node_wrap_cast ); \
[dff1fd1]434 return i.signalled;
[848439f]435
[ac5816d]436 void wait( condition_variable(L) & this ) with(this) { WAIT( 0, 0p ) }
437 void wait( condition_variable(L) & this, uintptr_t info ) with(this) { WAIT( info, 0p ) }
438 void wait( condition_variable(L) & this, L & l ) with(this) { WAIT( 0, &l ) }
439 void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { WAIT( info, &l ) }
440
[c457dc41]441 bool wait( condition_variable(L) & this, Duration duration ) with(this) { WAIT_TIME( 0 , 0p , duration ) }
442 bool wait( condition_variable(L) & this, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, 0p , duration ) }
443 bool wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { WAIT_TIME( 0 , &l , duration ) }
444 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, &l , duration ) }
[7f958c4]445
446 //-----------------------------------------------------------------------------
447 // fast_cond_var
448 void ?{}( fast_cond_var(L) & this ){
[c18bf9e]449 this.blocked_threads{};
[7f958c4]450 #ifdef __CFA_DEBUG__
451 this.lock_used = 0p;
452 #endif
453 }
454 void ^?{}( fast_cond_var(L) & this ){ }
455
456 bool notify_one( fast_cond_var(L) & this ) with(this) {
457 bool ret = ! blocked_threads`isEmpty;
458 if ( ret ) {
459 info_thread(L) & popped = try_pop_front( blocked_threads );
460 on_notify(*popped.lock, popped.t);
461 }
462 return ret;
463 }
464 bool notify_all( fast_cond_var(L) & this ) with(this) {
465 bool ret = ! blocked_threads`isEmpty;
466 while( ! blocked_threads`isEmpty ) {
467 info_thread(L) & popped = try_pop_front( blocked_threads );
468 on_notify(*popped.lock, popped.t);
469 }
470 return ret;
471 }
472
473 uintptr_t front( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
474 bool empty ( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
475
476 void wait( fast_cond_var(L) & this, L & l ) {
477 wait( this, l, 0 );
478 }
479
480 void wait( fast_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
481 // brand cond lock with lock
482 #ifdef __CFA_DEBUG__
483 if ( lock_used == 0p ) lock_used = &l;
[7d9598d8]484 else assert(lock_used == &l);
[7f958c4]485 #endif
486 info_thread( L ) i = { active_thread(), info, &l };
487 insert_last( blocked_threads, i );
[fece3d9]488 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
[beeff61e]489 // park( );
[7f958c4]490 on_wakeup(*i.lock, recursion_count);
491 }
[454f478]492
[ae06e0b]493 //-----------------------------------------------------------------------------
494 // pthread_cond_var
495
496 void ?{}( pthread_cond_var(L) & this ) with(this) {
497 blocked_threads{};
498 lock{};
499 }
500
501 void ^?{}( pthread_cond_var(L) & this ) { }
502
503 bool notify_one( pthread_cond_var(L) & this ) with(this) {
504 lock( lock __cfaabi_dbg_ctx2 );
505 bool ret = ! blocked_threads`isEmpty;
506 if ( ret ) {
507 info_thread(L) & popped = try_pop_front( blocked_threads );
[4e83bb7]508 popped.signalled = true;
[ae06e0b]509 on_notify(*popped.lock, popped.t);
510 }
511 unlock( lock );
512 return ret;
513 }
514
515 bool notify_all( pthread_cond_var(L) & this ) with(this) {
516 lock( lock __cfaabi_dbg_ctx2 );
517 bool ret = ! blocked_threads`isEmpty;
518 while( ! blocked_threads`isEmpty ) {
519 info_thread(L) & popped = try_pop_front( blocked_threads );
[4e83bb7]520 popped.signalled = true;
[ae06e0b]521 on_notify(*popped.lock, popped.t);
522 }
523 unlock( lock );
524 return ret;
525 }
526
527 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
529
530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
531 lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]532 insert_last( blocked_threads, info );
[ae06e0b]533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
534 unlock( lock );
535
[5a05946]536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
[fece3d9]537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
[ae06e0b]538
[5a05946]539 // unregisters alarm so it doesn't go off if signal happens first
[ae06e0b]540 unregister_self( &node_wrap.alarm_node );
541
542 // resets recursion count here after waking
[beeff61e]543 if ( info.lock ) on_wakeup( *info.lock, recursion_count );
[ae06e0b]544 }
545
546 void wait( pthread_cond_var(L) & this, L & l ) with(this) {
547 wait( this, l, 0 );
548 }
549
550 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
551 lock( lock __cfaabi_dbg_ctx2 );
552 info_thread( L ) i = { active_thread(), info, &l };
[beeff61e]553 insert_last( blocked_threads, i );
[ae06e0b]554 unlock( lock );
[beeff61e]555
556 // blocks here
557 size_t recursion_count = block_and_get_recursion( i );
[5a05946]558
[beeff61e]559 on_wakeup( *i.lock, recursion_count );
[ae06e0b]560 }
561
562 #define PTHREAD_WAIT_TIME( u, l, t ) \
563 info_thread( L ) i = { active_thread(), u, l }; \
564 queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
565 return i.signalled;
566
[4e83bb7]567 Duration getDuration(timespec t) {
568 timespec currTime;
569 clock_gettime(CLOCK_REALTIME, &currTime);
570 Duration waitUntil = { t };
571 Duration currDur = { currTime };
572 if ( currDur >= waitUntil ) return currDur - waitUntil;
573 Duration zero = { 0 };
574 return zero;
575 }
576
[ae06e0b]577 bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
[4e83bb7]578 PTHREAD_WAIT_TIME( 0, &l , getDuration( t ) )
[ae06e0b]579 }
580
581 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) {
[4e83bb7]582 PTHREAD_WAIT_TIME( info, &l , getDuration( t ) )
[ae06e0b]583 }
584}
[454f478]585//-----------------------------------------------------------------------------
586// Semaphore
587void ?{}( semaphore & this, int count = 1 ) {
588 (this.lock){};
589 this.count = count;
590 (this.waiting){};
591}
592void ^?{}(semaphore & this) {}
593
594bool P(semaphore & this) with( this ){
595 lock( lock __cfaabi_dbg_ctx2 );
596 count -= 1;
597 if ( count < 0 ) {
598 // queue current task
599 append( waiting, active_thread() );
600
601 // atomically release spin lock and block
602 unlock( lock );
603 park();
604 return true;
605 }
606 else {
607 unlock( lock );
608 return false;
609 }
610}
611
[e84ab3d]612thread$ * V (semaphore & this, const bool doUnpark ) with( this ) {
613 thread$ * thrd = 0p;
[454f478]614 lock( lock __cfaabi_dbg_ctx2 );
615 count += 1;
616 if ( count <= 0 ) {
617 // remove task at head of waiting list
618 thrd = pop_head( waiting );
619 }
620
621 unlock( lock );
622
623 // make new owner
[22b7579]624 if( doUnpark ) unpark( thrd );
625
626 return thrd;
627}
[454f478]628
[22b7579]629bool V(semaphore & this) with( this ) {
[e84ab3d]630 thread$ * thrd = V(this, true);
[454f478]631 return thrd != 0p;
632}
633
634bool V(semaphore & this, unsigned diff) with( this ) {
[e84ab3d]635 thread$ * thrd = 0p;
[454f478]636 lock( lock __cfaabi_dbg_ctx2 );
637 int release = max(-count, (int)diff);
638 count += diff;
639 for(release) {
640 unpark( pop_head( waiting ) );
641 }
642
643 unlock( lock );
644
645 return thrd != 0p;
[f5f2768]646}
[5a05946]647
Note: See TracBrowser for help on using the repository browser.