source: libcfa/src/concurrency/select.hfa@ 5ba1356

Last change on this file since 5ba1356 was 6b33e89, checked in by Peter A. Buhr <pabuhr@…>, 5 months ago

change backquote call to regular call

  • Property mode set to 100644
File size: 9.0 KB
RevLine 
[e23b3ce]1//
2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// channel.hfa -- LIBCFATHREAD
8// Runtime locks that used with the runtime thread system.
9//
10// Author : Colby Alexander Parsons
11// Created On : Thu Jan 21 19:46:50 2023
[6b33e89]12// Last Modified By : Peter A. Buhr
13// Last Modified On : Fri Apr 25 07:31:26 2025
14// Update Count : 5
[e23b3ce]15//
16
[5e4a830]17#pragma once
18
[55b060d]19#include "collections/list.hfa"
[e23b3ce]20#include "alarm.hfa"
[beeff61e]21#include "kernel.hfa"
[c4f411e]22#include "time.hfa"
[339e30a]23
[beeff61e]24struct select_node;
25
26// node status
27static const unsigned long int __SELECT_UNSAT = 0;
[c4f411e]28static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case
29static const unsigned long int __SELECT_SAT = 2;
30static const unsigned long int __SELECT_RUN = 3;
[beeff61e]31
[c4f411e]32// these are used inside the compiler to aid in code generation
[beeff61e]33static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
34static inline void __CFA_maybe_park( int * park_counter ) {
[6b33e89]35 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
36 park();
[beeff61e]37}
38
39// node used for coordinating waituntil synchronization
[339e30a]40struct select_node {
[6b33e89]41 int * park_counter; // If this is 0p then the node is in a special OR case waituntil
42 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
[beeff61e]43
[6b33e89]44 void * extra; // used to store arbitrary data needed by some primitives
[beeff61e]45
[6b33e89]46 thread$ * blocked_thread;
47 inline dlink(select_node);
[339e30a]48};
49P9_EMBEDDED( select_node, dlink(select_node) )
50
[beeff61e]51static inline void ?{}( select_node & this ) {
[6b33e89]52 this.blocked_thread = active_thread();
53 this.clause_status = 0p;
54 this.park_counter = 0p;
55 this.extra = 0p;
[339e30a]56}
57
[beeff61e]58static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
[6b33e89]59 this.blocked_thread = blocked_thread;
60 this.clause_status = 0p;
61 this.park_counter = 0p;
62 this.extra = 0p;
[339e30a]63}
64
[beeff61e]65static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
[6b33e89]66 this.blocked_thread = blocked_thread;
67 this.clause_status = 0p;
68 this.park_counter = 0p;
69 this.extra = extra;
[339e30a]70}
[beeff61e]71static inline void ^?{}( select_node & this ) {}
[339e30a]72
[c4f411e]73// this is used inside the compiler to aid in code generation
[beeff61e]74static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
[339e30a]75
[c4f411e]76// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
77static inline bool __select_node_else_race( select_node & this ) with( this ) {
[6b33e89]78 unsigned long int cmp_status = __SELECT_UNSAT;
79 return *clause_status == 0
80 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
[c4f411e]81}
82
[339e30a]83//-----------------------------------------------------------------------------
84// is_selectable
[beeff61e]85forall(T & | sized(T))
86trait is_selectable {
[6b33e89]87 // For registering a select stmt on a selectable concurrency primitive
88 // Returns bool that indicates if operation is already SAT
89 bool register_select( T &, select_node & );
[beeff61e]90
[6b33e89]91 // For unregistering a select stmt on a selectable concurrency primitive
92 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
93 bool unregister_select( T &, select_node & );
[beeff61e]94
[6b33e89]95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
96 // passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped
97 bool on_selected( T &, select_node & );
[339e30a]98};
[bf55f32]99// Used inside the compiler to allow for overloading on return type for operations such as '?<<?' for channels
100// YOU MUST USE THIS MACRO OR INCLUDE AN EQUIVALENT DECL FOR YOUR TYPE TO SUPPORT WAITUNTIL
101#define __CFA_SELECT_GET_TYPE( typename ) typename __CFA_select_get_type( typename __CFA_t )
102
[339e30a]103
[c4f411e]104//=============================================================================================
105// Waituntil Helpers
106//=============================================================================================
107
[6f774be]108static inline void __make_select_node_unsat( select_node & this ) with( this ) {
[6b33e89]109 __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
[6f774be]110}
111static inline void __make_select_node_sat( select_node & this ) with( this ) {
[6b33e89]112 __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
[6f774be]113}
114
[c4f411e]115// used for the 2-stage avail needed by the special OR case
116static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
[6b33e89]117 /* paranoid */ verify( park_counter == 0p );
118 /* paranoid */ verify( clause_status != 0p );
119
120 unsigned long int cmp_status = __SELECT_UNSAT;
121 while( ! __atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
122 if ( cmp_status != __SELECT_PENDING ) return false;
123 cmp_status = __SELECT_UNSAT;
124 }
125 return true;
[c4f411e]126}
127
[6f774be]128// used for the 2-stage avail by the thread who owns a pending node
129static inline bool __pending_set_other( select_node & other, select_node & mine, unsigned long int val ) with( other ) {
[6b33e89]130 /* paranoid */ verify( park_counter == 0p );
131 /* paranoid */ verify( clause_status != 0p );
132
133 unsigned long int cmp_status = __SELECT_UNSAT;
134 while( ! __atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
135 if ( cmp_status != __SELECT_PENDING )
136 return false;
137
138 // toggle current status flag to avoid starvation/deadlock
139 __make_select_node_unsat( mine );
140 cmp_status = __SELECT_UNSAT;
141 if ( ! __atomic_compare_exchange_n( mine.clause_status, &cmp_status, __SELECT_PENDING, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )
142 return false;
143 cmp_status = __SELECT_UNSAT;
144 }
145 return true;
[e23b3ce]146}
[c4f411e]147
148static inline bool __make_select_node_pending( select_node & this ) with( this ) {
[6b33e89]149 return __mark_select_node( this, __SELECT_PENDING );
[beeff61e]150}
151
152// when a primitive becomes available it calls the following routine on it's node to update the select state:
153// return true if we want to unpark the thd
154static inline bool __make_select_node_available( select_node & this ) with( this ) {
[6b33e89]155 /* paranoid */ verify( clause_status != 0p );
156 if ( ! park_counter )
157 return __mark_select_node( this, (unsigned long int)&this );
[beeff61e]158
[6b33e89]159 unsigned long int cmp_status = __SELECT_UNSAT;
[beeff61e]160
[6b33e89]161 return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
162 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
163 && ! __atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
[beeff61e]164}
165
166// Handles the special OR case of the waituntil statement
167// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
[6b33e89]168// performing the operation since if we lose the race the operation should not be performed as it will be lost
[beeff61e]169// Returns true if execution can continue normally and false if the queue has now been drained
170static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
[6b33e89]171 if ( isEmpty( queue ) ) return false;
172 if ( first( queue ).clause_status && ! first( queue ).park_counter ) {
173 while ( ! isEmpty( queue ) ) {
174 // if node not a special OR case or if we win the special OR case race break
175 if ( ! first( queue ).clause_status || first( queue ).park_counter || __make_select_node_available( first( queue ) ) )
176 return true;
177 // otherwise we lost the special OR race so discard node
178 remove_first( queue );
179 }
180 return false;
181 }
182 return true;
[beeff61e]183}
184
185// wake one thread from the list
[a6b48f6]186static inline void wake_one( dlist( select_node ) & /*queue*/, select_node & popped ) {
[6b33e89]187 if ( ! popped.clause_status // normal case, node is not a select node
188 || ( popped.clause_status && ! popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
189 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread
190 unpark( popped.blocked_thread );
[339e30a]191}
[beeff61e]192
[6b33e89]193static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, remove_first( queue ) ); }
[beeff61e]194
195static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
[6b33e89]196 this.blocked_thread = active_thread();
197 this.clause_status = clause_status;
198 this.park_counter = park_counter;
[beeff61e]199}
200
[e23b3ce]201// waituntil ( timeout( ... ) ) support
202struct select_timeout_node {
[6b33e89]203 alarm_node_t a_node;
204 select_node * s_node;
[e23b3ce]205};
206void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
207void ^?{}( select_timeout_node & this );
208void timeout_handler_select_cast( alarm_node_t & node );
209
210// Selectable trait routines
211bool register_select( select_timeout_node & this, select_node & node );
212bool unregister_select( select_timeout_node & this, select_node & node );
[b93bf85]213bool on_selected( select_timeout_node & this, select_node & node );
[bf55f32]214select_timeout_node __CFA_select_get_type( select_timeout_node this );
[e23b3ce]215
[e1358c0]216// Timer macro for waituntil
217#define _timeout(D) (select_timeout_node) { D, timeout_handler_select_cast }
Note: See TracBrowser for help on using the repository browser.