Ignore:
Timestamp:
May 1, 2023, 4:00:06 PM (15 months ago)
Author:
caparsons <caparson@…>
Branches:
ADT, ast-experimental, master
Children:
73bf7ddc
Parents:
bb7422a
Message:

some cleanup and a bunch of changes to support waituntil statement

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/select.hfa

    rbb7422a rbeeff61e  
    22
    33#include "containers/list.hfa"
    4 #include <stdint.h>
    5 #include <kernel.hfa>
    6 #include <locks.hfa>
     4#include "stdint.h"
     5#include "kernel.hfa"
    76
     7struct select_node;
     8
     9// node status
     10static const unsigned long int __SELECT_UNSAT = 0;
     11static const unsigned long int __SELECT_SAT = 1;
     12static const unsigned long int __SELECT_RUN = 2;
     13
     14static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
     15static inline void __CFA_maybe_park( int * park_counter ) {
     16    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
     17        park();
     18}
     19
     20// node used for coordinating waituntil synchronization
    821struct select_node {
     22    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
     23    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
     24
     25    void * extra;                       // used to store arbitrary data needed by some primitives
     26
    927    thread$ * blocked_thread;
    10     void ** race_flag;
    1128    inline dlink(select_node);
    1229};
    1330P9_EMBEDDED( select_node, dlink(select_node) )
    1431
    15 void ?{}( select_node & this ) {
    16     this.blocked_thread = 0p;
    17     this.race_flag = 0p;
     32static inline void ?{}( select_node & this ) {
     33    this.blocked_thread = active_thread();
     34    this.clause_status = 0p;
     35    this.park_counter = 0p;
     36    this.extra = 0p;
    1837}
    1938
    20 void ?{}( select_node & this, thread$ * blocked_thread ) {
     39static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
    2140    this.blocked_thread = blocked_thread;
    22     this.race_flag = 0p;
     41    this.clause_status = 0p;
     42    this.park_counter = 0p;
     43    this.extra = 0p;
    2344}
    2445
    25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) {
     46static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
    2647    this.blocked_thread = blocked_thread;
    27     this.race_flag = race_flag;
     48    this.clause_status = 0p;
     49    this.park_counter = 0p;
     50    this.extra = extra;
    2851}
    2952
    30 void ^?{}( select_node & this ) {}
     53static inline void ^?{}( select_node & this ) {}
    3154
     55static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
    3256
    3357//-----------------------------------------------------------------------------
    3458// is_selectable
    35 trait is_selectable(T & | sized(T)) {
    36     // For registering a select on a selectable concurrency primitive
    37     // return 0p if primitive not accessible yet
    38     // return 1p if primitive gets acquired
    39     // return 2p if primitive is accessible but some other primitive won the race
    40     // C_TODO: add enum for return values
    41     void * register_select( T &, select_node & );
     59forall(T & | sized(T))
     60trait is_selectable {
     61    // For registering a select stmt on a selectable concurrency primitive
     62    // Returns bool that indicates if operation is already SAT
     63    bool register_select( T &, select_node & );
    4264
    43     void unregister_select( T &, select_node &  );
     65    // For unregistering a select stmt on a selectable concurrency primitive
     66    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
     67    bool unregister_select( T &, select_node &  );
     68
     69    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
     70    //    passed as an arg to this routine
     71    // If on_selected returns false, the statement is not run, if it returns true it is run.
     72    bool on_selected( T &, select_node & );
    4473};
    4574
    46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) {
    47     // temporary needed for atomic instruction
    48     void * cmp_flag = 0p;
    49    
    50     // if we dont win the selector race we need to potentially
    51     //   ignore this node and move to the next one so we return accordingly
    52     if ( *race_flag != 0p ||
    53         !__atomic_compare_exchange_n(
    54             race_flag,
    55             &cmp_flag,
    56             primitive_ptr,
    57             false,
    58             __ATOMIC_SEQ_CST,
    59             __ATOMIC_SEQ_CST
    60         )
    61     ) return false; // lost race and some other node triggered select
    62     return true; // won race so this node is what the select proceeds with
     75// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
     76static inline bool __select_node_else_race( select_node & this ) with( this ) {
     77    unsigned long int cmp_status = __SELECT_UNSAT;
     78    return *clause_status == 0
     79            && __atomic_compare_exchange_n( clause_status, &cmp_status, 1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
    6380}
     81
     82// when a primitive becomes available it calls the following routine on it's node to update the select state:
     83// return true if we want to unpark the thd
     84static inline bool __make_select_node_available( select_node & this ) with( this ) {
     85    unsigned long int cmp_status = __SELECT_UNSAT;
     86
     87    if( !park_counter )
     88        return *clause_status == 0
     89            && __atomic_compare_exchange_n( clause_status, &cmp_status, (unsigned long int)&this, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ); // OR specific case where race was won
     90
     91    return *clause_status == 0
     92        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
     93        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
     94}
     95
     96// Handles the special OR case of the waituntil statement
     97// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
     98//    performing the operation since if we lose the race the operation should not be performed as it will be lost
     99// Returns true if execution can continue normally and false if the queue has now been drained
     100static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
     101    if ( queue`isEmpty ) return false;
     102    if ( queue`first.clause_status && !queue`first.park_counter ) {
     103        while ( !queue`isEmpty ) {
     104            // if node not a special OR case or if we win the special OR case race break
     105            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) ) { return true; }
     106            // otherwise we lost the special OR race so discard node
     107            try_pop_front( queue );
     108        }
     109        return false;
     110    }
     111    return true;
     112}
     113
     114// wake one thread from the list
     115static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
     116    if ( !popped.clause_status                              // normal case, node is not a select node
     117        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
     118        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
     119        unpark( popped.blocked_thread );
     120}
     121
     122static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
     123
     124static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
     125    this.blocked_thread = active_thread();
     126    this.clause_status = clause_status;
     127    this.park_counter = park_counter;
     128}
     129
Note: See TracChangeset for help on using the changeset viewer.