Changeset 2a301ff for libcfa/src/concurrency/actor.hfa
- Timestamp:
- Aug 31, 2023, 11:31:15 PM (2 years ago)
- Branches:
- master
- Children:
- 950c58e
- Parents:
- 92355883 (diff), 686912c (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/actor.hfa (modified) (13 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r92355883 r2a301ff 39 39 // #define ACTOR_STATS 40 40 41 // used to run and only track missed queue gulps 42 #ifdef ACTOR_STATS 43 #define ACTOR_STATS_QUEUE_MISSED 44 #endif 45 41 46 // forward decls 42 47 struct actor; … … 48 53 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 54 struct request { 50 actor * receiver;51 message * msg;52 __receive_fn fn;55 actor * receiver; 56 message * msg; 57 __receive_fn fn; 53 58 }; 54 59 55 60 struct a_msg { 56 int m;61 int m; 57 62 }; 58 63 static inline void ?{}( request & this ) {} 59 64 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 this.receiver = receiver;61 this.msg = msg;62 this.fn = fn;65 this.receiver = receiver; 66 this.msg = msg; 67 this.fn = fn; 63 68 } 64 69 static inline void ?{}( request & this, request & copy ) { 65 this.receiver = copy.receiver;66 this.msg = copy.msg;67 this.fn = copy.fn;70 this.receiver = copy.receiver; 71 this.msg = copy.msg; 72 this.fn = copy.fn; 68 73 } 69 74 … … 71 76 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 77 struct copy_queue { 73 request * buffer;74 size_t count, buffer_size, index, utilized, last_size;78 request * buffer; 79 size_t count, buffer_size, index, utilized, last_size; 75 80 }; 76 81 static inline void ?{}( copy_queue & this ) {} 77 82 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 buffer_size = buf_size;79 buffer = aalloc( buffer_size );80 count = 0;81 utilized = 0;82 index = 0;83 last_size = 0;83 buffer_size = buf_size; 84 buffer = aalloc( buffer_size ); 85 count = 0; 86 utilized = 0; 87 index = 0; 88 last_size = 0; 84 89 } 85 90 static inline void ^?{}( copy_queue & this ) with(this) { 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );87 adelete(buffer);91 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 92 adelete(buffer); 88 93 } 89 94 90 95 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 if ( count >= buffer_size ) { // increase arr size92 last_size = buffer_size;93 buffer_size = 2 * buffer_size;94 buffer = realloc( buffer, sizeof( request ) * buffer_size );95 /* paranoid */ verify( buffer );96 }97 memcpy( &buffer[count], &elem, sizeof(request) );98 count++;96 if ( count >= buffer_size ) { // increase arr size 97 last_size = buffer_size; 98 buffer_size = 2 * buffer_size; 99 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 100 /* paranoid */ verify( buffer ); 101 } 102 memcpy( &buffer[count], &elem, sizeof(request) ); 103 count++; 99 104 } 100 105 … … 102 107 // it is not supported to call insert() before the array is fully empty 103 108 static inline request & remove( copy_queue & this ) with(this) { 104 if ( count > 0 ) {105 count--;106 size_t old_idx = index;107 index = count == 0 ? 0 : index + 1;108 return buffer[old_idx];109 }110 request * ret = 0p;111 return *0p;109 if ( count > 0 ) { 110 count--; 111 size_t old_idx = index; 112 index = count == 0 ? 0 : index + 1; 113 return buffer[old_idx]; 114 } 115 request * ret = 0p; 116 return *0p; 112 117 } 113 118 114 119 // try to reclaim some memory if less than half of buffer is utilized 115 120 static inline void reclaim( copy_queue & this ) with(this) { 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }117 utilized = 0;118 buffer_size--;119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory121 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 122 utilized = 0; 123 buffer_size--; 124 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 125 } 121 126 … … 123 128 124 129 struct work_queue { 125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 130 __spinlock_t mutex_lock; 131 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 132 copy_queue * c_queue; // current queue 133 volatile bool being_processed; // flag to prevent concurrent processing 134 #ifdef ACTOR_STATS 135 unsigned int id; 132 136 #endif 137 #ifdef ACTOR_STATS_QUEUE_MISSED 138 size_t missed; // transfers skipped due to being_processed flag being up 139 #endif 133 140 }; // work_queue 134 141 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 (*owned_queue){ buf_size };137 c_queue = owned_queue;138 being_processed = false;139 #ifdef ACTOR_STATS140 id = i;141 missed = 0;142 #endif142 owned_queue = alloc(); // allocated separately to avoid false sharing 143 (*owned_queue){ buf_size }; 144 c_queue = owned_queue; 145 being_processed = false; 146 #ifdef ACTOR_STATS 147 id = i; 148 missed = 0; 149 #endif 143 150 } 144 151 … … 147 154 148 155 static inline void insert( work_queue & this, request & elem ) with(this) { 149 lock( mutex_lock __cfaabi_dbg_ctx2 );150 insert( *c_queue, elem );151 unlock( mutex_lock );156 lock( mutex_lock __cfaabi_dbg_ctx2 ); 157 insert( *c_queue, elem ); 158 unlock( mutex_lock ); 152 159 } // insert 153 160 154 161 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 lock( mutex_lock __cfaabi_dbg_ctx2 );156 #ifdef __STEAL157 158 // check if queue is being processed elsewhere159 if ( unlikely( being_processed ) ) {160 #ifdef ACTOR_STATS161 missed++;162 #endif163 unlock( mutex_lock );164 return;165 }166 167 being_processed = c_queue->count != 0;168 #endif // __STEAL169 170 c_queue->utilized = c_queue->count;171 172 // swap copy queue ptrs173 copy_queue * temp = *transfer_to;174 *transfer_to = c_queue;175 c_queue = temp;176 unlock( mutex_lock );162 lock( mutex_lock __cfaabi_dbg_ctx2 ); 163 #ifdef __STEAL 164 165 // check if queue is being processed elsewhere 166 if ( unlikely( being_processed ) ) { 167 #ifdef ACTOR_STATS 168 missed++; 169 #endif 170 unlock( mutex_lock ); 171 return; 172 } 173 174 being_processed = c_queue->count != 0; 175 #endif // __STEAL 176 177 c_queue->utilized = c_queue->count; 178 179 // swap copy queue ptrs 180 copy_queue * temp = *transfer_to; 181 *transfer_to = c_queue; 182 c_queue = temp; 183 unlock( mutex_lock ); 177 184 } // transfer 178 185 179 186 // needed since some info needs to persist past worker lifetimes 180 187 struct worker_info { 181 volatile unsigned long long stamp;182 #ifdef ACTOR_STATS183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;184 unsigned long long processed;185 size_t gulps;186 #endif188 volatile unsigned long long stamp; 189 #ifdef ACTOR_STATS 190 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 191 unsigned long long processed; 192 size_t gulps; 193 #endif 187 194 }; 188 195 static inline void ?{}( worker_info & this ) { 189 #ifdef ACTOR_STATS190 this.stolen_from = 0;191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 #endif199 this.stamp = rdtscl();196 #ifdef ACTOR_STATS 197 this.stolen_from = 0; 198 this.try_steal = 0; // attempts to steal 199 this.stolen = 0; // successful steals 200 this.processed = 0; // requests processed 201 this.gulps = 0; // number of gulps 202 this.failed_swaps = 0; // steal swap failures 203 this.empty_stolen = 0; // queues empty after steal 204 this.msgs_stolen = 0; // number of messages stolen 205 #endif 206 this.stamp = rdtscl(); 200 207 } 201 208 … … 205 212 // #endif 206 213 thread worker { 207 work_queue ** request_queues;208 copy_queue * current_queue;209 executor * executor_;210 unsigned int start, range;211 int id;214 work_queue ** request_queues; 215 copy_queue * current_queue; 216 executor * executor_; 217 unsigned int start, range; 218 int id; 212 219 }; 213 220 … … 215 222 // aggregate counters for statistics 216 223 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;224 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 225 #endif 219 226 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 unsigned int start, unsigned int range, int id ) {221 ((thread &)this){ clu };222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers227 unsigned int start, unsigned int range, int id ) { 228 ((thread &)this){ clu }; 229 this.request_queues = request_queues; // array of all queues 230 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 231 this.executor_ = executor_; // pointer to current executor 232 this.start = start; // start of worker's subrange of request_queues 233 this.range = range; // size of worker's subrange of request_queues 234 this.id = id; // worker's id and index in array of workers 228 235 } 229 236 230 237 static bool no_steal = false; 231 238 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; // array of virtual processors adding parallelism for workers234 work_queue * request_queues; // master array of work request queues235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues240 bool seperate_clus; // use same or separate cluster for executor241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads239 cluster * cluster; // if workers execute on separate cluster 240 processor ** processors; // array of virtual processors adding parallelism for workers 241 work_queue * request_queues; // master array of work request queues 242 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 243 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 244 worker ** workers; // array of workers executing work requests 245 worker_info * w_infos; // array of info about each worker 246 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 247 bool seperate_clus; // use same or separate cluster for executor 248 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 249 }; // executor 243 250 … … 246 253 // #endif 247 254 static inline void ^?{}( worker & mutex this ) with(this) { 248 #ifdef ACTOR_STATS249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);256 257 // per worker steal stats (uncomment alongside the lock above this routine to print)258 // lock( out_lock __cfaabi_dbg_ctx2 );259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );260 // int count = 0;261 // int count2 = 0;262 // for ( i; range ) {263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 // }270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );272 // unlock( out_lock );273 #endif255 #ifdef ACTOR_STATS 256 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 257 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 258 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 259 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 260 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 261 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 262 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 263 264 // per worker steal stats (uncomment alongside the lock above this routine to print) 265 // lock( out_lock __cfaabi_dbg_ctx2 ); 266 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 267 // int count = 0; 268 // int count2 = 0; 269 // for ( i; range ) { 270 // if ( replaced_queue[start + i] > 0 ){ 271 // count++; 272 // // printf("%d: %u, ",i, replaced_queue[i]); 273 // } 274 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 275 // count2++; 276 // } 277 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 278 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 279 // unlock( out_lock ); 280 #endif 274 281 } 275 282 276 283 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );278 this.nprocessors = nprocessors;279 this.nworkers = nworkers;280 this.nrqueues = nrqueues;281 this.seperate_clus = seperate_clus;282 this.is_shutdown = false;283 284 if ( nworkers == nrqueues )285 no_steal = true;286 287 #ifdef ACTOR_STATS288 // stolen_arr = aalloc( nrqueues );289 // replaced_queue = aalloc( nrqueues );290 __total_workers = nworkers;291 #endif292 293 if ( seperate_clus ) {294 cluster = alloc();295 (*cluster){};296 } else cluster = active_cluster();297 298 request_queues = aalloc( nrqueues );299 worker_req_queues = aalloc( nrqueues );300 for ( i; nrqueues ) {301 request_queues[i]{ buf_size, i };302 worker_req_queues[i] = &request_queues[i];303 }304 305 processors = aalloc( nprocessors );306 for ( i; nprocessors )307 (*(processors[i] = alloc())){ *cluster };308 309 local_queues = aalloc( nworkers );310 workers = aalloc( nworkers );311 w_infos = aalloc( nworkers );312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;313 314 for ( i; nworkers ) {315 w_infos[i]{};316 local_queues[i]{ buf_size };317 }318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {320 range = reqPerWorker + ( i < extras ? 1 : 0 );321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };322 } // for284 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 285 this.nprocessors = nprocessors; 286 this.nworkers = nworkers; 287 this.nrqueues = nrqueues; 288 this.seperate_clus = seperate_clus; 289 this.is_shutdown = false; 290 291 if ( nworkers == nrqueues ) 292 no_steal = true; 293 294 #ifdef ACTOR_STATS 295 // stolen_arr = aalloc( nrqueues ); 296 // replaced_queue = aalloc( nrqueues ); 297 __total_workers = nworkers; 298 #endif 299 300 if ( seperate_clus ) { 301 cluster = alloc(); 302 (*cluster){}; 303 } else cluster = active_cluster(); 304 305 request_queues = aalloc( nrqueues ); 306 worker_req_queues = aalloc( nrqueues ); 307 for ( i; nrqueues ) { 308 request_queues[i]{ buf_size, i }; 309 worker_req_queues[i] = &request_queues[i]; 310 } 311 312 processors = aalloc( nprocessors ); 313 for ( i; nprocessors ) 314 (*(processors[i] = alloc())){ *cluster }; 315 316 local_queues = aalloc( nworkers ); 317 workers = aalloc( nworkers ); 318 w_infos = aalloc( nworkers ); 319 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 320 321 for ( i; nworkers ) { 322 w_infos[i]{}; 323 local_queues[i]{ buf_size }; 324 } 325 326 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 327 range = reqPerWorker + ( i < extras ? 1 : 0 ); 328 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 329 } // for 323 330 } 324 331 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 336 330 337 static inline void ^?{}( executor & this ) with(this) { 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 338 is_shutdown = true; 339 340 for ( i; nworkers ) 341 delete( workers[i] ); 342 343 for ( i; nprocessors ) { 344 delete( processors[i] ); 345 } // for 346 347 #ifdef ACTOR_STATS_QUEUE_MISSED 348 size_t misses = 0; 349 for ( i; nrqueues ) { 350 misses += worker_req_queues[i]->missed; 351 } 352 // adelete( stolen_arr ); 353 // adelete( replaced_queue ); 354 #endif 355 356 adelete( workers ); 357 adelete( w_infos ); 358 adelete( local_queues ); 359 adelete( request_queues ); 360 adelete( worker_req_queues ); 361 adelete( processors ); 362 if ( seperate_clus ) delete( cluster ); 363 364 #ifdef ACTOR_STATS // print formatted stats 365 printf(" Actor System Stats:\n"); 366 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 367 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 368 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 369 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 370 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 371 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 372 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 373 #endif 374 375 #ifndef ACTOR_STATS 376 #ifdef ACTOR_STATS_QUEUE_MISSED 377 printf("\t%lu", misses); 347 378 #endif 348 349 adelete( workers );350 adelete( w_infos );351 adelete( local_queues );352 adelete( request_queues );353 adelete( worker_req_queues );354 adelete( processors );355 if ( seperate_clus ) delete( cluster );356 357 #ifdef ACTOR_STATS // print formatted stats358 printf(" Actor System Stats:\n");359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);366 379 #endif 367 380 368 381 } 369 382 … … 372 385 373 386 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 #ifdef __CFA_DEBUG__375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;376 377 // reserve MAX for dead actors378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;379 return temp;380 #else381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;382 #endif387 #ifdef __CFA_DEBUG__ 388 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 389 390 // reserve MAX for dead actors 391 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 392 return temp; 393 #else 394 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 395 #endif 383 396 } // tickets 384 397 385 398 // TODO: update globals in this file to be static fields once the static fields project is done 386 399 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system388 static size_t __num_actors_ = 0; // number of actor objects in system400 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 401 static size_t __num_actors_ = 0; // number of actor objects in system 389 402 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 403 struct actor { 391 size_t ticket;// executor-queue handle392 allocation allocation_;// allocation action393 inline virtual_dtor;404 size_t ticket; // executor-queue handle 405 allocation alloc; // allocation action 406 inline virtual_dtor; 394 407 }; 395 408 396 409 static inline void ?{}( actor & this ) with(this) { 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive398 // member must be called to end it399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );400 allocation_= Nodelete;401 ticket = __get_next_ticket( *__actor_executor_ );402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );403 #ifdef ACTOR_STATS404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );405 #endif410 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 411 // member must be called to end it 412 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 413 alloc = Nodelete; 414 ticket = __get_next_ticket( *__actor_executor_ ); 415 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 416 #ifdef ACTOR_STATS 417 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 418 #endif 406 419 } 407 420 408 421 static inline void check_actor( actor & this ) { 409 if ( this.allocation_!= Nodelete ) {410 switch( this.allocation_) {411 case Delete: delete( &this ); break;412 case Destroy:413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 ^?{}(this);415 break;416 case Finished:417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 break;419 default: ;// stop warning420 }421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated423 unpark( __actor_executor_thd );424 }425 }422 if ( this.alloc != Nodelete ) { 423 switch( this.alloc ) { 424 case Delete: delete( &this ); break; 425 case Destroy: 426 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 427 ^?{}(this); 428 break; 429 case Finished: 430 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 431 break; 432 default: ; // stop warning 433 } 434 435 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 436 unpark( __actor_executor_thd ); 437 } 438 } 426 439 } 427 440 428 441 struct message { 429 allocation allocation_;// allocation action430 inline virtual_dtor;442 allocation alloc; // allocation action 443 inline virtual_dtor; 431 444 }; 432 445 433 446 static inline void ?{}( message & this ) { 434 this.allocation_= Nodelete;447 this.alloc = Nodelete; 435 448 } 436 449 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.allocation_, &alloc, sizeof(allocation) );// optimization to elide ctor438 DEBUG_ABORT( this.allocation_ == Finished, "The Finished allocation status is not supported for message types.\n");450 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 451 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 439 452 } 440 453 static inline void ^?{}( message & this ) with(this) { 441 CFA_DEBUG( if ( allocation_ == Nodelete ) printf("A message at location %p was allocated but never sent.\n", &this); ) 454 CFA_DEBUG( 455 if ( alloc == Nodelete ) { 456 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", 457 (long int)getpid(), &this ); 458 } 459 ) 442 460 } 443 461 444 462 static inline void check_message( message & this ) { 445 switch ( this.allocation_ ) { // analyze message status 446 case Nodelete: CFA_DEBUG( this.allocation_ = Finished ); break; 447 case Delete: delete( &this ); break; 448 case Destroy: ^?{}( this ); break; 449 case Finished: break; 450 } // switch 451 } 452 static inline void set_allocation( message & this, allocation state ) { 453 this.allocation_ = state; 463 switch ( this.alloc ) { // analyze message status 464 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 465 case Delete: delete( &this ); break; 466 case Destroy: ^?{}( this ); break; 467 case Finished: break; 468 } // switch 469 } 470 static inline allocation set_allocation( message & this, allocation state ) { 471 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 472 allocation prev = this.alloc; 473 this.alloc = state; 474 return prev; 475 } 476 static inline allocation get_allocation( message & this ) { 477 return this.alloc; 454 478 } 455 479 456 480 static inline void deliver_request( request & this ) { 457 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );458 actor * base_actor;459 message * base_msg;460 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );461 base_actor->allocation_ = temp; 462 check_message( *base_msg );463 check_actor( *base_actor );481 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 482 actor * base_actor; 483 message * base_msg; 484 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 485 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 486 check_message( *base_msg ); 487 check_actor( *base_actor ); 464 488 } 465 489 … … 467 491 // returns ptr to newly owned queue if swap succeeds 468 492 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 469 work_queue * my_queue = request_queues[my_idx];470 work_queue * other_queue = request_queues[victim_idx];471 472 // if either queue is 0p then they are in the process of being stolen473 if ( other_queue == 0p ) return 0p;474 475 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false476 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )477 return 0p;478 479 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false480 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {481 /* paranoid */ verify( request_queues[my_idx] == 0p );482 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val483 return 0p;484 }485 486 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically487 request_queues[my_idx] = other_queue; // last write does not need to be atomic488 return other_queue;493 work_queue * my_queue = request_queues[my_idx]; 494 work_queue * other_queue = request_queues[victim_idx]; 495 496 // if either queue is 0p then they are in the process of being stolen 497 if ( other_queue == 0p ) return 0p; 498 499 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 500 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 501 return 0p; 502 503 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 504 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 505 /* paranoid */ verify( request_queues[my_idx] == 0p ); 506 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 507 return 0p; 508 } 509 510 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 511 request_queues[my_idx] = other_queue; // last write does not need to be atomic 512 return other_queue; 489 513 } 490 514 491 515 // once a worker to steal from has been chosen, choose queue to steal from 492 516 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 493 // have to calculate victim start and range since victim may be deleted before us in shutdown494 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;495 const unsigned int extras = executor_->nrqueues % executor_->nworkers;496 unsigned int vic_start, vic_range;497 if ( extras > victim_id ) {498 vic_range = queues_per_worker + 1;499 vic_start = vic_range * victim_id;500 } else {501 vic_start = extras + victim_id * queues_per_worker;502 vic_range = queues_per_worker;503 }504 unsigned int start_idx = prng( vic_range );505 506 unsigned int tries = 0;507 work_queue * curr_steal_queue;508 509 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {510 tries++;511 curr_steal_queue = request_queues[ i + vic_start ];512 // avoid empty queues and queues that are being operated on513 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )514 continue;515 516 #ifdef ACTOR_STATS517 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );518 if ( curr_steal_queue ) {519 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;520 executor_->w_infos[id].stolen++;521 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;522 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);523 // replaced_queue[swap_idx]++;524 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);525 } else {526 executor_->w_infos[id].failed_swaps++;527 }528 #else529 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );530 #endif // ACTOR_STATS531 532 return;533 }534 535 return;517 // have to calculate victim start and range since victim may be deleted before us in shutdown 518 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 519 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 520 unsigned int vic_start, vic_range; 521 if ( extras > victim_id ) { 522 vic_range = queues_per_worker + 1; 523 vic_start = vic_range * victim_id; 524 } else { 525 vic_start = extras + victim_id * queues_per_worker; 526 vic_range = queues_per_worker; 527 } 528 unsigned int start_idx = prng( vic_range ); 529 530 unsigned int tries = 0; 531 work_queue * curr_steal_queue; 532 533 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 534 tries++; 535 curr_steal_queue = request_queues[ i + vic_start ]; 536 // avoid empty queues and queues that are being operated on 537 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 538 continue; 539 540 #ifdef ACTOR_STATS 541 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 542 if ( curr_steal_queue ) { 543 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 544 executor_->w_infos[id].stolen++; 545 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 546 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 547 // replaced_queue[swap_idx]++; 548 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 549 } else { 550 executor_->w_infos[id].failed_swaps++; 551 } 552 #else 553 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 554 #endif // ACTOR_STATS 555 556 return; 557 } 558 559 return; 536 560 } 537 561 538 562 // choose a worker to steal from 539 563 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 540 #if RAND541 unsigned int victim = prng( executor_->nworkers );542 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;543 choose_queue( this, victim, swap_idx );544 #elif SEARCH545 unsigned long long min = MAX; // smaller timestamp means longer since service546 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math547 int n_workers = executor_->nworkers;548 unsigned long long curr_stamp;549 int scount = 1;550 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {551 curr_stamp = executor_->w_infos[i].stamp;552 if ( curr_stamp < min ) {553 min = curr_stamp;554 min_id = i;555 }556 }557 choose_queue( this, min_id, swap_idx );558 #endif564 #if RAND 565 unsigned int victim = prng( executor_->nworkers ); 566 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 567 choose_queue( this, victim, swap_idx ); 568 #elif SEARCH 569 unsigned long long min = MAX; // smaller timestamp means longer since service 570 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 571 int n_workers = executor_->nworkers; 572 unsigned long long curr_stamp; 573 int scount = 1; 574 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 575 curr_stamp = executor_->w_infos[i].stamp; 576 if ( curr_stamp < min ) { 577 min = curr_stamp; 578 min_id = i; 579 } 580 } 581 choose_queue( this, min_id, swap_idx ); 582 #endif 559 583 } 560 584 561 585 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 562 586 void main( worker & this ) with(this) { 563 // #ifdef ACTOR_STATS 564 // for ( i; executor_->nrqueues ) { 565 // replaced_queue[i] = 0; 566 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 567 // } 568 // #endif 569 570 // threshold of empty queues we see before we go stealing 571 const unsigned int steal_threshold = 2 * range; 572 573 // Store variable data here instead of worker struct to avoid any potential false sharing 574 unsigned int empty_count = 0; 575 request & req; 576 work_queue * curr_work_queue; 577 578 Exit: 579 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 580 curr_work_queue = request_queues[i + start]; 581 582 // check if queue is empty before trying to gulp it 583 if ( is_empty( *curr_work_queue->c_queue ) ) { 584 #ifdef __STEAL 585 empty_count++; 586 if ( empty_count < steal_threshold ) continue; 587 #else 588 continue; 589 #endif 590 } 591 transfer( *curr_work_queue, ¤t_queue ); 592 #ifdef ACTOR_STATS 593 executor_->w_infos[id].gulps++; 594 #endif // ACTOR_STATS 595 #ifdef __STEAL 596 if ( is_empty( *current_queue ) ) { 597 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 598 empty_count++; 599 if ( empty_count < steal_threshold ) continue; 600 empty_count = 0; 601 602 CHECK_TERMINATION; // check for termination 603 604 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 605 606 #ifdef ACTOR_STATS 607 executor_->w_infos[id].try_steal++; 608 #endif // ACTOR_STATS 609 610 steal_work( this, start + prng( range ) ); 611 continue; 612 } 613 #endif // __STEAL 614 while ( ! is_empty( *current_queue ) ) { 615 #ifdef ACTOR_STATS 616 executor_->w_infos[id].processed++; 617 #endif 618 &req = &remove( *current_queue ); 619 if ( !&req ) continue; 620 deliver_request( req ); 621 } 622 #ifdef __STEAL 623 curr_work_queue->being_processed = false; // set done processing 624 empty_count = 0; // we found work so reset empty counter 587 // #ifdef ACTOR_STATS 588 // for ( i; executor_->nrqueues ) { 589 // replaced_queue[i] = 0; 590 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 591 // } 592 // #endif 593 594 // threshold of empty queues we see before we go stealing 595 const unsigned int steal_threshold = 2 * range; 596 597 // Store variable data here instead of worker struct to avoid any potential false sharing 598 unsigned int empty_count = 0; 599 request & req; 600 work_queue * curr_work_queue; 601 602 Exit: 603 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 604 curr_work_queue = request_queues[i + start]; 605 606 #ifndef __STEAL 607 CHECK_TERMINATION; 625 608 #endif 626 627 CHECK_TERMINATION; 628 629 // potentially reclaim some of the current queue's vector space if it is unused 630 reclaim( *current_queue ); 631 } // for 609 610 // check if queue is empty before trying to gulp it 611 if ( is_empty( *curr_work_queue->c_queue ) ) { 612 #ifdef __STEAL 613 empty_count++; 614 if ( empty_count < steal_threshold ) continue; 615 #else 616 continue; 617 #endif 618 } 619 transfer( *curr_work_queue, ¤t_queue ); 620 #ifdef ACTOR_STATS 621 executor_->w_infos[id].gulps++; 622 #endif // ACTOR_STATS 623 #ifdef __STEAL 624 if ( is_empty( *current_queue ) ) { 625 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 626 empty_count++; 627 if ( empty_count < steal_threshold ) continue; 628 empty_count = 0; 629 630 CHECK_TERMINATION; // check for termination 631 632 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 633 634 #ifdef ACTOR_STATS 635 executor_->w_infos[id].try_steal++; 636 #endif // ACTOR_STATS 637 638 steal_work( this, start + prng( range ) ); 639 continue; 640 } 641 #endif // __STEAL 642 while ( ! is_empty( *current_queue ) ) { 643 #ifdef ACTOR_STATS 644 executor_->w_infos[id].processed++; 645 #endif 646 &req = &remove( *current_queue ); 647 if ( !&req ) continue; 648 deliver_request( req ); 649 } 650 #ifdef __STEAL 651 curr_work_queue->being_processed = false; // set done processing 652 empty_count = 0; // we found work so reset empty counter 653 #endif 654 655 CHECK_TERMINATION; 656 657 // potentially reclaim some of the current queue's vector space if it is unused 658 reclaim( *current_queue ); 659 } // for 632 660 } 633 661 634 662 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 635 insert( request_queues[ticket], req);663 insert( request_queues[ticket], req); 636 664 } 637 665 638 666 static inline void send( actor & this, request & req ) { 639 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );640 send( *__actor_executor_, req, this.ticket );667 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 668 send( *__actor_executor_, req, this.ticket ); 641 669 } 642 670 643 671 static inline void __reset_stats() { 644 #ifdef ACTOR_STATS645 __total_tries = 0;646 __total_stolen = 0;647 __all_gulps = 0;648 __total_failed_swaps = 0;649 __total_empty_stolen = 0;650 __all_processed = 0;651 __num_actors_stats = 0;652 __all_msgs_stolen = 0;653 #endif672 #ifdef ACTOR_STATS 673 __total_tries = 0; 674 __total_stolen = 0; 675 __all_gulps = 0; 676 __total_failed_swaps = 0; 677 __total_empty_stolen = 0; 678 __all_processed = 0; 679 __num_actors_stats = 0; 680 __all_msgs_stolen = 0; 681 #endif 654 682 } 655 683 656 684 static inline void start_actor_system( size_t num_thds ) { 657 __reset_stats();658 __actor_executor_thd = active_thread();659 __actor_executor_ = alloc();660 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };685 __reset_stats(); 686 __actor_executor_thd = active_thread(); 687 __actor_executor_ = alloc(); 688 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 661 689 } 662 690 … … 664 692 665 693 static inline void start_actor_system( executor & this ) { 666 __reset_stats();667 __actor_executor_thd = active_thread();668 __actor_executor_ = &this;669 __actor_executor_passed = true;694 __reset_stats(); 695 __actor_executor_thd = active_thread(); 696 __actor_executor_ = &this; 697 __actor_executor_passed = true; 670 698 } 671 699 672 700 static inline void stop_actor_system() { 673 park( ); // will beunparked when actor system is finished674 675 if ( !__actor_executor_passed ) delete( __actor_executor_ );676 __actor_executor_ = 0p;677 __actor_executor_thd = 0p;678 __next_ticket = 0;679 __actor_executor_passed = false;701 park(); // unparked when actor system is finished 702 703 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 704 __actor_executor_ = 0p; 705 __actor_executor_thd = 0p; 706 __next_ticket = 0; 707 __actor_executor_passed = false; 680 708 } 681 709 682 710 // Default messages to send to any actor to change status 683 711 // assigned at creation to __base_msg_finished to avoid unused message warning 684 message __base_msg_finished @= { .allocation_ : Finished }; 685 struct __delete_msg_t { inline message; } delete_msg = __base_msg_finished; 686 struct __destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 687 struct __finished_msg_t { inline message; } finished_msg = __base_msg_finished; 688 689 allocation receive( actor & this, __delete_msg_t & msg ) { return Delete; } 690 allocation receive( actor & this, __destroy_msg_t & msg ) { return Destroy; } 691 allocation receive( actor & this, __finished_msg_t & msg ) { return Finished; } 692 712 message __base_msg_finished @= { .alloc : Finished }; 713 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; 714 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 715 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 716 717 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; } 718 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 719 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; }
Note:
See TracChangeset
for help on using the changeset viewer.