- File:
-
- 1 edited
-
libcfa/src/concurrency/actor.hfa (modified) (15 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r508671e r9235192c 39 39 // #define ACTOR_STATS 40 40 41 // used to run and only track missed queue gulps 42 #ifdef ACTOR_STATS 43 #define ACTOR_STATS_QUEUE_MISSED 44 #endif 45 41 46 // forward decls 42 47 struct actor; … … 48 53 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 54 struct request { 50 actor * receiver;51 message * msg;52 __receive_fn fn;55 actor * receiver; 56 message * msg; 57 __receive_fn fn; 53 58 }; 54 59 55 60 struct a_msg { 56 int m;61 int m; 57 62 }; 58 63 static inline void ?{}( request & this ) {} 59 64 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 this.receiver = receiver;61 this.msg = msg;62 this.fn = fn;65 this.receiver = receiver; 66 this.msg = msg; 67 this.fn = fn; 63 68 } 64 69 static inline void ?{}( request & this, request & copy ) { 65 this.receiver = copy.receiver;66 this.msg = copy.msg;67 this.fn = copy.fn;70 this.receiver = copy.receiver; 71 this.msg = copy.msg; 72 this.fn = copy.fn; 68 73 } 69 74 … … 71 76 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 77 struct copy_queue { 73 request * buffer;74 size_t count, buffer_size, index, utilized, last_size;78 request * buffer; 79 size_t count, buffer_size, index, utilized, last_size; 75 80 }; 76 81 static inline void ?{}( copy_queue & this ) {} 77 82 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 buffer_size = buf_size;79 buffer = aalloc( buffer_size );80 count = 0;81 utilized = 0;82 index = 0;83 last_size = 0;83 buffer_size = buf_size; 84 buffer = aalloc( buffer_size ); 85 count = 0; 86 utilized = 0; 87 index = 0; 88 last_size = 0; 84 89 } 85 90 static inline void ^?{}( copy_queue & this ) with(this) { 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );87 adelete(buffer);91 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 92 adelete(buffer); 88 93 } 89 94 90 95 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 if ( count >= buffer_size ) { // increase arr size92 last_size = buffer_size;93 buffer_size = 2 * buffer_size;94 buffer = realloc( buffer, sizeof( request ) * buffer_size );95 /* paranoid */ verify( buffer );96 }97 memcpy( &buffer[count], &elem, sizeof(request) );98 count++;96 if ( count >= buffer_size ) { // increase arr size 97 last_size = buffer_size; 98 buffer_size = 2 * buffer_size; 99 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 100 /* paranoid */ verify( buffer ); 101 } 102 memcpy( &buffer[count], &elem, sizeof(request) ); 103 count++; 99 104 } 100 105 … … 102 107 // it is not supported to call insert() before the array is fully empty 103 108 static inline request & remove( copy_queue & this ) with(this) { 104 if ( count > 0 ) {105 count--;106 size_t old_idx = index;107 index = count == 0 ? 0 : index + 1;108 return buffer[old_idx];109 }110 request * ret = 0p;111 return *0p;109 if ( count > 0 ) { 110 count--; 111 size_t old_idx = index; 112 index = count == 0 ? 0 : index + 1; 113 return buffer[old_idx]; 114 } 115 request * ret = 0p; 116 return *0p; 112 117 } 113 118 114 119 // try to reclaim some memory if less than half of buffer is utilized 115 120 static inline void reclaim( copy_queue & this ) with(this) { 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }117 utilized = 0;118 buffer_size--;119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory121 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 122 utilized = 0; 123 buffer_size--; 124 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 125 } 121 126 … … 123 128 124 129 struct work_queue { 125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 130 __spinlock_t mutex_lock; 131 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 132 copy_queue * c_queue; // current queue 133 volatile bool being_processed; // flag to prevent concurrent processing 134 #ifdef ACTOR_STATS 135 unsigned int id; 132 136 #endif 137 #ifdef ACTOR_STATS_QUEUE_MISSED 138 size_t missed; // transfers skipped due to being_processed flag being up 139 #endif 133 140 }; // work_queue 134 141 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 (*owned_queue){ buf_size };137 c_queue = owned_queue;138 being_processed = false;139 #ifdef ACTOR_STATS140 id = i;141 missed = 0;142 #endif142 owned_queue = alloc(); // allocated separately to avoid false sharing 143 (*owned_queue){ buf_size }; 144 c_queue = owned_queue; 145 being_processed = false; 146 #ifdef ACTOR_STATS 147 id = i; 148 missed = 0; 149 #endif 143 150 } 144 151 … … 147 154 148 155 static inline void insert( work_queue & this, request & elem ) with(this) { 149 lock( mutex_lock __cfaabi_dbg_ctx2 );150 insert( *c_queue, elem );151 unlock( mutex_lock );156 lock( mutex_lock __cfaabi_dbg_ctx2 ); 157 insert( *c_queue, elem ); 158 unlock( mutex_lock ); 152 159 } // insert 153 160 154 161 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 lock( mutex_lock __cfaabi_dbg_ctx2 );156 #ifdef __STEAL157 158 // check if queue is being processed elsewhere159 if ( unlikely( being_processed ) ) {160 #ifdef ACTOR_STATS161 missed++;162 #endif163 unlock( mutex_lock );164 return;165 }166 167 being_processed = c_queue->count != 0;168 #endif // __STEAL169 170 c_queue->utilized = c_queue->count;171 172 // swap copy queue ptrs173 copy_queue * temp = *transfer_to;174 *transfer_to = c_queue;175 c_queue = temp;176 unlock( mutex_lock );162 lock( mutex_lock __cfaabi_dbg_ctx2 ); 163 #ifdef __STEAL 164 165 // check if queue is being processed elsewhere 166 if ( unlikely( being_processed ) ) { 167 #ifdef ACTOR_STATS 168 missed++; 169 #endif 170 unlock( mutex_lock ); 171 return; 172 } 173 174 being_processed = c_queue->count != 0; 175 #endif // __STEAL 176 177 c_queue->utilized = c_queue->count; 178 179 // swap copy queue ptrs 180 copy_queue * temp = *transfer_to; 181 *transfer_to = c_queue; 182 c_queue = temp; 183 unlock( mutex_lock ); 177 184 } // transfer 178 185 179 186 // needed since some info needs to persist past worker lifetimes 180 187 struct worker_info { 181 volatile unsigned long long stamp;182 #ifdef ACTOR_STATS183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;184 unsigned long long processed;185 size_t gulps;186 #endif188 volatile unsigned long long stamp; 189 #ifdef ACTOR_STATS 190 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 191 unsigned long long processed; 192 size_t gulps; 193 #endif 187 194 }; 188 195 static inline void ?{}( worker_info & this ) { 189 #ifdef ACTOR_STATS190 this.stolen_from = 0;191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 #endif199 this.stamp = rdtscl();196 #ifdef ACTOR_STATS 197 this.stolen_from = 0; 198 this.try_steal = 0; // attempts to steal 199 this.stolen = 0; // successful steals 200 this.processed = 0; // requests processed 201 this.gulps = 0; // number of gulps 202 this.failed_swaps = 0; // steal swap failures 203 this.empty_stolen = 0; // queues empty after steal 204 this.msgs_stolen = 0; // number of messages stolen 205 #endif 206 this.stamp = rdtscl(); 200 207 } 201 208 … … 205 212 // #endif 206 213 thread worker { 207 work_queue ** request_queues;208 copy_queue * current_queue;209 executor * executor_;210 unsigned int start, range;211 int id;214 work_queue ** request_queues; 215 copy_queue * current_queue; 216 executor * executor_; 217 unsigned int start, range; 218 int id; 212 219 }; 213 220 … … 215 222 // aggregate counters for statistics 216 223 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;224 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 225 #endif 219 226 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 unsigned int start, unsigned int range, int id ) {221 ((thread &)this){ clu };222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers227 unsigned int start, unsigned int range, int id ) { 228 ((thread &)this){ clu }; 229 this.request_queues = request_queues; // array of all queues 230 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 231 this.executor_ = executor_; // pointer to current executor 232 this.start = start; // start of worker's subrange of request_queues 233 this.range = range; // size of worker's subrange of request_queues 234 this.id = id; // worker's id and index in array of workers 228 235 } 229 236 230 237 static bool no_steal = false; 231 238 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; // array of virtual processors adding parallelism for workers234 work_queue * request_queues; // master array of work request queues235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues240 bool seperate_clus; // use same or separate cluster for executor241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads239 cluster * cluster; // if workers execute on separate cluster 240 processor ** processors; // array of virtual processors adding parallelism for workers 241 work_queue * request_queues; // master array of work request queues 242 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 243 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 244 worker ** workers; // array of workers executing work requests 245 worker_info * w_infos; // array of info about each worker 246 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 247 bool seperate_clus; // use same or separate cluster for executor 248 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 249 }; // executor 243 250 … … 246 253 // #endif 247 254 static inline void ^?{}( worker & mutex this ) with(this) { 248 #ifdef ACTOR_STATS249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);256 257 // per worker steal stats (uncomment alongside the lock above this routine to print)258 // lock( out_lock __cfaabi_dbg_ctx2 );259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );260 // int count = 0;261 // int count2 = 0;262 // for ( i; range ) {263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 // }270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );272 // unlock( out_lock );273 #endif255 #ifdef ACTOR_STATS 256 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 257 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 258 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 259 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 260 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 261 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 262 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 263 264 // per worker steal stats (uncomment alongside the lock above this routine to print) 265 // lock( out_lock __cfaabi_dbg_ctx2 ); 266 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 267 // int count = 0; 268 // int count2 = 0; 269 // for ( i; range ) { 270 // if ( replaced_queue[start + i] > 0 ){ 271 // count++; 272 // // printf("%d: %u, ",i, replaced_queue[i]); 273 // } 274 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 275 // count2++; 276 // } 277 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 278 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 279 // unlock( out_lock ); 280 #endif 274 281 } 275 282 276 283 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );278 this.nprocessors = nprocessors;279 this.nworkers = nworkers;280 this.nrqueues = nrqueues;281 this.seperate_clus = seperate_clus;282 this.is_shutdown = false;283 284 if ( nworkers == nrqueues )285 no_steal = true;286 287 #ifdef ACTOR_STATS288 // stolen_arr = aalloc( nrqueues );289 // replaced_queue = aalloc( nrqueues );290 __total_workers = nworkers;291 #endif292 293 if ( seperate_clus ) {294 cluster = alloc();295 (*cluster){};296 } else cluster = active_cluster();297 298 request_queues = aalloc( nrqueues );299 worker_req_queues = aalloc( nrqueues );300 for ( i; nrqueues ) {301 request_queues[i]{ buf_size, i };302 worker_req_queues[i] = &request_queues[i];303 }304 305 processors = aalloc( nprocessors );306 for ( i; nprocessors )307 (*(processors[i] = alloc())){ *cluster };308 309 local_queues = aalloc( nworkers );310 workers = aalloc( nworkers );311 w_infos = aalloc( nworkers );312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;313 314 for ( i; nworkers ) {315 w_infos[i]{};316 local_queues[i]{ buf_size };317 }318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {320 range = reqPerWorker + ( i < extras ? 1 : 0 );321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };322 } // for284 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 285 this.nprocessors = nprocessors; 286 this.nworkers = nworkers; 287 this.nrqueues = nrqueues; 288 this.seperate_clus = seperate_clus; 289 this.is_shutdown = false; 290 291 if ( nworkers == nrqueues ) 292 no_steal = true; 293 294 #ifdef ACTOR_STATS 295 // stolen_arr = aalloc( nrqueues ); 296 // replaced_queue = aalloc( nrqueues ); 297 __total_workers = nworkers; 298 #endif 299 300 if ( seperate_clus ) { 301 cluster = alloc(); 302 (*cluster){}; 303 } else cluster = active_cluster(); 304 305 request_queues = aalloc( nrqueues ); 306 worker_req_queues = aalloc( nrqueues ); 307 for ( i; nrqueues ) { 308 request_queues[i]{ buf_size, i }; 309 worker_req_queues[i] = &request_queues[i]; 310 } 311 312 processors = aalloc( nprocessors ); 313 for ( i; nprocessors ) 314 (*(processors[i] = alloc())){ *cluster }; 315 316 local_queues = aalloc( nworkers ); 317 workers = aalloc( nworkers ); 318 w_infos = aalloc( nworkers ); 319 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 320 321 for ( i; nworkers ) { 322 w_infos[i]{}; 323 local_queues[i]{ buf_size }; 324 } 325 326 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 327 range = reqPerWorker + ( i < extras ? 1 : 0 ); 328 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 329 } // for 323 330 } 324 331 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 336 330 337 static inline void ^?{}( executor & this ) with(this) { 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 338 is_shutdown = true; 339 340 for ( i; nworkers ) 341 delete( workers[i] ); 342 343 for ( i; nprocessors ) { 344 delete( processors[i] ); 345 } // for 346 347 #ifdef ACTOR_STATS_QUEUE_MISSED 348 size_t misses = 0; 349 for ( i; nrqueues ) { 350 misses += worker_req_queues[i]->missed; 351 } 352 // adelete( stolen_arr ); 353 // adelete( replaced_queue ); 354 #endif 355 356 adelete( workers ); 357 adelete( w_infos ); 358 adelete( local_queues ); 359 adelete( request_queues ); 360 adelete( worker_req_queues ); 361 adelete( processors ); 362 if ( seperate_clus ) delete( cluster ); 363 364 #ifdef ACTOR_STATS // print formatted stats 365 printf(" Actor System Stats:\n"); 366 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 367 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 368 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 369 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 370 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 371 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 372 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 373 #endif 374 375 #ifndef ACTOR_STATS 376 #ifdef ACTOR_STATS_QUEUE_MISSED 377 printf("\t%lu", misses); 347 378 #endif 348 349 adelete( workers );350 adelete( w_infos );351 adelete( local_queues );352 adelete( request_queues );353 adelete( worker_req_queues );354 adelete( processors );355 if ( seperate_clus ) delete( cluster );356 357 #ifdef ACTOR_STATS // print formatted stats358 printf(" Actor System Stats:\n");359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);366 379 #endif 367 380 368 381 } 369 382 … … 372 385 373 386 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 #ifdef __CFA_DEBUG__375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;376 377 // reserve MAX for dead actors378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;379 return temp;380 #else381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;382 #endif387 #ifdef __CFA_DEBUG__ 388 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 389 390 // reserve MAX for dead actors 391 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 392 return temp; 393 #else 394 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 395 #endif 383 396 } // tickets 384 397 385 398 // TODO: update globals in this file to be static fields once the static fields project is done 386 399 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system388 static size_t __num_actors_ = 0; // number of actor objects in system400 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 401 static size_t __num_actors_ = 0; // number of actor objects in system 389 402 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 403 struct actor { 391 size_t ticket;// executor-queue handle392 allocation alloc;// allocation action393 inline virtual_dtor;404 size_t ticket; // executor-queue handle 405 allocation alloc; // allocation action 406 inline virtual_dtor; 394 407 }; 395 408 396 409 static inline void ?{}( actor & this ) with(this) { 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive398 // member must be called to end it399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );400 alloc = Nodelete;401 ticket = __get_next_ticket( *__actor_executor_ );402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );403 #ifdef ACTOR_STATS404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );405 #endif410 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 411 // member must be called to end it 412 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 413 alloc = Nodelete; 414 ticket = __get_next_ticket( *__actor_executor_ ); 415 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 416 #ifdef ACTOR_STATS 417 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 418 #endif 406 419 } 407 420 408 421 static inline void check_actor( actor & this ) { 409 if ( this.alloc != Nodelete ) {410 switch( this.alloc ) {411 case Delete: delete( &this ); break;412 case Destroy:413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 ^?{}(this);415 break;416 case Finished:417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 break;419 default: ;// stop warning420 }421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated423 unpark( __actor_executor_thd );424 }425 }422 if ( this.alloc != Nodelete ) { 423 switch( this.alloc ) { 424 case Delete: delete( &this ); break; 425 case Destroy: 426 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 427 ^?{}(this); 428 break; 429 case Finished: 430 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 431 break; 432 default: ; // stop warning 433 } 434 435 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 436 unpark( __actor_executor_thd ); 437 } 438 } 426 439 } 427 440 428 441 struct message { 429 allocation alloc;// allocation action430 inline virtual_dtor;442 allocation alloc; // allocation action 443 inline virtual_dtor; 431 444 }; 432 445 433 446 static inline void ?{}( message & this ) { 434 this.alloc = Nodelete;447 this.alloc = Nodelete; 435 448 } 436 449 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.alloc, &alloc, sizeof(allocation) );// optimization to elide ctor438 CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; ) 450 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 451 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 439 452 } 440 453 static inline void ^?{}( message & this ) with(this) { 441 CFA_DEBUG(454 CFA_DEBUG( 442 455 if ( alloc == Nodelete ) { 443 456 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", … … 448 461 449 462 static inline void check_message( message & this ) { 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline void set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ) 459 this.alloc = state; 463 switch ( this.alloc ) { // analyze message status 464 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 465 case Delete: delete( &this ); break; 466 case Destroy: ^?{}( this ); break; 467 case Finished: break; 468 } // switch 469 } 470 static inline allocation set_allocation( message & this, allocation state ) { 471 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 472 allocation prev = this.alloc; 473 this.alloc = state; 474 return prev; 475 } 476 static inline allocation get_allocation( message & this ) { 477 return this.alloc; 460 478 } 461 479 462 480 static inline void deliver_request( request & this ) { 463 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );464 actor * base_actor;465 message * base_msg;466 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );467 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor468 check_message( *base_msg );469 check_actor( *base_actor );481 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 482 actor * base_actor; 483 message * base_msg; 484 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 485 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 486 check_message( *base_msg ); 487 check_actor( *base_actor ); 470 488 } 471 489 … … 473 491 // returns ptr to newly owned queue if swap succeeds 474 492 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 475 work_queue * my_queue = request_queues[my_idx];476 work_queue * other_queue = request_queues[victim_idx];477 478 // if either queue is 0p then they are in the process of being stolen479 if ( other_queue == 0p ) return 0p;480 481 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false482 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )483 return 0p;484 485 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false486 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {487 /* paranoid */ verify( request_queues[my_idx] == 0p );488 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val489 return 0p;490 }491 492 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically493 request_queues[my_idx] = other_queue; // last write does not need to be atomic494 return other_queue;493 work_queue * my_queue = request_queues[my_idx]; 494 work_queue * other_queue = request_queues[victim_idx]; 495 496 // if either queue is 0p then they are in the process of being stolen 497 if ( other_queue == 0p ) return 0p; 498 499 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 500 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 501 return 0p; 502 503 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 504 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 505 /* paranoid */ verify( request_queues[my_idx] == 0p ); 506 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 507 return 0p; 508 } 509 510 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 511 request_queues[my_idx] = other_queue; // last write does not need to be atomic 512 return other_queue; 495 513 } 496 514 497 515 // once a worker to steal from has been chosen, choose queue to steal from 498 516 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 499 // have to calculate victim start and range since victim may be deleted before us in shutdown500 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;501 const unsigned int extras = executor_->nrqueues % executor_->nworkers;502 unsigned int vic_start, vic_range;503 if ( extras > victim_id ) {504 vic_range = queues_per_worker + 1;505 vic_start = vic_range * victim_id;506 } else {507 vic_start = extras + victim_id * queues_per_worker;508 vic_range = queues_per_worker;509 }510 unsigned int start_idx = prng( vic_range );511 512 unsigned int tries = 0;513 work_queue * curr_steal_queue;514 515 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {516 tries++;517 curr_steal_queue = request_queues[ i + vic_start ];518 // avoid empty queues and queues that are being operated on519 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )520 continue;521 522 #ifdef ACTOR_STATS523 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );524 if ( curr_steal_queue ) {525 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;526 executor_->w_infos[id].stolen++;527 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;528 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);529 // replaced_queue[swap_idx]++;530 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);531 } else {532 executor_->w_infos[id].failed_swaps++;533 }534 #else535 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );536 #endif // ACTOR_STATS537 538 return;539 }540 541 return;517 // have to calculate victim start and range since victim may be deleted before us in shutdown 518 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 519 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 520 unsigned int vic_start, vic_range; 521 if ( extras > victim_id ) { 522 vic_range = queues_per_worker + 1; 523 vic_start = vic_range * victim_id; 524 } else { 525 vic_start = extras + victim_id * queues_per_worker; 526 vic_range = queues_per_worker; 527 } 528 unsigned int start_idx = prng( vic_range ); 529 530 unsigned int tries = 0; 531 work_queue * curr_steal_queue; 532 533 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 534 tries++; 535 curr_steal_queue = request_queues[ i + vic_start ]; 536 // avoid empty queues and queues that are being operated on 537 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 538 continue; 539 540 #ifdef ACTOR_STATS 541 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 542 if ( curr_steal_queue ) { 543 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 544 executor_->w_infos[id].stolen++; 545 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 546 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 547 // replaced_queue[swap_idx]++; 548 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 549 } else { 550 executor_->w_infos[id].failed_swaps++; 551 } 552 #else 553 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 554 #endif // ACTOR_STATS 555 556 return; 557 } 558 559 return; 542 560 } 543 561 544 562 // choose a worker to steal from 545 563 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 546 #if RAND547 unsigned int victim = prng( executor_->nworkers );548 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;549 choose_queue( this, victim, swap_idx );550 #elif SEARCH551 unsigned long long min = MAX; // smaller timestamp means longer since service552 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math553 int n_workers = executor_->nworkers;554 unsigned long long curr_stamp;555 int scount = 1;556 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {557 curr_stamp = executor_->w_infos[i].stamp;558 if ( curr_stamp < min ) {559 min = curr_stamp;560 min_id = i;561 }562 }563 choose_queue( this, min_id, swap_idx );564 #endif564 #if RAND 565 unsigned int victim = prng( executor_->nworkers ); 566 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 567 choose_queue( this, victim, swap_idx ); 568 #elif SEARCH 569 unsigned long long min = MAX; // smaller timestamp means longer since service 570 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 571 int n_workers = executor_->nworkers; 572 unsigned long long curr_stamp; 573 int scount = 1; 574 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 575 curr_stamp = executor_->w_infos[i].stamp; 576 if ( curr_stamp < min ) { 577 min = curr_stamp; 578 min_id = i; 579 } 580 } 581 choose_queue( this, min_id, swap_idx ); 582 #endif 565 583 } 566 584 567 585 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 568 586 void main( worker & this ) with(this) { 569 // #ifdef ACTOR_STATS570 // for ( i; executor_->nrqueues ) {571 //replaced_queue[i] = 0;572 //__atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );573 // }574 // #endif575 576 // threshold of empty queues we see before we go stealing577 const unsigned int steal_threshold = 2 * range;578 579 // Store variable data here instead of worker struct to avoid any potential false sharing580 unsigned int empty_count = 0;581 request & req;582 work_queue * curr_work_queue;583 584 Exit:585 for ( unsigned int i = 0;; i = (i + 1) % range ) {// cycle through set of request buffers586 curr_work_queue = request_queues[i + start];587 // #ifdef ACTOR_STATS 588 // for ( i; executor_->nrqueues ) { 589 // replaced_queue[i] = 0; 590 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 591 // } 592 // #endif 593 594 // threshold of empty queues we see before we go stealing 595 const unsigned int steal_threshold = 2 * range; 596 597 // Store variable data here instead of worker struct to avoid any potential false sharing 598 unsigned int empty_count = 0; 599 request & req; 600 work_queue * curr_work_queue; 601 602 Exit: 603 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 604 curr_work_queue = request_queues[i + start]; 587 605 588 606 #ifndef __STEAL 589 607 CHECK_TERMINATION; 590 608 #endif 591 592 // check if queue is empty before trying to gulp it593 if ( is_empty( *curr_work_queue->c_queue ) ) {594 #ifdef __STEAL595 empty_count++;596 if ( empty_count < steal_threshold ) continue;597 #else598 continue;599 #endif600 }601 transfer( *curr_work_queue, ¤t_queue );602 #ifdef ACTOR_STATS603 executor_->w_infos[id].gulps++;604 #endif // ACTOR_STATS605 #ifdef __STEAL606 if ( is_empty( *current_queue ) ) {607 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }608 empty_count++;609 if ( empty_count < steal_threshold ) continue;610 empty_count = 0;611 612 CHECK_TERMINATION; // check for termination613 614 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );615 616 #ifdef ACTOR_STATS617 executor_->w_infos[id].try_steal++;618 #endif // ACTOR_STATS619 620 steal_work( this, start + prng( range ) );621 continue;622 }623 #endif // __STEAL624 while ( ! is_empty( *current_queue ) ) {625 #ifdef ACTOR_STATS626 executor_->w_infos[id].processed++;627 #endif628 &req = &remove( *current_queue );629 if ( !&req ) continue;630 deliver_request( req );631 }632 #ifdef __STEAL633 curr_work_queue->being_processed = false;// set done processing634 empty_count = 0; // we found work so reset empty counter635 #endif636 637 CHECK_TERMINATION;638 639 // potentially reclaim some of the current queue's vector space if it is unused640 reclaim( *current_queue );641 } // for609 610 // check if queue is empty before trying to gulp it 611 if ( is_empty( *curr_work_queue->c_queue ) ) { 612 #ifdef __STEAL 613 empty_count++; 614 if ( empty_count < steal_threshold ) continue; 615 #else 616 continue; 617 #endif 618 } 619 transfer( *curr_work_queue, ¤t_queue ); 620 #ifdef ACTOR_STATS 621 executor_->w_infos[id].gulps++; 622 #endif // ACTOR_STATS 623 #ifdef __STEAL 624 if ( is_empty( *current_queue ) ) { 625 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 626 empty_count++; 627 if ( empty_count < steal_threshold ) continue; 628 empty_count = 0; 629 630 CHECK_TERMINATION; // check for termination 631 632 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 633 634 #ifdef ACTOR_STATS 635 executor_->w_infos[id].try_steal++; 636 #endif // ACTOR_STATS 637 638 steal_work( this, start + prng( range ) ); 639 continue; 640 } 641 #endif // __STEAL 642 while ( ! is_empty( *current_queue ) ) { 643 #ifdef ACTOR_STATS 644 executor_->w_infos[id].processed++; 645 #endif 646 &req = &remove( *current_queue ); 647 if ( !&req ) continue; 648 deliver_request( req ); 649 } 650 #ifdef __STEAL 651 curr_work_queue->being_processed = false; // set done processing 652 empty_count = 0; // we found work so reset empty counter 653 #endif 654 655 CHECK_TERMINATION; 656 657 // potentially reclaim some of the current queue's vector space if it is unused 658 reclaim( *current_queue ); 659 } // for 642 660 } 643 661 644 662 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 645 insert( request_queues[ticket], req);663 insert( request_queues[ticket], req); 646 664 } 647 665 648 666 static inline void send( actor & this, request & req ) { 649 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );650 send( *__actor_executor_, req, this.ticket );667 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 668 send( *__actor_executor_, req, this.ticket ); 651 669 } 652 670 653 671 static inline void __reset_stats() { 654 #ifdef ACTOR_STATS655 __total_tries = 0;656 __total_stolen = 0;657 __all_gulps = 0;658 __total_failed_swaps = 0;659 __total_empty_stolen = 0;660 __all_processed = 0;661 __num_actors_stats = 0;662 __all_msgs_stolen = 0;663 #endif672 #ifdef ACTOR_STATS 673 __total_tries = 0; 674 __total_stolen = 0; 675 __all_gulps = 0; 676 __total_failed_swaps = 0; 677 __total_empty_stolen = 0; 678 __all_processed = 0; 679 __num_actors_stats = 0; 680 __all_msgs_stolen = 0; 681 #endif 664 682 } 665 683 666 684 static inline void start_actor_system( size_t num_thds ) { 667 __reset_stats();668 __actor_executor_thd = active_thread();669 __actor_executor_ = alloc();670 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };685 __reset_stats(); 686 __actor_executor_thd = active_thread(); 687 __actor_executor_ = alloc(); 688 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 671 689 } 672 690 … … 674 692 675 693 static inline void start_actor_system( executor & this ) { 676 __reset_stats();677 __actor_executor_thd = active_thread();678 __actor_executor_ = &this;679 __actor_executor_passed = true;694 __reset_stats(); 695 __actor_executor_thd = active_thread(); 696 __actor_executor_ = &this; 697 __actor_executor_passed = true; 680 698 } 681 699 682 700 static inline void stop_actor_system() { 683 park( );// unparked when actor system is finished684 685 if ( !__actor_executor_passed ) delete( __actor_executor_ );686 __actor_executor_ = 0p;687 __actor_executor_thd = 0p;688 __next_ticket = 0;689 __actor_executor_passed = false;701 park(); // unparked when actor system is finished 702 703 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 704 __actor_executor_ = 0p; 705 __actor_executor_thd = 0p; 706 __next_ticket = 0; 707 __actor_executor_passed = false; 690 708 } 691 709 … … 693 711 // assigned at creation to __base_msg_finished to avoid unused message warning 694 712 message __base_msg_finished @= { .alloc : Finished }; 695 struct delete_m essage_t { inline message; } delete_msg = __base_msg_finished;713 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; 696 714 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 697 715 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 698 716 699 allocation receive( actor & this, delete_m essage_t & msg ) { return Delete; }717 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; } 700 718 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 701 719 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; } 702
Note:
See TracChangeset
for help on using the changeset viewer.