Changeset 63be3387 for libcfa/src/concurrency
- Timestamp:
- Nov 14, 2022, 11:52:44 AM (3 years ago)
- Branches:
- ADT, ast-experimental, master
- Children:
- 7d9598d8
- Parents:
- b77f0e1 (diff), 19a8c40 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src/concurrency
- Files:
-
- 2 added
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
rb77f0e1 r63be3387 172 172 173 173 pthread_attr_t attr; 174 if (int ret = pthread_attr_init(&attr); 0 != ret) {174 if (int ret = __cfaabi_pthread_attr_init(&attr); 0 != ret) { 175 175 abort | "failed to create master epoll thread attr: " | ret | strerror(ret); 176 176 } 177 177 178 if (int ret = pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) {178 if (int ret = __cfaabi_pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) { 179 179 abort | "failed to create master epoll thread: " | ret | strerror(ret); 180 180 } -
libcfa/src/concurrency/invoke.h
rb77f0e1 r63be3387 146 146 147 147 // Link lists fields 148 // instrusive link field for threads 148 // instrusive link field for threads in the ready-queue 149 149 struct __thread_desc_link { 150 150 struct thread$ * next; 151 151 volatile unsigned long long ts; 152 152 }; 153 154 // Link lists fields 155 // instrusive link field for threads in the user_link/cltr_link 156 struct __thread_user_link { 157 #ifdef __cforall 158 inline dlink(thread$); 159 #else 160 struct thread$ * next; struct thread$ * back; 161 #endif 162 }; 163 _Static_assert(sizeof(struct __thread_user_link) == 2 * sizeof(struct thread$ *), "__thread_user_link should be consistent in C and Cforall"); 153 164 154 165 struct thread$ { … … 159 170 // Link lists fields 160 171 // instrusive link field for threads 161 struct __thread_desc_link link;172 struct __thread_desc_link rdy_link; 162 173 163 174 // current execution status for coroutine … … 195 206 struct __monitor_group_t monitors; 196 207 197 // used to put threads on dlistdata structure198 __cfa_dlink(thread$);199 200 struct { 201 struct thread$ * next;202 struct thread$ * prev;203 } node;208 // intrusive link fields, used for locks, monitors and any user defined data structure 209 // default link fields for dlist 210 struct __thread_user_link user_link; 211 212 // secondary intrusive link fields, used for global cluster list 213 // default link fields for dlist 214 struct __thread_user_link cltr_link; 204 215 205 216 // used to store state between clh lock/unlock … … 214 225 215 226 #if defined( __CFA_WITH_VERIFY__ ) 227 struct processor * volatile executing; 216 228 void * canary; 217 229 #endif 218 230 }; 219 #ifdef __cforall 220 P9_EMBEDDED( thread$, dlink(thread$) ) 221 #endif 231 222 232 // Wrapper for gdb 223 233 struct cfathread_thread_t { struct thread$ debug; }; … … 231 241 #ifdef __cforall 232 242 extern "Cforall" { 243 static inline thread$ * volatile & ?`next ( thread$ * this ) { 244 return this->user_link.next; 245 } 233 246 234 247 static inline thread$ *& get_next( thread$ & this ) __attribute__((const)) { 235 return this.link.next; 236 } 237 238 static inline [thread$ *&, thread$ *& ] __get( thread$ & this ) __attribute__((const)) { 239 return this.node.[next, prev]; 240 } 248 return this.user_link.next; 249 } 250 251 static inline tytagref( dlink(thread$), dlink(thread$) ) ?`inner( thread$ & this ) { 252 dlink(thread$) & b = this.user_link; 253 tytagref( dlink(thread$), dlink(thread$) ) result = { b }; 254 return result; 255 } 256 257 static inline tytagref(struct __thread_user_link, dlink(thread$)) ?`inner( struct thread$ & this ) { 258 struct __thread_user_link & ib = this.cltr_link; 259 dlink(thread$) & b = ib`inner; 260 tytagref(struct __thread_user_link, dlink(thread$)) result = { b }; 261 return result; 262 } 263 264 P9_EMBEDDED(struct __thread_user_link, dlink(thread$)) 241 265 242 266 static inline void ?{}(__monitor_group_t & this) { -
libcfa/src/concurrency/io.cfa
rb77f0e1 r63be3387 610 610 if( we ) { 611 611 sigval_t value = { PREEMPT_IO }; 612 pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value);612 __cfaabi_pthread_sigqueue(ctx->proc->kernel_thread, SIGUSR1, value); 613 613 } 614 614 … … 639 639 } 640 640 } 641 642 #if defined(CFA_WITH_IO_URING_IDLE)643 bool __kernel_read(struct processor * proc, io_future_t & future, iovec & iov, int fd) {644 io_context$ * ctx = proc->io.ctx;645 /* paranoid */ verify( ! __preemption_enabled() );646 /* paranoid */ verify( proc == __cfaabi_tls.this_processor );647 /* paranoid */ verify( ctx );648 649 __u32 idx;650 struct io_uring_sqe * sqe;651 652 // We can proceed to the fast path653 if( !__alloc(ctx, &idx, 1) ) {654 /* paranoid */ verify( false ); // for now check if this happens, next time just abort the sleep.655 return false;656 }657 658 // Allocation was successful659 __fill( &sqe, 1, &idx, ctx );660 661 sqe->user_data = (uintptr_t)&future;662 sqe->flags = 0;663 sqe->fd = fd;664 sqe->off = 0;665 sqe->ioprio = 0;666 sqe->fsync_flags = 0;667 sqe->__pad2[0] = 0;668 sqe->__pad2[1] = 0;669 sqe->__pad2[2] = 0;670 671 #if defined(CFA_HAVE_IORING_OP_READ)672 sqe->opcode = IORING_OP_READ;673 sqe->addr = (uint64_t)iov.iov_base;674 sqe->len = iov.iov_len;675 #elif defined(CFA_HAVE_READV) && defined(CFA_HAVE_IORING_OP_READV)676 sqe->opcode = IORING_OP_READV;677 sqe->addr = (uintptr_t)&iov;678 sqe->len = 1;679 #else680 #error CFA_WITH_IO_URING_IDLE but none of CFA_HAVE_READV, CFA_HAVE_IORING_OP_READV or CFA_HAVE_IORING_OP_READ defined681 #endif682 683 asm volatile("": : :"memory");684 685 /* paranoid */ verify( sqe->user_data == (uintptr_t)&future );686 __submit_only( ctx, &idx, 1 );687 688 /* paranoid */ verify( proc == __cfaabi_tls.this_processor );689 /* paranoid */ verify( ! __preemption_enabled() );690 691 return true;692 }693 694 void __cfa_io_idle( struct processor * proc ) {695 iovec iov;696 __atomic_acquire( &proc->io.ctx->cq.lock );697 698 __attribute__((used)) volatile bool was_reset = false;699 700 with( proc->idle_wctx) {701 702 // Do we already have a pending read703 if(available(*ftr)) {704 // There is no pending read, we need to add one705 reset(*ftr);706 707 iov.iov_base = rdbuf;708 iov.iov_len = sizeof(eventfd_t);709 __kernel_read(proc, *ftr, iov, evfd );710 ftr->result = 0xDEADDEAD;711 *((eventfd_t *)rdbuf) = 0xDEADDEADDEADDEAD;712 was_reset = true;713 }714 }715 716 if( !__atomic_load_n( &proc->do_terminate, __ATOMIC_SEQ_CST ) ) {717 __ioarbiter_flush( *proc->io.ctx );718 proc->idle_wctx.sleep_time = rdtscl();719 ioring_syscsll( *proc->io.ctx, 1, IORING_ENTER_GETEVENTS);720 }721 722 ready_schedule_lock();723 __cfa_do_drain( proc->io.ctx, proc->cltr );724 ready_schedule_unlock();725 726 asm volatile ("" :: "m" (was_reset));727 }728 #endif729 641 #endif -
libcfa/src/concurrency/io/setup.cfa
rb77f0e1 r63be3387 34 34 bool __cfa_io_flush( processor * proc ) { return false; } 35 35 bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1))) { return false; } 36 void __cfa_io_idle ( processor * ) __attribute__((nonnull (1))) {}37 36 void __cfa_io_stop ( processor * proc ) {} 38 37 … … 317 316 } 318 317 319 //=============================================================================================320 // I/O Context Sleep321 //=============================================================================================322 // static inline void __epoll_ctl(io_context$ & ctx, int op, const char * error) {323 // struct epoll_event ev;324 // ev.events = EPOLLIN | EPOLLONESHOT;325 // ev.data.u64 = (__u64)&ctx;326 // int ret = epoll_ctl(iopoll.epollfd, op, ctx.efd, &ev);327 // if (ret < 0) {328 // abort( "KERNEL ERROR: EPOLL %s - (%d) %s\n", error, (int)errno, strerror(errno) );329 // }330 // }331 332 // static void __epoll_register(io_context$ & ctx) {333 // __epoll_ctl(ctx, EPOLL_CTL_ADD, "ADD");334 // }335 336 // static void __epoll_unregister(io_context$ & ctx) {337 // // Read the current epoch so we know when to stop338 // size_t curr = __atomic_load_n(&iopoll.epoch, __ATOMIC_SEQ_CST);339 340 // // Remove the fd from the iopoller341 // __epoll_ctl(ctx, EPOLL_CTL_DEL, "REMOVE");342 343 // // Notify the io poller thread of the shutdown344 // iopoll.run = false;345 // sigval val = { 1 };346 // pthread_sigqueue( iopoll.thrd, SIGUSR1, val );347 348 // // Make sure all this is done349 // __atomic_thread_fence(__ATOMIC_SEQ_CST);350 351 // // Wait for the next epoch352 // while(curr == iopoll.epoch && !iopoll.stopped) Pause();353 // }354 355 // void __ioctx_prepare_block(io_context$ & ctx) {356 // __cfadbg_print_safe(io_core, "Kernel I/O - epoll : Re-arming io poller %d (%p)\n", ctx.fd, &ctx);357 // __epoll_ctl(ctx, EPOLL_CTL_MOD, "REARM");358 // }359 360 318 361 319 //============================================================================================= -
libcfa/src/concurrency/kernel.cfa
rb77f0e1 r63be3387 138 138 extern bool __cfa_io_drain( processor * proc ) __attribute__((nonnull (1))); 139 139 extern bool __cfa_io_flush( processor * ) __attribute__((nonnull (1))); 140 extern void __cfa_io_idle( processor * ) __attribute__((nonnull (1))); 141 142 #if defined(CFA_WITH_IO_URING_IDLE) 143 extern bool __kernel_read(processor * proc, io_future_t & future, iovec &, int fd); 144 #endif 140 145 141 146 142 extern void __disable_interrupts_hard(); … … 162 158 verify(this); 163 159 164 /* paranoid */ verify( this->idle_wctx.ftr != 0p );165 /* paranoid */ verify( this->idle_wctx.rdbuf != 0p );166 167 // used for idle sleep when io_uring is present168 // mark it as already fulfilled so we know if there is a pending request or not169 this->idle_wctx.ftr->self.ptr = 1p;170 171 160 __cfadbg_print_safe(runtime_core, "Kernel : core %p starting\n", this); 172 161 #if !defined(__CFA_NO_STATISTICS__) … … 291 280 /* paranoid */ verify( ! __preemption_enabled() ); 292 281 /* paranoid */ verifyf( thrd_dst->state == Ready || thrd_dst->preempted != __NO_PREEMPTION, "state : %d, preempted %d\n", thrd_dst->state, thrd_dst->preempted); 293 /* paranoid */ verifyf( thrd_dst-> link.next == 0p, "Expected null got %p", thrd_dst->link.next );282 /* paranoid */ verifyf( thrd_dst->rdy_link.next == 0p, "Expected null got %p", thrd_dst->rdy_link.next ); 294 283 __builtin_prefetch( thrd_dst->context.SP ); 295 284 … … 321 310 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) < ((uintptr_t)__get_stack(thrd_dst->curr_cor)->base ) || thrd_dst->curr_cor == proc_cor || thrd_dst->corctx_flag, "ERROR : Destination thread$ %p has been corrupted.\n StackPointer too small.\n", thrd_dst ); // add escape condition if we are setting up the processor 322 311 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) > ((uintptr_t)__get_stack(thrd_dst->curr_cor)->limit) || thrd_dst->curr_cor == proc_cor || thrd_dst->corctx_flag, "ERROR : Destination thread$ %p has been corrupted.\n StackPointer too large.\n", thrd_dst ); // add escape condition if we are setting up the processor 312 /* paranoid */ verify( __atomic_exchange_n( &thrd_dst->executing, this, __ATOMIC_SEQ_CST) == 0p ); 323 313 /* paranoid */ verify( 0x0D15EA5E0D15EA5Ep == thrd_dst->canary ); 324 314 … … 332 322 333 323 /* paranoid */ verify( 0x0D15EA5E0D15EA5Ep == thrd_dst->canary ); 324 /* paranoid */ verify( __atomic_exchange_n( &thrd_dst->executing, 0p, __ATOMIC_SEQ_CST) == this ); 334 325 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) > ((uintptr_t)__get_stack(thrd_dst->curr_cor)->limit) || thrd_dst->corctx_flag, "ERROR : Destination thread$ %p has been corrupted.\n StackPointer too large.\n", thrd_dst ); 335 326 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) < ((uintptr_t)__get_stack(thrd_dst->curr_cor)->base ) || thrd_dst->corctx_flag, "ERROR : Destination thread$ %p has been corrupted.\n StackPointer too small.\n", thrd_dst ); 327 /* paranoid */ verify( thrd_dst->state != Halted ); 336 328 /* paranoid */ verify( thrd_dst->context.SP ); 337 /* paranoid */ verify( thrd_dst->curr_cluster == this->cltr );338 329 /* paranoid */ verify( kernelTLS().this_thread == thrd_dst ); 339 330 /* paranoid */ verify( ! __preemption_enabled() ); … … 452 443 "Error preempted thread marked as not currently running, state %d, preemption %d\n", thrd->state, thrd->preempted ); 453 444 /* paranoid */ #endif 454 /* paranoid */ verifyf( thrd-> link.next == 0p, "Expected null got %p", thrd->link.next );445 /* paranoid */ verifyf( thrd->rdy_link.next == 0p, "Expected null got %p", thrd->rdy_link.next ); 455 446 /* paranoid */ verify( 0x0D15EA5E0D15EA5Ep == thrd->canary ); 456 447 … … 600 591 /* paranoid */ verifyf( ((uintptr_t)thrd->context.SP) < ((uintptr_t)__get_stack(thrd->curr_cor)->base ), "ERROR : thread$ %p has been corrupted.\n StackPointer too small.\n", thrd ); 601 592 602 thrd->state = Halting;603 593 if( TICKET_RUNNING != thrd->ticket ) { abort( "Thread terminated with pending unpark" ); } 604 594 if( thrd != this->owner ) { abort( "Thread internal monitor has incorrect owner" ); } 605 595 if( this->recursion != 1) { abort( "Thread internal monitor has unbalanced recursion" ); } 596 597 thrd->state = Halting; 598 thrd->ticket = TICKET_DEAD; 606 599 607 600 // Leave the thread … … 624 617 // If that is the case, abandon the preemption. 625 618 bool preempted = false; 626 if(thrd-> link.next == 0p) {619 if(thrd->rdy_link.next == 0p) { 627 620 preempted = true; 628 621 thrd->preempted = reason; … … 726 719 727 720 728 #if !defined(CFA_WITH_IO_URING_IDLE) 729 #if !defined(__CFA_NO_STATISTICS__) 730 if(this->print_halts) { 731 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); 721 #if !defined(__CFA_NO_STATISTICS__) 722 if(this->print_halts) { 723 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); 724 } 725 #endif 726 727 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle_fd); 728 729 { 730 eventfd_t val; 731 ssize_t ret = read( this->idle_wctx.evfd, &val, sizeof(val) ); 732 if(ret < 0) { 733 switch((int)errno) { 734 case EAGAIN: 735 #if EAGAIN != EWOULDBLOCK 736 case EWOULDBLOCK: 737 #endif 738 case EINTR: 739 // No need to do anything special here, just assume it's a legitimate wake-up 740 break; 741 default: 742 abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) ); 732 743 } 733 #endif 734 735 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle_fd); 736 737 { 738 eventfd_t val; 739 ssize_t ret = read( this->idle_wctx.evfd, &val, sizeof(val) ); 740 if(ret < 0) { 741 switch((int)errno) { 742 case EAGAIN: 743 #if EAGAIN != EWOULDBLOCK 744 case EWOULDBLOCK: 745 #endif 746 case EINTR: 747 // No need to do anything special here, just assume it's a legitimate wake-up 748 break; 749 default: 750 abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) ); 751 } 752 } 753 } 754 755 #if !defined(__CFA_NO_STATISTICS__) 756 if(this->print_halts) { 757 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); 758 } 759 #endif 760 #else 761 __cfa_io_idle( this ); 744 } 745 } 746 747 #if !defined(__CFA_NO_STATISTICS__) 748 if(this->print_halts) { 749 __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); 750 } 762 751 #endif 763 752 } … … 775 764 insert_first(this.idles, proc); 776 765 766 // update the pointer to the head wait context, which should now point to this proc. 777 767 __atomic_store_n(&this.fdw, &proc.idle_wctx, __ATOMIC_SEQ_CST); 778 768 unlock( this ); … … 791 781 792 782 { 783 // update the pointer to the head wait context 793 784 struct __fd_waitctx * wctx = 0; 794 785 if(!this.idles`isEmpty) wctx = &this.idles`first.idle_wctx; -
libcfa/src/concurrency/kernel.hfa
rb77f0e1 r63be3387 64 64 // 1 - means the proc should wake-up immediately 65 65 // FD - means the proc is going asleep and should be woken by writing to the FD. 66 // The FD value should always be the evfd field just below. 66 67 volatile int sem; 67 68 … … 69 70 int evfd; 70 71 71 // buffer into which the proc will read from evfd 72 // unused if not using io_uring for idle sleep 73 void * rdbuf; 74 75 // future use to track the read of the eventfd 76 // unused if not using io_uring for idle sleep 77 io_future_t * ftr; 78 72 // Used for debugging, should be removed eventually. 79 73 volatile unsigned long long wake__time; 80 74 volatile unsigned long long sleep_time; … … 160 154 // P9_EMBEDDED( processor, dlink(processor) ) 161 155 static inline tytagref( dlink(processor), dlink(processor) ) ?`inner( processor & this ) { 162 163 164 156 dlink(processor) & b = this.link; 157 tytagref( dlink(processor), dlink(processor) ) result = { b }; 158 return result; 165 159 } 166 160 … … 256 250 // List of threads 257 251 __spinlock_t thread_list_lock; 258 __dllist_t(struct thread$) threads;252 dlist(struct thread$, struct __thread_user_link) threads; 259 253 unsigned int nthreads; 260 254 … … 269 263 io_context_params params; 270 264 } io; 265 266 struct { 267 struct processor ** procs; 268 unsigned cnt; 269 } managed; 271 270 272 271 #if !defined(__CFA_NO_STATISTICS__) … … 298 297 static inline struct cluster * active_cluster () { return publicTLS_get( this_processor )->cltr; } 299 298 299 // set the number of internal processors 300 // these processors are in addition to any explicitly declared processors 301 unsigned set_concurrency( cluster & this, unsigned new_count ); 302 300 303 #if !defined(__CFA_NO_STATISTICS__) 301 304 void print_stats_now( cluster & this, int flags ); -
libcfa/src/concurrency/kernel/cluster.cfa
rb77f0e1 r63be3387 483 483 484 484 // We add a boat-load of assertions here because the anchor code is very fragile 485 /* paranoid */ _Static_assert( offsetof( thread$, link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );486 /* paranoid */ verify( offsetof( thread$, link ) == nested_offsetof(__intrusive_lane_t, l.anchor) );487 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, link )) == (uintptr_t)(&this.l.anchor) );488 /* paranoid */ verify( &mock_head(this)-> link.next == &this.l.anchor.next );489 /* paranoid */ verify( &mock_head(this)-> link.ts == &this.l.anchor.ts );490 /* paranoid */ verify( mock_head(this)-> link.next == 0p );491 /* paranoid */ verify( mock_head(this)-> link.ts == MAX );485 /* paranoid */ _Static_assert( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) ); 486 /* paranoid */ verify( offsetof( thread$, rdy_link ) == nested_offsetof(__intrusive_lane_t, l.anchor) ); 487 /* paranoid */ verify( ((uintptr_t)( mock_head(this) ) + offsetof( thread$, rdy_link )) == (uintptr_t)(&this.l.anchor) ); 488 /* paranoid */ verify( &mock_head(this)->rdy_link.next == &this.l.anchor.next ); 489 /* paranoid */ verify( &mock_head(this)->rdy_link.ts == &this.l.anchor.ts ); 490 /* paranoid */ verify( mock_head(this)->rdy_link.next == 0p ); 491 /* paranoid */ verify( mock_head(this)->rdy_link.ts == MAX ); 492 492 /* paranoid */ verify( mock_head(this) == this.l.prev ); 493 493 /* paranoid */ verify( __alignof__(__intrusive_lane_t) == 64 ); -
libcfa/src/concurrency/kernel/private.hfa
rb77f0e1 r63be3387 20 20 #endif 21 21 22 #include <signal.h> 23 22 24 #include "kernel.hfa" 23 25 #include "thread.hfa" … … 39 41 } 40 42 41 // Defines whether or not we *want* to use io_uring_enter as the idle_sleep blocking call42 // #define CFA_WANT_IO_URING_IDLE43 44 // Defines whether or not we *can* use io_uring_enter as the idle_sleep blocking call45 #if defined(CFA_WANT_IO_URING_IDLE) && defined(CFA_HAVE_LINUX_IO_URING_H)46 #if defined(CFA_HAVE_IORING_OP_READ) || (defined(CFA_HAVE_READV) && defined(CFA_HAVE_IORING_OP_READV))47 #define CFA_WITH_IO_URING_IDLE48 #endif49 #endif50 51 43 // #define READYQ_USE_LINEAR_AVG 52 44 #define READYQ_USE_LOGDBL_AVG … … 63 55 #endif 64 56 57 extern "C" { 58 __attribute__((visibility("protected"))) int __cfaabi_pthread_create(pthread_t *_thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); 59 __attribute__((visibility("protected"))) int __cfaabi_pthread_join(pthread_t _thread, void **retval); 60 __attribute__((visibility("protected"))) pthread_t __cfaabi_pthread_self(void); 61 __attribute__((visibility("protected"))) int __cfaabi_pthread_attr_init(pthread_attr_t *attr); 62 __attribute__((visibility("protected"))) int __cfaabi_pthread_attr_destroy(pthread_attr_t *attr); 63 __attribute__((visibility("protected"))) int __cfaabi_pthread_attr_setstack( pthread_attr_t *attr, void *stackaddr, size_t stacksize ); 64 __attribute__((visibility("protected"))) int __cfaabi_pthread_attr_getstacksize( const pthread_attr_t *attr, size_t *stacksize ); 65 __attribute__((visibility("protected"))) int __cfaabi_pthread_sigqueue(pthread_t _thread, int sig, const union sigval value); 66 __attribute__((visibility("protected"))) int __cfaabi_pthread_sigmask( int how, const sigset_t *set, sigset_t *oset); 67 } 68 65 69 //----------------------------------------------------------------------------- 66 70 // Scheduler … … 153 157 #define TICKET_RUNNING ( 0) // thread is running 154 158 #define TICKET_UNBLOCK ( 1) // thread should ignore next block 159 #define TICKET_DEAD (0xDEAD) // thread should never be unparked 155 160 156 161 //----------------------------------------------------------------------------- -
libcfa/src/concurrency/kernel/startup.cfa
rb77f0e1 r63be3387 16 16 #define __cforall_thread__ 17 17 #define _GNU_SOURCE 18 19 // #define __CFA_DEBUG_PRINT_RUNTIME_CORE__ 18 20 19 21 // C Includes … … 113 115 KERNEL_STORAGE(thread$, mainThread); 114 116 KERNEL_STORAGE(__stack_t, mainThreadCtx); 115 // KERNEL_STORAGE(__scheduler_RWLock_t, __scheduler_lock);116 KERNEL_STORAGE(eventfd_t, mainIdleEventFd);117 KERNEL_STORAGE(io_future_t, mainIdleFuture);118 117 #if !defined(__CFA_NO_STATISTICS__) 119 118 KERNEL_STORAGE(__stats_t, mainProcStats); … … 222 221 ( this.runner ){}; 223 222 init( this, "Main Processor", *mainCluster, 0p ); 224 kernel_thread = pthread_self();223 kernel_thread = __cfaabi_pthread_self(); 225 224 226 225 runner{ &this }; … … 232 231 mainProcessor = (processor *)&storage_mainProcessor; 233 232 (*mainProcessor){}; 234 235 mainProcessor->idle_wctx.rdbuf = &storage_mainIdleEventFd;236 mainProcessor->idle_wctx.ftr = (io_future_t*)&storage_mainIdleFuture;237 /* paranoid */ verify( sizeof(storage_mainIdleEventFd) == sizeof(eventfd_t) );238 233 239 234 __cfa_io_start( mainProcessor ); … … 283 278 } 284 279 280 extern "C"{ 281 void pthread_delete_kernel_threads_(); 282 } 283 284 285 285 static void __kernel_shutdown(void) { 286 286 if(!cfa_main_returned) return; 287 288 //delete kernel threads for pthread_concurrency 289 pthread_delete_kernel_threads_(); 290 287 291 /* paranoid */ verify( __preemption_enabled() ); 288 292 disable_interrupts(); … … 327 331 328 332 /* paranoid */ verify( this.do_terminate == true ); 329 __cfa abi_dbg_print_safe("Kernel : destroyed main processor context %p\n", &runner);333 __cfadbg_print_safe(runtime_core, "Kernel : destroyed main processor context %p\n", &runner); 330 334 } 331 335 … … 373 377 register_tls( proc ); 374 378 375 // used for idle sleep when io_uring is present376 io_future_t future;377 eventfd_t idle_buf;378 proc->idle_wctx.ftr = &future;379 proc->idle_wctx.rdbuf = &idle_buf;380 381 382 379 // SKULLDUGGERY: We want to create a context for the processor coroutine 383 380 // which is needed for the 2-step context switch. However, there is no reason … … 388 385 (proc->runner){ proc, &info }; 389 386 390 __cfa abi_dbg_print_safe("Coroutine : created stack %p\n", get_coroutine(proc->runner)->stack.storage);387 __cfadbg_print_safe(runtime_core, "Coroutine : created stack %p\n", get_coroutine(proc->runner)->stack.storage); 391 388 392 389 //Set global state … … 514 511 self_mon.recursion = 1; 515 512 self_mon_p = &self_mon; 516 link.next = 0p;517 link.ts = MAX;513 rdy_link.next = 0p; 514 rdy_link.ts = MAX; 518 515 preferred = ready_queue_new_preferred(); 519 516 last_proc = 0p; 520 517 random_state = __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl(); 521 518 #if defined( __CFA_WITH_VERIFY__ ) 519 executing = 0p; 522 520 canary = 0x0D15EA5E0D15EA5Ep; 523 521 #endif 524 522 525 node.next = 0p;526 node.prev = 0p;527 523 doregister(curr_cluster, this); 528 524 … … 647 643 #endif 648 644 649 threads{ __get};645 threads{}; 650 646 651 647 io.arbiter = create(); 652 648 io.params = io_params; 649 650 managed.procs = 0p; 651 managed.cnt = 0; 653 652 654 653 doregister(this); … … 667 666 668 667 void ^?{}(cluster & this) libcfa_public { 668 set_concurrency( this, 0 ); 669 669 670 destroy(this.io.arbiter); 670 671 … … 722 723 lock (cltr->thread_list_lock __cfaabi_dbg_ctx2); 723 724 cltr->nthreads += 1; 724 push_front(cltr->threads, thrd);725 insert_first(cltr->threads, thrd); 725 726 unlock (cltr->thread_list_lock); 726 727 } … … 728 729 void unregister( cluster * cltr, thread$ & thrd ) { 729 730 lock (cltr->thread_list_lock __cfaabi_dbg_ctx2); 730 remove(cltr->threads, thrd ); 731 cltr->nthreads -= 1; 731 { 732 tytagref( dlink(thread$), dlink(thread$) ) ?`inner( thread$ & this ) = void; 733 with( DLINK_VIA( thread$, struct __thread_user_link ) ) 734 remove( thrd ); 735 cltr->nthreads -= 1; 736 } 732 737 unlock(cltr->thread_list_lock); 733 738 } … … 777 782 pthread_attr_t attr; 778 783 779 check( pthread_attr_init( &attr ), "pthread_attr_init" ); // initialize attribute784 check( __cfaabi_pthread_attr_init( &attr ), "pthread_attr_init" ); // initialize attribute 780 785 781 786 size_t stacksize = max( PTHREAD_STACK_MIN, DEFAULT_STACK_SIZE ); … … 804 809 #endif 805 810 806 check( pthread_attr_setstack( &attr, stack, stacksize ), "pthread_attr_setstack" );807 check( pthread_create( pthread, &attr, start, arg ), "pthread_create" );811 check( __cfaabi_pthread_attr_setstack( &attr, stack, stacksize ), "pthread_attr_setstack" ); 812 check( __cfaabi_pthread_create( pthread, &attr, start, arg ), "pthread_create" ); 808 813 return stack; 809 814 } 810 815 811 816 void __destroy_pthread( pthread_t pthread, void * stack, void ** retval ) { 812 int err = pthread_join( pthread, retval );817 int err = __cfaabi_pthread_join( pthread, retval ); 813 818 if( err != 0 ) abort("KERNEL ERROR: joining pthread %p caused error %s\n", (void*)pthread, strerror(err)); 814 819 … … 816 821 pthread_attr_t attr; 817 822 818 check( pthread_attr_init( &attr ), "pthread_attr_init" ); // initialize attribute823 check( __cfaabi_pthread_attr_init( &attr ), "pthread_attr_init" ); // initialize attribute 819 824 820 825 size_t stacksize; 821 826 // default stack size, normally defined by shell limit 822 check( pthread_attr_getstacksize( &attr, &stacksize ), "pthread_attr_getstacksize" );827 check( __cfaabi_pthread_attr_getstacksize( &attr, &stacksize ), "pthread_attr_getstacksize" ); 823 828 assert( stacksize >= PTHREAD_STACK_MIN ); 824 829 stacksize += __page_size; … … 838 843 } 839 844 845 unsigned set_concurrency( cluster & this, unsigned new ) libcfa_public { 846 unsigned old = this.managed.cnt; 847 848 __cfadbg_print_safe(runtime_core, "Kernel : resizing cluster from %u to %u\n", old, (unsigned)new); 849 850 // Delete all the old unneeded procs 851 if(old > new) for(i; (unsigned)new ~ old) { 852 __cfadbg_print_safe(runtime_core, "Kernel : destroying %u\n", i); 853 delete( this.managed.procs[i] ); 854 } 855 856 // Allocate new array (uses realloc and memcpies the data) 857 this.managed.procs = alloc( new, this.managed.procs`realloc ); 858 this.managed.cnt = new; 859 860 // Create the desired new procs 861 if(old < new) for(i; old ~ new) { 862 __cfadbg_print_safe(runtime_core, "Kernel : constructing %u\n", i); 863 (*(this.managed.procs[i] = alloc())){ this }; 864 } 865 866 // return the old count 867 return old; 868 } 869 840 870 #if defined(__CFA_WITH_VERIFY__) 841 871 static bool verify_fwd_bck_rng(void) { -
libcfa/src/concurrency/locks.hfa
rb77f0e1 r63be3387 21 21 22 22 #include "bits/weakso_locks.hfa" 23 #include "containers/ queueLockFree.hfa"23 #include "containers/lockfree.hfa" 24 24 #include "containers/list.hfa" 25 25 … … 498 498 } 499 499 500 static inline size_t on_wait(simple_owner_lock & this) with(this) { 500 static inline size_t on_wait(simple_owner_lock & this) with(this) { 501 501 lock( lock __cfaabi_dbg_ctx2 ); 502 502 /* paranoid */ verifyf( owner != 0p, "Attempt to release lock %p that isn't held", &this ); -
libcfa/src/concurrency/monitor.cfa
rb77f0e1 r63be3387 122 122 123 123 // Some one else has the monitor, wait in line for it 124 /* paranoid */ verify( thrd-> link.next == 0p );124 /* paranoid */ verify( thrd->user_link.next == 0p ); 125 125 append( this->entry_queue, thrd ); 126 /* paranoid */ verify( thrd-> link.next == 1p );126 /* paranoid */ verify( thrd->user_link.next == 1p ); 127 127 128 128 unlock( this->lock ); … … 233 233 234 234 // Some one else has the monitor, wait in line for it 235 /* paranoid */ verify( thrd-> link.next == 0p );235 /* paranoid */ verify( thrd->user_link.next == 0p ); 236 236 append( this->entry_queue, thrd ); 237 /* paranoid */ verify( thrd-> link.next == 1p );237 /* paranoid */ verify( thrd->user_link.next == 1p ); 238 238 unlock( this->lock ); 239 239 … … 791 791 thread$ * new_owner = pop_head( this->entry_queue ); 792 792 /* paranoid */ verifyf( !this->owner || active_thread() == this->owner, "Expected owner to be %p, got %p (r: %i, m: %p)", active_thread(), this->owner, this->recursion, this ); 793 /* paranoid */ verify( !new_owner || new_owner-> link.next == 0p );793 /* paranoid */ verify( !new_owner || new_owner->user_link.next == 0p ); 794 794 __set_owner( this, new_owner ); 795 795 … … 935 935 __queue_t(thread$) & entry_queue = monitors[0]->entry_queue; 936 936 937 #if defined( __CFA_WITH_VERIFY__ ) 938 thread$ * last = 0p; 939 #endif 937 940 // For each thread in the entry-queue 938 941 for( thread$ ** thrd_it = &entry_queue.head; 939 942 (*thrd_it) != 1p; 940 thrd_it = & (*thrd_it)->link.next943 thrd_it = &get_next(**thrd_it) 941 944 ) { 945 thread$ * curr = *thrd_it; 946 947 /* paranoid */ verifyf( !last || last->user_link.next == curr, "search not making progress, from %p (%p) to %p", last, last->user_link.next, curr ); 948 /* paranoid */ verifyf( curr != last, "search not making progress, from %p to %p", last, curr ); 949 942 950 // For each acceptable check if it matches 943 951 int i = 0; … … 946 954 for( __acceptable_t * it = begin; it != end; it++, i++ ) { 947 955 // Check if we have a match 948 if( *it == (*thrd_it)->monitors ) {956 if( *it == curr->monitors ) { 949 957 950 958 // If we have a match return it … … 953 961 } 954 962 } 963 964 #if defined( __CFA_WITH_VERIFY__ ) 965 last = curr; 966 #endif 955 967 } 956 968 … … 1025 1037 1026 1038 // Some one else has the monitor, wait in line for it 1027 /* paranoid */ verify( thrd-> link.next == 0p );1039 /* paranoid */ verify( thrd->user_link.next == 0p ); 1028 1040 append( this->entry_queue, thrd ); 1029 /* paranoid */ verify( thrd-> link.next == 1p );1041 /* paranoid */ verify( thrd->user_link.next == 1p ); 1030 1042 1031 1043 unlock( this->lock ); -
libcfa/src/concurrency/preemption.cfa
rb77f0e1 r63be3387 352 352 sigset_t oldset; 353 353 int ret; 354 ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary354 ret = __cfaabi_pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary 355 355 if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); } 356 356 … … 385 385 sigaddset( &mask, sig ); 386 386 387 if ( pthread_sigmask( SIG_UNBLOCK, &mask, 0p ) == -1 ) {387 if ( __cfaabi_pthread_sigmask( SIG_UNBLOCK, &mask, 0p ) == -1 ) { 388 388 abort( "internal error, pthread_sigmask" ); 389 389 } … … 396 396 sigaddset( &mask, sig ); 397 397 398 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {398 if ( __cfaabi_pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 399 399 abort( "internal error, pthread_sigmask" ); 400 400 } … … 404 404 static void preempt( processor * this ) { 405 405 sigval_t value = { PREEMPT_NORMAL }; 406 pthread_sigqueue( this->kernel_thread, SIGUSR1, value );406 __cfaabi_pthread_sigqueue( this->kernel_thread, SIGUSR1, value ); 407 407 } 408 408 … … 415 415 sigset_t oldset; 416 416 int ret; 417 ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary417 ret = __cfaabi_pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary 418 418 if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); } 419 419 … … 434 434 sigset_t oldset; 435 435 int ret; 436 ret = pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary436 ret = __cfaabi_pthread_sigmask(0, ( const sigset_t * ) 0p, &oldset); // workaround trac#208: cast should be unnecessary 437 437 if(ret != 0) { abort("ERROR sigprocmask returned %d", ret); } 438 438 … … 505 505 sigval val; 506 506 val.sival_int = 0; 507 pthread_sigqueue( alarm_thread, SIGALRM, val );507 __cfaabi_pthread_sigqueue( alarm_thread, SIGALRM, val ); 508 508 509 509 // Wait for the preemption thread to finish … … 579 579 static_assert( sizeof( sigset_t ) == sizeof( cxt->uc_sigmask ), "Expected cxt->uc_sigmask to be of sigset_t" ); 580 580 #endif 581 if ( pthread_sigmask( SIG_SETMASK, (sigset_t *)&(cxt->uc_sigmask), 0p ) == -1 ) {581 if ( __cfaabi_pthread_sigmask( SIG_SETMASK, (sigset_t *)&(cxt->uc_sigmask), 0p ) == -1 ) { 582 582 abort( "internal error, sigprocmask" ); 583 583 } … … 607 607 sigset_t mask; 608 608 sigfillset(&mask); 609 if ( pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) {609 if ( __cfaabi_pthread_sigmask( SIG_BLOCK, &mask, 0p ) == -1 ) { 610 610 abort( "internal error, pthread_sigmask" ); 611 611 } -
libcfa/src/concurrency/ready_subqueue.hfa
rb77f0e1 r63be3387 25 25 static inline thread$ * mock_head(const __intrusive_lane_t & this) { 26 26 thread$ * rhead = (thread$ *)( 27 (uintptr_t)( &this.l.anchor ) - __builtin_offsetof( thread$, link )27 (uintptr_t)( &this.l.anchor ) - __builtin_offsetof( thread$, rdy_link ) 28 28 ); 29 29 return rhead; … … 34 34 static inline void push( __intrusive_lane_t & this, thread$ * node ) { 35 35 /* paranoid */ verify( this.l.lock ); 36 /* paranoid */ verify( node-> link.next == 0p );37 /* paranoid */ verify( __atomic_load_n(&node-> link.ts, __ATOMIC_RELAXED) == MAX );38 /* paranoid */ verify( this.l.prev-> link.next == 0p );39 /* paranoid */ verify( __atomic_load_n(&this.l.prev-> link.ts, __ATOMIC_RELAXED) == MAX );36 /* paranoid */ verify( node->rdy_link.next == 0p ); 37 /* paranoid */ verify( __atomic_load_n(&node->rdy_link.ts, __ATOMIC_RELAXED) == MAX ); 38 /* paranoid */ verify( this.l.prev->rdy_link.next == 0p ); 39 /* paranoid */ verify( __atomic_load_n(&this.l.prev->rdy_link.ts, __ATOMIC_RELAXED) == MAX ); 40 40 if( this.l.anchor.next == 0p ) { 41 41 /* paranoid */ verify( this.l.anchor.next == 0p ); … … 51 51 52 52 // Get the relevant nodes locally 53 this.l.prev-> link.next = node;54 __atomic_store_n(&this.l.prev-> link.ts, rdtscl(), __ATOMIC_RELAXED);53 this.l.prev->rdy_link.next = node; 54 __atomic_store_n(&this.l.prev->rdy_link.ts, rdtscl(), __ATOMIC_RELAXED); 55 55 this.l.prev = node; 56 56 #if !defined(__CFA_NO_STATISTICS__) … … 70 70 // Get the relevant nodes locally 71 71 thread$ * node = this.l.anchor.next; 72 this.l.anchor.next = node-> link.next;73 __atomic_store_n(&this.l.anchor.ts, __atomic_load_n(&node-> link.ts, __ATOMIC_RELAXED), __ATOMIC_RELAXED);72 this.l.anchor.next = node->rdy_link.next; 73 __atomic_store_n(&this.l.anchor.ts, __atomic_load_n(&node->rdy_link.ts, __ATOMIC_RELAXED), __ATOMIC_RELAXED); 74 74 bool is_empty = this.l.anchor.next == 0p; 75 node-> link.next = 0p;76 __atomic_store_n(&node-> link.ts, ULLONG_MAX, __ATOMIC_RELAXED);75 node->rdy_link.next = 0p; 76 __atomic_store_n(&node->rdy_link.ts, ULLONG_MAX, __ATOMIC_RELAXED); 77 77 #if !defined(__CFA_NO_STATISTICS__) 78 78 this.l.cnt--; … … 83 83 84 84 unsigned long long ats = __atomic_load_n(&this.l.anchor.ts, __ATOMIC_RELAXED); 85 /* paranoid */ verify( node-> link.next == 0p );86 /* paranoid */ verify( __atomic_load_n(&node-> link.ts , __ATOMIC_RELAXED) == MAX );87 /* paranoid */ verify( __atomic_load_n(&node-> link.ts , __ATOMIC_RELAXED) != 0 );85 /* paranoid */ verify( node->rdy_link.next == 0p ); 86 /* paranoid */ verify( __atomic_load_n(&node->rdy_link.ts , __ATOMIC_RELAXED) == MAX ); 87 /* paranoid */ verify( __atomic_load_n(&node->rdy_link.ts , __ATOMIC_RELAXED) != 0 ); 88 88 /* paranoid */ verify( ats != 0 ); 89 89 /* paranoid */ verify( (ats == MAX) == is_empty ); -
libcfa/src/concurrency/thread.cfa
rb77f0e1 r63be3387 44 44 self_mon_p = &self_mon; 45 45 curr_cluster = &cl; 46 link.next = 0p;47 link.ts = MAX;46 rdy_link.next = 0p; 47 rdy_link.ts = MAX; 48 48 preferred = ready_queue_new_preferred(); 49 49 last_proc = 0p; 50 50 random_state = __global_random_mask ? __global_random_prime : __global_random_prime ^ rdtscl(); 51 51 #if defined( __CFA_WITH_VERIFY__ ) 52 executing = 0p; 52 53 canary = 0x0D15EA5E0D15EA5Ep; 53 54 #endif 54 55 node.next = 0p;56 node.prev = 0p;57 55 58 56 clh_node = malloc( ); … … 177 175 178 176 //----------------------------------------------------------------------------- 177 bool migrate( thread$ * thrd, struct cluster & cl ) { 178 179 monitor$ * tmon = get_monitor(thrd); 180 monitor$ * __monitors[] = { tmon }; 181 monitor_guard_t __guard = { __monitors, 1 }; 182 183 184 { 185 // if nothing needs to be done, return false 186 if( thrd->curr_cluster == &cl ) return false; 187 188 // are we migrating ourself? 189 const bool local = thrd == active_thread(); 190 191 /* paranoid */ verify( !local || &cl != active_cluster() ); 192 /* paranoid */ verify( !local || thrd->curr_cluster == active_cluster() ); 193 /* paranoid */ verify( !local || thrd->curr_cluster == active_processor()->cltr ); 194 /* paranoid */ verify( local || tmon->signal_stack.top->owner->waiting_thread == thrd ); 195 /* paranoid */ verify( local || tmon->signal_stack.top ); 196 197 // make sure we aren't interrupted while doing this 198 // not as important if we aren't local 199 disable_interrupts(); 200 201 // actually move the thread 202 unregister( thrd->curr_cluster, *thrd ); 203 thrd->curr_cluster = &cl; 204 doregister( thrd->curr_cluster, *thrd ); 205 206 // restore interrupts 207 enable_interrupts(); 208 209 // if this is the local thread, we are still running on the old cluster 210 if(local) yield(); 211 212 /* paranoid */ verify( !local || &cl == active_cluster() ); 213 /* paranoid */ verify( !local || thrd->curr_cluster == active_cluster() ); 214 /* paranoid */ verify( !local || thrd->curr_cluster == active_processor()->cltr ); 215 /* paranoid */ verify( local || tmon->signal_stack.top ); 216 /* paranoid */ verify( local || tmon->signal_stack.top->owner->waiting_thread == thrd ); 217 218 return true; 219 } 220 } 221 222 //----------------------------------------------------------------------------- 179 223 #define GENERATOR LCG 180 224 -
libcfa/src/concurrency/thread.hfa
rb77f0e1 r63be3387 132 132 133 133 //---------- 134 // misc 135 bool migrate( thread$ * thrd, struct cluster & cl ); 136 137 forall( T & | is_thread(T) ) 138 static inline bool migrate( T & mutex thrd, struct cluster & cl ) { return migrate( &(thread&)thrd, cl ); } 139 140 141 //---------- 134 142 // prng 135 143 static inline {
Note:
See TracChangeset
for help on using the changeset viewer.