source: libcfa/src/executor.cfa @ 8d462e5

arm-ehjacob/cs343-translationnew-astnew-ast-unique-expr
Last change on this file since 8d462e5 was 8d462e5, checked in by Michael Brooks <mlbrooks@…>, 18 months ago

Exector starter ported to standard-library dlist. Basic run works.

  • Property mode set to 100644
File size: 5.5 KB
Line 
1// Mutex buffer is embedded in the nomutex executor to allow the executor to delete the workers without causing a
2// deadlock.  If the executor is the monitor and the buffer is class, the thread calling the executor's destructor
3// (which is mutex) blocks when deleting the workers, preventing outstanding workers from calling remove to drain the
4// buffer.
5
6#include <containers/list.hfa>
7#include <thread.hfa>
8#include <stdio.h>
9
10struct WRequest {                                       // client request, no return
11    void (* action)( void );
12    DLISTED_MGD_IMPL_IN(WRequest)
13}; // WRequest
14DLISTED_MGD_IMPL_OUT(WRequest)
15
16void ?{}( WRequest & req ) with(req) { action = 0; }
17void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; }
18bool stop( WRequest & req ) { return req.action == 0; }
19void doit( WRequest & req ) { req.action(); }
20
21monitor WRBuffer {                                      // unbounded buffer
22    dlist( WRequest, WRequest ) queue;                  // unbounded list of work requests
23    condition delay;
24}; // WRBuffer
25
26void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) {
27    insert_last( queue, *elem );                        // insert element into buffer
28    signal( delay );                                    // restart
29} // insert
30
31WRequest * remove( WRBuffer & mutex buf ) with(buf) {
32    if ( queue`is_empty ) wait( delay );                // no request to process ? => wait
33    return & pop_first( queue );
34} // remove
35
36// Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and
37// are distributed into buffers in a roughly round-robin order.
38
39thread Worker {
40    WRBuffer * requests;
41    unsigned int start, range;
42}; // Worker
43
44void main( Worker & w ) with(w) {
45    for ( int i = 0;; i = (i + 1) % range ) {
46        WRequest * request = remove( requests[i + start] );
47      if ( ! request ) { yield(); continue; }
48      if ( stop( *request ) ) break;
49        doit( *request );
50        delete( request );
51    } // for
52} // Worker::main
53
54void ?{}( Worker & worker, cluster * wc, WRBuffer * requests, unsigned int start, unsigned int range ) {
55    (*get_thread(worker)){ *wc };                       // create on given cluster
56    worker.[requests, start, range] = [requests, start, range];
57} // ?{}
58
59struct Executor {
60    cluster * cluster;                                  // if workers execute on separate cluster
61    processor ** processors;                            // array of virtual processors adding parallelism for workers
62    WRBuffer * requests;                                // list of work requests
63    Worker ** workers;                                  // array of workers executing work requests
64    unsigned int nprocessors, nworkers, nmailboxes;     // number of mailboxes/workers/processor tasks
65    bool sepClus;                                       // use same or separate cluster for executor
66}; // Executor
67
68static thread_local unsigned int next;                  // demultiplexed across worker buffers
69unsigned int tickets( Executor & ex ) with(ex) {
70    //return uFetchAdd( next, 1 ) % nmailboxes;
71    return next++ % nmailboxes;                         // no locking, interference randomizes
72} // tickets
73
74void ?{}( Executor & ex, unsigned int np, unsigned int nw, unsigned int nm, bool sc = false ) with(ex) {
75    [nprocessors, nworkers, nmailboxes, sepClus] = [np, nw, nm, sc];
76    assert( nmailboxes >= nworkers );
77    cluster = sepClus ? new( "Executor" ) : active_cluster();
78    processors = (processor **)anew( nprocessors );
79    requests = (WRBuffer *)anew( nmailboxes );
80    workers = (Worker **)anew( nworkers );
81
82    for ( i; nprocessors ) {
83        processors[ i ] = new( *cluster );
84    } // for
85
86    unsigned int reqPerWorker = nmailboxes / nworkers, extras = nmailboxes % nworkers;
87    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker + ( i < extras ? 1 : 0 ) ) {
88        workers[ i ] = new( cluster, requests, step, reqPerWorker + ( i < extras ? 1 : 0 ) );
89    } // for
90} // ?{}
91
92void ?{}( Executor & ex, unsigned int nprocessors, unsigned int nworkers, bool sepClus = false ) {
93    ex{ nprocessors, nworkers, nworkers, sepClus };
94}
95void ?{}( Executor & ex, unsigned int nprocessors, bool sepClus = false ) {
96    ex{ nprocessors, nprocessors, nprocessors, sepClus };
97}
98void ?{}( Executor & ex ) {                             // special for current cluster
99    ex{ 0, active_cluster()->nprocessors, false };
100}
101void ^?{}( Executor & ex ) with(ex) {
102    // Add one sentinel per worker to stop them. Since in destructor, no new work should be queued.  Cannot combine next
103    // two loops and only have a single sentinel because workers arrive in arbitrary order, so worker1 may take the
104    // single sentinel while waiting for worker 0 to end.
105
106    WRequest sentinel[nworkers];
107    unsigned int reqPerWorker = nmailboxes / nworkers;
108    for ( unsigned int i = 0, step = 0; i < nworkers; i += 1, step += reqPerWorker ) {
109        insert( requests[step], &sentinel[i] );         // force eventually termination
110    } // for
111    for ( i; nworkers ) {
112        delete( workers[ i ] );
113    } // for
114    for ( i; nprocessors ) {
115        delete( processors[ i ] );
116    } // for
117
118    delete( workers );
119    delete( requests );
120    delete( processors );
121    if ( sepClus ) { delete( cluster ); }
122} // ^?{}
123
124void send( Executor & ex, void (* action)( void ) ) {   // asynchronous call, no return value
125    WRequest * node = new( action );
126    insert( ex.requests[tickets( ex )], node );
127} // send
128
129int counter = 0;
130
131void workie( void ) {
132    __atomic_add_fetch( &counter, 1, __ATOMIC_SEQ_CST );
133//    fprintf( stderr, "workie\n" );
134}
135
136int main() {
137    {
138        Executor exector;
139        for ( i; 3000 ) {
140            send( exector, workie );
141            if ( i % 100 == 0 ) {
142//              fprintf( stderr, "%d\n", i );
143                yield();
144            }
145        } // for
146    }
147    printf( "%d\n", counter );
148}
149
150// Local Variables: //
151// compile-command: "cfa executor.cfa" //
152// End: //
Note: See TracBrowser for help on using the repository browser.