Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/concurrency/select.hfa

    r5e4a830 re23b3ce  
     1//
     2// Cforall Version 1.0.0 Copyright (C) 2021 University of Waterloo
     3//
     4// The contents of this file are covered under the licence agreement in the
     5// file "LICENCE" distributed with Cforall.
     6//
     7// channel.hfa -- LIBCFATHREAD
     8// Runtime locks that used with the runtime thread system.
     9//
     10// Author           : Colby Alexander Parsons
     11// Created On       : Thu Jan 21 19:46:50 2023
     12// Last Modified By :
     13// Last Modified On :
     14// Update Count     :
     15//
     16
    117#pragma once
    218
    319#include "containers/list.hfa"
    4 #include <stdint.h>
    5 #include <kernel.hfa>
    6 #include <locks.hfa>
     20#include "alarm.hfa"
     21#include "kernel.hfa"
     22#include "time.hfa"
    723
     24struct select_node;
     25
     26// node status
     27static const unsigned long int __SELECT_UNSAT = 0;
     28static const unsigned long int __SELECT_PENDING = 1; // used only by special OR case
     29static const unsigned long int __SELECT_SAT = 2;
     30static const unsigned long int __SELECT_RUN = 3;
     31
     32// these are used inside the compiler to aid in code generation
     33static inline bool __CFA_has_clause_run( unsigned long int status ) { return status == __SELECT_RUN; }
     34static inline void __CFA_maybe_park( int * park_counter ) {
     35    if ( __atomic_sub_fetch( park_counter, 1, __ATOMIC_SEQ_CST) < 0 )
     36        park();
     37}
     38
     39// node used for coordinating waituntil synchronization
    840struct select_node {
     41    int * park_counter;                 // If this is 0p then the node is in a special OR case waituntil
     42    unsigned long int * clause_status;  // needs to point at ptr sized location, if this is 0p then node is not part of a waituntil
     43
     44    void * extra;                       // used to store arbitrary data needed by some primitives
     45
    946    thread$ * blocked_thread;
    10     void ** race_flag;
    1147    inline dlink(select_node);
    1248};
    1349P9_EMBEDDED( select_node, dlink(select_node) )
    1450
    15 void ?{}( select_node & this ) {
    16     this.blocked_thread = 0p;
    17     this.race_flag = 0p;
     51static inline void ?{}( select_node & this ) {
     52    this.blocked_thread = active_thread();
     53    this.clause_status = 0p;
     54    this.park_counter = 0p;
     55    this.extra = 0p;
    1856}
    1957
    20 void ?{}( select_node & this, thread$ * blocked_thread ) {
     58static inline void ?{}( select_node & this, thread$ * blocked_thread ) {
    2159    this.blocked_thread = blocked_thread;
    22     this.race_flag = 0p;
     60    this.clause_status = 0p;
     61    this.park_counter = 0p;
     62    this.extra = 0p;
    2363}
    2464
    25 void ?{}( select_node & this, thread$ * blocked_thread, void ** race_flag ) {
     65static inline void ?{}( select_node & this, thread$ * blocked_thread, void * extra ) {
    2666    this.blocked_thread = blocked_thread;
    27     this.race_flag = race_flag;
     67    this.clause_status = 0p;
     68    this.park_counter = 0p;
     69    this.extra = extra;
    2870}
     71static inline void ^?{}( select_node & this ) {}
    2972
    30 void ^?{}( select_node & this ) {}
     73// this is used inside the compiler to aid in code generation
     74static inline unsigned long int * __get_clause_status( select_node & s ) { return s.clause_status; }
    3175
     76// this is used inside the compiler to attempt to establish an else clause as a winner in the OR special case race
     77static inline bool __select_node_else_race( select_node & this ) with( this ) {
     78    unsigned long int cmp_status = __SELECT_UNSAT;
     79    return *clause_status == 0
     80            && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST );
     81}
    3282
    3383//-----------------------------------------------------------------------------
    3484// is_selectable
    35 trait is_selectable(T & | sized(T)) {
    36     // For registering a select on a selectable concurrency primitive
    37     // return 0p if primitive not accessible yet
    38     // return 1p if primitive gets acquired
    39     // return 2p if primitive is accessible but some other primitive won the race
    40     // C_TODO: add enum for return values
    41     void * register_select( T &, select_node & );
     85forall(T & | sized(T))
     86trait is_selectable {
     87    // For registering a select stmt on a selectable concurrency primitive
     88    // Returns bool that indicates if operation is already SAT
     89    bool register_select( T &, select_node & );
    4290
    43     void unregister_select( T &, select_node &  );
     91    // For unregistering a select stmt on a selectable concurrency primitive
     92    // If true is returned then the corresponding code block is run (only in non-special OR case and only if node status is not RUN)
     93    bool unregister_select( T &, select_node &  );
     94
     95    // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node
     96    //    passed as an arg to this routine
     97    // If on_selected returns false, the statement is not run, if it returns true it is run.
     98    bool on_selected( T &, select_node & );
    4499};
    45100
    46 static inline bool install_select_winner( select_node & this, void * primitive_ptr ) with(this) {
    47     // temporary needed for atomic instruction
    48     void * cmp_flag = 0p;
    49    
    50     // if we dont win the selector race we need to potentially
    51     //   ignore this node and move to the next one so we return accordingly
    52     if ( *race_flag != 0p ||
    53         !__atomic_compare_exchange_n(
    54             race_flag,
    55             &cmp_flag,
    56             primitive_ptr,
    57             false,
    58             __ATOMIC_SEQ_CST,
    59             __ATOMIC_SEQ_CST
    60         )
    61     ) return false; // lost race and some other node triggered select
    62     return true; // won race so this node is what the select proceeds with
     101//=============================================================================================
     102// Waituntil Helpers
     103//=============================================================================================
     104
     105// used for the 2-stage avail needed by the special OR case
     106static inline bool __mark_select_node( select_node & this, unsigned long int val ) with( this ) {
     107    /* paranoid */ verify( park_counter == 0p );
     108    /* paranoid */ verify( clause_status != 0p );
     109
     110    unsigned long int cmp_status = __SELECT_UNSAT;
     111    while( !__atomic_compare_exchange_n( clause_status, &cmp_status, val, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {
     112        if ( cmp_status != __SELECT_PENDING ) return false;
     113        cmp_status = __SELECT_UNSAT;
     114    }
     115    return true;
    63116}
     117
     118static inline void __make_select_node_unsat( select_node & this ) with( this ) {
     119    __atomic_store_n( clause_status, __SELECT_UNSAT, __ATOMIC_SEQ_CST );
     120}
     121static inline void __make_select_node_sat( select_node & this ) with( this ) {
     122    __atomic_store_n( clause_status, __SELECT_SAT, __ATOMIC_SEQ_CST );
     123}
     124
     125static inline bool __make_select_node_pending( select_node & this ) with( this ) {
     126    return __mark_select_node( this, __SELECT_PENDING );
     127}
     128
     129// when a primitive becomes available it calls the following routine on it's node to update the select state:
     130// return true if we want to unpark the thd
     131static inline bool __make_select_node_available( select_node & this ) with( this ) {
     132    /* paranoid */ verify( clause_status != 0p );
     133    if( !park_counter )
     134        return __mark_select_node( this, (unsigned long int)&this );
     135
     136    unsigned long int cmp_status = __SELECT_UNSAT;
     137
     138    return *clause_status == 0 // C_TODO might not need a cmp_xchg in non special OR case
     139        && __atomic_compare_exchange_n( clause_status, &cmp_status, __SELECT_SAT, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) // can maybe just use atomic write
     140        && !__atomic_add_fetch( park_counter, 1, __ATOMIC_SEQ_CST);
     141}
     142
     143// Handles the special OR case of the waituntil statement
     144// Since only one select node can win in the OR case, we need to race to set the node available BEFORE
     145//    performing the operation since if we lose the race the operation should not be performed as it will be lost
     146// Returns true if execution can continue normally and false if the queue has now been drained
     147static inline bool __handle_waituntil_OR( dlist( select_node ) & queue ) {
     148    if ( queue`isEmpty ) return false;
     149    if ( queue`first.clause_status && !queue`first.park_counter ) {
     150        while ( !queue`isEmpty ) {
     151            // if node not a special OR case or if we win the special OR case race break
     152            if ( !queue`first.clause_status || queue`first.park_counter || __make_select_node_available( queue`first ) )
     153                return true;
     154            // otherwise we lost the special OR race so discard node
     155            try_pop_front( queue );
     156        }
     157        return false;
     158    }
     159    return true;
     160}
     161
     162// wake one thread from the list
     163static inline void wake_one( dlist( select_node ) & queue, select_node & popped ) {
     164    if ( !popped.clause_status                              // normal case, node is not a select node
     165        || ( popped.clause_status && !popped.park_counter ) // If popped link is special case OR selecting unpark but don't call __make_select_node_available
     166        || __make_select_node_available( popped ) )         // check if popped link belongs to a selecting thread
     167        unpark( popped.blocked_thread );
     168}
     169
     170static inline void wake_one( dlist( select_node ) & queue ) { wake_one( queue, try_pop_front( queue ) ); }
     171
     172static inline void setup_clause( select_node & this, unsigned long int * clause_status, int * park_counter ) {
     173    this.blocked_thread = active_thread();
     174    this.clause_status = clause_status;
     175    this.park_counter = park_counter;
     176}
     177
     178// waituntil ( timeout( ... ) ) support
     179struct select_timeout_node {
     180    alarm_node_t a_node;
     181    select_node * s_node;
     182};
     183void ?{}( select_timeout_node & this, Duration duration, Alarm_Callback callback );
     184void ^?{}( select_timeout_node & this );
     185void timeout_handler_select_cast( alarm_node_t & node );
     186
     187// Selectable trait routines
     188bool register_select( select_timeout_node & this, select_node & node );
     189bool unregister_select( select_timeout_node & this, select_node & node );
     190bool on_selected( select_timeout_node & this, select_node & node );
     191
     192// Gateway routines to waituntil on duration
     193select_timeout_node timeout( Duration duration );
     194select_timeout_node sleep( Duration duration );
Note: See TracChangeset for help on using the changeset viewer.