Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • libcfa/src/executor.cfa

    r8d462e5 rcca568e  
    44// buffer.
    55
    6 #include <containers/list.hfa>
     6#include <bits/containers.hfa>
    77#include <thread.hfa>
    88#include <stdio.h>
    99
     10forall( dtype T )
     11monitor Buffer {                                        // unbounded buffer
     12    __queue_t( T ) queue;                               // unbounded list of work requests
     13    condition delay;
     14}; // Buffer
     15forall( dtype T | is_node(T) ) {
     16    void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) {
     17        append( queue, elem );                          // insert element into buffer
     18        signal( delay );                                // restart
     19    } // insert
     20
     21    T * remove( Buffer( T ) & mutex buf ) with(buf) {
     22        if ( queue.head != 0 ) wait( delay );                   // no request to process ? => wait
     23//      return pop_head( queue );
     24    } // remove
     25} // distribution
     26
    1027struct WRequest {                                       // client request, no return
    1128    void (* action)( void );
    12     DLISTED_MGD_IMPL_IN(WRequest)
     29    WRequest * next;                                    // intrusive queue field
    1330}; // WRequest
    14 DLISTED_MGD_IMPL_OUT(WRequest)
    1531
    16 void ?{}( WRequest & req ) with(req) { action = 0; }
    17 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; }
     32WRequest *& get_next( WRequest & this ) { return this.next; }
     33void ?{}( WRequest & req ) with(req) { action = 0; next = 0; }
     34void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; }
    1835bool stop( WRequest & req ) { return req.action == 0; }
    1936void doit( WRequest & req ) { req.action(); }
    20 
    21 monitor WRBuffer {                                      // unbounded buffer
    22     dlist( WRequest, WRequest ) queue;                  // unbounded list of work requests
    23     condition delay;
    24 }; // WRBuffer
    25 
    26 void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) {
    27     insert_last( queue, *elem );                        // insert element into buffer
    28     signal( delay );                                    // restart
    29 } // insert
    30 
    31 WRequest * remove( WRBuffer & mutex buf ) with(buf) {
    32     if ( queue`is_empty ) wait( delay );                // no request to process ? => wait
    33     return & pop_first( queue );
    34 } // remove
    3537
    3638// Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and
     
    3840
    3941thread Worker {
    40     WRBuffer * requests;
     42    Buffer( WRequest ) * requests;
    4143    unsigned int start, range;
    4244}; // Worker
     
    5254} // Worker::main
    5355
    54 void ?{}( Worker & worker, cluster * wc, WRBuffer * requests, unsigned int start, unsigned int range ) {
     56void ?{}( Worker & worker, cluster * wc, Buffer( WRequest ) * requests, unsigned int start, unsigned int range ) {
    5557    (*get_thread(worker)){ *wc };                       // create on given cluster
    5658    worker.[requests, start, range] = [requests, start, range];
     
    6062    cluster * cluster;                                  // if workers execute on separate cluster
    6163    processor ** processors;                            // array of virtual processors adding parallelism for workers
    62     WRBuffer * requests;                                // list of work requests
     64    Buffer( WRequest ) * requests;                      // list of work requests
    6365    Worker ** workers;                                  // array of workers executing work requests
    6466    unsigned int nprocessors, nworkers, nmailboxes;     // number of mailboxes/workers/processor tasks
     
    7779    cluster = sepClus ? new( "Executor" ) : active_cluster();
    7880    processors = (processor **)anew( nprocessors );
    79     requests = (WRBuffer *)anew( nmailboxes );
     81    requests = anew( nmailboxes );
    8082    workers = (Worker **)anew( nworkers );
    8183
     
    139141        for ( i; 3000 ) {
    140142            send( exector, workie );
    141             if ( i % 100 == 0 ) {
    142 //              fprintf( stderr, "%d\n", i );
    143                 yield();
    144             }
     143            if ( i % 100 ) yield();
    145144        } // for
    146145    }
Note: See TracChangeset for help on using the changeset viewer.