source: libcfa/src/concurrency/future.hfa @ 1c0a3a4

Last change on this file since 1c0a3a4 was 1c0a3a4, checked in by Peter A. Buhr <pabuhr@…>, 3 weeks ago

fix problem with future seperate compilation

  • Property mode set to 100644
File size: 8.7 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// concurrency/future.hfa --
8//
9// Author           : Thierry Delisle & Peiran Hong & Colby Parsons
10// Created On       : Wed Jan 06 17:33:18 2021
11// Last Modified By : Peter A. Buhr
12// Last Modified On : Fri Nov 29 16:07:34 2024
13// Update Count     : 30
14//
15
16#pragma once
17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
20#include "select.hfa"
21#include "locks.hfa"
22
23//----------------------------------------------------------------------------
24// future
25// I don't use future_t here as I need to use a lock for this future since it supports multiple consumers.
26// future_t is lockfree and uses atomics which aren't needed given we use locks here
27forall( T ) {
28    enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 };
29
30        struct future {
31                int state;
32                T result;
33                dlist( select_node ) waiters;
34        futex_mutex lock;
35        };
36    __CFA_SELECT_GET_TYPE( future(T) );
37
38    struct future_node {
39        inline select_node;
40        T * my_result;
41    };
42
43        static inline {
44
45        void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
46            ((select_node &)this){ blocked_thread };
47            this.my_result = my_result;
48        }
49
50        void ?{}( future(T) & this ) {
51                        this.waiters{};
52            this.state = FUTURE_EMPTY;
53            this.lock{};
54                }
55
56                // Reset future back to original state
57                void reset( future(T) & this ) with(this)
58        {
59            lock( lock );
60            if( ! waiters`isEmpty )
61                abort("Attempting to reset a future with blocked waiters");
62            state = FUTURE_EMPTY;
63            unlock( lock );
64        }
65
66                // check if the future is available
67        // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
68                bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
69
70
71        // memcpy wrapper to help copy values
72        void copy_T( T & from, T & to ) {
73            memcpy((void *)&to, (void *)&from, sizeof(T));
74        }
75
76        // internal helper to signal waiters off of the future
77        void _internal_flush( future(T) & this ) with(this) {
78            while( ! waiters`isEmpty ) {
79                if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
80                    break; // if handle_OR returns false then waiters is empty so break
81                select_node &s = try_pop_front( waiters );
82
83                if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
84                    copy_T( result, *(((future_node(T) &)s).my_result) );
85
86                wake_one( waiters, s );
87            }
88        }
89
90                // Fulfil the future, returns whether or not someone was unblocked
91                bool fulfil( future(T) & this, T val ) with(this) {
92            lock( lock );
93            if( state != FUTURE_EMPTY )
94                abort("Attempting to fulfil a future that has already been fulfilled");
95
96            copy_T( val, result );
97
98            bool ret_val = ! waiters`isEmpty;
99            state = FUTURE_FULFILLED;
100                        _internal_flush( this );
101            unlock( lock );
102            return ret_val;
103                }
104
105                // Wait for the future to be fulfilled
106                // Also return whether the thread had to block or not
107                [T, bool] get( future(T) & this ) with( this ) {
108            lock( lock );
109            T ret_val;
110            if( state == FUTURE_FULFILLED ) {
111                copy_T( result, ret_val );
112                unlock( lock );
113                return [ret_val, false];
114            }
115
116            future_node(T) node = { active_thread(), &ret_val };
117            insert_last( waiters, ((select_node &)node) );
118            unlock( lock );
119            park( );
120
121                        return [ret_val, true];
122                }
123
124                // Wait for the future to be fulfilled
125                T get( future(T) & this ) {
126                        [T, bool] tt;
127                        tt = get(this);
128                        return tt.0;
129                }
130
131        // Gets value if it is available and returns [ val, true ]
132        // otherwise returns [ default_val, false]
133        // will not block
134        [T, bool] try_get( future(T) & this ) with(this) {
135            lock( lock );
136            T ret_val;
137            if( state == FUTURE_FULFILLED ) {
138                copy_T( result, ret_val );
139                unlock( lock );
140                return [ret_val, true];
141            }
142            unlock( lock );
143
144            return [ret_val, false];
145        }
146
147        bool register_select( future(T) & this, select_node & s ) with(this) {
148            lock( lock );
149
150            // check if we can complete operation. If so race to establish winner in special OR case
151            if ( !s.park_counter && state != FUTURE_EMPTY ) {
152                if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
153                    unlock( lock );
154                    return false;
155                }
156            }
157
158            // future not ready -> insert select node and return
159            if( state == FUTURE_EMPTY ) {
160                insert_last( waiters, s );
161                unlock( lock );
162                return false;
163            }
164
165            __make_select_node_available( s );
166            unlock( lock );
167            return true;
168        }
169
170        bool unregister_select( future(T) & this, select_node & s ) with(this) {
171            if ( ! s`isListed ) return false;
172            lock( lock );
173            if ( s`isListed ) remove( s );
174            unlock( lock );
175            return false;
176        }
177
178        bool on_selected( future(T) & this, select_node & node ) { return true; }
179        }
180}
181
182//--------------------------------------------------------------------------------------------------------
183// These futures below do not support select statements so they may not have as many features as 'future'
184//  however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
185//  since it uses raw atomics and no locks
186//
187// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
188//  since it is monitor based and also is not compatible with select statements
189//--------------------------------------------------------------------------------------------------------
190
191forall( T ) {
192        struct single_future {
193                inline future_t;
194                T result;
195        };
196
197        static inline {
198                // Reset future back to original state
199                void reset(single_future(T) & this) { reset( (future_t&)this ); }
200
201                // check if the future is available
202                bool available( single_future(T) & this ) { return available( (future_t&)this ); }
203
204                // Mark the future as abandoned, meaning it will be deleted by the server
205                // This doesn't work beause of the potential need for a destructor
206                void abandon( single_future(T) & this );
207
208                // Fulfil the future, returns whether or not someone was unblocked
209                thread$ * fulfil( single_future(T) & this, T result ) {
210                        this.result = result;
211                        return fulfil( (future_t&)this );
212                }
213
214                // Wait for the future to be fulfilled
215                // Also return whether the thread had to block or not
216                [T, bool] wait( single_future(T) & this ) {
217                        bool r = wait( (future_t&)this );
218                        return [this.result, r];
219                }
220
221                // Wait for the future to be fulfilled
222                T wait( single_future(T) & this ) {
223                        [T, bool] tt;
224                        tt = wait(this);
225                        return tt.0;
226                }
227        }
228}
229
230forall( T ) {
231        monitor multi_future {
232                inline future_t;
233                condition blocked;
234                bool has_first;
235                T result;
236        };
237
238        static inline {
239                void ?{}(multi_future(T) & this) {
240                        this.has_first = false;
241                }
242
243                bool $first( multi_future(T) & mutex this ) {
244                        if (this.has_first) {
245                                wait( this.blocked );
246                                return false;
247                        }
248
249                        this.has_first = true;
250                        return true;
251                }
252
253                void $first_done( multi_future(T) & mutex this ) {
254                        this.has_first = false;
255                        signal_all( this.blocked );
256                }
257
258                // Reset future back to original state
259                void reset(multi_future(T) & mutex this) {
260                        if( this.has_first != false) abort("Attempting to reset a multi_future with at least one blocked threads");
261                        if( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
262                        reset( (future_t&)this );
263                }
264
265                // Fulfil the future, returns whether or not someone was unblocked
266                bool fulfil( multi_future(T) & this, T result ) {
267                        this.result = result;
268                        return fulfil( (future_t&)this ) != 0p;
269                }
270
271                // Wait for the future to be fulfilled
272                // Also return whether the thread had to block or not
273                [T, bool] wait( multi_future(T) & this ) {
274                        bool sw = $first( this );
275                        bool w = !sw;
276                        if ( sw ) {
277                                w = wait( (future_t&)this );
278                                $first_done( this );
279                        }
280
281                        return [this.result, w];
282                }
283
284                // Wait for the future to be fulfilled
285                T wait( multi_future(T) & this ) {
286                        return wait(this).0;
287                }
288        }
289}
Note: See TracBrowser for help on using the repository browser.