Changeset 4a368547 for src/libcfa/concurrency
- Timestamp:
- May 29, 2017, 3:08:47 PM (8 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
- Children:
- 2ab67b9, a029714
- Parents:
- ff98952 (diff), 4c5b972 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- src/libcfa/concurrency
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
src/libcfa/concurrency/monitor
rff98952 r4a368547 59 59 unsigned short count; //Number of criterions in the criteria 60 60 __condition_node_t * next; //Intrusive linked list Next field 61 uintptr_t user_info; //Custom user info accessible before signalling 61 62 }; 62 63 … … 85 86 } 86 87 87 void wait( condition * this ); 88 void signal( condition * this ); 88 void wait( condition * this, uintptr_t user_info = 0 ); 89 bool signal( condition * this ); 90 bool signal_block( condition * this ); 91 static inline bool is_empty( condition * this ) { return !this->blocked.head; } 92 uintptr_t front( condition * this ); 93 89 94 #endif //MONITOR_H -
src/libcfa/concurrency/monitor.c
rff98952 r4a368547 62 62 //Some one else has the monitor, wait in line for it 63 63 append( &this->entry_queue, thrd ); 64 LIB_DEBUG_PRINT_SAFE("%p Blocking on entry\n", thrd); 64 65 ScheduleInternal( &this->lock ); 65 66 … … 97 98 unlock( &this->lock ); 98 99 100 LIB_DEBUG_PRINT_SAFE("Next owner is %p\n", new_owner); 101 99 102 //We need to wake-up the thread 100 103 ScheduleThread( new_owner ); … … 134 137 } 135 138 136 void debug_break() __attribute__(( noinline )) 137 { 138 139 void ?{}(__condition_node_t * this, thread_desc * waiting_thread, unsigned short count, uintptr_t user_info ) { 140 this->waiting_thread = waiting_thread; 141 this->count = count; 142 this->next = NULL; 143 this->user_info = user_info; 144 } 145 146 void ?{}(__condition_criterion_t * this ) { 147 this->ready = false; 148 this->target = NULL; 149 this->owner = NULL; 150 this->next = NULL; 151 } 152 153 void ?{}(__condition_criterion_t * this, monitor_desc * target, __condition_node_t * owner ) { 154 this->ready = false; 155 this->target = target; 156 this->owner = owner; 157 this->next = NULL; 139 158 } 140 159 141 160 //----------------------------------------------------------------------------- 142 161 // Internal scheduling 143 void wait( condition * this ) {162 void wait( condition * this, uintptr_t user_info = 0 ) { 144 163 LIB_DEBUG_PRINT_SAFE("Waiting\n"); 145 164 … … 149 168 assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors ); 150 169 assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count ); 170 assertf( this->monitor_count < 32u, "Excessive monitor count (%i)", this->monitor_count ); 151 171 152 172 unsigned short count = this->monitor_count; … … 156 176 LIB_DEBUG_PRINT_SAFE("count %i\n", count); 157 177 158 __condition_node_t waiter; 159 waiter.waiting_thread = this_thread(); 160 waiter.count = count; 161 waiter.next = NULL; 178 __condition_node_t waiter = { this_thread(), count, user_info }; 162 179 163 180 __condition_criterion_t criteria[count]; 164 181 for(int i = 0; i < count; i++) { 165 criteria[i].ready = false; 166 criteria[i].target = this->monitors[i]; 167 criteria[i].owner = &waiter; 168 criteria[i].next = NULL; 182 (&criteria[i]){ this->monitors[i], &waiter }; 169 183 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] ); 170 184 } … … 184 198 } 185 199 186 debug_break();187 188 200 for( int i = 0; i < count; i++) { 189 201 thread_desc * new_owner = next_thread( this->monitors[i] ); … … 191 203 } 192 204 193 debug_break();194 195 205 LIB_DEBUG_PRINT_SAFE("Will unblock: "); 196 206 for(int i = 0; i < thread_count; i++) { … … 201 211 // Everything is ready to go to sleep 202 212 ScheduleInternal( locks, count, threads, thread_count ); 203 204 213 205 214 //WE WOKE UP … … 212 221 } 213 222 214 voidsignal( condition * this ) {215 if( !this->blocked.head) {223 bool signal( condition * this ) { 224 if( is_empty( this ) ) { 216 225 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n"); 217 return ;226 return false; 218 227 } 219 228 … … 224 233 unsigned short count = this->monitor_count; 225 234 235 //Some more checking in debug 226 236 LIB_DEBUG_DO( 227 237 thread_desc * this_thrd = this_thread(); … … 237 247 ); 238 248 249 //Lock all the monitors 239 250 lock_all( this->monitors, NULL, count ); 240 251 LIB_DEBUG_PRINT_SAFE("Signalling"); 241 252 253 //Pop the head of the waiting queue 242 254 __condition_node_t * node = pop_head( &this->blocked ); 255 256 //Add the thread to the proper AS stack 243 257 for(int i = 0; i < count; i++) { 244 258 __condition_criterion_t * crit = &node->criteria[i]; … … 250 264 LIB_DEBUG_PRINT_SAFE("\n"); 251 265 266 //Release 252 267 unlock_all( this->monitors, count ); 268 269 return true; 270 } 271 272 bool signal_block( condition * this ) { 273 if( !this->blocked.head ) { 274 LIB_DEBUG_PRINT_SAFE("Nothing to signal\n"); 275 return false; 276 } 277 278 //Check that everything is as expected 279 assertf( this->monitors != NULL, "Waiting with no monitors (%p)", this->monitors ); 280 assertf( this->monitor_count != 0, "Waiting with 0 monitors (%i)", this->monitor_count ); 281 282 unsigned short count = this->monitor_count; 283 unsigned int recursions[ count ]; //Save the current recursion levels to restore them later 284 spinlock * locks [ count ]; //We need to pass-in an array of locks to ScheduleInternal 285 286 lock_all( this->monitors, locks, count ); 287 288 //create creteria 289 __condition_node_t waiter = { this_thread(), count, 0 }; 290 291 __condition_criterion_t criteria[count]; 292 for(int i = 0; i < count; i++) { 293 (&criteria[i]){ this->monitors[i], &waiter }; 294 LIB_DEBUG_PRINT_SAFE( "Criterion %p\n", &criteria[i] ); 295 push( &criteria[i].target->signal_stack, &criteria[i] ); 296 } 297 298 waiter.criteria = criteria; 299 300 //save contexts 301 save_recursion( this->monitors, recursions, count ); 302 303 //Find the thread to run 304 thread_desc * signallee = pop_head( &this->blocked )->waiting_thread; 305 for(int i = 0; i < count; i++) { 306 set_owner( this->monitors[i], signallee ); 307 } 308 309 LIB_DEBUG_PRINT_SAFE( "Waiting on signal block\n" ); 310 311 //Everything is ready to go to sleep 312 ScheduleInternal( locks, count, &signallee, 1 ); 313 314 LIB_DEBUG_PRINT_SAFE( "Back from signal block\n" ); 315 316 //We are back, restore the owners and recursions 317 lock_all( locks, count ); 318 restore_recursion( this->monitors, recursions, count ); 319 unlock_all( locks, count ); 320 321 return true; 322 } 323 324 uintptr_t front( condition * this ) { 325 LIB_DEBUG_DO( 326 if( is_empty(this) ) { 327 abortf( "Attempt to access user data on an empty condition.\n" 328 "Possible cause is not checking if the condition is empty before reading stored data." ); 329 } 330 ); 331 return this->blocked.head->user_info; 253 332 } 254 333 … … 335 414 336 415 for( int i = 0; i < count; i++ ) { 416 337 417 LIB_DEBUG_PRINT_SAFE( "Checking %p for %p\n", &criteria[i], target ); 338 418 if( &criteria[i] == target ) { -
src/libcfa/concurrency/thread
rff98952 r4a368547 82 82 83 83 void yield(); 84 void yield( unsigned times ); 84 85 85 86 #endif //THREADS_H -
src/libcfa/concurrency/thread.c
rff98952 r4a368547 87 87 } 88 88 89 void yield( unsigned times ) { 90 for( unsigned i = 0; i < times; i++ ) { 91 yield(); 92 } 93 } 94 89 95 void ThreadCtxSwitch(coroutine_desc* src, coroutine_desc* dst) { 90 96 // set state of current coroutine to inactive
Note:
See TracChangeset
for help on using the changeset viewer.