Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/executor.cfa

    ra51c0c0 rcca568e  
    44// buffer.
    55
     6#include <bits/containers.hfa>
    67#include <thread.hfa>
    7 #include <containers/list.hfa>
     8#include <stdio.h>
    89
    9 forall( dtype T | $dlistable(T, T) ) {
    10         monitor Buffer {                                                                        // unbounded buffer
    11                 dlist( T, T ) queue;                                                    // unbounded list of work requests
    12                 condition delay;
    13         }; // Buffer
     10forall( dtype T )
     11monitor Buffer {                                        // unbounded buffer
     12    __queue_t( T ) queue;                               // unbounded list of work requests
     13    condition delay;
     14}; // Buffer
     15forall( dtype T | is_node(T) ) {
     16    void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) {
     17        append( queue, elem );                          // insert element into buffer
     18        signal( delay );                                // restart
     19    } // insert
    1420
    15         void insert( Buffer(T) & mutex buf, T * elem ) with(buf) {
    16                 dlist( T, T ) * qptr = &queue;                                  // workaround https://cforall.uwaterloo.ca/trac/ticket/166
    17                 insert_last( *qptr, *elem );                                    // insert element into buffer
    18                 signal( delay );                                                                // restart
    19         } // insert
     21    T * remove( Buffer( T ) & mutex buf ) with(buf) {
     22        if ( queue.head != 0 ) wait( delay );                   // no request to process ? => wait
     23//      return pop_head( queue );
     24    } // remove
     25} // distribution
    2026
    21         T * remove( Buffer(T) & mutex buf ) with(buf) {
    22                 dlist( T, T ) * qptr = &queue;                                  // workaround https://cforall.uwaterloo.ca/trac/ticket/166
    23                 // if ( (*qptr)`is_empty ) wait( delay );                       // no request to process ? => wait
    24           if ( (*qptr)`is_empty ) return 0p;                            // no request to process ? => wait
    25                 return &pop_first( *qptr );
    26         } // remove
    27 } // forall
     27struct WRequest {                                       // client request, no return
     28    void (* action)( void );
     29    WRequest * next;                                    // intrusive queue field
     30}; // WRequest
    2831
    29 struct WRequest {                                                                               // client request, no return
    30         void (* action)( void );
    31         DLISTED_MGD_IMPL_IN(WRequest)
    32 }; // WRequest
    33 DLISTED_MGD_IMPL_OUT(WRequest)
    34 
    35 void ?{}( WRequest & req ) with(req) { action = 0; }
    36 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; }
     32WRequest *& get_next( WRequest & this ) { return this.next; }
     33void ?{}( WRequest & req ) with(req) { action = 0; next = 0; }
     34void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; }
    3735bool stop( WRequest & req ) { return req.action == 0; }
    3836void doit( WRequest & req ) { req.action(); }
    3937
    40 // Each worker has its own set (when requests buffers > workers) of work buffers to reduce contention between client
    41 // and server, where work requests arrive and are distributed into buffers in a roughly round-robin order.
     38// Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and
     39// are distributed into buffers in a roughly round-robin order.
    4240
    4341thread Worker {
    44         Buffer(WRequest) * requests;
    45         WRequest * request;
    46         unsigned int start, range;
     42    Buffer( WRequest ) * requests;
     43    unsigned int start, range;
    4744}; // Worker
    4845
    4946void main( Worker & w ) with(w) {
    50         for ( int i = 0;; i = (i + 1) % range ) {
    51                 request = remove( requests[i + start] );
    52           if ( ! request ) { yield(); continue; }
    53           if ( stop( *request ) ) break;
    54                 doit( *request );
    55                 delete( request );
    56         } // for
     47    for ( int i = 0;; i = (i + 1) % range ) {
     48        WRequest * request = remove( requests[i + start] );
     49      if ( ! request ) { yield(); continue; }
     50      if ( stop( *request ) ) break;
     51        doit( *request );
     52        delete( request );
     53    } // for
    5754} // Worker::main
    5855
    59 void ?{}( Worker & worker, cluster * wc, Buffer(WRequest) * requests, unsigned int start, unsigned int range ) {
    60         ((thread &)worker){ *wc };
    61         worker.[requests, request, start, range] = [requests, 0p, start, range];
     56void ?{}( Worker & worker, cluster * wc, Buffer( WRequest ) * requests, unsigned int start, unsigned int range ) {
     57    (*get_thread(worker)){ *wc };                       // create on given cluster
     58    worker.[requests, start, range] = [requests, start, range];
    6259} // ?{}
    6360
    64 WRequest * current_request( Worker & worker ) { return worker.request; }
    65 
    6661struct Executor {
    67         cluster * cluster;                                                                      // if workers execute on separate cluster
    68         processor ** processors;                                                        // array of virtual processors adding parallelism for workers
    69         Buffer(WRequest) * requests;                                            // list of work requests
    70         Worker ** workers;                                                                      // array of workers executing work requests
    71         unsigned int nprocessors, nworkers, nrqueues;           // number of processors/threads/request queues
    72         bool sepClus;                                                                           // use same or separate cluster for executor
    73         unsigned int next;                                                                      // demultiplexed across worker buffers
     62    cluster * cluster;                                  // if workers execute on separate cluster
     63    processor ** processors;                            // array of virtual processors adding parallelism for workers
     64    Buffer( WRequest ) * requests;                      // list of work requests
     65    Worker ** workers;                                  // array of workers executing work requests
     66    unsigned int nprocessors, nworkers, nmailboxes;     // number of mailboxes/workers/processor tasks
     67    bool sepClus;                                       // use same or separate cluster for executor
    7468}; // Executor
    7569
     70static thread_local unsigned int next;                  // demultiplexed across worker buffers
    7671unsigned int tickets( Executor & ex ) with(ex) {
    77         //return uFetchAdd( next, 1 ) % nrqueues;
    78         return next++ % nrqueues;                                                       // no locking, interference randomizes
     72    //return uFetchAdd( next, 1 ) % nmailboxes;
     73    return next++ % nmailboxes;                         // no locking, interference randomizes
    7974} // tickets
    8075
    81 void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nr, bool sc = false ) with(ex) {
    82         [nprocessors, nworkers, nrqueues, sepClus] = [np, nw, nr, sc];
    83         assert( nrqueues >= nworkers );
    84         cluster = sepClus ? new( "Executor" ) : active_cluster();
    85         processors = aalloc( nprocessors );
    86         requests = anew( nrqueues );
    87         workers = aalloc( nworkers );
     76void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) {
     77    [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];
     78    assert( nmailboxes >= nworkers );
     79    cluster = sepClus ? new( "Executor" ) : active_cluster();
     80    processors = (processor **)anew( nprocessors );
     81    requests = anew( nmailboxes );
     82    workers = (Worker **)anew( nworkers );
    8883
    89         for ( i; nprocessors ) {
    90                 processors[i] = new( *cluster );
    91         } // for
     84    for ( i; nprocessors ) {
     85        processors[ i ] = new( *cluster );
     86    } // for
    9287
    93         unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;
    94 //      for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {
    95     for ( i; nworkers : start; 0u ~ @ ~ range : range; ) {
    96             range = reqPerWorker + ( i < extras ? 1 : 0 );
    97                 workers[i] = new( cluster, requests, start, range );
    98         } // for
     88    unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers;
     89    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) {
     90        workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) );
     91    } // for
    9992} // ?{}
    10093
    10194void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) {
    102         ex{ nprocessors, nworkers, nworkers, sepClus };
     95    ex{ nprocessors, nworkers, nworkers, sepClus };
    10396}
    10497void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) {
    105         ex{ nprocessors, nprocessors, nprocessors, sepClus };
     98    ex{ nprocessors, nprocessors, nprocessors, sepClus };
    10699}
    107 void ?{}( Executor & ex ) {                                                             // special for current cluster, no processors added
    108         ex{ 0, active_cluster()->nprocessors, false };
     100void ?{}( Executor & ex ) {                             // special for current cluster
     101    ex{ 0, active_cluster()->nprocessors, false };
    109102}
    110103void ^?{}( Executor & ex ) with(ex) {
    111         // Add one sentinel per worker to stop them. Since in destructor, no new external work should be queued.  Cannot
    112         // combine next two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may
    113         // take the single sentinel while waiting for worker 0 to end.
     104    // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued.  Cannot combine next
     105    // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the
     106    // single sentinel while waiting for worker 0 to end.
    114107
    115         WRequest sentinel[nworkers];
    116         unsigned int reqPerWorker = nrqueues / nworkers;
    117         for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
    118                 insert( requests[step], &sentinel[i] );                 // force eventually termination
    119         } // for
    120         for ( i; nworkers ) {
    121                 delete( workers[i] );
    122         } // for
    123         for ( i; nprocessors ) {
    124                 delete( processors[i] );
    125         } // for
     108    WRequest sentinel[nworkers];
     109    unsigned int reqPerWorker = nmailboxes / nworkers;
     110    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
     111        insert( requests[step], &sentinel[i] );         // force eventually termination
     112    } // for
     113    for ( i; nworkers ) {
     114        delete( workers[ i ] );
     115    } // for
     116    for ( i; nprocessors ) {
     117        delete( processors[ i ] );
     118    } // for
    126119
    127         free( workers );
    128 //      adelete( nrqueues, requests );
    129         for ( i; nrqueues ) ^?{}( requests[i] );                        // FIX ME: problem with resolver
    130         free( requests );
    131         free( processors );
    132         if ( sepClus ) { delete( cluster ); }
     120    delete( workers );
     121    delete( requests );
     122    delete( processors );
     123    if ( sepClus ) { delete( cluster ); }
    133124} // ^?{}
    134125
    135126void send( Executor & ex, void (* action)( void ) ) {   // asynchronous call, no return value
    136         WRequest * node = new( action );
    137         insert( ex.requests[tickets( ex )], node );
     127    WRequest * node = new( action );
     128    insert( ex.requests[tickets( ex )], node );
    138129} // send
    139 
    140130
    141131int counter = 0;
    142132
    143 void work( void ) {
    144         __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
    145         // fprintf( stderr, "workie\n" );
     133void workie( void ) {
     134    __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
     135//    fprintf( stderr, "workie\n" );
    146136}
    147137
    148 int main( int argc, char * argv[] ) {
    149         int times = 1_000_000;
    150         if ( argc == 2 ) times = atoi( argv[1] );
    151         processor p[7];
    152         {
    153                 Executor exector;
    154                 for ( i; times ) {
    155                         send( exector, work );
    156                         if ( i % 100 == 0 ) yield();
    157                 } // for
    158         }
    159         printf( "%d\n", counter );
     138int main() {
     139    {
     140        Executor exector;
     141        for ( i; 3000 ) {
     142            send( exector, workie );
     143            if ( i % 100 ) yield();
     144        } // for
     145    }
     146    printf( "%d\n", counter );
    160147}
    161148
    162149// Local Variables: //
    163 // tab-width: 4" //
    164150// compile-command: "cfa executor.cfa" //
    165151// End: //
Note: See TracChangeset for help on using the changeset viewer.