source: libcfa/src/concurrency/select.hfa @ 97b47ec

Last change on this file since 97b47ec was 0aef549, checked in by caparsons <caparson@…>, 18 months ago

Merge branch 'master' of plg.uwaterloo.ca:software/cfa/cfa-cc

  • Property mode set to 100644
File size: 9.0 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author           : Colby Alexander Parsons
11// Created On       : Thu Jan 21 19:46:50 2023
12// Last Modified By :
13// Last Modified On :
14// Update Count     :
15//
16
17#pragma once
18
19#include "containers/list.hfa"
20#include "alarm.hfa"
21#include "kernel.hfa"
22#include "time.hfa"
23
24struct select_node;
25
26// node status
27static const unsigned long int __SELECT_UNSAT = 0;
28static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case
29static const unsigned long int __SELECT_SAT = 2;
30static const unsigned long int __SELECT_RUN = 3;
31
32// these are used inside the compiler to aid in code generation
33static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
34static inline void __CFA_maybe_park( int * park_counter ) {
35    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
36        park();
37}
38
39// node used for coordinating waituntil synchronization
40struct select_node {
41    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
42    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
43
44    void * extra;                       // used to store arbitrary data needed by some primitives
45
46    thread$ * blocked_thread;
47    inline dlink(select_node);
48};
49P9_EMBEDDED( select_node, dlink(select_node) )
50
51static inline void ?{}( select_node & this ) {
52    this.blocked_thread = active_thread();
53    this.clause_status = 0p;
54    this.park_counter = 0p;
55    this.extra = 0p;
56}
57
58static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
59    this.blocked_thread = blocked_thread;
60    this.clause_status = 0p;
61    this.park_counter = 0p;
62    this.extra = 0p;
63}
64
65static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
66    this.blocked_thread = blocked_thread;
67    this.clause_status = 0p;
68    this.park_counter = 0p;
69    this.extra = extra;
70}
71static inline void ^?{}( select_node & this ) {}
72
73// this is used inside the compiler to aid in code generation
74static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
75
76// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
77static inline bool __select_node_else_race( select_node & this ) with( this ) {
78    unsigned long int cmp_status = __SELECT_UNSAT;
79    return *clause_status == 0
80            && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
81}
82
83//-----------------------------------------------------------------------------
84// is_selectable
85forall(T & | sized(T))
86trait is_selectable {
87    // For registering a select stmt on a selectable concurrency primitive
88    // Returns bool that indicates if operation is already SAT
89    bool register_select( T &, select_node & );
90
91    // For unregistering a select stmt on a selectable concurrency primitive
92    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
93    bool unregister_select( T &, select_node & );
94
95    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
96    //    passed as an arg to this routine
97    // If on_selected returns false, the statement is not run, if it returns true it is run.
98    void on_selected( T &, select_node & );
99};
100
101//=============================================================================================
102// Waituntil Helpers
103//=============================================================================================
104
105static inline void __make_select_node_unsat( select_node & this ) with( this ) {
106    __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
107}
108static inline void __make_select_node_sat( select_node & this ) with( this ) {
109    __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
110}
111
112// used for the 2-stage avail needed by the special OR case
113static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
114    /* paranoid */ verify( park_counter == 0p );
115    /* paranoid */ verify( clause_status != 0p );
116
117    unsigned long int cmp_status = __SELECT_UNSAT;
118    while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
119        if ( cmp_status != __SELECT_PENDING ) return false;
120        cmp_status = __SELECT_UNSAT;
121    }
122    return true;
123}
124
125// used for the 2-stage avail by the thread who owns a pending node
126static inline bool __pending_set_other( select_node & other, select_node & mine, unsigned long int val ) with( other ) {
127    /* paranoid */ verify( park_counter == 0p );
128    /* paranoid */ verify( clause_status != 0p );
129
130    unsigned long int cmp_status = __SELECT_UNSAT;
131    while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
132        if ( cmp_status != __SELECT_PENDING )
133            return false;
134
135        // toggle current status flag to avoid starvation/deadlock
136        __make_select_node_unsat( mine );
137        cmp_status = __SELECT_UNSAT;
138        if ( !__atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
139            return false;
140        cmp_status = __SELECT_UNSAT;
141    }
142    return true;
143}
144
145static inline bool __make_select_node_pending( select_node & this ) with( this ) {
146    return __mark_select_node( this, __SELECT_PENDING );
147}
148
149// when a primitive becomes available it calls the following routine on it's node to update the select state:
150// return true if we want to unpark the thd
151static inline bool __make_select_node_available( select_node & this ) with( this ) {
152    /* paranoid */ verify( clause_status != 0p );
153    if( !park_counter )
154        return __mark_select_node( this, (unsigned long int)&this );
155
156    unsigned long int cmp_status = __SELECT_UNSAT;
157
158    return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
159        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
160        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
161}
162
163// Handles the special OR case of the waituntil statement
164// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
165//    performing the operation since if we lose the race the operation should not be performed as it will be lost
166// Returns true if execution can continue normally and false if the queue has now been drained
167static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
168    if ( queue`isEmpty ) return false;
169    if ( queue`first.clause_status && !queue`first.park_counter ) {
170        while ( !queue`isEmpty ) {
171            // if node not a special OR case or if we win the special OR case race break
172            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) )
173                return true;
174            // otherwise we lost the special OR race so discard node
175            try_pop_front( queue );
176        }
177        return false;
178    }
179    return true;
180}
181
182// wake one thread from the list
183static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
184    if ( !popped.clause_status                              // normal case, node is not a select node
185        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
186        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
187        unpark( popped.blocked_thread );
188}
189
190static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
191
192static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
193    this.blocked_thread = active_thread();
194    this.clause_status = clause_status;
195    this.park_counter = park_counter;
196}
197
198// waituntil ( timeout( ... ) ) support
199struct select_timeout_node {
200    alarm_node_t a_node;
201    select_node * s_node;
202};
203void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
204void ^?{}( select_timeout_node & this );
205void timeout_handler_select_cast( alarm_node_t & node );
206
207// Selectable trait routines
208bool register_select( select_timeout_node & this, select_node & node );
209bool unregister_select( select_timeout_node & this, select_node & node );
210void on_selected( select_timeout_node & this, select_node & node );
211
212// Gateway routines to waituntil on duration
213select_timeout_node timeout( Duration duration );
214select_timeout_node sleep( Duration duration );
Note: See TracBrowser for help on using the repository browser.