source: libcfa/src/concurrency/locks.cfa @ c015e2d

Last change on this file since c015e2d was b93bf85, checked in by caparsons <caparson@…>, 16 months ago

fixed spurious channel close waituntil error case. Was caused by a race condition causing an exception to be thrown while another was in flight

  • Property mode set to 100644
File size: 20.1 KB
RevLine 
[ab1b971]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
[5a05946]7// locks.cfa -- LIBCFATHREAD
[ab1b971]8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2021
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
17#define __cforall_thread__
18
[848439f]19#include "locks.hfa"
[708ae38]20#include "kernel/private.hfa"
[848439f]21
22#include <kernel.hfa>
23#include <stdlib.hfa>
24
[c18bf9e]25#pragma GCC visibility push(default)
26
[ac5816d]27//-----------------------------------------------------------------------------
28// info_thread
[fd54fef]29forall(L & | is_blocking_lock(L)) {
[ac5816d]30        struct info_thread {
[82f4063]31                // used to put info_thread on a dl queue
32                inline dlink(info_thread(L));
[ac5816d]33
34                // waiting thread
[e84ab3d]35                struct thread$ * t;
[ac5816d]36
37                // shadow field
38                uintptr_t info;
39
40                // lock that is passed to wait() (if one is passed)
41                L * lock;
42
43                // true when signalled and false when timeout wakes thread
44                bool signalled;
45        };
[82f4063]46        P9_EMBEDDED( info_thread(L), dlink(info_thread(L)) )
[848439f]47
[e84ab3d]48        void ?{}( info_thread(L) & this, thread$ * t, uintptr_t info, L * l ) {
[848439f]49                this.t = t;
50                this.info = info;
[ac5816d]51                this.lock = l;
[848439f]52        }
53
[ac5816d]54        void ^?{}( info_thread(L) & this ) {}
[848439f]55}
[cad1df1]56
[ac5816d]57//-----------------------------------------------------------------------------
58// Blocking Locks
[848439f]59void ?{}( blocking_lock & this, bool multi_acquisition, bool strict_owner ) {
60        this.lock{};
61        this.blocked_threads{};
62        this.wait_count = 0;
63        this.multi_acquisition = multi_acquisition;
64        this.strict_owner = strict_owner;
65        this.owner = 0p;
66        this.recursion_count = 0;
67}
68
[cad1df1]69void ^?{}( blocking_lock & this ) {}
[ab1b971]70
[848439f]71
72void lock( blocking_lock & this ) with( this ) {
73        lock( lock __cfaabi_dbg_ctx2 );
[e84ab3d]74        thread$ * thrd = active_thread();
[ac5816d]75
76        // single acquisition lock is held by current thread
77        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
78
79        // lock is held by some other thread
80        if ( owner != 0p && owner != thrd ) {
[beeff61e]81        select_node node;
82                insert_last( blocked_threads, node );
[848439f]83                wait_count++;
84                unlock( lock );
[eeb5023]85                park( );
[beeff61e]86        return;
87        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
[848439f]88                recursion_count++;
[beeff61e]89        } else {  // lock isn't held
[ac5816d]90                owner = thrd;
[848439f]91                recursion_count = 1;
92        }
[beeff61e]93    unlock( lock );
[848439f]94}
95
96bool try_lock( blocking_lock & this ) with( this ) {
97        bool ret = false;
98        lock( lock __cfaabi_dbg_ctx2 );
[ac5816d]99
100        // lock isn't held
101        if ( owner == 0p ) {
[6a8882c]102                owner = active_thread();
103                recursion_count = 1;
[848439f]104                ret = true;
[ac5816d]105        }
106        // multi acquisition lock is held by current thread
107        else if ( owner == active_thread() && multi_acquisition ) {
[848439f]108                recursion_count++;
109                ret = true;
110        }
[ac5816d]111
[848439f]112        unlock( lock );
113        return ret;
114}
115
[beeff61e]116static inline void pop_node( blocking_lock & this ) with( this ) {
117    __handle_waituntil_OR( blocked_threads );
118    select_node * node = &try_pop_front( blocked_threads );
119    if ( node ) {
120        wait_count--;
121        owner = node->blocked_thread;
122        recursion_count = 1;
123        // if ( !node->clause_status || __make_select_node_available( *node ) ) unpark( node->blocked_thread );
124        wake_one( blocked_threads, *node );
125    } else {
126        owner = 0p;
127        recursion_count = 0;
128    }
[cad1df1]129}
130
131void unlock( blocking_lock & this ) with( this ) {
132        lock( lock __cfaabi_dbg_ctx2 );
[ac5816d]133        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
[22b7579]134        /* paranoid */ verifyf( owner == active_thread() || !strict_owner , "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
135        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to release owner lock %p which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
[ac5816d]136
137        // if recursion count is zero release lock and set new owner if one is waiting
[848439f]138        recursion_count--;
[ac5816d]139        if ( recursion_count == 0 ) {
[beeff61e]140                pop_node( this );
[848439f]141        }
142        unlock( lock );
143}
144
145size_t wait_count( blocking_lock & this ) with( this ) {
146        return wait_count;
147}
148
[e84ab3d]149void on_notify( blocking_lock & this, thread$ * t ) with( this ) {
[ac5816d]150        lock( lock __cfaabi_dbg_ctx2 );
151        // lock held
152        if ( owner != 0p ) {
[beeff61e]153                insert_last( blocked_threads, *(select_node *)t->link_node );
[848439f]154                wait_count++;
[ac5816d]155        }
156        // lock not held
157        else {
[848439f]158                owner = t;
[6a8882c]159                recursion_count = 1;
[eeb5023]160                unpark( t );
[848439f]161        }
[beeff61e]162    unlock( lock );
[848439f]163}
164
[fece3d9]165size_t on_wait( blocking_lock & this, __cfa_pre_park pp_fn, void * pp_datum ) with( this ) {
[ac5816d]166        lock( lock __cfaabi_dbg_ctx2 );
167        /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this );
168        /* paranoid */ verifyf( owner == active_thread() || !strict_owner, "Thread %p other than the owner %p attempted to release owner lock %p", owner, active_thread(), &this );
169
[22b7579]170        size_t ret = recursion_count;
171
[beeff61e]172        pop_node( this );
173
174    select_node node;
175    active_thread()->link_node = (void *)&node;
[848439f]176        unlock( lock );
[beeff61e]177
[fece3d9]178    pre_park_then_park( pp_fn, pp_datum );
[beeff61e]179
[22b7579]180        return ret;
181}
182
183void on_wakeup( blocking_lock & this, size_t recursion ) with( this ) {
184        recursion_count = recursion;
[848439f]185}
186
[beeff61e]187// waituntil() support
188bool register_select( blocking_lock & this, select_node & node ) with(this) {
189    lock( lock __cfaabi_dbg_ctx2 );
190        thread$ * thrd = active_thread();
191
192        // single acquisition lock is held by current thread
193        /* paranoid */ verifyf( owner != thrd || multi_acquisition, "Single acquisition lock holder (%p) attempted to reacquire the lock %p resulting in a deadlock.", owner, &this );
194
195    if ( !node.park_counter && ( (owner == thrd && multi_acquisition) || owner == 0p ) ) { // OR special case
196        if ( !__make_select_node_available( node ) ) { // we didn't win the race so give up on registering
197           unlock( lock );
198           return false;
199        }
200    }
201
202        // lock is held by some other thread
203        if ( owner != 0p && owner != thrd ) {
204                insert_last( blocked_threads, node );
205                wait_count++;
206                unlock( lock );
207        return false;
208        } else if ( owner == thrd && multi_acquisition ) { // multi acquisition lock is held by current thread
209                recursion_count++;
210        } else {  // lock isn't held
211                owner = thrd;
212                recursion_count = 1;
213        }
214
215    if ( node.park_counter ) __make_select_node_available( node );
216    unlock( lock );
217    return true;
218}
219
220bool unregister_select( blocking_lock & this, select_node & node ) with(this) {
221    lock( lock __cfaabi_dbg_ctx2 );
222    if ( node`isListed ) {
223        remove( node );
224        wait_count--;
225        unlock( lock );
226        return false;
227    }
228   
229    if ( owner == active_thread() ) {
230        /* paranoid */ verifyf( recursion_count == 1 || multi_acquisition, "Thread %p attempted to unlock owner lock %p in waituntil unregister, which is not recursive but has a recursive count of %zu", active_thread(), &this, recursion_count );
231        // if recursion count is zero release lock and set new owner if one is waiting
232        recursion_count--;
233        if ( recursion_count == 0 ) {
234            pop_node( this );
235        }
236    }
237        unlock( lock );
238    return false;
239}
240
[b93bf85]241bool on_selected( blocking_lock & this, select_node & node ) { return true; }
[beeff61e]242
[ac5816d]243//-----------------------------------------------------------------------------
244// alarm node wrapper
[fd54fef]245forall(L & | is_blocking_lock(L)) {
[c20533ea]246        struct alarm_node_wrap {
247                alarm_node_t alarm_node;
248                condition_variable(L) * cond;
[90a10e8]249                info_thread(L) * info_thd;
[c20533ea]250        };
251
[c457dc41]252        void ?{}( alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, condition_variable(L) * c, info_thread(L) * i ) {
[c20533ea]253                this.alarm_node{ callback, alarm, period };
[ac5816d]254                this.cond = c;
[90a10e8]255                this.info_thd = i;
[c20533ea]256        }
257
258        void ^?{}( alarm_node_wrap(L) & this ) { }
[848439f]259
[c18bf9e]260        static void timeout_handler ( alarm_node_wrap(L) & this ) with( this ) {
[ac5816d]261                // This condition_variable member is called from the kernel, and therefore, cannot block, but it can spin.
262                lock( cond->lock __cfaabi_dbg_ctx2 );
263
264                // this check is necessary to avoid a race condition since this timeout handler
265                //      may still be called after a thread has been removed from the queue but
266                //      before the alarm is unregistered
[82f4063]267                if ( (*info_thd)`isListed ) {   // is thread on queue
[90a10e8]268                        info_thd->signalled = false;
[ac5816d]269                        // remove this thread O(1)
[82f4063]270                        remove( *info_thd );
[6a8882c]271                        cond->count--;
[90a10e8]272                        if( info_thd->lock ) {
[ac5816d]273                                // call lock's on_notify if a lock was passed
[90a10e8]274                                on_notify(*info_thd->lock, info_thd->t);
[ac5816d]275                        } else {
276                                // otherwise wake thread
[90a10e8]277                                unpark( info_thd->t );
[ac5816d]278                        }
279                }
280                unlock( cond->lock );
[848439f]281        }
282
[797a193]283        // this casts the alarm node to our wrapped type since we used type erasure
[c18bf9e]284        static void alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (alarm_node_wrap(L) &)a ); }
[ae06e0b]285
286        struct pthread_alarm_node_wrap {
287                alarm_node_t alarm_node;
288                pthread_cond_var(L) * cond;
289                info_thread(L) * info_thd;
290        };
291
292        void ?{}( pthread_alarm_node_wrap(L) & this, Duration alarm, Duration period, Alarm_Callback callback, pthread_cond_var(L) * c, info_thread(L) * i ) {
293                this.alarm_node{ callback, alarm, period };
294                this.cond = c;
295                this.info_thd = i;
296        }
297
298        void ^?{}( pthread_alarm_node_wrap(L) & this ) { }
299
300        static void timeout_handler ( pthread_alarm_node_wrap(L) & this ) with( this ) {
301                // This pthread_cond_var member is called from the kernel, and therefore, cannot block, but it can spin.
302                lock( cond->lock __cfaabi_dbg_ctx2 );
303                // this check is necessary to avoid a race condition since this timeout handler
304                //      may still be called after a thread has been removed from the queue but
305                //      before the alarm is unregistered
306                if ( (*info_thd)`isListed ) {   // is thread on queue
307                        info_thd->signalled = false;
308                        // remove this thread O(1)
309                        remove( *info_thd );
310                        on_notify(*info_thd->lock, info_thd->t);
311                }
312                unlock( cond->lock );
313        }
314
315        // this casts the alarm node to our wrapped type since we used type erasure
316        static void pthread_alarm_node_wrap_cast( alarm_node_t & a ) { timeout_handler( (pthread_alarm_node_wrap(L) &)a ); }
[c20533ea]317}
318
[ac5816d]319//-----------------------------------------------------------------------------
[7f958c4]320// Synchronization Locks
[fd54fef]321forall(L & | is_blocking_lock(L)) {
[c20533ea]322
[7f958c4]323        //-----------------------------------------------------------------------------
324        // condition variable
[848439f]325        void ?{}( condition_variable(L) & this ){
[eeb5023]326                this.lock{};
327                this.blocked_threads{};
328                this.count = 0;
[848439f]329        }
330
[cad1df1]331        void ^?{}( condition_variable(L) & this ){ }
[848439f]332
[c18bf9e]333        static void process_popped( condition_variable(L) & this, info_thread(L) & popped ) with( this ) {
[cad1df1]334                if(&popped != 0p) {
[dff1fd1]335                        popped.signalled = true;
[eeb5023]336                        count--;
[cad1df1]337                        if (popped.lock) {
[ac5816d]338                                // if lock passed call on_notify
339                                on_notify(*popped.lock, popped.t);
[848439f]340                        } else {
[ac5816d]341                                // otherwise wake thread
342                                unpark(popped.t);
[848439f]343                        }
344                }
[cad1df1]345        }
346
347        bool notify_one( condition_variable(L) & this ) with( this ) {
348                lock( lock __cfaabi_dbg_ctx2 );
[82f4063]349                bool ret = ! blocked_threads`isEmpty;
350                process_popped(this, try_pop_front( blocked_threads ));
[848439f]351                unlock( lock );
352                return ret;
353        }
354
[eeb5023]355        bool notify_all( condition_variable(L) & this ) with(this) {
[848439f]356                lock( lock __cfaabi_dbg_ctx2 );
[82f4063]357                bool ret = ! blocked_threads`isEmpty;
358                while( ! blocked_threads`isEmpty ) {
359                        process_popped(this, try_pop_front( blocked_threads ));
[848439f]360                }
361                unlock( lock );
362                return ret;
363        }
364
[eeb5023]365        uintptr_t front( condition_variable(L) & this ) with(this) {
[82f4063]366                return blocked_threads`isEmpty ? NULL : blocked_threads`first.info;
[848439f]367        }
368
[22b7579]369        bool empty( condition_variable(L) & this ) with(this) {
370                lock( lock __cfaabi_dbg_ctx2 );
[82f4063]371                bool ret = blocked_threads`isEmpty;
[22b7579]372                unlock( lock );
373                return ret;
374        }
[cad1df1]375
376        int counter( condition_variable(L) & this ) with(this) { return count; }
[848439f]377
[beeff61e]378        static void enqueue_thread( condition_variable(L) & this, info_thread(L) * i ) with(this) {
[ac5816d]379                // add info_thread to waiting queue
[82f4063]380                insert_last( blocked_threads, *i );
[cad1df1]381                count++;
[848439f]382        }
383
[fece3d9]384    static size_t block_and_get_recursion( info_thread(L) & i, __cfa_pre_park pp_fn, void * pp_datum ) {
[beeff61e]385        size_t recursion_count = 0;
[5a05946]386                if ( i.lock ) // if lock was passed get recursion count to reset to after waking thread
[fece3d9]387                        recursion_count = on_wait( *i.lock, pp_fn, pp_datum ); // this call blocks
[5a05946]388                else
389            pre_park_then_park( pp_fn, pp_datum );
[beeff61e]390        return recursion_count;
391    }
[fece3d9]392    static size_t block_and_get_recursion( info_thread(L) & i ) { return block_and_get_recursion( i, pre_park_noop, 0p ); }
[beeff61e]393
[cad1df1]394        // helper for wait()'s' with no timeout
[c18bf9e]395        static void queue_info_thread( condition_variable(L) & this, info_thread(L) & i ) with(this) {
[848439f]396                lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]397        enqueue_thread( this, &i );
[848439f]398                unlock( lock );
[ac5816d]399
400                // blocks here
[beeff61e]401        size_t recursion_count = block_and_get_recursion( i );
[ac5816d]402
403                // resets recursion count here after waking
[beeff61e]404                if ( i.lock ) on_wakeup( *i.lock, recursion_count );
[848439f]405        }
406
[ac5816d]407        #define WAIT( u, l ) \
408                info_thread( L ) i = { active_thread(), u, l }; \
409                queue_info_thread( this, i );
410
[fece3d9]411    static void cond_alarm_register( void * node_ptr ) { register_self( (alarm_node_t *)node_ptr ); }
412
[eeb5023]413        // helper for wait()'s' with a timeout
[c18bf9e]414        static void queue_info_thread_timeout( condition_variable(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
[eeb5023]415                lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]416        enqueue_thread( this, &info );
[afd7faf]417                alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
[848439f]418                unlock( lock );
419
[5a05946]420                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
[fece3d9]421        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
[beeff61e]422                // park();
[848439f]423
[ac5816d]424                // unregisters alarm so it doesn't go off if this happens first
425                unregister_self( &node_wrap.alarm_node );
[848439f]426
[ac5816d]427                // resets recursion count here after waking
[beeff61e]428                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
[848439f]429        }
430
[ac5816d]431        #define WAIT_TIME( u, l, t ) \
432                info_thread( L ) i = { active_thread(), u, l }; \
[afd7faf]433                queue_info_thread_timeout(this, i, t, alarm_node_wrap_cast ); \
[dff1fd1]434                return i.signalled;
[848439f]435
[ac5816d]436        void wait( condition_variable(L) & this                        ) with(this) { WAIT( 0, 0p    ) }
437        void wait( condition_variable(L) & this, uintptr_t info        ) with(this) { WAIT( info, 0p ) }
438        void wait( condition_variable(L) & this, L & l                 ) with(this) { WAIT( 0, &l    ) }
439        void wait( condition_variable(L) & this, L & l, uintptr_t info ) with(this) { WAIT( info, &l ) }
440
[c457dc41]441        bool wait( condition_variable(L) & this, Duration duration                        ) with(this) { WAIT_TIME( 0   , 0p , duration ) }
442        bool wait( condition_variable(L) & this, uintptr_t info, Duration duration        ) with(this) { WAIT_TIME( info, 0p , duration ) }
443        bool wait( condition_variable(L) & this, L & l, Duration duration                 ) with(this) { WAIT_TIME( 0   , &l , duration ) }
444        bool wait( condition_variable(L) & this, L & l, uintptr_t info, Duration duration ) with(this) { WAIT_TIME( info, &l , duration ) }
[7f958c4]445
446        //-----------------------------------------------------------------------------
447        // fast_cond_var
448        void  ?{}( fast_cond_var(L) & this ){
[c18bf9e]449                this.blocked_threads{};
[7f958c4]450                #ifdef __CFA_DEBUG__
451                this.lock_used = 0p;
452                #endif
453        }
454        void ^?{}( fast_cond_var(L) & this ){ }
455
456        bool notify_one( fast_cond_var(L) & this ) with(this) {
457                bool ret = ! blocked_threads`isEmpty;
458                if ( ret ) {
459                        info_thread(L) & popped = try_pop_front( blocked_threads );
460                        on_notify(*popped.lock, popped.t);
461                }
462                return ret;
463        }
464        bool notify_all( fast_cond_var(L) & this ) with(this) {
465                bool ret = ! blocked_threads`isEmpty;
466                while( ! blocked_threads`isEmpty ) {
467                        info_thread(L) & popped = try_pop_front( blocked_threads );
468                        on_notify(*popped.lock, popped.t);
469                }
470                return ret;
471        }
472
473        uintptr_t front( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
474        bool empty ( fast_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
475
476        void wait( fast_cond_var(L) & this, L & l ) {
477                wait( this, l, 0 );
478        }
479
480        void wait( fast_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
481                // brand cond lock with lock
482                #ifdef __CFA_DEBUG__
483                        if ( lock_used == 0p ) lock_used = &l;
[7d9598d]484                        else assert(lock_used == &l);
[7f958c4]485                #endif
486                info_thread( L ) i = { active_thread(), info, &l };
487                insert_last( blocked_threads, i );
[fece3d9]488                size_t recursion_count = on_wait( *i.lock, pre_park_noop, 0p ); // blocks here
[beeff61e]489                // park( );
[7f958c4]490                on_wakeup(*i.lock, recursion_count);
491        }
[454f478]492
[ae06e0b]493        //-----------------------------------------------------------------------------
494        // pthread_cond_var
495
496        void  ?{}( pthread_cond_var(L) & this ) with(this) {
497                blocked_threads{};
498                lock{};
499        }
500
501        void ^?{}( pthread_cond_var(L) & this ) { }
502
503        bool notify_one( pthread_cond_var(L) & this ) with(this) {
504                lock( lock __cfaabi_dbg_ctx2 );
505                bool ret = ! blocked_threads`isEmpty;
506                if ( ret ) {
507                        info_thread(L) & popped = try_pop_front( blocked_threads );
[4e83bb7]508                        popped.signalled = true;
[ae06e0b]509                        on_notify(*popped.lock, popped.t);
510                }
511                unlock( lock );
512                return ret;
513        }
514
515        bool notify_all( pthread_cond_var(L) & this ) with(this) {
516                lock( lock __cfaabi_dbg_ctx2 );
517                bool ret = ! blocked_threads`isEmpty;
518                while( ! blocked_threads`isEmpty ) {
519                        info_thread(L) & popped = try_pop_front( blocked_threads );
[4e83bb7]520                        popped.signalled = true;
[ae06e0b]521                        on_notify(*popped.lock, popped.t);
522                }
523                unlock( lock );
524                return ret;
525        }
526
527        uintptr_t front( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty ? NULL : blocked_threads`first.info; }
528        bool empty ( pthread_cond_var(L) & this ) with(this) { return blocked_threads`isEmpty; }
529
530        static void queue_info_thread_timeout( pthread_cond_var(L) & this, info_thread(L) & info, Duration t, Alarm_Callback callback ) with(this) {
531                lock( lock __cfaabi_dbg_ctx2 );
[beeff61e]532        insert_last( blocked_threads, info );
[ae06e0b]533                pthread_alarm_node_wrap(L) node_wrap = { t, 0`s, callback, &this, &info };
534                unlock( lock );
535
[5a05946]536                // blocks here and registers alarm node before blocking after releasing locks to avoid deadlock
[fece3d9]537        size_t recursion_count = block_and_get_recursion( info, cond_alarm_register, (void *)(&node_wrap.alarm_node) );
[ae06e0b]538
[5a05946]539                // unregisters alarm so it doesn't go off if signal happens first
[ae06e0b]540                unregister_self( &node_wrap.alarm_node );
541
542                // resets recursion count here after waking
[beeff61e]543                if ( info.lock ) on_wakeup( *info.lock, recursion_count );
[ae06e0b]544        }
545
546        void wait( pthread_cond_var(L) & this, L & l ) with(this) {
547                wait( this, l, 0 );
548        }
549
550        void wait( pthread_cond_var(L) & this, L & l, uintptr_t info ) with(this) {
551                lock( lock __cfaabi_dbg_ctx2 );
552                info_thread( L ) i = { active_thread(), info, &l };
[beeff61e]553        insert_last( blocked_threads, i );
[ae06e0b]554                unlock( lock );
[beeff61e]555
556        // blocks here
557                size_t recursion_count = block_and_get_recursion( i );
[5a05946]558
[beeff61e]559                on_wakeup( *i.lock, recursion_count );
[ae06e0b]560        }
561
562        #define PTHREAD_WAIT_TIME( u, l, t ) \
563                info_thread( L ) i = { active_thread(), u, l }; \
564                queue_info_thread_timeout(this, i, t, pthread_alarm_node_wrap_cast ); \
565                return i.signalled;
566
[4e83bb7]567        Duration getDuration(timespec t) {
568                timespec currTime;
569                clock_gettime(CLOCK_REALTIME, &currTime);
570                Duration waitUntil = { t };
571                Duration currDur = { currTime };
572                if ( currDur >= waitUntil ) return currDur - waitUntil;
573                Duration zero = { 0 };
574                return zero;
575        }
576
[ae06e0b]577        bool wait( pthread_cond_var(L) & this, L & l, timespec t ) {
[4e83bb7]578                PTHREAD_WAIT_TIME( 0, &l , getDuration( t ) )
[ae06e0b]579        }
580       
581        bool wait( pthread_cond_var(L) & this, L & l, uintptr_t info, timespec t  ) {
[4e83bb7]582                PTHREAD_WAIT_TIME( info, &l , getDuration( t ) )
[ae06e0b]583        }
584}
[454f478]585//-----------------------------------------------------------------------------
586// Semaphore
587void  ?{}( semaphore & this, int count = 1 ) {
588        (this.lock){};
589        this.count = count;
590        (this.waiting){};
591}
592void ^?{}(semaphore & this) {}
593
594bool P(semaphore & this) with( this ){
595        lock( lock __cfaabi_dbg_ctx2 );
596        count -= 1;
597        if ( count < 0 ) {
598                // queue current task
599                append( waiting, active_thread() );
600
601                // atomically release spin lock and block
602                unlock( lock );
603                park();
604                return true;
605        }
606        else {
607            unlock( lock );
608            return false;
609        }
610}
611
[e84ab3d]612thread$ * V (semaphore & this, const bool doUnpark ) with( this ) {
613        thread$ * thrd = 0p;
[454f478]614        lock( lock __cfaabi_dbg_ctx2 );
615        count += 1;
616        if ( count <= 0 ) {
617                // remove task at head of waiting list
618                thrd = pop_head( waiting );
619        }
620
621        unlock( lock );
622
623        // make new owner
[22b7579]624        if( doUnpark ) unpark( thrd );
625
626        return thrd;
627}
[454f478]628
[22b7579]629bool V(semaphore & this) with( this ) {
[e84ab3d]630        thread$ * thrd = V(this, true);
[454f478]631        return thrd != 0p;
632}
633
634bool V(semaphore & this, unsigned diff) with( this ) {
[e84ab3d]635        thread$ * thrd = 0p;
[454f478]636        lock( lock __cfaabi_dbg_ctx2 );
637        int release = max(-count, (int)diff);
638        count += diff;
639        for(release) {
640                unpark( pop_head( waiting ) );
641        }
642
643        unlock( lock );
644
645        return thrd != 0p;
[f5f2768]646}
[5a05946]647
Note: See TracBrowser for help on using the repository browser.