source: libcfa/src/concurrency/select.hfa @ 73bf7ddc

ADTast-experimental
Last change on this file since 73bf7ddc was beeff61e, checked in by caparsons <caparson@…>, 14 months ago

some cleanup and a bunch of changes to support waituntil statement

  • Property mode set to 100644
File size: 5.6 KB
Line 
1#pragma once
2
3#include "containers/list.hfa"
4#include "stdint.h"
5#include "kernel.hfa"
6
7struct select_node;
8
9// node status
10static const unsigned long int __SELECT_UNSAT = 0;
11static const unsigned long int __SELECT_SAT = 1;
12static const unsigned long int __SELECT_RUN = 2;
13
14static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
15static inline void __CFA_maybe_park( int * park_counter ) {
16    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
17        park();
18}
19
20// node used for coordinating waituntil synchronization
21struct select_node {
22    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
23    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
24
25    void * extra;                       // used to store arbitrary data needed by some primitives
26
27    thread$ * blocked_thread;
28    inline dlink(select_node);
29};
30P9_EMBEDDED( select_node, dlink(select_node) )
31
32static inline void ?{}( select_node & this ) {
33    this.blocked_thread = active_thread();
34    this.clause_status = 0p;
35    this.park_counter = 0p;
36    this.extra = 0p;
37}
38
39static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
40    this.blocked_thread = blocked_thread;
41    this.clause_status = 0p;
42    this.park_counter = 0p;
43    this.extra = 0p;
44}
45
46static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
47    this.blocked_thread = blocked_thread;
48    this.clause_status = 0p;
49    this.park_counter = 0p;
50    this.extra = extra;
51}
52
53static inline void ^?{}( select_node & this ) {}
54
55static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
56
57//-----------------------------------------------------------------------------
58// is_selectable
59forall(T & | sized(T))
60trait is_selectable {
61    // For registering a select stmt on a selectable concurrency primitive
62    // Returns bool that indicates if operation is already SAT
63    bool register_select( T &, select_node & );
64
65    // For unregistering a select stmt on a selectable concurrency primitive
66    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
67    bool unregister_select( T &, select_node &  );
68
69    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
70    //    passed as an arg to this routine
71    // If on_selected returns false, the statement is not run, if it returns true it is run.
72    bool on_selected( T &, select_node & );
73};
74
75// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
76static inline bool __select_node_else_race( select_node & this ) with( this ) {
77    unsigned long int cmp_status = __SELECT_UNSAT;
78    return *clause_status == 0
79            && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
80}
81
82// when a primitive becomes available it calls the following routine on it's node to update the select state:
83// return true if we want to unpark the thd
84static inline bool __make_select_node_available( select_node & this ) with( this ) {
85    unsigned long int cmp_status = __SELECT_UNSAT;
86
87    if( !park_counter )
88        return *clause_status == 0
89            && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won
90
91    return *clause_status == 0
92        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
93        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
94}
95
96// Handles the special OR case of the waituntil statement
97// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
98//    performing the operation since if we lose the race the operation should not be performed as it will be lost
99// Returns true if execution can continue normally and false if the queue has now been drained
100static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
101    if ( queue`isEmpty ) return false;
102    if ( queue`first.clause_status && !queue`first.park_counter ) {
103        while ( !queue`isEmpty ) {
104            // if node not a special OR case or if we win the special OR case race break
105            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; }
106            // otherwise we lost the special OR race so discard node
107            try_pop_front( queue );
108        }
109        return false;
110    }
111    return true;
112}
113
114// wake one thread from the list
115static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
116    if ( !popped.clause_status                              // normal case, node is not a select node
117        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
118        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
119        unpark( popped.blocked_thread );
120}
121
122static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
123
124static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
125    this.blocked_thread = active_thread();
126    this.clause_status = clause_status;
127    this.park_counter = park_counter;
128}
129
Note: See TracBrowser for help on using the repository browser.