source: libcfa/src/concurrency/select.hfa@ 8607a72

ADT ast-experimental
Last change on this file since 8607a72 was beeff61e, checked in by caparsons <caparson@…>, 2 years ago

some cleanup and a bunch of changes to support waituntil statement

  • Property mode set to 100644
File size: 5.6 KB
Line 
1#pragma once
2
3#include "containers/list.hfa"
4#include "stdint.h"
5#include "kernel.hfa"
6
7struct select_node;
8
9// node status
10static const unsigned long int __SELECT_UNSAT = 0;
11static const unsigned long int __SELECT_SAT = 1;
12static const unsigned long int __SELECT_RUN = 2;
13
14static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
15static inline void __CFA_maybe_park( int * park_counter ) {
16 if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
17 park();
18}
19
20// node used for coordinating waituntil synchronization
21struct select_node {
22 int * park_counter; // If this is 0p then the node is in a special OR case waituntil
23 unsigned long int * clause_status; // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
24
25 void * extra; // used to store arbitrary data needed by some primitives
26
27 thread$ * blocked_thread;
28 inline dlink(select_node);
29};
30P9_EMBEDDED( select_node, dlink(select_node) )
31
32static inline void ?{}( select_node & this ) {
33 this.blocked_thread = active_thread();
34 this.clause_status = 0p;
35 this.park_counter = 0p;
36 this.extra = 0p;
37}
38
39static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
40 this.blocked_thread = blocked_thread;
41 this.clause_status = 0p;
42 this.park_counter = 0p;
43 this.extra = 0p;
44}
45
46static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
47 this.blocked_thread = blocked_thread;
48 this.clause_status = 0p;
49 this.park_counter = 0p;
50 this.extra = extra;
51}
52
53static inline void ^?{}( select_node & this ) {}
54
55static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
56
57//-----------------------------------------------------------------------------
58// is_selectable
59forall(T & | sized(T))
60trait is_selectable {
61 // For registering a select stmt on a selectable concurrency primitive
62 // Returns bool that indicates if operation is already SAT
63 bool register_select( T &, select_node & );
64
65 // For unregistering a select stmt on a selectable concurrency primitive
66 // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
67 bool unregister_select( T &, select_node & );
68
69 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
70 // passed as an arg to this routine
71 // If on_selected returns false, the statement is not run, if it returns true it is run.
72 bool on_selected( T &, select_node & );
73};
74
75// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
76static inline bool __select_node_else_race( select_node & this ) with( this ) {
77 unsigned long int cmp_status = __SELECT_UNSAT;
78 return *clause_status == 0
79 && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
80}
81
82// when a primitive becomes available it calls the following routine on it's node to update the select state:
83// return true if we want to unpark the thd
84static inline bool __make_select_node_available( select_node & this ) with( this ) {
85 unsigned long int cmp_status = __SELECT_UNSAT;
86
87 if( !park_counter )
88 return *clause_status == 0
89 && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won
90
91 return *clause_status == 0
92 && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
93 && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
94}
95
96// Handles the special OR case of the waituntil statement
97// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
98// performing the operation since if we lose the race the operation should not be performed as it will be lost
99// Returns true if execution can continue normally and false if the queue has now been drained
100static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
101 if ( queue`isEmpty ) return false;
102 if ( queue`first.clause_status && !queue`first.park_counter ) {
103 while ( !queue`isEmpty ) {
104 // if node not a special OR case or if we win the special OR case race break
105 if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; }
106 // otherwise we lost the special OR race so discard node
107 try_pop_front( queue );
108 }
109 return false;
110 }
111 return true;
112}
113
114// wake one thread from the list
115static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
116 if ( !popped.clause_status // normal case, node is not a select node
117 || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
118 || __make_select_node_available( popped ) ) // check if popped link belongs to a selecting thread
119 unpark( popped.blocked_thread );
120}
121
122static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
123
124static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
125 this.blocked_thread = active_thread();
126 this.clause_status = clause_status;
127 this.park_counter = park_counter;
128}
129
Note: See TracBrowser for help on using the repository browser.