Changeset 2dfdae3 for libcfa/src/concurrency
- Timestamp:
- Jun 27, 2023, 4:48:44 PM (2 years ago)
- Branches:
- master
- Children:
- 70e47fec
- Parents:
- 508671e (diff), b9c06b98 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)links above to see all the changes relative to each parent. - File:
-
- 1 edited
-
libcfa/src/concurrency/actor.hfa (modified) (14 diffs)
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/actor.hfa
r508671e r2dfdae3 48 48 typedef allocation (*__receive_fn)(actor &, message &, actor **, message **); 49 49 struct request { 50 actor * receiver;51 message * msg;52 __receive_fn fn;50 actor * receiver; 51 message * msg; 52 __receive_fn fn; 53 53 }; 54 54 55 55 struct a_msg { 56 int m;56 int m; 57 57 }; 58 58 static inline void ?{}( request & this ) {} 59 59 static inline void ?{}( request & this, actor * receiver, message * msg, __receive_fn fn ) { 60 this.receiver = receiver;61 this.msg = msg;62 this.fn = fn;60 this.receiver = receiver; 61 this.msg = msg; 62 this.fn = fn; 63 63 } 64 64 static inline void ?{}( request & this, request & copy ) { 65 this.receiver = copy.receiver;66 this.msg = copy.msg;67 this.fn = copy.fn;65 this.receiver = copy.receiver; 66 this.msg = copy.msg; 67 this.fn = copy.fn; 68 68 } 69 69 … … 71 71 // assumes gulping behaviour (once a remove occurs, removes happen until empty beforw next insert) 72 72 struct copy_queue { 73 request * buffer;74 size_t count, buffer_size, index, utilized, last_size;73 request * buffer; 74 size_t count, buffer_size, index, utilized, last_size; 75 75 }; 76 76 static inline void ?{}( copy_queue & this ) {} 77 77 static inline void ?{}( copy_queue & this, size_t buf_size ) with(this) { 78 buffer_size = buf_size;79 buffer = aalloc( buffer_size );80 count = 0;81 utilized = 0;82 index = 0;83 last_size = 0;78 buffer_size = buf_size; 79 buffer = aalloc( buffer_size ); 80 count = 0; 81 utilized = 0; 82 index = 0; 83 last_size = 0; 84 84 } 85 85 static inline void ^?{}( copy_queue & this ) with(this) { 86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" );87 adelete(buffer);86 DEBUG_ABORT( count != 0, "Actor system terminated with messages sent but not received\n" ); 87 adelete(buffer); 88 88 } 89 89 90 90 static inline void insert( copy_queue & this, request & elem ) with(this) { 91 if ( count >= buffer_size ) { // increase arr size92 last_size = buffer_size;93 buffer_size = 2 * buffer_size;94 buffer = realloc( buffer, sizeof( request ) * buffer_size );95 /* paranoid */ verify( buffer );96 }97 memcpy( &buffer[count], &elem, sizeof(request) );98 count++;91 if ( count >= buffer_size ) { // increase arr size 92 last_size = buffer_size; 93 buffer_size = 2 * buffer_size; 94 buffer = realloc( buffer, sizeof( request ) * buffer_size ); 95 /* paranoid */ verify( buffer ); 96 } 97 memcpy( &buffer[count], &elem, sizeof(request) ); 98 count++; 99 99 } 100 100 … … 102 102 // it is not supported to call insert() before the array is fully empty 103 103 static inline request & remove( copy_queue & this ) with(this) { 104 if ( count > 0 ) {105 count--;106 size_t old_idx = index;107 index = count == 0 ? 0 : index + 1;108 return buffer[old_idx];109 }110 request * ret = 0p;111 return *0p;104 if ( count > 0 ) { 105 count--; 106 size_t old_idx = index; 107 index = count == 0 ? 0 : index + 1; 108 return buffer[old_idx]; 109 } 110 request * ret = 0p; 111 return *0p; 112 112 } 113 113 114 114 // try to reclaim some memory if less than half of buffer is utilized 115 115 static inline void reclaim( copy_queue & this ) with(this) { 116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; }117 utilized = 0;118 buffer_size--;119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory116 if ( utilized >= last_size || buffer_size <= 4 ) { utilized = 0; return; } 117 utilized = 0; 118 buffer_size--; 119 buffer = realloc( buffer, sizeof( request ) * buffer_size ); // try to reclaim some memory 120 120 } 121 121 … … 123 123 124 124 struct work_queue { 125 __spinlock_t mutex_lock;126 copy_queue * owned_queue;// copy queue allocated and cleaned up by this work_queue127 copy_queue * c_queue;// current queue128 volatile bool being_processed;// flag to prevent concurrent processing129 #ifdef ACTOR_STATS130 unsigned int id;131 size_t missed;// transfers skipped due to being_processed flag being up132 #endif125 __spinlock_t mutex_lock; 126 copy_queue * owned_queue; // copy queue allocated and cleaned up by this work_queue 127 copy_queue * c_queue; // current queue 128 volatile bool being_processed; // flag to prevent concurrent processing 129 #ifdef ACTOR_STATS 130 unsigned int id; 131 size_t missed; // transfers skipped due to being_processed flag being up 132 #endif 133 133 }; // work_queue 134 134 static inline void ?{}( work_queue & this, size_t buf_size, unsigned int i ) with(this) { 135 owned_queue = alloc();// allocated separately to avoid false sharing136 (*owned_queue){ buf_size };137 c_queue = owned_queue;138 being_processed = false;139 #ifdef ACTOR_STATS140 id = i;141 missed = 0;142 #endif135 owned_queue = alloc(); // allocated separately to avoid false sharing 136 (*owned_queue){ buf_size }; 137 c_queue = owned_queue; 138 being_processed = false; 139 #ifdef ACTOR_STATS 140 id = i; 141 missed = 0; 142 #endif 143 143 } 144 144 … … 147 147 148 148 static inline void insert( work_queue & this, request & elem ) with(this) { 149 lock( mutex_lock __cfaabi_dbg_ctx2 );150 insert( *c_queue, elem );151 unlock( mutex_lock );149 lock( mutex_lock __cfaabi_dbg_ctx2 ); 150 insert( *c_queue, elem ); 151 unlock( mutex_lock ); 152 152 } // insert 153 153 154 154 static inline void transfer( work_queue & this, copy_queue ** transfer_to ) with(this) { 155 lock( mutex_lock __cfaabi_dbg_ctx2 );156 #ifdef __STEAL157 158 // check if queue is being processed elsewhere159 if ( unlikely( being_processed ) ) {160 #ifdef ACTOR_STATS161 missed++;162 #endif163 unlock( mutex_lock );164 return;165 }166 167 being_processed = c_queue->count != 0;168 #endif // __STEAL169 170 c_queue->utilized = c_queue->count;171 172 // swap copy queue ptrs173 copy_queue * temp = *transfer_to;174 *transfer_to = c_queue;175 c_queue = temp;176 unlock( mutex_lock );155 lock( mutex_lock __cfaabi_dbg_ctx2 ); 156 #ifdef __STEAL 157 158 // check if queue is being processed elsewhere 159 if ( unlikely( being_processed ) ) { 160 #ifdef ACTOR_STATS 161 missed++; 162 #endif 163 unlock( mutex_lock ); 164 return; 165 } 166 167 being_processed = c_queue->count != 0; 168 #endif // __STEAL 169 170 c_queue->utilized = c_queue->count; 171 172 // swap copy queue ptrs 173 copy_queue * temp = *transfer_to; 174 *transfer_to = c_queue; 175 c_queue = temp; 176 unlock( mutex_lock ); 177 177 } // transfer 178 178 179 179 // needed since some info needs to persist past worker lifetimes 180 180 struct worker_info { 181 volatile unsigned long long stamp;182 #ifdef ACTOR_STATS183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen;184 unsigned long long processed;185 size_t gulps;186 #endif181 volatile unsigned long long stamp; 182 #ifdef ACTOR_STATS 183 size_t stolen_from, try_steal, stolen, empty_stolen, failed_swaps, msgs_stolen; 184 unsigned long long processed; 185 size_t gulps; 186 #endif 187 187 }; 188 188 static inline void ?{}( worker_info & this ) { 189 #ifdef ACTOR_STATS190 this.stolen_from = 0;191 this.try_steal = 0;// attempts to steal192 this.stolen = 0;// successful steals193 this.processed = 0;// requests processed194 this.gulps = 0;// number of gulps195 this.failed_swaps = 0;// steal swap failures196 this.empty_stolen = 0;// queues empty after steal197 this.msgs_stolen = 0;// number of messages stolen198 #endif199 this.stamp = rdtscl();189 #ifdef ACTOR_STATS 190 this.stolen_from = 0; 191 this.try_steal = 0; // attempts to steal 192 this.stolen = 0; // successful steals 193 this.processed = 0; // requests processed 194 this.gulps = 0; // number of gulps 195 this.failed_swaps = 0; // steal swap failures 196 this.empty_stolen = 0; // queues empty after steal 197 this.msgs_stolen = 0; // number of messages stolen 198 #endif 199 this.stamp = rdtscl(); 200 200 } 201 201 … … 205 205 // #endif 206 206 thread worker { 207 work_queue ** request_queues;208 copy_queue * current_queue;209 executor * executor_;210 unsigned int start, range;211 int id;207 work_queue ** request_queues; 208 copy_queue * current_queue; 209 executor * executor_; 210 unsigned int start, range; 211 int id; 212 212 }; 213 213 … … 215 215 // aggregate counters for statistics 216 216 size_t __total_tries = 0, __total_stolen = 0, __total_workers, __all_gulps = 0, __total_empty_stolen = 0, 217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0;217 __total_failed_swaps = 0, __all_processed = 0, __num_actors_stats = 0, __all_msgs_stolen = 0; 218 218 #endif 219 219 static inline void ?{}( worker & this, cluster & clu, work_queue ** request_queues, copy_queue * current_queue, executor * executor_, 220 unsigned int start, unsigned int range, int id ) {221 ((thread &)this){ clu };222 this.request_queues = request_queues;// array of all queues223 this.current_queue = current_queue;// currently gulped queue (start with empty queue to use in swap later)224 this.executor_ = executor_;// pointer to current executor225 this.start = start;// start of worker's subrange of request_queues226 this.range = range;// size of worker's subrange of request_queues227 this.id = id;// worker's id and index in array of workers220 unsigned int start, unsigned int range, int id ) { 221 ((thread &)this){ clu }; 222 this.request_queues = request_queues; // array of all queues 223 this.current_queue = current_queue; // currently gulped queue (start with empty queue to use in swap later) 224 this.executor_ = executor_; // pointer to current executor 225 this.start = start; // start of worker's subrange of request_queues 226 this.range = range; // size of worker's subrange of request_queues 227 this.id = id; // worker's id and index in array of workers 228 228 } 229 229 230 230 static bool no_steal = false; 231 231 struct executor { 232 cluster * cluster;// if workers execute on separate cluster233 processor ** processors; // array of virtual processors adding parallelism for workers234 work_queue * request_queues; // master array of work request queues235 copy_queue * local_queues;// array of all worker local queues to avoid deletion race236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping237 worker ** workers;// array of workers executing work requests238 worker_info * w_infos;// array of info about each worker239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues240 bool seperate_clus; // use same or separate cluster for executor241 volatile bool is_shutdown;// flag to communicate shutdown to worker threads232 cluster * cluster; // if workers execute on separate cluster 233 processor ** processors; // array of virtual processors adding parallelism for workers 234 work_queue * request_queues; // master array of work request queues 235 copy_queue * local_queues; // array of all worker local queues to avoid deletion race 236 work_queue ** worker_req_queues; // secondary array of work queues to allow for swapping 237 worker ** workers; // array of workers executing work requests 238 worker_info * w_infos; // array of info about each worker 239 unsigned int nprocessors, nworkers, nrqueues; // number of processors/threads/request queues 240 bool seperate_clus; // use same or separate cluster for executor 241 volatile bool is_shutdown; // flag to communicate shutdown to worker threads 242 242 }; // executor 243 243 … … 246 246 // #endif 247 247 static inline void ^?{}( worker & mutex this ) with(this) { 248 #ifdef ACTOR_STATS249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST);250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST);251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST);252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST);253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST);254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST);255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST);256 257 // per worker steal stats (uncomment alongside the lock above this routine to print)258 // lock( out_lock __cfaabi_dbg_ctx2 );259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) );260 // int count = 0;261 // int count2 = 0;262 // for ( i; range ) {263 //if ( replaced_queue[start + i] > 0 ){264 //count++;265 //// printf("%d: %u, ",i, replaced_queue[i]);266 //}267 //if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0)268 //count2++;269 // }270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers );271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers );272 // unlock( out_lock );273 #endif248 #ifdef ACTOR_STATS 249 __atomic_add_fetch(&__all_gulps, executor_->w_infos[id].gulps,__ATOMIC_SEQ_CST); 250 __atomic_add_fetch(&__all_processed, executor_->w_infos[id].processed,__ATOMIC_SEQ_CST); 251 __atomic_add_fetch(&__all_msgs_stolen, executor_->w_infos[id].msgs_stolen,__ATOMIC_SEQ_CST); 252 __atomic_add_fetch(&__total_tries, executor_->w_infos[id].try_steal, __ATOMIC_SEQ_CST); 253 __atomic_add_fetch(&__total_stolen, executor_->w_infos[id].stolen, __ATOMIC_SEQ_CST); 254 __atomic_add_fetch(&__total_failed_swaps, executor_->w_infos[id].failed_swaps, __ATOMIC_SEQ_CST); 255 __atomic_add_fetch(&__total_empty_stolen, executor_->w_infos[id].empty_stolen, __ATOMIC_SEQ_CST); 256 257 // per worker steal stats (uncomment alongside the lock above this routine to print) 258 // lock( out_lock __cfaabi_dbg_ctx2 ); 259 // printf("Worker id: %d, processed: %llu messages, attempted %lu, stole: %lu, stolen from: %lu\n", id, processed, try_steal, stolen, __atomic_add_fetch(&executor_->w_infos[id].stolen_from, 0, __ATOMIC_SEQ_CST) ); 260 // int count = 0; 261 // int count2 = 0; 262 // for ( i; range ) { 263 // if ( replaced_queue[start + i] > 0 ){ 264 // count++; 265 // // printf("%d: %u, ",i, replaced_queue[i]); 266 // } 267 // if (__atomic_add_fetch(&stolen_arr[start + i],0,__ATOMIC_SEQ_CST) > 0) 268 // count2++; 269 // } 270 // printf("swapped with: %d of %u indices\n", count, executor_->nrqueues / executor_->nworkers ); 271 // printf("%d of %u indices were stolen\n", count2, executor_->nrqueues / executor_->nworkers ); 272 // unlock( out_lock ); 273 #endif 274 274 } 275 275 276 276 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus, size_t buf_size ) with(this) { 277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" );278 this.nprocessors = nprocessors;279 this.nworkers = nworkers;280 this.nrqueues = nrqueues;281 this.seperate_clus = seperate_clus;282 this.is_shutdown = false;283 284 if ( nworkers == nrqueues )285 no_steal = true;286 287 #ifdef ACTOR_STATS288 // stolen_arr = aalloc( nrqueues );289 // replaced_queue = aalloc( nrqueues );290 __total_workers = nworkers;291 #endif292 293 if ( seperate_clus ) {294 cluster = alloc();295 (*cluster){};296 } else cluster = active_cluster();297 298 request_queues = aalloc( nrqueues );299 worker_req_queues = aalloc( nrqueues );300 for ( i; nrqueues ) {301 request_queues[i]{ buf_size, i };302 worker_req_queues[i] = &request_queues[i];303 }304 305 processors = aalloc( nprocessors );306 for ( i; nprocessors )307 (*(processors[i] = alloc())){ *cluster };308 309 local_queues = aalloc( nworkers );310 workers = aalloc( nworkers );311 w_infos = aalloc( nworkers );312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers;313 314 for ( i; nworkers ) {315 w_infos[i]{};316 local_queues[i]{ buf_size };317 }318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) {320 range = reqPerWorker + ( i < extras ? 1 : 0 );321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i };322 } // for277 if ( nrqueues < nworkers ) abort( "nrqueues needs to be >= nworkers\n" ); 278 this.nprocessors = nprocessors; 279 this.nworkers = nworkers; 280 this.nrqueues = nrqueues; 281 this.seperate_clus = seperate_clus; 282 this.is_shutdown = false; 283 284 if ( nworkers == nrqueues ) 285 no_steal = true; 286 287 #ifdef ACTOR_STATS 288 // stolen_arr = aalloc( nrqueues ); 289 // replaced_queue = aalloc( nrqueues ); 290 __total_workers = nworkers; 291 #endif 292 293 if ( seperate_clus ) { 294 cluster = alloc(); 295 (*cluster){}; 296 } else cluster = active_cluster(); 297 298 request_queues = aalloc( nrqueues ); 299 worker_req_queues = aalloc( nrqueues ); 300 for ( i; nrqueues ) { 301 request_queues[i]{ buf_size, i }; 302 worker_req_queues[i] = &request_queues[i]; 303 } 304 305 processors = aalloc( nprocessors ); 306 for ( i; nprocessors ) 307 (*(processors[i] = alloc())){ *cluster }; 308 309 local_queues = aalloc( nworkers ); 310 workers = aalloc( nworkers ); 311 w_infos = aalloc( nworkers ); 312 unsigned int reqPerWorker = nrqueues / nworkers, extras = nrqueues % nworkers; 313 314 for ( i; nworkers ) { 315 w_infos[i]{}; 316 local_queues[i]{ buf_size }; 317 } 318 319 for ( unsigned int i = 0, start = 0, range; i < nworkers; i += 1, start += range ) { 320 range = reqPerWorker + ( i < extras ? 1 : 0 ); 321 (*(workers[i] = alloc())){ *cluster, worker_req_queues, &local_queues[i], &this, start, range, i }; 322 } // for 323 323 } 324 324 static inline void ?{}( executor & this, unsigned int nprocessors, unsigned int nworkers, unsigned int nrqueues, bool seperate_clus ) { this{ nprocessors, nworkers, nrqueues, seperate_clus, __DEFAULT_EXECUTOR_BUFSIZE__ }; } … … 329 329 330 330 static inline void ^?{}( executor & this ) with(this) { 331 is_shutdown = true;332 333 for ( i; nworkers )334 delete( workers[i] );335 336 for ( i; nprocessors ) {337 delete( processors[i] );338 } // for339 340 #ifdef ACTOR_STATS341 size_t misses = 0;342 for ( i; nrqueues ) {343 misses += worker_req_queues[i]->missed;344 }345 // adelete( stolen_arr );346 // adelete( replaced_queue );347 #endif348 349 adelete( workers );350 adelete( w_infos );351 adelete( local_queues );352 adelete( request_queues );353 adelete( worker_req_queues );354 adelete( processors );355 if ( seperate_clus ) delete( cluster );356 357 #ifdef ACTOR_STATS // print formatted stats358 printf("Actor System Stats:\n");359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed);360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps;361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses);362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n",363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen);364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen;365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal);366 #endif367 331 is_shutdown = true; 332 333 for ( i; nworkers ) 334 delete( workers[i] ); 335 336 for ( i; nprocessors ) { 337 delete( processors[i] ); 338 } // for 339 340 #ifdef ACTOR_STATS 341 size_t misses = 0; 342 for ( i; nrqueues ) { 343 misses += worker_req_queues[i]->missed; 344 } 345 // adelete( stolen_arr ); 346 // adelete( replaced_queue ); 347 #endif 348 349 adelete( workers ); 350 adelete( w_infos ); 351 adelete( local_queues ); 352 adelete( request_queues ); 353 adelete( worker_req_queues ); 354 adelete( processors ); 355 if ( seperate_clus ) delete( cluster ); 356 357 #ifdef ACTOR_STATS // print formatted stats 358 printf(" Actor System Stats:\n"); 359 printf("\tActors Created:\t\t\t\t%lu\n\tMessages Sent:\t\t\t\t%lu\n", __num_actors_stats, __all_processed); 360 size_t avg_gulps = __all_gulps == 0 ? 0 : __all_processed / __all_gulps; 361 printf("\tGulps:\t\t\t\t\t%lu\n\tAverage Gulp Size:\t\t\t%lu\n\tMissed gulps:\t\t\t\t%lu\n", __all_gulps, avg_gulps, misses); 362 printf("\tSteal attempts:\t\t\t\t%lu\n\tSteals:\t\t\t\t\t%lu\n\tSteal failures (no candidates):\t\t%lu\n\tSteal failures (failed swaps):\t\t%lu\t Empty steals:\t\t%lu\n", 363 __total_tries, __total_stolen, __total_tries - __total_stolen - __total_failed_swaps, __total_failed_swaps, __total_empty_stolen); 364 size_t avg_steal = __total_stolen == 0 ? 0 : __all_msgs_stolen / __total_stolen; 365 printf("\tMessages stolen:\t\t\t%lu\n\tAverage steal size:\t\t\t%lu\n", __all_msgs_stolen, avg_steal); 366 #endif 367 368 368 } 369 369 … … 372 372 373 373 static inline size_t __get_next_ticket( executor & this ) with(this) { 374 #ifdef __CFA_DEBUG__375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;376 377 // reserve MAX for dead actors378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues;379 return temp;380 #else381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues;382 #endif374 #ifdef __CFA_DEBUG__ 375 size_t temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 376 377 // reserve MAX for dead actors 378 if ( unlikely( temp == MAX ) ) temp = __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_SEQ_CST) % nrqueues; 379 return temp; 380 #else 381 return __atomic_fetch_add( &__next_ticket, 1, __ATOMIC_RELAXED) % nrqueues; 382 #endif 383 383 } // tickets 384 384 385 385 // TODO: update globals in this file to be static fields once the static fields project is done 386 386 static executor * __actor_executor_ = 0p; 387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system388 static size_t __num_actors_ = 0; // number of actor objects in system387 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system 388 static size_t __num_actors_ = 0; // number of actor objects in system 389 389 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish 390 390 struct actor { 391 size_t ticket;// executor-queue handle392 allocation alloc;// allocation action393 inline virtual_dtor;391 size_t ticket; // executor-queue handle 392 allocation alloc; // allocation action 393 inline virtual_dtor; 394 394 }; 395 395 396 396 static inline void ?{}( actor & this ) with(this) { 397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive398 // member must be called to end it399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );400 alloc = Nodelete;401 ticket = __get_next_ticket( *__actor_executor_ );402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED );403 #ifdef ACTOR_STATS404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST );405 #endif397 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 398 // member must be called to end it 399 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" ); 400 alloc = Nodelete; 401 ticket = __get_next_ticket( *__actor_executor_ ); 402 __atomic_fetch_add( &__num_actors_, 1, __ATOMIC_RELAXED ); 403 #ifdef ACTOR_STATS 404 __atomic_fetch_add( &__num_actors_stats, 1, __ATOMIC_SEQ_CST ); 405 #endif 406 406 } 407 407 408 408 static inline void check_actor( actor & this ) { 409 if ( this.alloc != Nodelete ) {410 switch( this.alloc ) {411 case Delete: delete( &this ); break;412 case Destroy:413 CFA_DEBUG( this.ticket = MAX; );// mark as terminated414 ^?{}(this);415 break;416 case Finished:417 CFA_DEBUG( this.ticket = MAX; );// mark as terminated418 break;419 default: ;// stop warning420 }421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated423 unpark( __actor_executor_thd );424 }425 }409 if ( this.alloc != Nodelete ) { 410 switch( this.alloc ) { 411 case Delete: delete( &this ); break; 412 case Destroy: 413 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 414 ^?{}(this); 415 break; 416 case Finished: 417 CFA_DEBUG( this.ticket = MAX; ); // mark as terminated 418 break; 419 default: ; // stop warning 420 } 421 422 if ( unlikely( __atomic_add_fetch( &__num_actors_, -1, __ATOMIC_RELAXED ) == 0 ) ) { // all actors have terminated 423 unpark( __actor_executor_thd ); 424 } 425 } 426 426 } 427 427 428 428 struct message { 429 allocation alloc;// allocation action430 inline virtual_dtor;429 allocation alloc; // allocation action 430 inline virtual_dtor; 431 431 }; 432 432 433 433 static inline void ?{}( message & this ) { 434 this.alloc = Nodelete;434 this.alloc = Nodelete; 435 435 } 436 436 static inline void ?{}( message & this, allocation alloc ) { 437 memcpy( &this.alloc, &alloc, sizeof(allocation) );// optimization to elide ctor438 CFA_DEBUG( if( this.alloc == Finished ) this.alloc = Nodelete; ) 437 memcpy( &this.alloc, &alloc, sizeof(allocation) ); // optimization to elide ctor 438 CFA_DEBUG( if ( this.alloc == Finished ) this.alloc = Nodelete; ); 439 439 } 440 440 static inline void ^?{}( message & this ) with(this) { 441 CFA_DEBUG(441 CFA_DEBUG( 442 442 if ( alloc == Nodelete ) { 443 443 printf( "CFA warning (UNIX pid:%ld) : program terminating with message %p allocated but never sent.\n", … … 448 448 449 449 static inline void check_message( message & this ) { 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline void set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ) 459 this.alloc = state; 450 switch ( this.alloc ) { // analyze message status 451 case Nodelete: CFA_DEBUG( this.alloc = Finished ); break; 452 case Delete: delete( &this ); break; 453 case Destroy: ^?{}( this ); break; 454 case Finished: break; 455 } // switch 456 } 457 static inline allocation set_allocation( message & this, allocation state ) { 458 CFA_DEBUG( if ( state == Nodelete ) state = Finished; ); 459 allocation prev = this.alloc; 460 this.alloc = state; 461 return prev; 462 } 463 static inline allocation get_allocation( message & this ) { 464 return this.alloc; 460 465 } 461 466 462 467 static inline void deliver_request( request & this ) { 463 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );464 actor * base_actor;465 message * base_msg;466 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg );467 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor468 check_message( *base_msg );469 check_actor( *base_actor );468 DEBUG_ABORT( this.receiver->ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 469 actor * base_actor; 470 message * base_msg; 471 allocation temp = this.fn( *this.receiver, *this.msg, &base_actor, &base_msg ); 472 memcpy( &base_actor->alloc, &temp, sizeof(allocation) ); // optimization to elide ctor 473 check_message( *base_msg ); 474 check_actor( *base_actor ); 470 475 } 471 476 … … 473 478 // returns ptr to newly owned queue if swap succeeds 474 479 static inline work_queue * try_swap_queues( worker & this, unsigned int victim_idx, unsigned int my_idx ) with(this) { 475 work_queue * my_queue = request_queues[my_idx];476 work_queue * other_queue = request_queues[victim_idx];477 478 // if either queue is 0p then they are in the process of being stolen479 if ( other_queue == 0p ) return 0p;480 481 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false482 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) )483 return 0p;484 485 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false486 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) {487 /* paranoid */ verify( request_queues[my_idx] == 0p );488 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val489 return 0p;490 }491 492 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically493 request_queues[my_idx] = other_queue; // last write does not need to be atomic494 return other_queue;480 work_queue * my_queue = request_queues[my_idx]; 481 work_queue * other_queue = request_queues[victim_idx]; 482 483 // if either queue is 0p then they are in the process of being stolen 484 if ( other_queue == 0p ) return 0p; 485 486 // try to set our queue ptr to be 0p. If it fails someone moved our queue so return false 487 if ( !__atomic_compare_exchange_n( &request_queues[my_idx], &my_queue, 0p, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) 488 return 0p; 489 490 // try to set other queue ptr to be our queue ptr. If it fails someone moved the other queue so fix up then return false 491 if ( !__atomic_compare_exchange_n( &request_queues[victim_idx], &other_queue, my_queue, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST ) ) { 492 /* paranoid */ verify( request_queues[my_idx] == 0p ); 493 request_queues[my_idx] = my_queue; // reset my queue ptr back to appropriate val 494 return 0p; 495 } 496 497 // we have successfully swapped and since our queue is 0p no one will touch it so write back new queue ptr non atomically 498 request_queues[my_idx] = other_queue; // last write does not need to be atomic 499 return other_queue; 495 500 } 496 501 497 502 // once a worker to steal from has been chosen, choose queue to steal from 498 503 static inline void choose_queue( worker & this, unsigned int victim_id, unsigned int swap_idx ) with(this) { 499 // have to calculate victim start and range since victim may be deleted before us in shutdown500 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers;501 const unsigned int extras = executor_->nrqueues % executor_->nworkers;502 unsigned int vic_start, vic_range;503 if ( extras > victim_id ) {504 vic_range = queues_per_worker + 1;505 vic_start = vic_range * victim_id;506 } else {507 vic_start = extras + victim_id * queues_per_worker;508 vic_range = queues_per_worker;509 }510 unsigned int start_idx = prng( vic_range );511 512 unsigned int tries = 0;513 work_queue * curr_steal_queue;514 515 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) {516 tries++;517 curr_steal_queue = request_queues[ i + vic_start ];518 // avoid empty queues and queues that are being operated on519 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) )520 continue;521 522 #ifdef ACTOR_STATS523 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );524 if ( curr_steal_queue ) {525 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count;526 executor_->w_infos[id].stolen++;527 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++;528 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED);529 // replaced_queue[swap_idx]++;530 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED);531 } else {532 executor_->w_infos[id].failed_swaps++;533 }534 #else535 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx );536 #endif // ACTOR_STATS537 538 return;539 }540 541 return;504 // have to calculate victim start and range since victim may be deleted before us in shutdown 505 const unsigned int queues_per_worker = executor_->nrqueues / executor_->nworkers; 506 const unsigned int extras = executor_->nrqueues % executor_->nworkers; 507 unsigned int vic_start, vic_range; 508 if ( extras > victim_id ) { 509 vic_range = queues_per_worker + 1; 510 vic_start = vic_range * victim_id; 511 } else { 512 vic_start = extras + victim_id * queues_per_worker; 513 vic_range = queues_per_worker; 514 } 515 unsigned int start_idx = prng( vic_range ); 516 517 unsigned int tries = 0; 518 work_queue * curr_steal_queue; 519 520 for ( unsigned int i = start_idx; tries < vic_range; i = (i + 1) % vic_range ) { 521 tries++; 522 curr_steal_queue = request_queues[ i + vic_start ]; 523 // avoid empty queues and queues that are being operated on 524 if ( curr_steal_queue == 0p || curr_steal_queue->being_processed || is_empty( *curr_steal_queue->c_queue ) ) 525 continue; 526 527 #ifdef ACTOR_STATS 528 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 529 if ( curr_steal_queue ) { 530 executor_->w_infos[id].msgs_stolen += curr_steal_queue->c_queue->count; 531 executor_->w_infos[id].stolen++; 532 if ( is_empty( *curr_steal_queue->c_queue ) ) executor_->w_infos[id].empty_stolen++; 533 // __atomic_add_fetch(&executor_->w_infos[victim_id].stolen_from, 1, __ATOMIC_RELAXED); 534 // replaced_queue[swap_idx]++; 535 // __atomic_add_fetch(&stolen_arr[ i + vic_start ], 1, __ATOMIC_RELAXED); 536 } else { 537 executor_->w_infos[id].failed_swaps++; 538 } 539 #else 540 curr_steal_queue = try_swap_queues( this, i + vic_start, swap_idx ); 541 #endif // ACTOR_STATS 542 543 return; 544 } 545 546 return; 542 547 } 543 548 544 549 // choose a worker to steal from 545 550 static inline void steal_work( worker & this, unsigned int swap_idx ) with(this) { 546 #if RAND547 unsigned int victim = prng( executor_->nworkers );548 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers;549 choose_queue( this, victim, swap_idx );550 #elif SEARCH551 unsigned long long min = MAX; // smaller timestamp means longer since service552 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math553 int n_workers = executor_->nworkers;554 unsigned long long curr_stamp;555 int scount = 1;556 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) {557 curr_stamp = executor_->w_infos[i].stamp;558 if ( curr_stamp < min ) {559 min = curr_stamp;560 min_id = i;561 }562 }563 choose_queue( this, min_id, swap_idx );564 #endif551 #if RAND 552 unsigned int victim = prng( executor_->nworkers ); 553 if ( victim == id ) victim = ( victim + 1 ) % executor_->nworkers; 554 choose_queue( this, victim, swap_idx ); 555 #elif SEARCH 556 unsigned long long min = MAX; // smaller timestamp means longer since service 557 int min_id = 0; // use ints not uints to avoid integer underflow without hacky math 558 int n_workers = executor_->nworkers; 559 unsigned long long curr_stamp; 560 int scount = 1; 561 for ( int i = (id + 1) % n_workers; scount < n_workers; i = (i + 1) % n_workers, scount++ ) { 562 curr_stamp = executor_->w_infos[i].stamp; 563 if ( curr_stamp < min ) { 564 min = curr_stamp; 565 min_id = i; 566 } 567 } 568 choose_queue( this, min_id, swap_idx ); 569 #endif 565 570 } 566 571 567 572 #define CHECK_TERMINATION if ( unlikely( executor_->is_shutdown ) ) break Exit 568 573 void main( worker & this ) with(this) { 569 // #ifdef ACTOR_STATS570 // for ( i; executor_->nrqueues ) {571 //replaced_queue[i] = 0;572 //__atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST );573 // }574 // #endif575 576 // threshold of empty queues we see before we go stealing577 const unsigned int steal_threshold = 2 * range;578 579 // Store variable data here instead of worker struct to avoid any potential false sharing580 unsigned int empty_count = 0;581 request & req;582 work_queue * curr_work_queue;583 584 Exit:585 for ( unsigned int i = 0;; i = (i + 1) % range ) {// cycle through set of request buffers586 curr_work_queue = request_queues[i + start];574 // #ifdef ACTOR_STATS 575 // for ( i; executor_->nrqueues ) { 576 // replaced_queue[i] = 0; 577 // __atomic_store_n( &stolen_arr[i], 0, __ATOMIC_SEQ_CST ); 578 // } 579 // #endif 580 581 // threshold of empty queues we see before we go stealing 582 const unsigned int steal_threshold = 2 * range; 583 584 // Store variable data here instead of worker struct to avoid any potential false sharing 585 unsigned int empty_count = 0; 586 request & req; 587 work_queue * curr_work_queue; 588 589 Exit: 590 for ( unsigned int i = 0;; i = (i + 1) % range ) { // cycle through set of request buffers 591 curr_work_queue = request_queues[i + start]; 587 592 588 593 #ifndef __STEAL 589 594 CHECK_TERMINATION; 590 595 #endif 591 592 // check if queue is empty before trying to gulp it593 if ( is_empty( *curr_work_queue->c_queue ) ) {594 #ifdef __STEAL595 empty_count++;596 if ( empty_count < steal_threshold ) continue;597 #else598 continue;599 #endif600 }601 transfer( *curr_work_queue, ¤t_queue );602 #ifdef ACTOR_STATS603 executor_->w_infos[id].gulps++;604 #endif // ACTOR_STATS605 #ifdef __STEAL606 if ( is_empty( *current_queue ) ) {607 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; }608 empty_count++;609 if ( empty_count < steal_threshold ) continue;610 empty_count = 0;611 612 CHECK_TERMINATION; // check for termination613 614 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED );615 616 #ifdef ACTOR_STATS617 executor_->w_infos[id].try_steal++;618 #endif // ACTOR_STATS619 620 steal_work( this, start + prng( range ) );621 continue;622 }623 #endif // __STEAL624 while ( ! is_empty( *current_queue ) ) {625 #ifdef ACTOR_STATS626 executor_->w_infos[id].processed++;627 #endif628 &req = &remove( *current_queue );629 if ( !&req ) continue;630 deliver_request( req );631 }632 #ifdef __STEAL633 curr_work_queue->being_processed = false;// set done processing634 empty_count = 0; // we found work so reset empty counter635 #endif636 637 CHECK_TERMINATION;638 639 // potentially reclaim some of the current queue's vector space if it is unused640 reclaim( *current_queue );641 } // for596 597 // check if queue is empty before trying to gulp it 598 if ( is_empty( *curr_work_queue->c_queue ) ) { 599 #ifdef __STEAL 600 empty_count++; 601 if ( empty_count < steal_threshold ) continue; 602 #else 603 continue; 604 #endif 605 } 606 transfer( *curr_work_queue, ¤t_queue ); 607 #ifdef ACTOR_STATS 608 executor_->w_infos[id].gulps++; 609 #endif // ACTOR_STATS 610 #ifdef __STEAL 611 if ( is_empty( *current_queue ) ) { 612 if ( unlikely( no_steal ) ) { CHECK_TERMINATION; continue; } 613 empty_count++; 614 if ( empty_count < steal_threshold ) continue; 615 empty_count = 0; 616 617 CHECK_TERMINATION; // check for termination 618 619 __atomic_store_n( &executor_->w_infos[id].stamp, rdtscl(), __ATOMIC_RELAXED ); 620 621 #ifdef ACTOR_STATS 622 executor_->w_infos[id].try_steal++; 623 #endif // ACTOR_STATS 624 625 steal_work( this, start + prng( range ) ); 626 continue; 627 } 628 #endif // __STEAL 629 while ( ! is_empty( *current_queue ) ) { 630 #ifdef ACTOR_STATS 631 executor_->w_infos[id].processed++; 632 #endif 633 &req = &remove( *current_queue ); 634 if ( !&req ) continue; 635 deliver_request( req ); 636 } 637 #ifdef __STEAL 638 curr_work_queue->being_processed = false; // set done processing 639 empty_count = 0; // we found work so reset empty counter 640 #endif 641 642 CHECK_TERMINATION; 643 644 // potentially reclaim some of the current queue's vector space if it is unused 645 reclaim( *current_queue ); 646 } // for 642 647 } 643 648 644 649 static inline void send( executor & this, request & req, unsigned long int ticket ) with(this) { 645 insert( request_queues[ticket], req);650 insert( request_queues[ticket], req); 646 651 } 647 652 648 653 static inline void send( actor & this, request & req ) { 649 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" );650 send( *__actor_executor_, req, this.ticket );654 DEBUG_ABORT( this.ticket == (unsigned long int)MAX, "Attempted to send message to deleted/dead actor\n" ); 655 send( *__actor_executor_, req, this.ticket ); 651 656 } 652 657 653 658 static inline void __reset_stats() { 654 #ifdef ACTOR_STATS655 __total_tries = 0;656 __total_stolen = 0;657 __all_gulps = 0;658 __total_failed_swaps = 0;659 __total_empty_stolen = 0;660 __all_processed = 0;661 __num_actors_stats = 0;662 __all_msgs_stolen = 0;663 #endif659 #ifdef ACTOR_STATS 660 __total_tries = 0; 661 __total_stolen = 0; 662 __all_gulps = 0; 663 __total_failed_swaps = 0; 664 __total_empty_stolen = 0; 665 __all_processed = 0; 666 __num_actors_stats = 0; 667 __all_msgs_stolen = 0; 668 #endif 664 669 } 665 670 666 671 static inline void start_actor_system( size_t num_thds ) { 667 __reset_stats();668 __actor_executor_thd = active_thread();669 __actor_executor_ = alloc();670 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 };672 __reset_stats(); 673 __actor_executor_thd = active_thread(); 674 __actor_executor_ = alloc(); 675 (*__actor_executor_){ 0, num_thds, num_thds == 1 ? 1 : num_thds * 16 }; 671 676 } 672 677 … … 674 679 675 680 static inline void start_actor_system( executor & this ) { 676 __reset_stats();677 __actor_executor_thd = active_thread();678 __actor_executor_ = &this;679 __actor_executor_passed = true;681 __reset_stats(); 682 __actor_executor_thd = active_thread(); 683 __actor_executor_ = &this; 684 __actor_executor_passed = true; 680 685 } 681 686 682 687 static inline void stop_actor_system() { 683 park( );// unparked when actor system is finished684 685 if ( !__actor_executor_passed ) delete( __actor_executor_ );686 __actor_executor_ = 0p;687 __actor_executor_thd = 0p;688 __next_ticket = 0;689 __actor_executor_passed = false;688 park(); // unparked when actor system is finished 689 690 if ( !__actor_executor_passed ) delete( __actor_executor_ ); 691 __actor_executor_ = 0p; 692 __actor_executor_thd = 0p; 693 __next_ticket = 0; 694 __actor_executor_passed = false; 690 695 } 691 696 … … 693 698 // assigned at creation to __base_msg_finished to avoid unused message warning 694 699 message __base_msg_finished @= { .alloc : Finished }; 695 struct delete_m essage_t { inline message; } delete_msg = __base_msg_finished;700 struct delete_msg_t { inline message; } delete_msg = __base_msg_finished; 696 701 struct destroy_msg_t { inline message; } destroy_msg = __base_msg_finished; 697 702 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 698 703 699 allocation receive( actor & this, delete_m essage_t & msg ) { return Delete; }704 allocation receive( actor & this, delete_msg_t & msg ) { return Delete; } 700 705 allocation receive( actor & this, destroy_msg_t & msg ) { return Destroy; } 701 706 allocation receive( actor & this, finished_msg_t & msg ) { return Finished; } 702
Note:
See TracChangeset
for help on using the changeset viewer.