source: libcfa/src/concurrency/locks.cfa@ 532c0cd

Last change on this file since 532c0cd was b93bf85, checked in by caparsons <caparson@…>, 2 years ago

fixed spurious channel close waituntil error case. Was caused by a race condition causing an exception to be thrown while another was in flight

  • Property mode set to 100644
File size: 20.1 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// locks.cfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2021
12// Last Modified By :
13// Last Modified On :
14// Update Count :
15//
16
17#define __cforall_thread__
18
19#include "locks.hfa"
20#include "kernel/private.hfa"
21
22#include <kernel.hfa>
23#include <stdlib.hfa>
24
25#pragma GCC visibility push(default)
26
27//-----------------------------------------------------------------------------
28// info_thread
29forall(L & | is_blocking_lock(L)) {
30 struct info_thread {
31 // used to put info_thread on a dl queue
32 inline dlink(info_thread(L));
33
34 // waiting thread
35 struct thread$ * t;
36
37 // shadow field
38 uintptr_t info;
39
40 // lock that is passed to wait() (if one is passed)
41 L * lock;
42
43 // true when signalled and false when timeout wakes thread
44 bool signalled;
45 };
46 P9_EMBEDDED( info_thread(L), dlink(info_thread(L)) )
47
48 void ?{}( info_thread(L) & this, thread$ * t, uintptr_t info, L * l ) {
49 this.t = t;
50 this.info = info;
51 this.lock = l;
52 }
53
54 void ^?{}( info_thread(L) & this ) {}
55}
56
57//-----------------------------------------------------------------------------
58// Blocking Locks
59void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner ) {
60 this.lock{};
61 this.blocked_threads{};
62 this.wait_count = 0;
63 this.multi_acquisition = multi_acquisition;
64 this.strict_owner = strict_owner;
65 this.owner = 0p;
66 this.recursion_count = 0;
67}
68
69void ^?{}( blocking_lock & this ) {}
70
71
72void lock( blocking_lock & this ) with( this ) {
73 lock( lock __cfaabi_dbg_ctx2 );
74 thread$ * thrd = active_thread();
75
76 // single acquisition lock is held by current thread
77 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
78
79 // lock is held by some other thread
80 if ( owner != 0p && owner != thrd ) {
81 select_node node;
82 insert_last( blocked_threads, node );
83 wait_count++;
84 unlock( lock );
85 park( );
86 return;
87 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
88 recursion_count++;
89 } else { // lock isn't held
90 owner = thrd;
91 recursion_count = 1;
92 }
93 unlock( lock );
94}
95
96bool try_lock( blocking_lock & this ) with( this ) {
97 bool ret = false;
98 lock( lock __cfaabi_dbg_ctx2 );
99
100 // lock isn't held
101 if ( owner == 0p ) {
102 owner = active_thread();
103 recursion_count = 1;
104 ret = true;
105 }
106 // multi acquisition lock is held by current thread
107 else if ( owner == active_thread() && multi_acquisition ) {
108 recursion_count++;
109 ret = true;
110 }
111
112 unlock( lock );
113 return ret;
114}
115
116static inline void pop_node( blocking_lock & this ) with( this ) {
117 __handle_waituntil_OR( blocked_threads );
118 select_node * node = &try_pop_front( blocked_threads );
119 if ( node ) {
120 wait_count--;
121 owner = node->blocked_thread;
122 recursion_count = 1;
123 // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
124 wake_one( blocked_threads, *node );
125 } else {
126 owner = 0p;
127 recursion_count = 0;
128 }
129}
130
131void unlock( blocking_lock & this ) with( this ) {
132 lock( lock __cfaabi_dbg_ctx2 );
133 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
134 /* paranoid */ verifyf( owner == active_thread() || !strict_owner , "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
135 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to release owner lock %p which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
136
137 // if recursion count is zero release lock and set new owner if one is waiting
138 recursion_count--;
139 if ( recursion_count == 0 ) {
140 pop_node( this );
141 }
142 unlock( lock );
143}
144
145size_t wait_count( blocking_lock & this ) with( this ) {
146 return wait_count;
147}
148
149void on_notify( blocking_lock & this, thread$ * t ) with( this ) {
150 lock( lock __cfaabi_dbg_ctx2 );
151 // lock held
152 if ( owner != 0p ) {
153 insert_last( blocked_threads, *(select_node *)t->link_node );
154 wait_count++;
155 }
156 // lock not held
157 else {
158 owner = t;
159 recursion_count = 1;
160 unpark( t );
161 }
162 unlock( lock );
163}
164
165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
166 lock( lock __cfaabi_dbg_ctx2 );
167 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
168 /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
169
170 size_t ret = recursion_count;
171
172 pop_node( this );
173
174 select_node node;
175 active_thread()->link_node = (void *)&node;
176 unlock( lock );
177
178 pre_park_then_park( pp_fn, pp_datum );
179
180 return ret;
181}
182
183void on_wakeup( blocking_lock & this, size_t recursion ) with( this ) {
184 recursion_count = recursion;
185}
186
187// waituntil() support
188bool register_select( blocking_lock & this, select_node & node ) with(this) {
189 lock( lock __cfaabi_dbg_ctx2 );
190 thread$ * thrd = active_thread();
191
192 // single acquisition lock is held by current thread
193 /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
194
195 if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
196 if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
197 unlock( lock );
198 return false;
199 }
200 }
201
202 // lock is held by some other thread
203 if ( owner != 0p && owner != thrd ) {
204 insert_last( blocked_threads, node );
205 wait_count++;
206 unlock( lock );
207 return false;
208 } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
209 recursion_count++;
210 } else { // lock isn't held
211 owner = thrd;
212 recursion_count = 1;
213 }
214
215 if ( node.park_counter ) __make_select_node_available( node );
216 unlock( lock );
217 return true;
218}
219
220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
221 lock( lock __cfaabi_dbg_ctx2 );
222 if ( node`isListed ) {
223 remove( node );
224 wait_count--;
225 unlock( lock );
226 return false;
227 }
228
229 if ( owner == active_thread() ) {
230 /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
231 // if recursion count is zero release lock and set new owner if one is waiting
232 recursion_count--;
233 if ( recursion_count == 0 ) {
234 pop_node( this );
235 }
236 }
237 unlock( lock );
238 return false;
239}
240
241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
242
243//-----------------------------------------------------------------------------
244// alarm node wrapper
245forall(L & | is_blocking_lock(L)) {
246 struct alarm_node_wrap {
247 alarm_node_t alarm_node;
248 condition_variable(L) * cond;
249 info_thread(L) * info_thd;
250 };
251
252 void ?{}( alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, condition_variable(L) * c, info_thread(L) * i ) {
253 this.alarm_node{ callback, alarm, period };
254 this.cond = c;
255 this.info_thd = i;
256 }
257
258 void ^?{}( alarm_node_wrap(L) & this ) { }
259
260 static void timeout_handler ( alarm_node_wrap(L) & this ) with( this ) {
261 // This condition_variable member is called from the kernel, and therefore, cannot block, but it can spin.
262 lock( cond->lock __cfaabi_dbg_ctx2 );
263
264 // this check is necessary to avoid a race condition since this timeout handler
265 // may still be called after a thread has been removed from the queue but
266 // before the alarm is unregistered
267 if ( (*info_thd)`isListed ) { // is thread on queue
268 info_thd->signalled = false;
269 // remove this thread O(1)
270 remove( *info_thd );
271 cond->count--;
272 if( info_thd->lock ) {
273 // call lock's on_notify if a lock was passed
274 on_notify(*info_thd->lock, info_thd->t);
275 } else {
276 // otherwise wake thread
277 unpark( info_thd->t );
278 }
279 }
280 unlock( cond->lock );
281 }
282
283 // this casts the alarm node to our wrapped type since we used type erasure
284 static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
285
286 struct pthread_alarm_node_wrap {
287 alarm_node_t alarm_node;
288 pthread_cond_var(L) * cond;
289 info_thread(L) * info_thd;
290 };
291
292 void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
293 this.alarm_node{ callback, alarm, period };
294 this.cond = c;
295 this.info_thd = i;
296 }
297
298 void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
299
300 static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
301 // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
302 lock( cond->lock __cfaabi_dbg_ctx2 );
303 // this check is necessary to avoid a race condition since this timeout handler
304 // may still be called after a thread has been removed from the queue but
305 // before the alarm is unregistered
306 if ( (*info_thd)`isListed ) { // is thread on queue
307 info_thd->signalled = false;
308 // remove this thread O(1)
309 remove( *info_thd );
310 on_notify(*info_thd->lock, info_thd->t);
311 }
312 unlock( cond->lock );
313 }
314
315 // this casts the alarm node to our wrapped type since we used type erasure
316 static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
317}
318
319//-----------------------------------------------------------------------------
320// Synchronization Locks
321forall(L & | is_blocking_lock(L)) {
322
323 //-----------------------------------------------------------------------------
324 // condition variable
325 void ?{}( condition_variable(L) & this ){
326 this.lock{};
327 this.blocked_threads{};
328 this.count = 0;
329 }
330
331 void ^?{}( condition_variable(L) & this ){ }
332
333 static void process_popped( condition_variable(L) & this, info_thread(L) & popped ) with( this ) {
334 if(&popped != 0p) {
335 popped.signalled = true;
336 count--;
337 if (popped.lock) {
338 // if lock passed call on_notify
339 on_notify(*popped.lock, popped.t);
340 } else {
341 // otherwise wake thread
342 unpark(popped.t);
343 }
344 }
345 }
346
347 bool notify_one( condition_variable(L) & this ) with( this ) {
348 lock( lock __cfaabi_dbg_ctx2 );
349 bool ret = ! blocked_threads`isEmpty;
350 process_popped(this, try_pop_front( blocked_threads ));
351 unlock( lock );
352 return ret;
353 }
354
355 bool notify_all( condition_variable(L) & this ) with(this) {
356 lock( lock __cfaabi_dbg_ctx2 );
357 bool ret = ! blocked_threads`isEmpty;
358 while( ! blocked_threads`isEmpty ) {
359 process_popped(this, try_pop_front( blocked_threads ));
360 }
361 unlock( lock );
362 return ret;
363 }
364
365 uintptr_t front( condition_variable(L) & this ) with(this) {
366 return blocked_threads`isEmpty ? NULL : blocked_threads`first.info;
367 }
368
369 bool empty( condition_variable(L) & this ) with(this) {
370 lock( lock __cfaabi_dbg_ctx2 );
371 bool ret = blocked_threads`isEmpty;
372 unlock( lock );
373 return ret;
374 }
375
376 int counter( condition_variable(L) & this ) with(this) { return count; }
377
378 static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
379 // add info_thread to waiting queue
380 insert_last( blocked_threads, *i );
381 count++;
382 }
383
384 static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
385 size_t recursion_count = 0;
386 if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
387 recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
388 else
389 pre_park_then_park( pp_fn, pp_datum );
390 return recursion_count;
391 }
392 static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
393
394 // helper for wait()'s' with no timeout
395 static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
396 lock( lock __cfaabi_dbg_ctx2 );
397 enqueue_thread( this, &i );
398 unlock( lock );
399
400 // blocks here
401 size_t recursion_count = block_and_get_recursion( i );
402
403 // resets recursion count here after waking
404 if ( i.lock ) on_wakeup( *i.lock, recursion_count );
405 }
406
407 #define WAIT( u, l ) \
408 info_thread( L ) i = { active_thread(), u, l }; \
409 queue_info_thread( this, i );
410
411 static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
412
413 // helper for wait()'s' with a timeout
414 static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
415 lock( lock __cfaabi_dbg_ctx2 );
416 enqueue_thread( this, &info );
417 alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
418 unlock( lock );
419
420 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
421 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
422 // park();
423
424 // unregisters alarm so it doesn't go off if this happens first
425 unregister_self( &node_wrap.alarm_node );
426
427 // resets recursion count here after waking
428 if ( info.lock ) on_wakeup( *info.lock, recursion_count );
429 }
430
431 #define WAIT_TIME( u, l, t ) \
432 info_thread( L ) i = { active_thread(), u, l }; \
433 queue_info_thread_timeout(this, i, t, alarm_node_wrap_cast ); \
434 return i.signalled;
435
436 void wait( condition_variable(L) & this ) with(this) { WAIT( 0, 0p ) }
437 void wait( condition_variable(L) & this, uintptr_t info ) with(this) { WAIT( info, 0p ) }
438 void wait( condition_variable(L) & this, L & l ) with(this) { WAIT( 0, &l ) }
439 void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { WAIT( info, &l ) }
440
441 bool wait( condition_variable(L) & this, Duration duration ) with(this) { WAIT_TIME( 0 , 0p , duration ) }
442 bool wait( condition_variable(L) & this, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, 0p , duration ) }
443 bool wait( condition_variable(L) & this, L & l, Duration duration ) with(this) { WAIT_TIME( 0 , &l , duration ) }
444 bool wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, &l , duration ) }
445
446 //-----------------------------------------------------------------------------
447 // fast_cond_var
448 void ?{}( fast_cond_var(L) & this ){
449 this.blocked_threads{};
450 #ifdef __CFA_DEBUG__
451 this.lock_used = 0p;
452 #endif
453 }
454 void ^?{}( fast_cond_var(L) & this ){ }
455
456 bool notify_one( fast_cond_var(L) & this ) with(this) {
457 bool ret = ! blocked_threads`isEmpty;
458 if ( ret ) {
459 info_thread(L) & popped = try_pop_front( blocked_threads );
460 on_notify(*popped.lock, popped.t);
461 }
462 return ret;
463 }
464 bool notify_all( fast_cond_var(L) & this ) with(this) {
465 bool ret = ! blocked_threads`isEmpty;
466 while( ! blocked_threads`isEmpty ) {
467 info_thread(L) & popped = try_pop_front( blocked_threads );
468 on_notify(*popped.lock, popped.t);
469 }
470 return ret;
471 }
472
473 uintptr_t front( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
474 bool empty ( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
475
476 void wait( fast_cond_var(L) & this, L & l ) {
477 wait( this, l, 0 );
478 }
479
480 void wait( fast_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
481 // brand cond lock with lock
482 #ifdef __CFA_DEBUG__
483 if ( lock_used == 0p ) lock_used = &l;
484 else assert(lock_used == &l);
485 #endif
486 info_thread( L ) i = { active_thread(), info, &l };
487 insert_last( blocked_threads, i );
488 size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
489 // park( );
490 on_wakeup(*i.lock, recursion_count);
491 }
492
493 //-----------------------------------------------------------------------------
494 // pthread_cond_var
495
496 void ?{}( pthread_cond_var(L) & this ) with(this) {
497 blocked_threads{};
498 lock{};
499 }
500
501 void ^?{}( pthread_cond_var(L) & this ) { }
502
503 bool notify_one( pthread_cond_var(L) & this ) with(this) {
504 lock( lock __cfaabi_dbg_ctx2 );
505 bool ret = ! blocked_threads`isEmpty;
506 if ( ret ) {
507 info_thread(L) & popped = try_pop_front( blocked_threads );
508 popped.signalled = true;
509 on_notify(*popped.lock, popped.t);
510 }
511 unlock( lock );
512 return ret;
513 }
514
515 bool notify_all( pthread_cond_var(L) & this ) with(this) {
516 lock( lock __cfaabi_dbg_ctx2 );
517 bool ret = ! blocked_threads`isEmpty;
518 while( ! blocked_threads`isEmpty ) {
519 info_thread(L) & popped = try_pop_front( blocked_threads );
520 popped.signalled = true;
521 on_notify(*popped.lock, popped.t);
522 }
523 unlock( lock );
524 return ret;
525 }
526
527 uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
528 bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
529
530 static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
531 lock( lock __cfaabi_dbg_ctx2 );
532 insert_last( blocked_threads, info );
533 pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
534 unlock( lock );
535
536 // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
537 size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
538
539 // unregisters alarm so it doesn't go off if signal happens first
540 unregister_self( &node_wrap.alarm_node );
541
542 // resets recursion count here after waking
543 if ( info.lock ) on_wakeup( *info.lock, recursion_count );
544 }
545
546 void wait( pthread_cond_var(L) & this, L & l ) with(this) {
547 wait( this, l, 0 );
548 }
549
550 void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
551 lock( lock __cfaabi_dbg_ctx2 );
552 info_thread( L ) i = { active_thread(), info, &l };
553 insert_last( blocked_threads, i );
554 unlock( lock );
555
556 // blocks here
557 size_t recursion_count = block_and_get_recursion( i );
558
559 on_wakeup( *i.lock, recursion_count );
560 }
561
562 #define PTHREAD_WAIT_TIME( u, l, t ) \
563 info_thread( L ) i = { active_thread(), u, l }; \
564 queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
565 return i.signalled;
566
567 Duration getDuration(timespec t) {
568 timespec currTime;
569 clock_gettime(CLOCK_REALTIME, &currTime);
570 Duration waitUntil = { t };
571 Duration currDur = { currTime };
572 if ( currDur >= waitUntil ) return currDur - waitUntil;
573 Duration zero = { 0 };
574 return zero;
575 }
576
577 bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
578 PTHREAD_WAIT_TIME( 0, &l , getDuration( t ) )
579 }
580
581 bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t ) {
582 PTHREAD_WAIT_TIME( info, &l , getDuration( t ) )
583 }
584}
585//-----------------------------------------------------------------------------
586// Semaphore
587void ?{}( semaphore & this, int count = 1 ) {
588 (this.lock){};
589 this.count = count;
590 (this.waiting){};
591}
592void ^?{}(semaphore & this) {}
593
594bool P(semaphore & this) with( this ){
595 lock( lock __cfaabi_dbg_ctx2 );
596 count -= 1;
597 if ( count < 0 ) {
598 // queue current task
599 append( waiting, active_thread() );
600
601 // atomically release spin lock and block
602 unlock( lock );
603 park();
604 return true;
605 }
606 else {
607 unlock( lock );
608 return false;
609 }
610}
611
612thread$ * V (semaphore & this, const bool doUnpark ) with( this ) {
613 thread$ * thrd = 0p;
614 lock( lock __cfaabi_dbg_ctx2 );
615 count += 1;
616 if ( count <= 0 ) {
617 // remove task at head of waiting list
618 thrd = pop_head( waiting );
619 }
620
621 unlock( lock );
622
623 // make new owner
624 if( doUnpark ) unpark( thrd );
625
626 return thrd;
627}
628
629bool V(semaphore & this) with( this ) {
630 thread$ * thrd = V(this, true);
631 return thrd != 0p;
632}
633
634bool V(semaphore & this, unsigned diff) with( this ) {
635 thread$ * thrd = 0p;
636 lock( lock __cfaabi_dbg_ctx2 );
637 int release = max(-count, (int)diff);
638 count += diff;
639 for(release) {
640 unpark( pop_head( waiting ) );
641 }
642
643 unlock( lock );
644
645 return thrd != 0p;
646}
647
Note: See TracBrowser for help on using the repository browser.