Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/executor.cfa

    ra51c0c0 r8d462e5  
    44// buffer.
    55
     6#include <containers/list.hfa>
    67#include <thread.hfa>
    7 #include <containers/list.hfa>
     8#include <stdio.h>
    89
    9 forall( dtype T | $dlistable(T, T) ) {
    10         monitor Buffer {                                                                        // unbounded buffer
    11                 dlist( T, T ) queue;                                                    // unbounded list of work requests
    12                 condition delay;
    13         }; // Buffer
    14 
    15         void insert( Buffer(T) & mutex buf, T * elem ) with(buf) {
    16                 dlist( T, T ) * qptr = &queue;                                  // workaround https://cforall.uwaterloo.ca/trac/ticket/166
    17                 insert_last( *qptr, *elem );                                    // insert element into buffer
    18                 signal( delay );                                                                // restart
    19         } // insert
    20 
    21         T * remove( Buffer(T) & mutex buf ) with(buf) {
    22                 dlist( T, T ) * qptr = &queue;                                  // workaround https://cforall.uwaterloo.ca/trac/ticket/166
    23                 // if ( (*qptr)`is_empty ) wait( delay );                       // no request to process ? => wait
    24           if ( (*qptr)`is_empty ) return 0p;                            // no request to process ? => wait
    25                 return &pop_first( *qptr );
    26         } // remove
    27 } // forall
    28 
    29 struct WRequest {                                                                               // client request, no return
    30         void (* action)( void );
    31         DLISTED_MGD_IMPL_IN(WRequest)
     10struct WRequest {                                       // client request, no return
     11    void (* action)( void );
     12    DLISTED_MGD_IMPL_IN(WRequest)
    3213}; // WRequest
    3314DLISTED_MGD_IMPL_OUT(WRequest)
     
    3819void doit( WRequest & req ) { req.action(); }
    3920
    40 // Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client
    41 // and server, where work requests arrive and are distributed into buffers in a roughly round-robin order.
     21monitor WRBuffer {                                      // unbounded buffer
     22    dlist( WRequest, WRequest ) queue;                  // unbounded list of work requests
     23    condition delay;
     24}; // WRBuffer
     25
     26void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) {
     27    insert_last( queue, *elem );                        // insert element into buffer
     28    signal( delay );                                    // restart
     29} // insert
     30
     31WRequest * remove( WRBuffer & mutex buf ) with(buf) {
     32    if ( queue`is_empty ) wait( delay );                // no request to process ? => wait
     33    return & pop_first( queue );
     34} // remove
     35
     36// Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and
     37// are distributed into buffers in a roughly round-robin order.
    4238
    4339thread Worker {
    44         Buffer(WRequest) * requests;
    45         WRequest * request;
    46         unsigned int start, range;
     40    WRBuffer * requests;
     41    unsigned int start, range;
    4742}; // Worker
    4843
    4944void main( Worker & w ) with(w) {
    50         for ( int i = 0;; i = (i + 1) % range ) {
    51                 request = remove( requests[i + start] );
    52           if ( ! request ) { yield(); continue; }
    53           if ( stop( *request ) ) break;
    54                 doit( *request );
    55                 delete( request );
    56         } // for
     45    for ( int i = 0;; i = (i + 1) % range ) {
     46        WRequest * request = remove( requests[i + start] );
     47      if ( ! request ) { yield(); continue; }
     48      if ( stop( *request ) ) break;
     49        doit( *request );
     50        delete( request );
     51    } // for
    5752} // Worker::main
    5853
    59 void ?{}( Worker & worker, cluster * wc, Buffer(WRequest) * requests, unsigned int start, unsigned int range ) {
    60         ((thread &)worker){ *wc };
    61         worker.[requests, request, start, range] = [requests, 0p, start, range];
     54void ?{}( Worker & worker, cluster * wc, WRBuffer * requests, unsigned int start, unsigned int range ) {
     55    (*get_thread(worker)){ *wc };                       // create on given cluster
     56    worker.[requests, start, range] = [requests, start, range];
    6257} // ?{}
    6358
    64 WRequest * current_request( Worker & worker ) { return worker.request; }
    65 
    6659struct Executor {
    67         cluster * cluster;                                                                      // if workers execute on separate cluster
    68         processor ** processors;                                                        // array of virtual processors adding parallelism for workers
    69         Buffer(WRequest) * requests;                                            // list of work requests
    70         Worker ** workers;                                                                      // array of workers executing work requests
    71         unsigned int nprocessors, nworkers, nrqueues;           // number of processors/threads/request queues
    72         bool sepClus;                                                                           // use same or separate cluster for executor
    73         unsigned int next;                                                                      // demultiplexed across worker buffers
     60    cluster * cluster;                                  // if workers execute on separate cluster
     61    processor ** processors;                            // array of virtual processors adding parallelism for workers
     62    WRBuffer * requests;                                // list of work requests
     63    Worker ** workers;                                  // array of workers executing work requests
     64    unsigned int nprocessors, nworkers, nmailboxes;     // number of mailboxes/workers/processor tasks
     65    bool sepClus;                                       // use same or separate cluster for executor
    7466}; // Executor
    7567
     68static thread_local unsigned int next;                  // demultiplexed across worker buffers
    7669unsigned int tickets( Executor & ex ) with(ex) {
    77         //return uFetchAdd( next, 1 ) % nrqueues;
    78         return next++ % nrqueues;                                                       // no locking, interference randomizes
     70    //return uFetchAdd( next, 1 ) % nmailboxes;
     71    return next++ % nmailboxes;                         // no locking, interference randomizes
    7972} // tickets
    8073
    81 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nr, bool sc = false ) with(ex) {
    82         [nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc];
    83         assert( nrqueues >= nworkers );
    84         cluster = sepClus ? new( "Executor" ) : active_cluster();
    85         processors = aalloc( nprocessors );
    86         requests = anew( nrqueues );
    87         workers = aalloc( nworkers );
     74void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) {
     75    [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];
     76    assert( nmailboxes >= nworkers );
     77    cluster = sepClus ? new( "Executor" ) : active_cluster();
     78    processors = (processor **)anew( nprocessors );
     79    requests = (WRBuffer *)anew( nmailboxes );
     80    workers = (Worker **)anew( nworkers );
    8881
    89         for ( i; nprocessors ) {
    90                 processors[i] = new( *cluster );
    91         } // for
     82    for ( i; nprocessors ) {
     83        processors[ i ] = new( *cluster );
     84    } // for
    9285
    93         unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    94 //      for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    95     for ( i; nworkers : start; 0u ~ @ ~ range : range; ) {
    96             range = reqPerWorker + ( i < extras ? 1 : 0 );
    97                 workers[i] = new( cluster, requests, start, range );
    98         } // for
     86    unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers;
     87    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) {
     88        workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) );
     89    } // for
    9990} // ?{}
    10091
    10192void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) {
    102         ex{ nprocessors, nworkers, nworkers, sepClus };
     93    ex{ nprocessors, nworkers, nworkers, sepClus };
    10394}
    10495void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) {
    105         ex{ nprocessors, nprocessors, nprocessors, sepClus };
     96    ex{ nprocessors, nprocessors, nprocessors, sepClus };
    10697}
    107 void ?{}( Executor & ex ) {                                                             // special for current cluster, no processors added
    108         ex{ 0, active_cluster()->nprocessors, false };
     98void ?{}( Executor & ex ) {                             // special for current cluster
     99    ex{ 0, active_cluster()->nprocessors, false };
    109100}
    110101void ^?{}( Executor & ex ) with(ex) {
    111         // Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued.  Cannot
    112         // combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may
    113         // take the single sentinel while waiting for worker 0 to end.
     102    // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued.  Cannot combine next
     103    // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the
     104    // single sentinel while waiting for worker 0 to end.
    114105
    115         WRequest sentinel[nworkers];
    116         unsigned int reqPerWorker = nrqueues / nworkers;
    117         for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
    118                 insert( requests[step], &sentinel[i] );                 // force eventually termination
    119         } // for
    120         for ( i; nworkers ) {
    121                 delete( workers[i] );
    122         } // for
    123         for ( i; nprocessors ) {
    124                 delete( processors[i] );
    125         } // for
     106    WRequest sentinel[nworkers];
     107    unsigned int reqPerWorker = nmailboxes / nworkers;
     108    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
     109        insert( requests[step], &sentinel[i] );         // force eventually termination
     110    } // for
     111    for ( i; nworkers ) {
     112        delete( workers[ i ] );
     113    } // for
     114    for ( i; nprocessors ) {
     115        delete( processors[ i ] );
     116    } // for
    126117
    127         free( workers );
    128 //      adelete( nrqueues, requests );
    129         for ( i; nrqueues ) ^?{}( requests[i] );                        // FIX ME: problem with resolver
    130         free( requests );
    131         free( processors );
    132         if ( sepClus ) { delete( cluster ); }
     118    delete( workers );
     119    delete( requests );
     120    delete( processors );
     121    if ( sepClus ) { delete( cluster ); }
    133122} // ^?{}
    134123
    135124void send( Executor & ex, void (* action)( void ) ) {   // asynchronous call, no return value
    136         WRequest * node = new( action );
    137         insert( ex.requests[tickets( ex )], node );
     125    WRequest * node = new( action );
     126    insert( ex.requests[tickets( ex )], node );
    138127} // send
    139 
    140128
    141129int counter = 0;
    142130
    143 void work( void ) {
    144         __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
    145         // fprintf( stderr, "workie\n" );
     131void workie( void ) {
     132    __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
     133//    fprintf( stderr, "workie\n" );
    146134}
    147135
    148 int main( int argc, char * argv[] ) {
    149         int times = 1_000_000;
    150         if ( argc == 2 ) times = atoi( argv[1] );
    151         processor p[7];
    152         {
    153                 Executor exector;
    154                 for ( i; times ) {
    155                         send( exector, work );
    156                         if ( i % 100 == 0 ) yield();
    157                 } // for
    158         }
    159         printf( "%d\n", counter );
     136int main() {
     137    {
     138        Executor exector;
     139        for ( i; 3000 ) {
     140            send( exector, workie );
     141            if ( i % 100 == 0 ) {
     142//              fprintf( stderr, "%d\n", i );
     143                yield();
     144            }
     145        } // for
     146    }
     147    printf( "%d\n", counter );
    160148}
    161149
    162150// Local Variables: //
    163 // tab-width: 4" //
    164151// compile-command: "cfa executor.cfa" //
    165152// End: //
Note: See TracChangeset for help on using the changeset viewer.