Changeset bd98b58 for src/libcfa
- Timestamp:
- Jan 20, 2017, 4:50:15 PM (8 years ago)
- Branches:
- ADT, aaron-thesis, arm-eh, ast-experimental, cleanup-dtors, deferred_resn, demangler, enum, forall-pointer-decay, jacob/cs343-translation, jenkins-sandbox, master, new-ast, new-ast-unique-expr, new-env, no_list, persistent-indexer, pthread-emulation, qualifiedEnum, resolv-new, with_gc
- Children:
- 207c7e1d
- Parents:
- dcb42b8
- Location:
- src/libcfa/concurrency
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/libcfa/concurrency/coroutines ¶
rdcb42b8 rbd98b58 62 62 63 63 // Get current coroutine 64 extern coroutine * current_coroutine; //PRIVATE, never use directly 65 static inline coroutine * this_coroutine(void) { 66 return current_coroutine; 67 } 64 coroutine * this_coroutine(void); 68 65 69 66 // Private wrappers for context switch and stack creation -
TabularUnified src/libcfa/concurrency/coroutines.c ¶
rdcb42b8 rbd98b58 14 14 // 15 15 16 #include "coroutines" 17 16 18 extern "C" { 17 19 #include <stddef.h> … … 23 25 } 24 26 25 #include " coroutines"27 #include "kernel" 26 28 #include "libhdr.h" 27 29 28 30 #define __CFA_INVOKE_PRIVATE__ 29 31 #include "invoke.h" 32 33 /*thread_local*/ extern processor * this_processor; 30 34 31 35 //----------------------------------------------------------------------------- … … 35 39 #define MinStackSize 1000 36 40 static size_t pageSize = 0; // architecture pagesize HACK, should go in proper runtime singleton 37 38 //Current coroutine39 //Will need to be in TLS when multi-threading is added40 coroutine* current_coroutine;41 41 42 42 //----------------------------------------------------------------------------- … … 110 110 111 111 // set new coroutine that task is executing 112 current_coroutine = dst;112 this_processor->current_coroutine = dst; 113 113 114 114 // context switch to specified coroutine -
TabularUnified src/libcfa/concurrency/invoke.c ¶
rdcb42b8 rbd98b58 14 14 15 15 extern void __suspend_no_inline__F___1(void); 16 extern void __s cheduler_remove__F_P9sthread_h__1(struct thread_h*);16 extern void __signal_termination__F_P9sthread_h__1(struct thread_h*); 17 17 18 18 void CtxInvokeCoroutine( … … 57 57 main( this ); 58 58 59 cor->state = Halt; 60 cor->notHalted = false; 61 __scheduler_remove__F_P9sthread_h__1(thrd); 59 __signal_termination__F_P9sthread_h__1(thrd); 62 60 63 61 //Final suspend, should never return -
TabularUnified src/libcfa/concurrency/invoke.h ¶
rdcb42b8 rbd98b58 11 11 12 12 #define unlikely(x) __builtin_expect(!!(x), 0) 13 #define SCHEDULER_CAPACITY 10 14 15 struct simple_thread_list { 16 struct thread_h * head; 17 struct thread_h ** tail; 18 }; 19 20 #ifdef __CFORALL__ 21 extern "Cforall" { 22 void ?{}( struct simple_thread_list * ); 23 void append( struct simple_thread_list *, struct thread_h * ); 24 struct thread_h * pop_head( struct simple_thread_list * ); 25 } 26 #endif 13 27 14 28 struct coStack_t { … … 35 49 }; 36 50 51 struct simple_lock { 52 struct simple_thread_list blocked; 53 }; 54 37 55 struct thread_h { 38 56 struct coroutine c; 57 struct simple_lock lock; 58 struct thread_h * next; 39 59 }; 40 60 -
TabularUnified src/libcfa/concurrency/kernel ¶
rdcb42b8 rbd98b58 20 20 #include <stdbool.h> 21 21 22 #include "invoke.h" 23 24 //----------------------------------------------------------------------------- 25 // Cluster 26 struct cluster { 27 simple_thread_list ready_queue; 28 }; 29 30 void ?{}(cluster * this); 31 void ^?{}(cluster * this); 32 33 //----------------------------------------------------------------------------- 34 // Processor 22 35 struct processor { 23 36 struct processorCtx_t * ctx; 24 unsigned int thread_index;25 unsigned int thread_count;26 struct thread_h * threads[10];37 cluster * cltr; 38 coroutine * current_coroutine; 39 thread_h * current_thread; 27 40 bool terminated; 28 41 }; 29 42 30 void ?{}(processor * this );43 void ?{}(processor * this, cluster * cltr); 31 44 void ^?{}(processor * this); 32 45 33 void scheduler_add( struct thread_h * thrd ); 34 void scheduler_remove( struct thread_h * thrd ); 35 void kernel_run( void ); 46 47 //----------------------------------------------------------------------------- 48 // Locks 49 50 void ?{}(simple_lock * this); 51 void ^?{}(simple_lock * this); 52 53 void lock( simple_lock * ); 54 void unlock( simple_lock * ); 36 55 37 56 #endif //KERNEL_H -
TabularUnified src/libcfa/concurrency/kernel.c ¶
rdcb42b8 rbd98b58 32 32 #include "invoke.h" 33 33 34 cluster * systemCluster; 34 35 processor * systemProcessor; 35 36 thread_h * mainThread; … … 38 39 void kernel_shutdown(void) __attribute__((destructor(101))); 39 40 40 void ?{}(processor * this ) {41 void ?{}(processor * this, cluster * cltr) { 41 42 this->ctx = NULL; 42 this->thread_index = 0; 43 this->thread_count = 10; 43 this->cltr = cltr; 44 44 this->terminated = false; 45 46 for(int i = 0; i < 10; i++) { 47 this->threads[i] = NULL; 48 } 49 50 LIB_DEBUG_PRINTF("Processor : ctor for core %p (core spots %d)\n", this, this->thread_count); 51 } 52 53 void ^?{}(processor * this) { 54 45 } 46 47 void ^?{}(processor * this) {} 48 49 void ?{}(cluster * this) { 50 ( &this->ready_queue ){}; 51 } 52 53 void ^?{}(cluster * this) {} 54 55 //----------------------------------------------------------------------------- 56 // Global state 57 58 /*thread_local*/ processor * this_processor; 59 60 coroutine * this_coroutine(void) { 61 return this_processor->current_coroutine; 62 } 63 64 thread_h * this_thread(void) { 65 return this_processor->current_thread; 55 66 } 56 67 … … 77 88 // Processor running routines 78 89 void main(processorCtx_t * ctx); 79 thread_h * nextThread( processor * this);90 thread_h * nextThread(cluster * this); 80 91 void runThread(processor * this, thread_h * dst); 81 92 void spin(processor * this, unsigned int * spin_count); … … 88 99 for( unsigned int spin_count = 0; ! this->terminated; spin_count++ ) { 89 100 90 readyThread = nextThread( this);101 readyThread = nextThread( this->cltr ); 91 102 92 103 if(readyThread) { … … 101 112 } 102 113 103 thread_h * nextThread(processor * this) {104 for(int i = 0; i < this->thread_count; i++) {105 this->thread_index = (this->thread_index + 1) % this->thread_count;106 107 thread_h * thrd = this->threads[this->thread_index];108 if(thrd) return thrd;109 }110 111 return NULL;112 }113 114 114 void runThread(processor * this, thread_h * dst) { 115 115 coroutine * proc_ctx = get_coroutine(this->ctx); … … 120 120 // Which is now the current_coroutine 121 121 // LIB_DEBUG_PRINTF("Kernel : switching to ctx %p (from %p, current %p)\n", thrd_ctx, proc_ctx, current_coroutine); 122 current_coroutine = thrd_ctx; 122 this->current_thread = dst; 123 this->current_coroutine = thrd_ctx; 123 124 CtxSwitch( proc_ctx->stack.context, thrd_ctx->stack.context ); 124 current_coroutine = proc_ctx;125 this->current_coroutine = proc_ctx; 125 126 // LIB_DEBUG_PRINTF("Kernel : returned from ctx %p (to %p, current %p)\n", thrd_ctx, proc_ctx, current_coroutine); 126 127 … … 133 134 134 135 //----------------------------------------------------------------------------- 135 // Kernel runner (Temporary) 136 137 void scheduler_add( thread_h * thrd ) { 138 for(int i = 0; i < systemProcessor->thread_count; i++) { 139 if(systemProcessor->threads[i] == NULL) { 140 systemProcessor->threads[i] = thrd; 141 return; 142 } 143 } 144 assertf(false, "Scheduler full"); 145 } 146 147 void scheduler_remove( thread_h * thrd ) { 148 for(int i = 0; i < systemProcessor->thread_count; i++) { 149 if(systemProcessor->threads[i] == thrd) { 150 systemProcessor->threads[i] = NULL; 151 return; 152 } 153 } 154 assertf(false, "Trying to unschedule unkown thread"); 136 // Scheduler routines 137 void thread_schedule( thread_h * thrd ) { 138 assertf( thrd->next == NULL, "Expected null got %p", thrd->next ); 139 append( &systemProcessor->cltr->ready_queue, thrd ); 140 } 141 142 thread_h * nextThread(cluster * this) { 143 return pop_head( &this->ready_queue ); 155 144 } 156 145 … … 160 149 161 150 KERNEL_STORAGE(processorCtx_t, systemProcessorCtx); 151 KERNEL_STORAGE(cluster, systemCluster); 162 152 KERNEL_STORAGE(processor, systemProcessor); 163 153 KERNEL_STORAGE(thread_h, mainThread); … … 221 211 222 212 mainThread_info_t ctx; 223 // LIB_DEBUG_PRINTF("Kernel : base : %p\n", ctx.base );224 // LIB_DEBUG_PRINTF("Kernel : top : %p\n", ctx.top );225 // LIB_DEBUG_PRINTF("Kernel : limit : %p\n", ctx.limit );226 // LIB_DEBUG_PRINTF("Kernel : size : %x\n", ctx.size );227 // LIB_DEBUG_PRINTF("Kernel : storage : %p\n", ctx.storage );228 // LIB_DEBUG_PRINTF("Kernel : context : %p\n", ctx.context );229 213 230 214 // Start by initializing the main thread … … 232 216 mainThread{ &ctx }; 233 217 234 // // Initialize the system processor 218 // Initialize the system cluster 219 systemCluster = (cluster *)&systemCluster_storage; 220 systemCluster{}; 221 222 // Initialize the system processor 235 223 systemProcessor = (processor *)&systemProcessor_storage; 236 systemProcessor{ };224 systemProcessor{ systemCluster }; 237 225 238 226 // Initialize the system processor ctx … … 243 231 // Add the main thread to the ready queue 244 232 // once resume is called on systemProcessor->ctx the mainThread needs to be scheduled like any normal thread 245 scheduler_add(mainThread);233 thread_schedule(mainThread); 246 234 247 235 //initialize the global state variables 248 current_coroutine = &mainThread->c; 236 this_processor = systemProcessor; 237 this_processor->current_thread = mainThread; 238 this_processor->current_coroutine = &mainThread->c; 249 239 250 240 // SKULLDUGGERY: Force a context switch to the system processor to set the main thread's context to the current UNIX … … 285 275 } 286 276 277 //----------------------------------------------------------------------------- 278 // Locks 279 void ?{}( simple_lock * this ) { 280 ( &this->blocked ){}; 281 } 282 283 void ^?{}( simple_lock * this ) { 284 285 } 286 287 void lock( simple_lock * this ) { 288 append( &this->blocked, this_thread() ); 289 suspend(); 290 } 291 292 void unlock( simple_lock * this ) { 293 thread_h * it; 294 while( it = pop_head( &this->blocked) ) { 295 thread_schedule( it ); 296 } 297 } 298 299 //----------------------------------------------------------------------------- 300 // Queues 301 void ?{}( simple_thread_list * this ) { 302 this->head = NULL; 303 this->tail = &this->head; 304 } 305 306 void append( simple_thread_list * this, thread_h * t ) { 307 assert( t->next == NULL ); 308 *this->tail = t; 309 this->tail = &t->next; 310 } 311 312 thread_h * pop_head( simple_thread_list * this ) { 313 thread_h * head = this->head; 314 if( head ) { 315 this->head = head->next; 316 if( !head->next ) { 317 this->tail = &this->head; 318 } 319 head->next = NULL; 320 } 321 322 return head; 323 } 287 324 // Local Variables: // 288 325 // mode: c // -
TabularUnified src/libcfa/concurrency/threads ¶
rdcb42b8 rbd98b58 45 45 } 46 46 47 thread_h * this_thread(void); 48 47 49 //----------------------------------------------------------------------------- 48 50 // Ctors and dtors … … 67 69 void ^?{}( thread(T)* this ); 68 70 69 //----------------------------------------------------------------------------- 70 // PRIVATE exposed because of inline 71 void yield(); 71 72 72 73 #endif //THREADS_H -
TabularUnified src/libcfa/concurrency/threads.c ¶
rdcb42b8 rbd98b58 23 23 #include "invoke.h" 24 24 25 #include <stdlib> 25 extern "C" { 26 #include <stddef.h> 27 } 28 29 /*thread_local*/ extern processor * this_processor; 26 30 27 31 //----------------------------------------------------------------------------- 28 32 // Forward declarations 29 33 forall(otype T | is_thread(T) ) 30 void start( thread(T)* this );34 void start( T* this ); 31 35 32 36 forall(otype T | is_thread(T) ) 33 void stop( thread(T)* this );37 void stop( T* this ); 34 38 35 39 //----------------------------------------------------------------------------- … … 38 42 void ?{}(thread_h* this) { 39 43 (&this->c){}; 44 (&this->lock){}; 45 this->next = NULL; 40 46 } 41 47 … … 47 53 void ?{}( thread(T)* this ) { 48 54 (&this->handle){}; 49 start( this);55 start(&this->handle); 50 56 } 51 57 … … 53 59 void ?{}( thread(T)* this, P params ) { 54 60 (&this->handle){ params }; 55 start( this);61 start(&this->handle); 56 62 } 57 63 58 64 forall(otype T | is_thread(T) ) 59 65 void ^?{}( thread(T)* this ) { 60 stop( this);66 stop(&this->handle); 61 67 ^(&this->handle){}; 62 68 } … … 69 75 } 70 76 77 extern void thread_schedule( thread_h * ); 78 71 79 forall(otype T | is_thread(T)) 72 void start( thread(T)* this ) { 73 T* handle = &this->handle; 74 coroutine* thrd_c = get_coroutine(handle); 75 thread_h* thrd_h = get_thread (handle); 80 void start( T* this ) { 81 coroutine* thrd_c = get_coroutine(this); 82 thread_h* thrd_h = get_thread (this); 76 83 thrd_c->last = this_coroutine(); 77 current_coroutine = thrd_c;84 this_processor->current_coroutine = thrd_c; 78 85 79 86 // LIB_DEBUG_PRINTF("Thread start : %p (t %p, c %p)\n", handle, thrd_c, thrd_h); 80 87 81 88 create_stack(&thrd_c->stack, thrd_c->stack.size); 82 CtxStart( handle, CtxInvokeThread);89 CtxStart(this, CtxInvokeThread); 83 90 CtxSwitch( thrd_c->last->stack.context, thrd_c->stack.context ); 84 91 85 scheduler_add(thrd_h);92 thread_schedule(thrd_h); 86 93 } 87 94 88 95 forall(otype T | is_thread(T) ) 89 void stop( thread(T)* this ) { 90 T* handle = &this->handle; 91 thread_h* thrd_h = get_thread (handle); 92 while( thrd_h->c.notHalted ) { 93 suspend(); 96 void stop( T* this ) { 97 thread_h* thrd = get_thread(this); 98 if( thrd->c.notHalted ) { 99 lock( &thrd->lock ); 94 100 } 101 } 102 103 void signal_termination( thread_h * this ) { 104 this->c.state = Halt; 105 this->c.notHalted = false; 106 unlock( &this->lock ); 107 } 108 109 void yield( void ) { 110 thread_schedule( this_thread() ); 111 suspend(); 95 112 } 96 113
Note: See TracChangeset
for help on using the changeset viewer.