Changeset 2a301ff for libcfa/src/concurrency
- Timestamp:
- Aug 31, 2023, 11:31:15 PM (2 years ago)
- Branches:
- master
- Children:
- 950c58e
- Parents:
- 92355883 (diff), 686912c (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 17 edited
-
actor.hfa (modified) (13 diffs)
-
alarm.hfa (modified) (2 diffs)
-
channel.hfa (modified) (10 diffs)
-
coroutine.cfa (modified) (4 diffs)
-
coroutine.hfa (modified) (2 diffs)
-
future.hfa (modified) (2 diffs)
-
invoke.h (modified) (5 diffs)
-
iofwd.hfa (modified) (3 diffs)
-
kernel.cfa (modified) (1 diff)
-
kernel.hfa (modified) (2 diffs)
-
kernel/startup.cfa (modified) (1 diff)
-
locks.cfa (modified) (1 diff)
-
locks.hfa (modified) (9 diffs)
-
once.hfa (modified) (1 diff)
-
select.cfa (modified) (1 diff)
-
select.hfa (modified) (3 diffs)
-
stats.cfa (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r92355883 r2a301ff 39 39 // #define ACTOR_STATS 40 40 41 // used to run and only track missed queue gulps 42 #ifdef ACTOR_STATS 43 #define ACTOR_STATS_QUEUE_MISSED 44 #endif 45 41 46 // forward decls 42 47 struct actor; … … 48 53 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 54 struct request { 50 actor * receiver;51 message * msg;52 __receive_fn fn;55 actor * receiver; 56 message * msg; 57 __receive_fn fn; 53 58 }; 54 59 55 60 struct a_msg { 56 int m;61 int m; 57 62 }; 58 63 static inline void ?{}( request & this ) {} 59 64 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 this.receiver = receiver;61 this.msg = msg;62 this.fn = fn;65 this.receiver = receiver; 66 this.msg = msg; 67 this.fn = fn; 63 68 } 64 69 static inline void ?{}( request & this, request & copy ) { 65 this.receiver = copy.receiver;66 this.msg = copy.msg;67 this.fn = copy.fn;70 this.receiver = copy.receiver; 71 this.msg = copy.msg; 72 this.fn = copy.fn; 68 73 } 69 74 … … 71 76 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 77 struct copy_queue { 73 request * buffer;74 size_t count, buffer_size, index, utilized, last_size;78 request * buffer; 79 size_t count, buffer_size, index, utilized, last_size; 75 80 }; 76 81 static inline void ?{}( copy_queue & this ) {} 77 82 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 buffer_size = buf_size;79 buffer = aalloc( buffer_size );80 count = 0;81 utilized = 0;82 index = 0;83 last_size = 0;83 buffer_size = buf_size; 84 buffer = aalloc( buffer_size ); 85 count = 0; 86 utilized = 0; 87 index = 0; 88 last_size = 0; 84 89 } 85 90 static inline void ^?{}( copy_queue & this ) with(this) { 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );87 adelete(buffer);91 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 92 adelete(buffer); 88 93 } 89 94 90 95 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 if ( count >= buffer_size ) { // increase arr size92 last_size = buffer_size;93 buffer_size = 2 * buffer_size;94 buffer = realloc( buffer, sizeof( request ) * buffer_size );95 /* paranoid */ verify( buffer );96 }97 memcpy( &buffer[count], &elem, sizeof(request) );98 count++;96 if ( count >= buffer_size ) { // increase arr size 97 last_size = buffer_size; 98 buffer_size = 2 * buffer_size; 99 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 100 /* paranoid */ verify( buffer ); 101 } 102 memcpy( &buffer[count], &elem, sizeof(request) ); 103 count++; 99 104 } 100 105 … … 102 107 // it is not supported to call insert() before the array is fully empty 103 108 static inline request & remove( copy_queue & this ) with(this) { 104 if ( count > 0 ) {105 count--;106 size_t old_idx = index;107 index = count == 0 ? 0 : index + 1;108 return buffer[old_idx];109 }110 request * ret = 0p;111 return *0p;109 if ( count > 0 ) { 110 count--; 111 size_t old_idx = index; 112 index = count == 0 ? 0 : index + 1; 113 return buffer[old_idx]; 114 } 115 request * ret = 0p; 116 return *0p; 112 117 } 113 118 114 119 // try to reclaim some memory if less than half of buffer is utilized 115 120 static inline void reclaim( copy_queue & this ) with(this) { 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }117 utilized = 0;118 buffer_size--;119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory121 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 122 utilized = 0; 123 buffer_size--; 124 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 125 } 121 126 … … 123 128 124 129 struct work_queue { 125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 130 __spinlock_t mutex_lock; 131 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 132 copy_queue * c_queue; // current queue 133 volatile bool being_processed; // flag to prevent concurrent processing 134 #ifdef ACTOR_STATS 135 unsigned int id; 132 136 #endif 137 #ifdef ACTOR_STATS_QUEUE_MISSED 138 size_t missed; // transfers skipped due to being_processed flag being up 139 #endif 133 140 }; // work_queue 134 141 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 (*owned_queue){ buf_size };137 c_queue = owned_queue;138 being_processed = false;139 #ifdef ACTOR_STATS140 id = i;141 missed = 0;142 #endif142 owned_queue = alloc(); // allocated separately to avoid false sharing 143 (*owned_queue){ buf_size }; 144 c_queue = owned_queue; 145 being_processed = false; 146 #ifdef ACTOR_STATS 147 id = i; 148 missed = 0; 149 #endif 143 150 } 144 151 … … 147 154 148 155 static inline void insert( work_queue & this, request & elem ) with(this) { 149 lock( mutex_lock __cfaabi_dbg_ctx2 );150 insert( *c_queue, elem );151 unlock( mutex_lock );156 lock( mutex_lock __cfaabi_dbg_ctx2 ); 157 insert( *c_queue, elem ); 158 unlock( mutex_lock ); 152 159 } // insert 153 160 154 161 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 lock( mutex_lock __cfaabi_dbg_ctx2 );156 #ifdef __STEAL157 158 // check if queue is being processed elsewhere159 if ( unlikely( being_processed ) ) {160 #ifdef ACTOR_STATS161 missed++;162 #endif163 unlock( mutex_lock );164 return;165 }166 167 being_processed = c_queue->count != 0;168 #endif // __STEAL169 170 c_queue->utilized = c_queue->count;171 172 // swap copy queue ptrs173 copy_queue * temp = *transfer_to;174 *transfer_to = c_queue;175 c_queue = temp;176 unlock( mutex_lock );162 lock( mutex_lock __cfaabi_dbg_ctx2 ); 163 #ifdef __STEAL 164 165 // check if queue is being processed elsewhere 166 if ( unlikely( being_processed ) ) { 167 #ifdef ACTOR_STATS 168 missed++; 169 #endif 170 unlock( mutex_lock ); 171 return; 172 } 173 174 being_processed = c_queue->count != 0; 175 #endif // __STEAL 176 177 c_queue->utilized = c_queue->count; 178 179 // swap copy queue ptrs 180 copy_queue * temp = *transfer_to; 181 *transfer_to = c_queue; 182 c_queue = temp; 183 unlock( mutex_lock ); 177 184 } // transfer 178 185 179 186 // needed since some info needs to persist past worker lifetimes 180 187 struct worker_info { 181 volatile unsigned long long stamp;182 #ifdef ACTOR_STATS183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;184 unsigned long long processed;185 size_t gulps;186 #endif188 volatile unsigned long long stamp; 189 #ifdef ACTOR_STATS 190 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 191 unsigned long long processed; 192 size_t gulps; 193 #endif 187 194 }; 188 195 static inline void ?{}( worker_info & this ) { 189 #ifdef ACTOR_STATS190 this.stolen_from = 0;191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 #endif199 this.stamp = rdtscl();196 #ifdef ACTOR_STATS 197 this.stolen_from = 0; 198 this.try_steal = 0; // attempts to steal 199 this.stolen = 0; // successful steals 200 this.processed = 0; // requests processed 201 this.gulps = 0; // number of gulps 202 this.failed_swaps = 0; // steal swap failures 203 this.empty_stolen = 0; // queues empty after steal 204 this.msgs_stolen = 0; // number of messages stolen 205 #endif 206 this.stamp = rdtscl(); 200 207 } 201 208 … … 205 212 // #endif 206 213 thread worker { 207 work_queue ** request_queues;208 copy_queue * current_queue;209 executor * executor_;210 unsigned int start, range;211 int id;214 work_queue ** request_queues; 215 copy_queue * current_queue; 216 executor * executor_; 217 unsigned int start, range; 218 int id; 212 219 }; 213 220 … … 215 222 // aggregate counters for statistics 216 223 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;224 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 225 #endif 219 226 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 unsigned int start, unsigned int range, int id ) {221 ((thread &)this){ clu };222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers227 unsigned int start, unsigned int range, int id ) { 228 ((thread &)this){ clu }; 229 this.request_queues = request_queues; // array of all queues 230 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 231 this.executor_ = executor_; // pointer to current executor 232 this.start = start; // start of worker's subrange of request_queues 233 this.range = range; // size of worker's subrange of request_queues 234 this.id = id; // worker's id and index in array of workers 228 235 } 229 236 230 237 static bool no_steal = false; 231 238 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; // array of virtual processors adding parallelism for workers234 work_queue * request_queues; // master array of work request queues235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues240 bool seperate_clus; // use same or separate cluster for executor241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads239 cluster * cluster; // if workers execute on separate cluster 240 processor ** processors; // array of virtual processors adding parallelism for workers 241 work_queue * request_queues; // master array of work request queues 242 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 243 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 244 worker ** workers; // array of workers executing work requests 245 worker_info * w_infos; // array of info about each worker 246 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 247 bool seperate_clus; // use same or separate cluster for executor 248 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 249 }; // executor 243 250 … … 246 253 // #endif 247 254 static inline void ^?{}( worker & mutex this ) with(this) { 248 #ifdef ACTOR_STATS249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);256 257 // per worker steal stats (uncomment alongside the lock above this routine to print)258 // lock( out_lock __cfaabi_dbg_ctx2 );259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );260 // int count = 0;261 // int count2 = 0;262 // for ( i; range ) {263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 // }270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );272 // unlock( out_lock );273 #endif255 #ifdef ACTOR_STATS 256 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 257 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 258 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 259 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 260 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 261 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 262 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 263 264 // per worker steal stats (uncomment alongside the lock above this routine to print) 265 // lock( out_lock __cfaabi_dbg_ctx2 ); 266 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 267 // int count = 0; 268 // int count2 = 0; 269 // for ( i; range ) { 270 // if ( replaced_queue[start + i] > 0 ){ 271 // count++; 272 // // printf("%d: %u, ",i, replaced_queue[i]); 273 // } 274 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 275 // count2++; 276 // } 277 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 278 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 279 // unlock( out_lock ); 280 #endif 274 281 } 275 282 276 283 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );278 this.nprocessors = nprocessors;279 this.nworkers = nworkers;280 this.nrqueues = nrqueues;281 this.seperate_clus = seperate_clus;282 this.is_shutdown = false;283 284 if ( nworkers == nrqueues )285 no_steal = true;286 287 #ifdef ACTOR_STATS288 // stolen_arr = aalloc( nrqueues );289 // replaced_queue = aalloc( nrqueues );290 __total_workers = nworkers;291 #endif292 293 if ( seperate_clus ) {294 cluster = alloc();295 (*cluster){};296 } else cluster = active_cluster();297 298 request_queues = aalloc( nrqueues );299 worker_req_queues = aalloc( nrqueues );300 for ( i; nrqueues ) {301 request_queues[i]{ buf_size, i };302 worker_req_queues[i] = &request_queues[i];303 }304 305 processors = aalloc( nprocessors );306 for ( i; nprocessors )307 (*(processors[i] = alloc())){ *cluster };308 309 local_queues = aalloc( nworkers );310 workers = aalloc( nworkers );311 w_infos = aalloc( nworkers );312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;313 314 for ( i; nworkers ) {315 w_infos[i]{};316 local_queues[i]{ buf_size };317 }318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {320 range = reqPerWorker + ( i < extras ? 1 : 0 );321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };322 } // for284 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 285 this.nprocessors = nprocessors; 286 this.nworkers = nworkers; 287 this.nrqueues = nrqueues; 288 this.seperate_clus = seperate_clus; 289 this.is_shutdown = false; 290 291 if ( nworkers == nrqueues ) 292 no_steal = true; 293 294 #ifdef ACTOR_STATS 295 // stolen_arr = aalloc( nrqueues ); 296 // replaced_queue = aalloc( nrqueues ); 297 __total_workers = nworkers; 298 #endif 299 300 if ( seperate_clus ) { 301 cluster = alloc(); 302 (*cluster){}; 303 } else cluster = active_cluster(); 304 305 request_queues = aalloc( nrqueues ); 306 worker_req_queues = aalloc( nrqueues ); 307 for ( i; nrqueues ) { 308 request_queues[i]{ buf_size, i }; 309 worker_req_queues[i] = &request_queues[i]; 310 } 311 312 processors = aalloc( nprocessors ); 313 for ( i; nprocessors ) 314 (*(processors[i] = alloc())){ *cluster }; 315 316 local_queues = aalloc( nworkers ); 317 workers = aalloc( nworkers ); 318 w_infos = aalloc( nworkers ); 319 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 320 321 for ( i; nworkers ) { 322 w_infos[i]{}; 323 local_queues[i]{ buf_size }; 324 } 325 326 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 327 range = reqPerWorker + ( i < extras ? 1 : 0 ); 328 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 329 } // for 323 330 } 324 331 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 336 330 337 static inline void ^?{}( executor & this ) with(this) { 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 338 is_shutdown = true; 339 340 for ( i; nworkers ) 341 delete( workers[i] ); 342 343 for ( i; nprocessors ) { 344 delete( processors[i] ); 345 } // for 346 347 #ifdef ACTOR_STATS_QUEUE_MISSED 348 size_t misses = 0; 349 for ( i; nrqueues ) { 350 misses += worker_req_queues[i]->missed; 351 } 352 // adelete( stolen_arr ); 353 // adelete( replaced_queue ); 354 #endif 355 356 adelete( workers ); 357 adelete( w_infos ); 358 adelete( local_queues ); 359 adelete( request_queues ); 360 adelete( worker_req_queues ); 361 adelete( processors ); 362 if ( seperate_clus ) delete( cluster ); 363 364 #ifdef ACTOR_STATS // print formatted stats 365 printf(" Actor System Stats:\n"); 366 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 367 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 368 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 369 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 370 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 371 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 372 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 373 #endif 374 375 #ifndef ACTOR_STATS 376 #ifdef ACTOR_STATS_QUEUE_MISSED 377 printf("\t%lu", misses); 347 378 #endif 348 349 adelete( workers );350 adelete( w_infos );351 adelete( local_queues );352 adelete( request_queues );353 adelete( worker_req_queues );354 adelete( processors );355 if ( seperate_clus ) delete( cluster );356 357 #ifdef ACTOR_STATS // print formatted stats358 printf(" Actor System Stats:\n");359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);366 379 #endif 367 380 368 381 } 369 382 … … 372 385 373 386 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 #ifdef __CFA_DEBUG__375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;376 377 // reserve MAX for dead actors378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;379 return temp;380 #else381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;382 #endif387 #ifdef __CFA_DEBUG__ 388 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 389 390 // reserve MAX for dead actors 391 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 392 return temp; 393 #else 394 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 395 #endif 383 396 } // tickets 384 397 385 398 // TODO: update globals in this file to be static fields once the static fields project is done 386 399 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system388 static size_t __num_actors_ = 0; // number of actor objects in system400 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 401 static size_t __num_actors_ = 0; // number of actor objects in system 389 402 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 403 struct actor { 391 size_t ticket;// executor-queue handle392 allocation allocation_;// allocation action393 inline virtual_dtor;404 size_t ticket; // executor-queue handle 405 allocation alloc; // allocation action 406 inline virtual_dtor; 394 407 }; 395 408 396 409 static inline void ?{}( actor & this ) with(this) { 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive398 // member must be called to end it399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );400 allocation_= Nodelete;401 ticket = __get_next_ticket( *__actor_executor_ );402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );403 #ifdef ACTOR_STATS404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );405 #endif410 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 411 // member must be called to end it 412 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 413 alloc = Nodelete; 414 ticket = __get_next_ticket( *__actor_executor_ ); 415 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 416 #ifdef ACTOR_STATS 417 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 418 #endif 406 419 } 407 420 408 421 static inline void check_actor( actor & this ) { 409 if ( this.allocation_!= Nodelete ) {410 switch( this.allocation_) {411 case Delete: delete( &this ); break;412 case Destroy:413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 ^?{}(this);415 break;416 case Finished:417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 break;419 default: ;// stop warning420 }421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated423 unpark( __actor_executor_thd );424 }425 }422 if ( this.alloc != Nodelete ) { 423 switch( this.alloc ) { 424 case Delete: delete( &this ); break; 425 case Destroy: 426 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 427 ^?{}(this); 428 break; 429 case Finished: 430 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 431 break; 432 default: ; // stop warning 433 } 434 435 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 436 unpark( __actor_executor_thd ); 437 } 438 } 426 439 } 427 440 428 441 struct message { 429 allocation allocation_;// allocation action430 inline virtual_dtor;442 allocation alloc; // allocation action 443 inline virtual_dtor; 431 444 }; 432 445 433 446 static inline void ?{}( message & this ) { 434 this.allocation_= Nodelete;447 this.alloc = Nodelete; 435 448 } 436 449 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.allocation_, &alloc, sizeof(allocation) );// optimization to elide ctor438 DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n");450 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 451 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 439 452 } 440 453 static inline void ^?{}( message & this ) with(this) { 441 CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); ) 454 CFA_DEBUG( 455 if ( alloc == Nodelete ) { 456 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", 457 (long int)getpid(), &this ); 458 } 459 ) 442 460 } 443 461 444 462 static inline void check_message( message & this ) { 445 switch ( this.allocation_ ) { // analyze message status 446 case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break; 447 case Delete: delete( &this ); break; 448 case Destroy: ^?{}( this ); break; 449 case Finished: break; 450 } // switch 451 } 452 static inline void set_allocation( message & this, allocation state ) { 453 this.allocation_ = state; 463 switch ( this.alloc ) { // analyze message status 464 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 465 case Delete: delete( &this ); break; 466 case Destroy: ^?{}( this ); break; 467 case Finished: break; 468 } // switch 469 } 470 static inline allocation set_allocation( message & this, allocation state ) { 471 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 472 allocation prev = this.alloc; 473 this.alloc = state; 474 return prev; 475 } 476 static inline allocation get_allocation( message & this ) { 477 return this.alloc; 454 478 } 455 479 456 480 static inline void deliver_request( request & this ) { 457 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );458 actor * base_actor;459 message * base_msg;460 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );461 base_actor->allocation_ = temp; 462 check_message( *base_msg );463 check_actor( *base_actor );481 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 482 actor * base_actor; 483 message * base_msg; 484 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 485 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 486 check_message( *base_msg ); 487 check_actor( *base_actor ); 464 488 } 465 489 … … 467 491 // returns ptr to newly owned queue if swap succeeds 468 492 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 469 work_queue * my_queue = request_queues[my_idx];470 work_queue * other_queue = request_queues[victim_idx];471 472 // if either queue is 0p then they are in the process of being stolen473 if ( other_queue == 0p ) return 0p;474 475 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false476 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )477 return 0p;478 479 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false480 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {481 /* paranoid */ verify( request_queues[my_idx] == 0p );482 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val483 return 0p;484 }485 486 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically487 request_queues[my_idx] = other_queue; // last write does not need to be atomic488 return other_queue;493 work_queue * my_queue = request_queues[my_idx]; 494 work_queue * other_queue = request_queues[victim_idx]; 495 496 // if either queue is 0p then they are in the process of being stolen 497 if ( other_queue == 0p ) return 0p; 498 499 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 500 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 501 return 0p; 502 503 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 504 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 505 /* paranoid */ verify( request_queues[my_idx] == 0p ); 506 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 507 return 0p; 508 } 509 510 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 511 request_queues[my_idx] = other_queue; // last write does not need to be atomic 512 return other_queue; 489 513 } 490 514 491 515 // once a worker to steal from has been chosen, choose queue to steal from 492 516 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 493 // have to calculate victim start and range since victim may be deleted before us in shutdown494 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;495 const unsigned int extras = executor_->nrqueues % executor_->nworkers;496 unsigned int vic_start, vic_range;497 if ( extras > victim_id ) {498 vic_range = queues_per_worker + 1;499 vic_start = vic_range * victim_id;500 } else {501 vic_start = extras + victim_id * queues_per_worker;502 vic_range = queues_per_worker;503 }504 unsigned int start_idx = prng( vic_range );505 506 unsigned int tries = 0;507 work_queue * curr_steal_queue;508 509 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {510 tries++;511 curr_steal_queue = request_queues[ i + vic_start ];512 // avoid empty queues and queues that are being operated on513 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )514 continue;515 516 #ifdef ACTOR_STATS517 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );518 if ( curr_steal_queue ) {519 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;520 executor_->w_infos[id].stolen++;521 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;522 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);523 // replaced_queue[swap_idx]++;524 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);525 } else {526 executor_->w_infos[id].failed_swaps++;527 }528 #else529 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );530 #endif // ACTOR_STATS531 532 return;533 }534 535 return;517 // have to calculate victim start and range since victim may be deleted before us in shutdown 518 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 519 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 520 unsigned int vic_start, vic_range; 521 if ( extras > victim_id ) { 522 vic_range = queues_per_worker + 1; 523 vic_start = vic_range * victim_id; 524 } else { 525 vic_start = extras + victim_id * queues_per_worker; 526 vic_range = queues_per_worker; 527 } 528 unsigned int start_idx = prng( vic_range ); 529 530 unsigned int tries = 0; 531 work_queue * curr_steal_queue; 532 533 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 534 tries++; 535 curr_steal_queue = request_queues[ i + vic_start ]; 536 // avoid empty queues and queues that are being operated on 537 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 538 continue; 539 540 #ifdef ACTOR_STATS 541 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 542 if ( curr_steal_queue ) { 543 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 544 executor_->w_infos[id].stolen++; 545 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 546 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 547 // replaced_queue[swap_idx]++; 548 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 549 } else { 550 executor_->w_infos[id].failed_swaps++; 551 } 552 #else 553 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 554 #endif // ACTOR_STATS 555 556 return; 557 } 558 559 return; 536 560 } 537 561 538 562 // choose a worker to steal from 539 563 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 540 #if RAND541 unsigned int victim = prng( executor_->nworkers );542 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;543 choose_queue( this, victim, swap_idx );544 #elif SEARCH545 unsigned long long min = MAX; // smaller timestamp means longer since service546 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math547 int n_workers = executor_->nworkers;548 unsigned long long curr_stamp;549 int scount = 1;550 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {551 curr_stamp = executor_->w_infos[i].stamp;552 if ( curr_stamp < min ) {553 min = curr_stamp;554 min_id = i;555 }556 }557 choose_queue( this, min_id, swap_idx );558 #endif564 #if RAND 565 unsigned int victim = prng( executor_->nworkers ); 566 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 567 choose_queue( this, victim, swap_idx ); 568 #elif SEARCH 569 unsigned long long min = MAX; // smaller timestamp means longer since service 570 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 571 int n_workers = executor_->nworkers; 572 unsigned long long curr_stamp; 573 int scount = 1; 574 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 575 curr_stamp = executor_->w_infos[i].stamp; 576 if ( curr_stamp < min ) { 577 min = curr_stamp; 578 min_id = i; 579 } 580 } 581 choose_queue( this, min_id, swap_idx ); 582 #endif 559 583 } 560 584 561 585 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 562 586 void main( worker & this ) with(this) { 563 // #ifdef ACTOR_STATS 564 // for ( i; executor_->nrqueues ) { 565 // replaced_queue[i] = 0; 566 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 567 // } 568 // #endif 569 570 // threshold of empty queues we see before we go stealing 571 const unsigned int steal_threshold = 2 * range; 572 573 // Store variable data here instead of worker struct to avoid any potential false sharing 574 unsigned int empty_count = 0; 575 request & req; 576 work_queue * curr_work_queue; 577 578 Exit: 579 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 580 curr_work_queue = request_queues[i + start]; 581 582 // check if queue is empty before trying to gulp it 583 if ( is_empty( *curr_work_queue->c_queue ) ) { 584 #ifdef __STEAL 585 empty_count++; 586 if ( empty_count < steal_threshold ) continue; 587 #else 588 continue; 589 #endif 590 } 591 transfer( *curr_work_queue, ¤t_queue ); 592 #ifdef ACTOR_STATS 593 executor_->w_infos[id].gulps++; 594 #endif // ACTOR_STATS 595 #ifdef __STEAL 596 if ( is_empty( *current_queue ) ) { 597 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 598 empty_count++; 599 if ( empty_count < steal_threshold ) continue; 600 empty_count = 0; 601 602 CHECK_TERMINATION; // check for termination 603 604 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 605 606 #ifdef ACTOR_STATS 607 executor_->w_infos[id].try_steal++; 608 #endif // ACTOR_STATS 609 610 steal_work( this, start + prng( range ) ); 611 continue; 612 } 613 #endif // __STEAL 614 while ( ! is_empty( *current_queue ) ) { 615 #ifdef ACTOR_STATS 616 executor_->w_infos[id].processed++; 617 #endif 618 &req = &remove( *current_queue ); 619 if ( !&req ) continue; 620 deliver_request( req ); 621 } 622 #ifdef __STEAL 623 curr_work_queue->being_processed = false; // set done processing 624 empty_count = 0; // we found work so reset empty counter 587 // #ifdef ACTOR_STATS 588 // for ( i; executor_->nrqueues ) { 589 // replaced_queue[i] = 0; 590 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 591 // } 592 // #endif 593 594 // threshold of empty queues we see before we go stealing 595 const unsigned int steal_threshold = 2 * range; 596 597 // Store variable data here instead of worker struct to avoid any potential false sharing 598 unsigned int empty_count = 0; 599 request & req; 600 work_queue * curr_work_queue; 601 602 Exit: 603 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 604 curr_work_queue = request_queues[i + start]; 605 606 #ifndef __STEAL 607 CHECK_TERMINATION; 625 608 #endif 626 627 CHECK_TERMINATION; 628 629 // potentially reclaim some of the current queue's vector space if it is unused 630 reclaim( *current_queue ); 631 } // for 609 610 // check if queue is empty before trying to gulp it 611 if ( is_empty( *curr_work_queue->c_queue ) ) { 612 #ifdef __STEAL 613 empty_count++; 614 if ( empty_count < steal_threshold ) continue; 615 #else 616 continue; 617 #endif 618 } 619 transfer( *curr_work_queue, ¤t_queue ); 620 #ifdef ACTOR_STATS 621 executor_->w_infos[id].gulps++; 622 #endif // ACTOR_STATS 623 #ifdef __STEAL 624 if ( is_empty( *current_queue ) ) { 625 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 626 empty_count++; 627 if ( empty_count < steal_threshold ) continue; 628 empty_count = 0; 629 630 CHECK_TERMINATION; // check for termination 631 632 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 633 634 #ifdef ACTOR_STATS 635 executor_->w_infos[id].try_steal++; 636 #endif // ACTOR_STATS 637 638 steal_work( this, start + prng( range ) ); 639 continue; 640 } 641 #endif // __STEAL 642 while ( ! is_empty( *current_queue ) ) { 643 #ifdef ACTOR_STATS 644 executor_->w_infos[id].processed++; 645 #endif 646 &req = &remove( *current_queue ); 647 if ( !&req ) continue; 648 deliver_request( req ); 649 } 650 #ifdef __STEAL 651 curr_work_queue->being_processed = false; // set done processing 652 empty_count = 0; // we found work so reset empty counter 653 #endif 654 655 CHECK_TERMINATION; 656 657 // potentially reclaim some of the current queue's vector space if it is unused 658 reclaim( *current_queue ); 659 } // for 632 660 } 633 661 634 662 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 635 insert( request_queues[ticket], req);663 insert( request_queues[ticket], req); 636 664 } 637 665 638 666 static inline void send( actor & this, request & req ) { 639 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );640 send( *__actor_executor_, req, this.ticket );667 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 668 send( *__actor_executor_, req, this.ticket ); 641 669 } 642 670 643 671 static inline void __reset_stats() { 644 #ifdef ACTOR_STATS645 __total_tries = 0;646 __total_stolen = 0;647 __all_gulps = 0;648 __total_failed_swaps = 0;649 __total_empty_stolen = 0;650 __all_processed = 0;651 __num_actors_stats = 0;652 __all_msgs_stolen = 0;653 #endif672 #ifdef ACTOR_STATS 673 __total_tries = 0; 674 __total_stolen = 0; 675 __all_gulps = 0; 676 __total_failed_swaps = 0; 677 __total_empty_stolen = 0; 678 __all_processed = 0; 679 __num_actors_stats = 0; 680 __all_msgs_stolen = 0; 681 #endif 654 682 } 655 683 656 684 static inline void start_actor_system( size_t num_thds ) { 657 __reset_stats();658 __actor_executor_thd = active_thread();659 __actor_executor_ = alloc();660 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };685 __reset_stats(); 686 __actor_executor_thd = active_thread(); 687 __actor_executor_ = alloc(); 688 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 661 689 } 662 690 … … 664 692 665 693 static inline void start_actor_system( executor & this ) { 666 __reset_stats();667 __actor_executor_thd = active_thread();668 __actor_executor_ = &this;669 __actor_executor_passed = true;694 __reset_stats(); 695 __actor_executor_thd = active_thread(); 696 __actor_executor_ = &this; 697 __actor_executor_passed = true; 670 698 } 671 699 672 700 static inline void stop_actor_system() { 673 park( ); // will beunparked when actor system is finished674 675 if ( !__actor_executor_passed ) delete( __actor_executor_ );676 __actor_executor_ = 0p;677 __actor_executor_thd = 0p;678 __next_ticket = 0;679 __actor_executor_passed = false;701 park(); // unparked when actor system is finished 702 703 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 704 __actor_executor_ = 0p; 705 __actor_executor_thd = 0p; 706 __next_ticket = 0; 707 __actor_executor_passed = false; 680 708 } 681 709 682 710 // Default messages to send to any actor to change status 683 711 // assigned at creation to __base_msg_finished to avoid unused message warning 684 message __base_msg_finished @= { .allocation_ : Finished }; 685 struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished; 686 struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 687 struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished; 688 689 allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; } 690 allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; } 691 allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; } 692 712 message __base_msg_finished @= { .alloc : Finished }; 713 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; 714 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 715 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 716 717 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; } 718 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 719 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; } -
libcfa/src/concurrency/alarm.hfa
r92355883 r2a301ff 10 10 // Created On : Fri Jun 2 11:31:25 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Mon Mar 26 16:25:41 201813 // Update Count : 1 112 // Last Modified On : Wed Aug 30 21:27:40 2023 13 // Update Count : 12 14 14 // 15 15 … … 23 23 #include "time.hfa" 24 24 25 #include "co ntainers/list.hfa"25 #include "collections/list.hfa" 26 26 27 27 struct thread$; -
libcfa/src/concurrency/channel.hfa
r92355883 r2a301ff 68 68 #endif 69 69 }; 70 static inline void ?{}( channel(T) & this, channel(T) this2 ) = void; 71 static inline void ?=?( channel(T) & this, channel(T) this2 ) = void; 70 72 71 73 static inline void ?{}( channel(T) &c, size_t _size ) with(c) { … … 326 328 return retval; 327 329 } 330 static inline void remove( channel(T) & chan ) { T elem = (T)remove( chan ); } 331 332 333 /////////////////////////////////////////////////////////////////////////////////////////// 334 // The following is Go-style operator support for channels 335 /////////////////////////////////////////////////////////////////////////////////////////// 336 337 static inline void ?<<?( channel(T) & chan, T elem ) { insert( chan, elem ); } 338 static inline void ?<<?( T & ret, channel(T) & chan ) { ret = remove( chan ); } 328 339 329 340 /////////////////////////////////////////////////////////////////////////////////////////// … … 340 351 unlock( mutex_lock ); 341 352 342 // only return true when not special OR case , not exceptional calseand status is SAT343 return ( node.extra == 0p || !node.park_counter )? false : *node.clause_status == __SELECT_SAT;353 // only return true when not special OR case and status is SAT 354 return !node.park_counter ? false : *node.clause_status == __SELECT_SAT; 344 355 } 345 356 … … 363 374 // type used by select statement to capture a chan read as the selected operation 364 375 struct chan_read { 365 T &ret;366 channel(T) &chan;376 T * ret; 377 channel(T) * chan; 367 378 }; 368 369 static inline void ?{}( chan_read(T) & cr, channel(T) & chan, T & ret ) { 370 &cr.chan = &chan; 371 &cr.ret = &ret; 372 } 373 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ chan, ret }; return cr; } 374 375 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(this.chan, this) { 376 __closed_remove( chan, ret ); 379 __CFA_SELECT_GET_TYPE( chan_read(T) ); 380 381 static inline void ?{}( chan_read(T) & cr, channel(T) * chan, T * ret ) { 382 cr.chan = chan; 383 cr.ret = ret; 384 } 385 static inline chan_read(T) ?<<?( T & ret, channel(T) & chan ) { chan_read(T) cr{ &chan, &ret }; return cr; } 386 387 static inline void __handle_select_closed_read( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 388 __closed_remove( *chan, *ret ); 377 389 // if we get here then the insert succeeded 378 390 __make_select_node_available( node ); 379 391 } 380 392 381 static inline bool register_select( chan_read(T) & this, select_node & node ) with( this.chan, this) {382 lock( mutex_lock ); 383 node.extra = &ret; // set .extra so that if it == 0p later in on_selected it is due to channel close393 static inline bool register_select( chan_read(T) & this, select_node & node ) with(*this.chan, this) { 394 lock( mutex_lock ); 395 node.extra = ret; // set .extra so that if it == 0p later in on_selected it is due to channel close 384 396 385 397 #ifdef CHAN_STATS … … 396 408 397 409 if ( __handle_pending( prods, node ) ) { 398 __prods_handoff( chan,ret );410 __prods_handoff( *chan, *ret ); 399 411 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 400 412 unlock( mutex_lock ); … … 422 434 ZeroSize: if ( size == 0 && !prods`isEmpty ) { 423 435 if ( !__handle_waituntil_OR( prods ) ) break ZeroSize; 424 __prods_handoff( chan,ret );436 __prods_handoff( *chan, *ret ); 425 437 __set_avail_then_unlock( node, mutex_lock ); 426 438 return true; … … 439 451 440 452 // Remove from buffer 441 __do_remove( chan,ret );453 __do_remove( *chan, *ret ); 442 454 __set_avail_then_unlock( node, mutex_lock ); 443 455 return true; 444 456 } 445 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 446 static inline void on_selected( chan_read(T) & this, select_node & node ) with(this) { 447 if ( node.extra == 0p ) // check if woken up due to closed channel 448 __closed_remove( chan, ret ); 457 static inline bool unregister_select( chan_read(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 458 static inline bool on_selected( chan_read(T) & this, select_node & node ) with(this) { 459 if ( unlikely(node.extra == 0p) ) { 460 if ( !exception_in_flight() ) __closed_remove( *chan, *ret ); // check if woken up due to closed channel 461 else return false; 462 } 449 463 // This is only reachable if not closed or closed exception was handled 450 } 464 return true; 465 } 466 467 // type used by select statement to capture a chan read as the selected operation that doesn't have a param to read to 468 struct chan_read_no_ret { 469 T retval; 470 chan_read( T ) c_read; 471 }; 472 __CFA_SELECT_GET_TYPE( chan_read_no_ret(T) ); 473 474 static inline void ?{}( chan_read_no_ret(T) & this, channel(T) & chan ) { 475 this.c_read{ &chan, &this.retval }; 476 } 477 478 static inline chan_read_no_ret(T) remove( channel(T) & chan ) { chan_read_no_ret(T) c_read{ chan }; return c_read; } 479 static inline bool register_select( chan_read_no_ret(T) & this, select_node & node ) { 480 this.c_read.ret = &this.retval; 481 return register_select( this.c_read, node ); 482 } 483 static inline bool unregister_select( chan_read_no_ret(T) & this, select_node & node ) { return unregister_select( this.c_read, node ); } 484 static inline bool on_selected( chan_read_no_ret(T) & this, select_node & node ) { return on_selected( this.c_read, node ); } 451 485 452 486 // type used by select statement to capture a chan write as the selected operation 453 487 struct chan_write { 454 488 T elem; 455 channel(T) &chan;489 channel(T) * chan; 456 490 }; 457 458 static inline void ?{}( chan_write(T) & cw, channel(T) & chan, T elem ) { 459 &cw.chan = &chan; 491 __CFA_SELECT_GET_TYPE( chan_write(T) ); 492 493 static inline void ?{}( chan_write(T) & cw, channel(T) * chan, T elem ) { 494 cw.chan = chan; 460 495 memcpy( (void *)&cw.elem, (void *)&elem, sizeof(T) ); 461 496 } 462 static inline chan_write(T) ?>>?( T elem, channel(T) & chan ) { chan_write(T) cw{ chan, elem }; return cw; } 463 464 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(this.chan, this) { 465 __closed_insert( chan, elem ); 497 static inline chan_write(T) ?<<?( channel(T) & chan, T elem ) { chan_write(T) cw{ &chan, elem }; return cw; } 498 static inline chan_write(T) insert( T elem, channel(T) & chan) { chan_write(T) cw{ &chan, elem }; return cw; } 499 500 static inline void __handle_select_closed_write( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 501 __closed_insert( *chan, elem ); 466 502 // if we get here then the insert succeeded 467 503 __make_select_node_available( node ); 468 504 } 469 505 470 static inline bool register_select( chan_write(T) & this, select_node & node ) with( this.chan, this) {506 static inline bool register_select( chan_write(T) & this, select_node & node ) with(*this.chan, this) { 471 507 lock( mutex_lock ); 472 508 node.extra = &elem; // set .extra so that if it == 0p later in on_selected it is due to channel close … … 486 522 487 523 if ( __handle_pending( cons, node ) ) { 488 __cons_handoff( chan, elem );524 __cons_handoff( *chan, elem ); 489 525 __make_select_node_sat( node ); // need to to mark SAT now that we know operation is done or else threads could get stuck in __mark_select_node 490 526 unlock( mutex_lock ); … … 513 549 ConsEmpty: if ( !cons`isEmpty ) { 514 550 if ( !__handle_waituntil_OR( cons ) ) break ConsEmpty; 515 __cons_handoff( chan, elem );551 __cons_handoff( *chan, elem ); 516 552 __set_avail_then_unlock( node, mutex_lock ); 517 553 return true; … … 530 566 531 567 // otherwise carry out write either via normal insert 532 __buf_insert( chan, elem );568 __buf_insert( *chan, elem ); 533 569 __set_avail_then_unlock( node, mutex_lock ); 534 570 return true; 535 571 } 536 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( this.chan, node ); } 537 538 static inline void on_selected( chan_write(T) & this, select_node & node ) with(this) { 539 if ( node.extra == 0p ) // check if woken up due to closed channel 540 __closed_insert( chan, elem ); 541 572 static inline bool unregister_select( chan_write(T) & this, select_node & node ) { return unregister_chan( *this.chan, node ); } 573 574 static inline bool on_selected( chan_write(T) & this, select_node & node ) with(this) { 575 if ( unlikely(node.extra == 0p) ) { 576 if ( !exception_in_flight() ) __closed_insert( *chan, elem ); // check if woken up due to closed channel 577 else return false; 578 } 542 579 // This is only reachable if not closed or closed exception was handled 580 return true; 543 581 } 544 582 -
libcfa/src/concurrency/coroutine.cfa
r92355883 r2a301ff 28 28 #include "kernel/private.hfa" 29 29 #include "exception.hfa" 30 #include "exception.h" 30 31 #include "math.hfa" 31 32 … … 77 78 free( desc->cancellation ); 78 79 desc->cancellation = 0p; 80 } 81 82 // helper for popping from coroutine's ehm buffer 83 inline nonlocal_exception * pop_ehm_head( coroutine$ * this ) { 84 lock( this->ehm_state.buffer_lock __cfaabi_dbg_ctx2 ); 85 nonlocal_exception * nl_ex = pop_head( this->ehm_state.ehm_buffer ); 86 unlock( this->ehm_state.buffer_lock ); 87 return nl_ex; 79 88 } 80 89 … … 121 130 last = 0p; 122 131 cancellation = 0p; 132 ehm_state.ehm_buffer{}; 133 ehm_state.buffer_lock{}; 134 ehm_state.ehm_enabled = false; 123 135 } 124 136 125 137 void ^?{}(coroutine$& this) libcfa_public { 138 // handle any leftover pending non-local exceptions 139 nonlocal_exception * nl_ex = pop_ehm_head( &this ); 140 unsigned unhandled_ex = 0; 141 142 // if any leftover exceptions handle 143 while ( nl_ex != 0p ){ 144 unhandled_ex++; 145 free( nl_ex->the_exception ); 146 free( nl_ex ); 147 nl_ex = pop_ehm_head( &this ); 148 } 149 150 #ifdef __CFA_DEBUG__ 151 if ( unhandled_ex > 0 ) 152 printf( "Warning: Coroutine %p exited with %u pending nonlocal exceptions.\n", &this, unhandled_ex ); 153 #endif 154 126 155 if(this.state != Halted && this.state != Start && this.state != Primed) { 127 156 coroutine$ * src = active_coroutine(); … … 283 312 } 284 313 314 315 //////////////////////////////////////////////////////////////////////////////////////////////////// 316 // non local ehm routines 317 318 void defaultResumeAtHandler( exception_t * except ) { 319 __cfaehm_allocate_exception( except ); 320 free( except ); 321 __cfaehm_begin_unwind( (void(*)(exception_t *))defaultTerminationHandler ); 322 } 323 324 bool poll( coroutine$ * cor ) libcfa_public { 325 nonlocal_exception * nl_ex = pop_ehm_head( cor ); 326 327 // if no exceptions return false 328 if ( nl_ex == 0p ) return false; 329 330 // otherwise loop and throwResume all pending exceptions 331 while ( nl_ex != 0p ){ 332 exception_t * ex = nl_ex->the_exception; 333 free( nl_ex ); 334 __cfaehm_throw_resume( ex, defaultResumeAtHandler ); 335 336 // only reached if resumption handled. other dealloc handled in defaultResumeAtHandler 337 free( ex ); 338 nl_ex = pop_ehm_head( cor ); 339 } 340 341 return true; 342 } 343 344 bool poll() libcfa_public { return poll( active_coroutine() ); } 345 coroutine$ * resumer() libcfa_public { return active_coroutine()->last; } 346 347 // user facing ehm operations 348 forall(T & | is_coroutine(T)) { 349 // enable/disable non-local exceptions 350 void enable_ehm( T & cor ) libcfa_public { get_coroutine( cor )->ehm_state.ehm_enabled = true; } 351 void disable_ehm( T & cor ) libcfa_public { get_coroutine( cor )->ehm_state.ehm_enabled = false; } 352 353 // poll for non-local exceptions 354 bool poll( T & cor ) libcfa_public { return poll( get_coroutine( cor ) ); } 355 356 // poll iff nonlocal ehm is enabled 357 bool checked_poll( T & cor ) libcfa_public { return get_coroutine( cor )->ehm_state.ehm_enabled ? poll( cor ) : false; } 358 359 coroutine$ * resumer( T & cor ) libcfa_public { return get_coroutine( cor )->last; } 360 } 361 362 // resume non local exception at receiver (i.e. enqueue in ehm buffer) 363 forall(exceptT *, T & | ehm_resume_at( exceptT, T )) 364 void resumeAt( T & receiver, exceptT & ex ) libcfa_public { 365 coroutine$ * cor = get_coroutine( receiver ); 366 nonlocal_exception * nl_ex = alloc(); 367 exceptT * ex_copy = alloc(); 368 memcpy( ex_copy, &ex, sizeof(exceptT) ); 369 (*nl_ex){ (exception_t *)ex_copy }; 370 lock( cor->ehm_state.buffer_lock __cfaabi_dbg_ctx2 ); 371 append( cor->ehm_state.ehm_buffer, nl_ex ); 372 unlock( cor->ehm_state.buffer_lock ); 373 } 374 375 forall(exceptT * | { void $throwResume(exceptT &); }) 376 void resumeAt( coroutine$ * receiver, exceptT & ex ) libcfa_public { 377 nonlocal_exception * nl_ex = alloc(); 378 exceptT * ex_copy = alloc(); 379 memcpy( ex_copy, &ex, sizeof(exceptT) ); 380 (*nl_ex){ (exception_t *)ex_copy }; 381 lock( receiver->ehm_state.buffer_lock __cfaabi_dbg_ctx2 ); 382 append( receiver->ehm_state.ehm_buffer, nl_ex ); 383 unlock( receiver->ehm_state.buffer_lock ); 384 } 385 285 386 // Local Variables: // 286 387 // mode: c // -
libcfa/src/concurrency/coroutine.hfa
r92355883 r2a301ff 19 19 #include "invoke.h" 20 20 #include "../exception.hfa" 21 22 //----------------------------------------------------------------------------- 23 // Type used to store and queue nonlocal exceptions on coroutines 24 struct nonlocal_exception { 25 exception_t * the_exception; 26 nonlocal_exception * next; 27 }; 28 static inline void ?{} ( nonlocal_exception & this, exception_t * ex ) with(this) { 29 the_exception = ex; 30 next = 0p; 31 } 32 33 static inline nonlocal_exception *& get_next( nonlocal_exception & this ) __attribute__((const)) { 34 return this.next; 35 } 21 36 22 37 //----------------------------------------------------------------------------- … … 203 218 } 204 219 220 // non local ehm and coroutine utility routines 221 bool poll( coroutine$ * cor ); 222 bool poll(); 223 coroutine$ * resumer(); 224 225 forall(T & | is_coroutine(T)) { 226 void enable_ehm( T & cor ); 227 void disable_ehm( T & cor ); 228 bool poll( T & cor ); 229 bool checked_poll( T & cor ); 230 coroutine$ * resumer( T & cor ); 231 } 232 233 // trait for exceptions able to be resumed at another coroutine 234 forall(exceptT *, T & | is_coroutine(T)) 235 trait ehm_resume_at { void $throwResume(exceptT &); }; 236 237 // general resumeAt 238 forall(exceptT *, T & | ehm_resume_at( exceptT, T )) 239 void resumeAt( T & receiver, exceptT & ex ); 240 241 // resumeAt for underlying coroutine$ type 242 forall(exceptT * | { void $throwResume(exceptT &); }) 243 void resumeAt( coroutine$ * receiver, exceptT & ex ); 244 205 245 // Local Variables: // 206 246 // mode: c // -
libcfa/src/concurrency/future.hfa
r92355883 r2a301ff 39 39 futex_mutex lock; 40 40 }; 41 __CFA_SELECT_GET_TYPE( future(T) ); 41 42 42 43 struct future_node { … … 180 181 } 181 182 182 void on_selected( future(T) & this, select_node & node ) {}183 bool on_selected( future(T) & this, select_node & node ) { return true; } 183 184 } 184 185 } -
libcfa/src/concurrency/invoke.h
r92355883 r2a301ff 10 10 // Created On : Tue Jan 17 12:27:26 2016 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Tue Mar 14 13:39:31 202313 // Update Count : 5912 // Last Modified On : Wed Aug 30 21:27:51 2023 13 // Update Count : 60 14 14 // 15 15 16 16 // No not use #pragma once was this file is included twice in some places. It has its own guard system. 17 17 18 #include "bits/co ntainers.hfa"18 #include "bits/collections.hfa" 19 19 #include "bits/defs.hfa" 20 20 #include "bits/locks.hfa" … … 23 23 24 24 #ifdef __cforall 25 #include "co ntainers/list.hfa"25 #include "collections/list.hfa" 26 26 extern "C" { 27 27 #endif … … 74 74 }; 75 75 76 struct nonlocal_ehm { 77 // list of pending nonlocal exceptions 78 __queue_t(struct nonlocal_exception) ehm_buffer; 79 80 // lock to protect the buffer 81 struct __spinlock_t buffer_lock; 82 83 // enable/disabled flag 84 bool ehm_enabled; 85 }; 86 76 87 enum __Coroutine_State { Halted, Start, Primed, Blocked, Ready, Active, Cancelled, Halting }; 77 88 … … 98 109 struct _Unwind_Exception * cancellation; 99 110 111 // Non-local exception handling information 112 struct nonlocal_ehm ehm_state; 100 113 }; 101 114 // Wrapper for gdb … … 242 255 #ifdef __cforall 243 256 extern "Cforall" { 257 static inline bool exception_in_flight() { 258 return __get_stack( &active_thread()->self_cor )->exception_context.current_exception != 0p; 259 } 260 244 261 static inline thread$ * volatile & ?`next ( thread$ * this ) { 245 262 return this->user_link.next; -
libcfa/src/concurrency/iofwd.hfa
r92355883 r2a301ff 10 10 // Created On : Thu Apr 23 17:31:00 2020 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Mon Mar 13 23:54:57202313 // Update Count : 112 // Last Modified On : Fri Jul 21 21:36:01 2023 13 // Update Count : 3 14 14 // 15 15 … … 18 18 #include <unistd.h> 19 19 #include <sys/socket.h> 20 #include <string.h> // memset 20 21 21 22 extern "C" { … … 151 152 #if CFA_HAVE_LINUX_IO_URING_H 152 153 static inline void zero_sqe(struct io_uring_sqe * sqe) { 153 sqe->flags = 0; 154 sqe->ioprio = 0; 155 sqe->fd = 0; 156 sqe->off = 0; 157 sqe->addr = 0; 158 sqe->len = 0; 159 sqe->fsync_flags = 0; 160 sqe->__pad2[0] = 0; 161 sqe->__pad2[1] = 0; 162 sqe->__pad2[2] = 0; 163 sqe->fd = 0; 164 sqe->off = 0; 165 sqe->addr = 0; 166 sqe->len = 0; 154 memset( sqe, 0, sizeof( struct io_uring_sqe ) ); 167 155 } 168 156 #endif -
libcfa/src/concurrency/kernel.cfa
r92355883 r2a301ff 569 569 returnToKernel(); 570 570 __enable_interrupts_checked(); 571 572 571 } 573 572 -
libcfa/src/concurrency/kernel.hfa
r92355883 r2a301ff 10 10 // Created On : Tue Jan 17 12:27:26 2017 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Tue Feb 4 12:29:26 202013 // Update Count : 2 212 // Last Modified On : Wed Aug 30 21:28:46 2023 13 // Update Count : 23 14 14 // 15 15 … … 20 20 #include "coroutine.hfa" 21 21 22 #include "co ntainers/list.hfa"22 #include "collections/list.hfa" 23 23 24 24 extern "C" { -
libcfa/src/concurrency/kernel/startup.cfa
r92355883 r2a301ff 487 487 last = 0p; 488 488 cancellation = 0p; 489 ehm_state.ehm_buffer{}; 490 ehm_state.buffer_lock{}; 491 ehm_state.ehm_enabled = false; 489 492 } 490 493 -
libcfa/src/concurrency/locks.cfa
r92355883 r2a301ff 239 239 } 240 240 241 void on_selected( blocking_lock & this, select_node & node ) {}241 bool on_selected( blocking_lock & this, select_node & node ) { return true; } 242 242 243 243 //----------------------------------------------------------------------------- -
libcfa/src/concurrency/locks.hfa
r92355883 r2a301ff 21 21 22 22 #include "bits/weakso_locks.hfa" 23 #include "co ntainers/lockfree.hfa"24 #include "co ntainers/list.hfa"23 #include "collections/lockfree.hfa" 24 #include "collections/list.hfa" 25 25 26 26 #include "limits.hfa" … … 112 112 static inline bool register_select( single_acquisition_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 113 113 static inline bool unregister_select( single_acquisition_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 114 static inline void on_selected( single_acquisition_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); } 114 static inline bool on_selected( single_acquisition_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 115 __CFA_SELECT_GET_TYPE( single_acquisition_lock ); 115 116 116 117 //---------- … … 129 130 static inline bool register_select( owner_lock & this, select_node & node ) { return register_select( (blocking_lock &)this, node ); } 130 131 static inline bool unregister_select( owner_lock & this, select_node & node ) { return unregister_select( (blocking_lock &)this, node ); } 131 static inline void on_selected( owner_lock & this, select_node & node ) { on_selected( (blocking_lock &)this, node ); } 132 static inline bool on_selected( owner_lock & this, select_node & node ) { return on_selected( (blocking_lock &)this, node ); } 133 __CFA_SELECT_GET_TYPE( owner_lock ); 132 134 133 135 //----------------------------------------------------------------------------- … … 138 140 }; 139 141 140 static inline void ?{}( mcs_node & this) { this.next = 0p; }142 static inline void ?{}( mcs_node & this ) { this.next = 0p; } 141 143 142 144 static inline mcs_node * volatile & ?`next ( mcs_node * node ) { … … 148 150 }; 149 151 150 static inline void lock( mcs_lock & l, mcs_node & n) {152 static inline void lock( mcs_lock & l, mcs_node & n ) { 151 153 if(push(l.queue, &n)) 152 154 wait(n.sem); … … 172 174 }; 173 175 174 static inline void ?{}( mcs_spin_node & this) { this.next = 0p; this.locked = true; }176 static inline void ?{}( mcs_spin_node & this ) { this.next = 0p; this.locked = true; } 175 177 176 178 struct mcs_spin_lock { … … 178 180 }; 179 181 180 static inline void lock( mcs_spin_lock & l, mcs_spin_node & n) {182 static inline void lock( mcs_spin_lock & l, mcs_spin_node & n ) { 181 183 n.locked = true; 182 184 mcs_spin_node * prev = __atomic_exchange_n(&l.queue.tail, &n, __ATOMIC_SEQ_CST); … … 271 273 }; 272 274 static inline void ?{}( go_mutex & this ) with(this) { val = 0; } 273 // static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; // these don't compile correctly at the moment so they should be omitted 274 //static inline void ?=?( go_mutex & this, go_mutex this2 ) = void;275 static inline void ?{}( go_mutex & this, go_mutex this2 ) = void; 276 static inline void ?=?( go_mutex & this, go_mutex this2 ) = void; 275 277 276 278 static inline bool internal_try_lock(go_mutex & this, int & compare_val, int new_val ) with(this) { … … 619 621 } 620 622 621 static inline void on_selected( simple_owner_lock & this, select_node & node ) {}622 623 static inline bool on_selected( simple_owner_lock & this, select_node & node ) { return true; } 624 __CFA_SELECT_GET_TYPE( simple_owner_lock ); 623 625 624 626 //----------------------------------------------------------------------------- -
libcfa/src/concurrency/once.hfa
r92355883 r2a301ff 16 16 #pragma once 17 17 18 #include "co ntainers/lockfree.hfa"18 #include "collections/lockfree.hfa" 19 19 #include "kernel/fwd.hfa" 20 20 -
libcfa/src/concurrency/select.cfa
r92355883 r2a301ff 49 49 return false; 50 50 } 51 void on_selected( select_timeout_node & this, select_node & node ) {}51 bool on_selected( select_timeout_node & this, select_node & node ) { return true; } 52 52 53 53 // Gateway routine to wait on duration -
libcfa/src/concurrency/select.hfa
r92355883 r2a301ff 17 17 #pragma once 18 18 19 #include "co ntainers/list.hfa"19 #include "collections/list.hfa" 20 20 #include "alarm.hfa" 21 21 #include "kernel.hfa" … … 94 94 95 95 // This routine is run on the selecting thread prior to executing the statement corresponding to the select_node 96 // passed as an arg to this routine 97 // If on_selected returns false, the statement is not run, if it returns true it is run. 98 void on_selected( T &, select_node & ); 96 // passed as an arg to this routine. If true is returned proceed as normal, if false is returned the statement is skipped 97 bool on_selected( T &, select_node & ); 99 98 }; 99 // Used inside the compiler to allow for overloading on return type for operations such as '?<<?' for channels 100 // YOU MUST USE THIS MACRO OR INCLUDE AN EQUIVALENT DECL FOR YOUR TYPE TO SUPPORT WAITUNTIL 101 #define __CFA_SELECT_GET_TYPE( typename ) typename __CFA_select_get_type( typename __CFA_t ) 102 100 103 101 104 //============================================================================================= … … 208 211 bool register_select( select_timeout_node & this, select_node & node ); 209 212 bool unregister_select( select_timeout_node & this, select_node & node ); 210 void on_selected( select_timeout_node & this, select_node & node ); 213 bool on_selected( select_timeout_node & this, select_node & node ); 214 select_timeout_node __CFA_select_get_type( select_timeout_node this ); 211 215 212 216 // Gateway routines to waituntil on duration 213 217 select_timeout_node timeout( Duration duration ); 214 218 select_timeout_node sleep( Duration duration ); 219 -
libcfa/src/concurrency/stats.cfa
r92355883 r2a301ff 11 11 #if !defined(__CFA_NO_STATISTICS__) 12 12 void __init_stats( struct __stats_t * stats ) { 13 stats->ready.push.local.attempt = 0; 14 stats->ready.push.local.success = 0; 15 stats->ready.push.share.attempt = 0; 16 stats->ready.push.share.success = 0; 17 stats->ready.push.extrn.attempt = 0; 18 stats->ready.push.extrn.success = 0; 19 stats->ready.pop.local .attempt = 0; 20 stats->ready.pop.local .success = 0; 21 stats->ready.pop.help .attempt = 0; 22 stats->ready.pop.help .success = 0; 23 stats->ready.pop.steal .attempt = 0; 24 stats->ready.pop.steal .success = 0; 25 stats->ready.pop.search.attempt = 0; 26 stats->ready.pop.search.success = 0; 27 stats->ready.threads.migration = 0; 28 stats->ready.threads.extunpark = 0; 29 stats->ready.threads.threads = 0; 30 stats->ready.threads.cthreads = 0; 31 stats->ready.threads.preempt.yield = 0; 32 stats->ready.threads.preempt.rllfwd = 0; 33 stats->ready.sleep.halts = 0; 34 stats->ready.sleep.cancels = 0; 35 stats->ready.sleep.early = 0; 36 stats->ready.sleep.wakes = 0; 37 stats->ready.sleep.seen = 0; 38 stats->ready.sleep.exits = 0; 13 memset( &stats->ready, 0, sizeof( stats->ready ) ); 39 14 40 15 #if defined(CFA_HAVE_LINUX_IO_URING_H) 41 stats->io.alloc.fast = 0; 42 stats->io.alloc.slow = 0; 43 stats->io.alloc.fail = 0; 44 stats->io.alloc.revoke = 0; 45 stats->io.alloc.block = 0; 46 stats->io.submit.fast = 0; 47 stats->io.submit.slow = 0; 48 stats->io.submit.eagr = 0; 49 stats->io.submit.nblk = 0; 50 stats->io.submit.extr = 0; 51 stats->io.flush.external = 0; 52 stats->io.flush.signal = 0; 53 stats->io.flush.dirty = 0; 54 stats->io.flush.full = 0; 55 stats->io.flush.idle = 0; 56 stats->io.flush.eager = 0; 57 stats->io.calls.flush = 0; 58 stats->io.calls.submitted = 0; 59 stats->io.calls.drain = 0; 60 stats->io.calls.completed = 0; 61 stats->io.calls.locked = 0; 62 stats->io.calls.helped = 0; 63 stats->io.calls.errors.busy = 0; 64 stats->io.ops.sockread = 0; 65 stats->io.ops.epllread = 0; 66 stats->io.ops.sockwrite = 0; 67 stats->io.ops.epllwrite = 0; 16 memset( &stats->io, 0, sizeof( stats->io ) ); 68 17 #endif 69 18
Note:
See TracChangeset
for help on using the changeset viewer.