source: libcfa/src/concurrency/future.hfa @ 08e0d65

Last change on this file since 08e0d65 was db19e1d, checked in by Andrew Beach <ajbeach@…>, 8 days ago

Changed the interpritation of () to be no parameters instead of any parameters. This had a lot of little changes because of this and some nearby clean-up. This includes some changes, including changing some generated functions to be fixed-args instead of variable-args, stripping out the place holder void parameter earlier, but it still shows up earlier in some cases that examine the parser directly. Also had to update the function generation tools. Have only tested with one --arch. Hopefully this all works out.

  • Property mode set to 100644
File size: 8.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// concurrency/future.hfa --
8//
9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
10// Created On       : Wed Jan 06 17:33:18 2021
11// Last Modified By :
12// Last Modified On :
13// Update Count     :
14//
15
16#pragma once
17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
20#include "select.hfa"
21#include "locks.hfa"
22
23//----------------------------------------------------------------------------
24// future
25// I don't use future_t here since I need to use a lock for this future
26//  since it supports multiple consumers
27//  future_t is lockfree and uses atomics which aren't needed given we use locks here
28forall( T ) {
29    // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
30
31    // temporary enum replacement
32    const int FUTURE_EMPTY = 0;
33    const int FUTURE_FULFILLED = 1;
34
35        struct future {
36                int state;
37                T result;
38                dlist( select_node ) waiters;
39        futex_mutex lock;
40        };
41    __CFA_SELECT_GET_TYPE( future(T) );
42
43    struct future_node {
44        inline select_node;
45        T * my_result;
46    };
47
48        static inline {
49
50        void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
51            ((select_node &)this){ blocked_thread };
52            this.my_result = my_result;
53        }
54
55        void ?{}( future(T) & this ) {
56                        this.waiters{};
57            this.state = FUTURE_EMPTY;
58            this.lock{};
59                }
60
61                // Reset future back to original state
62                void reset( future(T) & this ) with(this)
63        {
64            lock( lock );
65            if( ! waiters`isEmpty )
66                abort("Attempting to reset a future with blocked waiters");
67            state = FUTURE_EMPTY;
68            unlock( lock );
69        }
70
71                // check if the future is available
72        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
73                bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
74
75
76        // memcpy wrapper to help copy values
77        void copy_T( T & from, T & to ) {
78            memcpy((void *)&to, (void *)&from, sizeof(T));
79        }
80
81        // internal helper to signal waiters off of the future
82        void _internal_flush( future(T) & this ) with(this) {
83            while( ! waiters`isEmpty ) {
84                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
85                    break; // if handle_OR returns false then waiters is empty so break
86                select_node &s = try_pop_front( waiters );
87
88                if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
89                    copy_T( result, *(((future_node(T) &)s).my_result) );
90
91                wake_one( waiters, s );
92            }
93        }
94
95                // Fulfil the future, returns whether or not someone was unblocked
96                bool fulfil( future(T) & this, T val ) with(this) {
97            lock( lock );
98            if( state != FUTURE_EMPTY )
99                abort("Attempting to fulfil a future that has already been fulfilled");
100
101            copy_T( val, result );
102
103            bool ret_val = ! waiters`isEmpty;
104            state = FUTURE_FULFILLED;
105                        _internal_flush( this );
106            unlock( lock );
107            return ret_val;
108                }
109
110                // Wait for the future to be fulfilled
111                // Also return whether the thread had to block or not
112                [T, bool] get( future(T) & this ) with( this ) {
113            lock( lock );
114            T ret_val;
115            if( state == FUTURE_FULFILLED ) {
116                copy_T( result, ret_val );
117                unlock( lock );
118                return [ret_val, false];
119            }
120
121            future_node(T) node = { active_thread(), &ret_val };
122            insert_last( waiters, ((select_node &)node) );
123            unlock( lock );
124            park( );
125
126                        return [ret_val, true];
127                }
128
129                // Wait for the future to be fulfilled
130                T get( future(T) & this ) {
131                        [T, bool] tt;
132                        tt = get(this);
133                        return tt.0;
134                }
135
136        // Gets value if it is available and returns [ val, true ]
137        // otherwise returns [ default_val, false]
138        // will not block
139        [T, bool] try_get( future(T) & this ) with(this) {
140            lock( lock );
141            T ret_val;
142            if( state == FUTURE_FULFILLED ) {
143                copy_T( result, ret_val );
144                unlock( lock );
145                return [ret_val, true];
146            }
147            unlock( lock );
148
149            return [ret_val, false];
150        }
151
152        bool register_select( future(T) & this, select_node & s ) with(this) {
153            lock( lock );
154
155            // check if we can complete operation. If so race to establish winner in special OR case
156            if ( !s.park_counter && state != FUTURE_EMPTY ) {
157                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
158                    unlock( lock );
159                    return false;
160                }
161            }
162
163            // future not ready -> insert select node and return
164            if( state == FUTURE_EMPTY ) {
165                insert_last( waiters, s );
166                unlock( lock );
167                return false;
168            }
169
170            __make_select_node_available( s );
171            unlock( lock );
172            return true;
173        }
174
175        bool unregister_select( future(T) & this, select_node & s ) with(this) {
176            if ( ! s`isListed ) return false;
177            lock( lock );
178            if ( s`isListed ) remove( s );
179            unlock( lock );
180            return false;
181        }
182
183        bool on_selected( future(T) & this, select_node & node ) { return true; }
184        }
185}
186
187//--------------------------------------------------------------------------------------------------------
188// These futures below do not support select statements so they may not have as many features as 'future'
189//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
190//  since it uses raw atomics and no locks
191//
192// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
193//  since it is monitor based and also is not compatible with select statements
194//--------------------------------------------------------------------------------------------------------
195
196forall( T ) {
197        struct single_future {
198                inline future_t;
199                T result;
200        };
201
202        static inline {
203                // Reset future back to original state
204                void reset(single_future(T) & this) { reset( (future_t&)this ); }
205
206                // check if the future is available
207                bool available( single_future(T) & this ) { return available( (future_t&)this ); }
208
209                // Mark the future as abandoned, meaning it will be deleted by the server
210                // This doesn't work beause of the potential need for a destructor
211                void abandon( single_future(T) & this );
212
213                // Fulfil the future, returns whether or not someone was unblocked
214                thread$ * fulfil( single_future(T) & this, T result ) {
215                        this.result = result;
216                        return fulfil( (future_t&)this );
217                }
218
219                // Wait for the future to be fulfilled
220                // Also return whether the thread had to block or not
221                [T, bool] wait( single_future(T) & this ) {
222                        bool r = wait( (future_t&)this );
223                        return [this.result, r];
224                }
225
226                // Wait for the future to be fulfilled
227                T wait( single_future(T) & this ) {
228                        [T, bool] tt;
229                        tt = wait(this);
230                        return tt.0;
231                }
232        }
233}
234
235forall( T ) {
236        monitor multi_future {
237                inline future_t;
238                condition blocked;
239                bool has_first;
240                T result;
241        };
242
243        static inline {
244                void ?{}(multi_future(T) & this) {
245                        this.has_first = false;
246                }
247
248                bool $first( multi_future(T) & mutex this ) {
249                        if (this.has_first) {
250                                wait( this.blocked );
251                                return false;
252                        }
253
254                        this.has_first = true;
255                        return true;
256                }
257
258                void $first_done( multi_future(T) & mutex this ) {
259                        this.has_first = false;
260                        signal_all( this.blocked );
261                }
262
263                // Reset future back to original state
264                void reset(multi_future(T) & mutex this) {
265                        if( this.has_first != false) abort("Attempting to reset a multi_future with at least one blocked threads");
266                        if( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
267                        reset( (future_t&)this );
268                }
269
270                // Fulfil the future, returns whether or not someone was unblocked
271                bool fulfil( multi_future(T) & this, T result ) {
272                        this.result = result;
273                        return fulfil( (future_t&)this ) != 0p;
274                }
275
276                // Wait for the future to be fulfilled
277                // Also return whether the thread had to block or not
278                [T, bool] wait( multi_future(T) & this ) {
279                        bool sw = $first( this );
280                        bool w = !sw;
281                        if ( sw ) {
282                                w = wait( (future_t&)this );
283                                $first_done( this );
284                        }
285
286                        return [this.result, w];
287                }
288
289                // Wait for the future to be fulfilled
290                T wait( multi_future(T) & this ) {
291                        return wait(this).0;
292                }
293        }
294}
Note: See TracBrowser for help on using the repository browser.