source: libcfa/src/concurrency/future.hfa@ 10a9479d

Last change on this file since 10a9479d was db19e1d, checked in by Andrew Beach <ajbeach@…>, 14 months ago

Changed the interpritation of () to be no parameters instead of any parameters. This had a lot of little changes because of this and some nearby clean-up. This includes some changes, including changing some generated functions to be fixed-args instead of variable-args, stripping out the place holder void parameter earlier, but it still shows up earlier in some cases that examine the parser directly. Also had to update the function generation tools. Have only tested with one --arch. Hopefully this all works out.

  • Property mode set to 100644
File size: 8.8 KB
Line 
1//
2// Cforall Version 1.0.0 Copyright (C) 2020 University of Waterloo
3//
4// The contents of this file are covered under the licence agreement in the
5// file "LICENCE" distributed with Cforall.
6//
7// concurrency/future.hfa --
8//
9// Author : Thierry Delisle & Peiran Hong & Colby Parsons
10// Created On : Wed Jan 06 17:33:18 2021
11// Last Modified By :
12// Last Modified On :
13// Update Count :
14//
15
16#pragma once
17
18#include "bits/locks.hfa"
19#include "monitor.hfa"
20#include "select.hfa"
21#include "locks.hfa"
22
23//----------------------------------------------------------------------------
24// future
25// I don't use future_t here since I need to use a lock for this future
26// since it supports multiple consumers
27// future_t is lockfree and uses atomics which aren't needed given we use locks here
28forall( T ) {
29 // enum { FUTURE_EMPTY = 0, FUTURE_FULFILLED = 1 }; // Enums seem to be broken so feel free to add this back afterwards
30
31 // temporary enum replacement
32 const int FUTURE_EMPTY = 0;
33 const int FUTURE_FULFILLED = 1;
34
35 struct future {
36 int state;
37 T result;
38 dlist( select_node ) waiters;
39 futex_mutex lock;
40 };
41 __CFA_SELECT_GET_TYPE( future(T) );
42
43 struct future_node {
44 inline select_node;
45 T * my_result;
46 };
47
48 static inline {
49
50 void ?{}( future_node(T) & this, thread$ * blocked_thread, T * my_result ) {
51 ((select_node &)this){ blocked_thread };
52 this.my_result = my_result;
53 }
54
55 void ?{}( future(T) & this ) {
56 this.waiters{};
57 this.state = FUTURE_EMPTY;
58 this.lock{};
59 }
60
61 // Reset future back to original state
62 void reset( future(T) & this ) with(this)
63 {
64 lock( lock );
65 if( ! waiters`isEmpty )
66 abort("Attempting to reset a future with blocked waiters");
67 state = FUTURE_EMPTY;
68 unlock( lock );
69 }
70
71 // check if the future is available
72 // currently no mutual exclusion because I can't see when you need this call to be synchronous or protected
73 bool available( future(T) & this ) { return __atomic_load_n( &this.state, __ATOMIC_RELAXED ); }
74
75
76 // memcpy wrapper to help copy values
77 void copy_T( T & from, T & to ) {
78 memcpy((void *)&to, (void *)&from, sizeof(T));
79 }
80
81 // internal helper to signal waiters off of the future
82 void _internal_flush( future(T) & this ) with(this) {
83 while( ! waiters`isEmpty ) {
84 if ( !__handle_waituntil_OR( waiters ) ) // handle special waituntil OR case
85 break; // if handle_OR returns false then waiters is empty so break
86 select_node &s = try_pop_front( waiters );
87
88 if ( s.clause_status == 0p ) // poke in result so that woken threads do not need to reacquire any locks
89 copy_T( result, *(((future_node(T) &)s).my_result) );
90
91 wake_one( waiters, s );
92 }
93 }
94
95 // Fulfil the future, returns whether or not someone was unblocked
96 bool fulfil( future(T) & this, T val ) with(this) {
97 lock( lock );
98 if( state != FUTURE_EMPTY )
99 abort("Attempting to fulfil a future that has already been fulfilled");
100
101 copy_T( val, result );
102
103 bool ret_val = ! waiters`isEmpty;
104 state = FUTURE_FULFILLED;
105 _internal_flush( this );
106 unlock( lock );
107 return ret_val;
108 }
109
110 // Wait for the future to be fulfilled
111 // Also return whether the thread had to block or not
112 [T, bool] get( future(T) & this ) with( this ) {
113 lock( lock );
114 T ret_val;
115 if( state == FUTURE_FULFILLED ) {
116 copy_T( result, ret_val );
117 unlock( lock );
118 return [ret_val, false];
119 }
120
121 future_node(T) node = { active_thread(), &ret_val };
122 insert_last( waiters, ((select_node &)node) );
123 unlock( lock );
124 park( );
125
126 return [ret_val, true];
127 }
128
129 // Wait for the future to be fulfilled
130 T get( future(T) & this ) {
131 [T, bool] tt;
132 tt = get(this);
133 return tt.0;
134 }
135
136 // Gets value if it is available and returns [ val, true ]
137 // otherwise returns [ default_val, false]
138 // will not block
139 [T, bool] try_get( future(T) & this ) with(this) {
140 lock( lock );
141 T ret_val;
142 if( state == FUTURE_FULFILLED ) {
143 copy_T( result, ret_val );
144 unlock( lock );
145 return [ret_val, true];
146 }
147 unlock( lock );
148
149 return [ret_val, false];
150 }
151
152 bool register_select( future(T) & this, select_node & s ) with(this) {
153 lock( lock );
154
155 // check if we can complete operation. If so race to establish winner in special OR case
156 if ( !s.park_counter && state != FUTURE_EMPTY ) {
157 if ( !__make_select_node_available( s ) ) { // we didn't win the race so give up on registering
158 unlock( lock );
159 return false;
160 }
161 }
162
163 // future not ready -> insert select node and return
164 if( state == FUTURE_EMPTY ) {
165 insert_last( waiters, s );
166 unlock( lock );
167 return false;
168 }
169
170 __make_select_node_available( s );
171 unlock( lock );
172 return true;
173 }
174
175 bool unregister_select( future(T) & this, select_node & s ) with(this) {
176 if ( ! s`isListed ) return false;
177 lock( lock );
178 if ( s`isListed ) remove( s );
179 unlock( lock );
180 return false;
181 }
182
183 bool on_selected( future(T) & this, select_node & node ) { return true; }
184 }
185}
186
187//--------------------------------------------------------------------------------------------------------
188// These futures below do not support select statements so they may not have as many features as 'future'
189// however the 'single_future' is cheap and cheerful and is most likely more performant than 'future'
190// since it uses raw atomics and no locks
191//
192// As far as 'multi_future' goes I can't see many use cases as it will be less performant than 'future'
193// since it is monitor based and also is not compatible with select statements
194//--------------------------------------------------------------------------------------------------------
195
196forall( T ) {
197 struct single_future {
198 inline future_t;
199 T result;
200 };
201
202 static inline {
203 // Reset future back to original state
204 void reset(single_future(T) & this) { reset( (future_t&)this ); }
205
206 // check if the future is available
207 bool available( single_future(T) & this ) { return available( (future_t&)this ); }
208
209 // Mark the future as abandoned, meaning it will be deleted by the server
210 // This doesn't work beause of the potential need for a destructor
211 void abandon( single_future(T) & this );
212
213 // Fulfil the future, returns whether or not someone was unblocked
214 thread$ * fulfil( single_future(T) & this, T result ) {
215 this.result = result;
216 return fulfil( (future_t&)this );
217 }
218
219 // Wait for the future to be fulfilled
220 // Also return whether the thread had to block or not
221 [T, bool] wait( single_future(T) & this ) {
222 bool r = wait( (future_t&)this );
223 return [this.result, r];
224 }
225
226 // Wait for the future to be fulfilled
227 T wait( single_future(T) & this ) {
228 [T, bool] tt;
229 tt = wait(this);
230 return tt.0;
231 }
232 }
233}
234
235forall( T ) {
236 monitor multi_future {
237 inline future_t;
238 condition blocked;
239 bool has_first;
240 T result;
241 };
242
243 static inline {
244 void ?{}(multi_future(T) & this) {
245 this.has_first = false;
246 }
247
248 bool $first( multi_future(T) & mutex this ) {
249 if (this.has_first) {
250 wait( this.blocked );
251 return false;
252 }
253
254 this.has_first = true;
255 return true;
256 }
257
258 void $first_done( multi_future(T) & mutex this ) {
259 this.has_first = false;
260 signal_all( this.blocked );
261 }
262
263 // Reset future back to original state
264 void reset(multi_future(T) & mutex this) {
265 if( this.has_first != false) abort("Attempting to reset a multi_future with at least one blocked threads");
266 if( !is_empty(this.blocked) ) abort("Attempting to reset a multi_future with multiple blocked threads");
267 reset( (future_t&)this );
268 }
269
270 // Fulfil the future, returns whether or not someone was unblocked
271 bool fulfil( multi_future(T) & this, T result ) {
272 this.result = result;
273 return fulfil( (future_t&)this ) != 0p;
274 }
275
276 // Wait for the future to be fulfilled
277 // Also return whether the thread had to block or not
278 [T, bool] wait( multi_future(T) & this ) {
279 bool sw = $first( this );
280 bool w = !sw;
281 if ( sw ) {
282 w = wait( (future_t&)this );
283 $first_done( this );
284 }
285
286 return [this.result, w];
287 }
288
289 // Wait for the future to be fulfilled
290 T wait( multi_future(T) & this ) {
291 return wait(this).0;
292 }
293 }
294}
Note: See TracBrowser for help on using the repository browser.