- Timestamp:
- May 13, 2020, 6:34:09 PM (4 years ago)
- Branches:
- ADT, arm-eh, ast-experimental, enum, forall-pointer-decay, jacob/cs343-translation, master, new-ast, new-ast-unique-expr, pthread-emulation, qualifiedEnum
- Children:
- 365cb03f, 9c438546
- Parents:
- 856fe3e (diff), 979df46 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa
- Files:
-
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/Makefile.in
r856fe3e r2223c80 253 253 DEFS = @DEFS@ 254 254 DEPDIR = @DEPDIR@ 255 DIST_BWLIMIT = @DIST_BWLIMIT@ 255 256 DLLTOOL = @DLLTOOL@ 256 257 DRIVER_DIR = @DRIVER_DIR@ -
libcfa/configure
r856fe3e r2223c80 707 707 CONFIG_CFLAGS 708 708 ARCH_FLAGS 709 DIST_BWLIMIT 709 710 CFADIR_HASH 710 711 LOCAL_CC1 … … 789 790 enable_silent_rules 790 791 enable_distcc 792 with_bwlimit 791 793 with_cfa_name 792 794 enable_static … … 1465 1467 --with-PACKAGE[=ARG] use PACKAGE [ARG=yes] 1466 1468 --without-PACKAGE do not use PACKAGE (same as --with-PACKAGE=no) 1469 --with-bwlimit=RATE RATE the maximum rate at which rsync will be limited when using distributed builds 1467 1470 --with-cfa-name=NAME NAME too which cfa will be installed 1468 1471 --with-pic[=PKGS] try to use only PIC/non-PIC objects [default=use … … 3047 3050 3048 3051 3052 3053 # Check whether --with-bwlimit was given. 3054 if test "${with_bwlimit+set}" = set; then : 3055 withval=$with_bwlimit; DIST_BWLIMIT=$withval 3056 else 3057 DIST_BWLIMIT=0 3058 fi 3059 3060 3049 3061 echo -n "checking for distributated build... " 3050 3062 if test x$enable_distcc = xno; then … … 3070 3082 ENABLE_DISTCC_FALSE= 3071 3083 fi 3084 3072 3085 3073 3086 -
libcfa/configure.ac
r856fe3e r2223c80 31 31 enable_distcc=$enableval, enable_distcc=no) 32 32 33 AC_ARG_WITH(bwlimit, 34 [ --with-bwlimit=RATE RATE the maximum rate at which rsync will be limited when using distributed builds], 35 DIST_BWLIMIT=$withval, DIST_BWLIMIT=0) 36 33 37 echo -n "checking for distributated build... " 34 38 if test x$enable_distcc = xno; then … … 55 59 AC_SUBST(CFADIR_HASH) 56 60 AC_SUBST(CFA_VERSION) 61 AC_SUBST(DIST_BWLIMIT) 57 62 58 63 #============================================================================== -
libcfa/prelude/Makefile.am
r856fe3e r2223c80 72 72 if ENABLE_DISTCC 73 73 distribution: @LOCAL_CFACC@ @LOCAL_CC1@ @CFACPP@ gcc-builtins.cf builtins.cf extras.cf prelude.cfa bootloader.c $(srcdir)/../../tools/build/push2dist.sh 74 ${AM_V_GEN}$(srcdir)/../../tools/build/push2dist.sh @CFADIR_HASH@ 74 ${AM_V_GEN}$(srcdir)/../../tools/build/push2dist.sh @CFADIR_HASH@ @DIST_BWLIMIT@ 75 75 @echo "Dummy file to track distribution to remote hosts" > ${@} 76 76 -
libcfa/prelude/Makefile.in
r856fe3e r2223c80 215 215 DEFS = @DEFS@ 216 216 DEPDIR = @DEPDIR@ 217 DIST_BWLIMIT = @DIST_BWLIMIT@ 217 218 DLLTOOL = @DLLTOOL@ 218 219 DRIVER_DIR = @DRIVER_DIR@ … … 655 656 656 657 @ENABLE_DISTCC_TRUE@distribution: @LOCAL_CFACC@ @LOCAL_CC1@ @CFACPP@ gcc-builtins.cf builtins.cf extras.cf prelude.cfa bootloader.c $(srcdir)/../../tools/build/push2dist.sh 657 @ENABLE_DISTCC_TRUE@ ${AM_V_GEN}$(srcdir)/../../tools/build/push2dist.sh @CFADIR_HASH@ 658 @ENABLE_DISTCC_TRUE@ ${AM_V_GEN}$(srcdir)/../../tools/build/push2dist.sh @CFADIR_HASH@ @DIST_BWLIMIT@ 658 659 @ENABLE_DISTCC_TRUE@ @echo "Dummy file to track distribution to remote hosts" > ${@} 659 660 -
libcfa/src/Makefile.in
r856fe3e r2223c80 307 307 DEFS = @DEFS@ 308 308 DEPDIR = @DEPDIR@ 309 DIST_BWLIMIT = @DIST_BWLIMIT@ 309 310 DLLTOOL = @DLLTOOL@ 310 311 DRIVER_DIR = @DRIVER_DIR@ -
libcfa/src/concurrency/io.cfa
r856fe3e r2223c80 18 18 19 19 #include "kernel.hfa" 20 #include "bitmanip.hfa" 20 21 21 22 #if !defined(HAVE_LINUX_IO_URING_H) 22 void __kernel_io_startup( cluster &, int, bool ) {23 void __kernel_io_startup( cluster &, unsigned, bool ) { 23 24 // Nothing to do without io_uring 24 25 } … … 91 92 struct __io_poller_fast { 92 93 struct __io_data * ring; 93 bool waiting;94 94 $thread thrd; 95 95 }; … … 97 97 void ?{}( __io_poller_fast & this, struct cluster & cltr ) { 98 98 this.ring = cltr.io; 99 this.waiting = true;100 99 (this.thrd){ "Fast I/O Poller", cltr }; 101 100 } … … 126 125 // Like head/tail but not seen by the kernel 127 126 volatile uint32_t alloc; 128 volatile uint32_t ready; 127 volatile uint32_t * ready; 128 uint32_t ready_cnt; 129 129 130 130 __spinlock_t lock; … … 145 145 volatile unsigned long long int block; 146 146 } submit_avg; 147 struct { 148 volatile unsigned long long int val; 149 volatile unsigned long long int cnt; 150 volatile unsigned long long int block; 151 } look_avg; 147 152 } stats; 148 153 #endif … … 192 197 void * stack; 193 198 pthread_t kthrd; 199 volatile bool blocked; 194 200 } slow; 195 201 __io_poller_fast fast; … … 201 207 // I/O Startup / Shutdown logic 202 208 //============================================================================================= 203 void __kernel_io_startup( cluster & this, intio_flags, bool main_cluster ) {209 void __kernel_io_startup( cluster & this, unsigned io_flags, bool main_cluster ) { 204 210 this.io = malloc(); 205 211 … … 274 280 sq.array = ( uint32_t *)(((intptr_t)sq.ring_ptr) + params.sq_off.array); 275 281 sq.alloc = *sq.tail; 276 sq.ready = *sq.tail; 282 283 if( io_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 284 /* paranoid */ verify( is_pow2( io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET ) || ((io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET) < 8) ); 285 sq.ready_cnt = max(io_flags >> CFA_CLUSTER_IO_BUFFLEN_OFFSET, 8); 286 sq.ready = alloc_align( 64, sq.ready_cnt ); 287 for(i; sq.ready_cnt) { 288 sq.ready[i] = -1ul32; 289 } 290 } 291 else { 292 sq.ready_cnt = 0; 293 sq.ready = 0p; 294 } 277 295 278 296 // completion queue … … 307 325 this.io->submit_q.stats.submit_avg.cnt = 0; 308 326 this.io->submit_q.stats.submit_avg.block = 0; 327 this.io->submit_q.stats.look_avg.val = 0; 328 this.io->submit_q.stats.look_avg.cnt = 0; 329 this.io->submit_q.stats.look_avg.block = 0; 309 330 this.io->completion_q.stats.completed_avg.val = 0; 310 331 this.io->completion_q.stats.completed_avg.slow_cnt = 0; … … 326 347 // Create the poller thread 327 348 __cfadbg_print_safe(io_core, "Kernel I/O : Creating slow poller for cluter %p\n", &this); 349 this.io->poller.slow.blocked = false; 328 350 this.io->poller.slow.stack = __create_pthread( &this.io->poller.slow.kthrd, __io_poller_slow, &this ); 329 351 } … … 347 369 if( this.io->cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 348 370 with( this.io->poller.fast ) { 349 /* paranoid */ verify( waiting ); // The thread shouldn't be in a system call350 371 /* paranoid */ verify( this.procs.head == 0p || &this == mainCluster ); 351 372 /* paranoid */ verify( this.idles.head == 0p || &this == mainCluster ); 352 373 353 374 // We need to adjust the clean-up based on where the thread is 354 if( thrd. preempted != __NO_PREEMPTION ) {375 if( thrd.state == Ready || thrd.preempted != __NO_PREEMPTION ) { 355 376 356 377 // This is the tricky case 357 378 // The thread was preempted and now it is on the ready queue 358 /* paranoid */ verify( thrd.state == Active ); // The thread better be in this state359 379 /* paranoid */ verify( thrd.next == 1p ); // The thread should be the last on the list 360 380 /* paranoid */ verify( this.ready_queue.head == &thrd ); // The thread should be the only thing on the list … … 405 425 if(this.print_stats) { 406 426 with(this.io->submit_q.stats, this.io->completion_q.stats) { 427 double lavgv = 0; 428 double lavgb = 0; 429 if(look_avg.cnt != 0) { 430 lavgv = ((double)look_avg.val ) / look_avg.cnt; 431 lavgb = ((double)look_avg.block) / look_avg.cnt; 432 } 433 407 434 __cfaabi_bits_print_safe( STDERR_FILENO, 408 435 "----- I/O uRing Stats -----\n" 409 "- total submit calls : %'15llu\n" 410 "- avg submit : %'18.2lf\n" 411 "- pre-submit block %% : %'18.2lf\n" 412 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 413 "- avg completion/wait : %'18.2lf\n", 436 "- total submit calls : %'15llu\n" 437 "- avg submit : %'18.2lf\n" 438 "- pre-submit block %% : %'18.2lf\n" 439 "- total ready search : %'15llu\n" 440 "- avg ready search len : %'18.2lf\n" 441 "- avg ready search block : %'18.2lf\n" 442 "- total wait calls : %'15llu (%'llu slow, %'llu fast)\n" 443 "- avg completion/wait : %'18.2lf\n", 414 444 submit_avg.cnt, 415 445 ((double)submit_avg.val) / submit_avg.cnt, 416 446 (100.0 * submit_avg.block) / submit_avg.cnt, 447 look_avg.cnt, 448 lavgv, 449 lavgb, 417 450 completed_avg.slow_cnt + completed_avg.fast_cnt, 418 451 completed_avg.slow_cnt, completed_avg.fast_cnt, … … 441 474 close(this.io->fd); 442 475 476 free( this.io->submit_q.ready ); // Maybe null, doesn't matter 443 477 free( this.io ); 444 478 } … … 454 488 // Process a single completion message from the io_uring 455 489 // This is NOT thread-safe 456 static int __drain_io( struct __io_data & ring, sigset_t * mask, int waitcnt, bool in_kernel ) { 457 int ret = syscall( __NR_io_uring_enter, ring.fd, 0, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 490 static [int, bool] __drain_io( & struct __io_data ring, * sigset_t mask, int waitcnt, bool in_kernel ) { 491 unsigned to_submit = 0; 492 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 493 494 // If the poller thread also submits, then we need to aggregate the submissions which are ready 495 uint32_t * tail = ring.submit_q.tail; 496 const uint32_t mask = *ring.submit_q.mask; 497 498 // Go through the list of ready submissions 499 for( i; ring.submit_q.ready_cnt ) { 500 // replace any submission with the sentinel, to consume it. 501 uint32_t idx = __atomic_exchange_n( &ring.submit_q.ready[i], -1ul32, __ATOMIC_RELAXED); 502 503 // If it was already the sentinel, then we are done 504 if( idx == -1ul32 ) continue; 505 506 // If we got a real submission, append it to the list 507 ring.submit_q.array[ ((*tail) + to_submit) & mask ] = idx & mask; 508 to_submit++; 509 } 510 511 // Increment the tail based on how many we are ready to submit 512 __atomic_fetch_add(tail, to_submit, __ATOMIC_SEQ_CST); 513 514 // update statistics 515 #if !defined(__CFA_NO_STATISTICS__) 516 ring.submit_q.stats.submit_avg.val += to_submit; 517 ring.submit_q.stats.submit_avg.cnt += 1; 518 #endif 519 } 520 521 int ret = syscall( __NR_io_uring_enter, ring.fd, to_submit, waitcnt, IORING_ENTER_GETEVENTS, mask, _NSIG / 8); 458 522 if( ret < 0 ) { 459 523 switch((int)errno) { … … 497 561 __atomic_fetch_add( ring.completion_q.head, count, __ATOMIC_RELAXED ); 498 562 499 return count;563 return [count, count > 0 || to_submit > 0]; 500 564 } 501 565 … … 519 583 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_USER_THREAD ) { 520 584 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 585 586 __atomic_store_n( &ring.poller.slow.blocked, true, __ATOMIC_SEQ_CST ); 587 521 588 // In the user-thread approach drain and if anything was drained, 522 589 // batton pass to the user-thread 523 int count = __drain_io( ring, &mask, 1, true ); 590 int count; 591 bool again; 592 [count, again] = __drain_io( ring, &mask, 1, true ); 593 594 __atomic_store_n( &ring.poller.slow.blocked, false, __ATOMIC_SEQ_CST ); 524 595 525 596 // Update statistics … … 529 600 #endif 530 601 531 if( count > 0) {602 if(again) { 532 603 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to fast poller\n", &ring); 533 604 __unpark( &ring.poller.fast.thrd __cfaabi_dbg_ctx2 ); … … 539 610 while(!__atomic_load_n(&ring.done, __ATOMIC_SEQ_CST)) { 540 611 //In the naive approach, just poll the io completion queue directly 541 int count = __drain_io( ring, &mask, 1, true ); 612 int count; 613 bool again; 614 [count, again] = __drain_io( ring, &mask, 1, true ); 542 615 543 616 // Update statistics … … 566 639 // Then loop until we need to start 567 640 while(!__atomic_load_n(&this.ring->done, __ATOMIC_SEQ_CST)) { 641 568 642 // Drain the io 569 this.waiting = false; 570 int count = __drain_io( *this.ring, 0p, 0, false ); 571 reset += count > 0 ? 1 : 0; 643 int count; 644 bool again; 645 [count, again] = __drain_io( *this.ring, 0p, 0, false ); 646 647 if(!again) reset++; 572 648 573 649 // Update statistics … … 577 653 #endif 578 654 579 this.waiting = true;655 // If we got something, just yield and check again 580 656 if(reset < 5) { 581 // If we got something, just yield and check again582 657 yield(); 583 658 } 659 // We didn't get anything baton pass to the slow poller 584 660 else { 585 // We didn't get anything baton pass to the slow poller586 661 __cfadbg_print_safe(io_core, "Kernel I/O : Moving to ring %p to slow poller\n", &this.ring); 662 reset = 0; 663 664 // wake up the slow poller 587 665 post( this.ring->poller.sem ); 666 667 // park this thread 588 668 park( __cfaabi_dbg_ctx ); 589 reset = 0;590 669 } 591 670 } 592 671 593 672 __cfadbg_print_safe(io_core, "Kernel I/O : Fast poller for ring %p stopping\n", &this.ring); 673 } 674 675 static inline void __wake_poller( struct __io_data & ring ) __attribute__((artificial)); 676 static inline void __wake_poller( struct __io_data & ring ) { 677 if(!__atomic_load_n( &ring.poller.slow.blocked, __ATOMIC_SEQ_CST)) return; 678 679 sigval val = { 1 }; 680 pthread_sigqueue( ring.poller.slow.kthrd, SIGUSR1, val ); 594 681 } 595 682 … … 632 719 uint32_t idx = __atomic_fetch_add(&ring.submit_q.alloc, 1ul32, __ATOMIC_SEQ_CST); 633 720 634 // Validate that we didn't overflow anything 635 // Check that nothing overflowed 636 /* paranoid */ verify( true ); 637 638 // Check that it goes head -> tail -> alloc and never head -> alloc -> tail 639 /* paranoid */ verify( true ); 721 // Mask the idx now to allow make everything easier to check 722 idx &= *ring.submit_q.mask; 640 723 641 724 // Return the sqe 642 return [&ring.submit_q.sqes[ idx & (*ring.submit_q.mask)], idx];725 return [&ring.submit_q.sqes[ idx ], idx]; 643 726 } 644 727 645 728 static inline void __submit( struct __io_data & ring, uint32_t idx ) { 646 // get mutual exclusion 647 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 648 649 // Append to the list of ready entries 650 uint32_t * tail = ring.submit_q.tail; 729 // Get now the data we definetely need 730 uint32_t * const tail = ring.submit_q.tail; 651 731 const uint32_t mask = *ring.submit_q.mask; 652 732 653 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 654 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 655 656 // Submit however, many entries need to be submitted 657 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 658 if( ret < 0 ) { 659 switch((int)errno) { 660 default: 661 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 662 } 663 } 664 665 // update statistics 666 #if !defined(__CFA_NO_STATISTICS__) 667 ring.submit_q.stats.submit_avg.val += 1; 668 ring.submit_q.stats.submit_avg.cnt += 1; 669 #endif 670 671 unlock(ring.submit_q.lock); 672 // Make sure that idx was submitted 673 // Be careful to not get false positive if we cycled the entire list or that someone else submitted for us 674 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 733 // There are 2 submission schemes, check which one we are using 734 if( ring.cltr_flags & CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS ) { 735 // If the poller thread submits, then we just need to add this to the ready array 736 737 /* paranoid */ verify( idx <= mask ); 738 /* paranoid */ verify( idx != -1ul32 ); 739 740 // We need to find a spot in the ready array 741 __attribute((unused)) int len = 0; 742 __attribute((unused)) int block = 0; 743 uint32_t expected = -1ul32; 744 uint32_t ready_mask = ring.submit_q.ready_cnt - 1; 745 uint32_t off = __tls_rand(); 746 LOOKING: for() { 747 for(i; ring.submit_q.ready_cnt) { 748 uint32_t ii = (i + off) & ready_mask; 749 if( __atomic_compare_exchange_n( &ring.submit_q.ready[ii], &expected, idx, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED ) ) { 750 break LOOKING; 751 } 752 753 len ++; 754 } 755 756 block++; 757 yield(); 758 } 759 760 __wake_poller( ring ); 761 762 // update statistics 763 #if !defined(__CFA_NO_STATISTICS__) 764 __atomic_fetch_add( &ring.submit_q.stats.look_avg.val, len, __ATOMIC_RELAXED ); 765 __atomic_fetch_add( &ring.submit_q.stats.look_avg.block, block, __ATOMIC_RELAXED ); 766 __atomic_fetch_add( &ring.submit_q.stats.look_avg.cnt, 1, __ATOMIC_RELAXED ); 767 #endif 768 769 __cfadbg_print_safe( io, "Kernel I/O : Added %u to ready for %p\n", idx, active_thread() ); 770 } 771 else { 772 // get mutual exclusion 773 lock(ring.submit_q.lock __cfaabi_dbg_ctx2); 774 775 // Append to the list of ready entries 776 777 /* paranoid */ verify( idx <= mask ); 778 779 ring.submit_q.array[ (*tail) & mask ] = idx & mask; 780 __atomic_fetch_add(tail, 1ul32, __ATOMIC_SEQ_CST); 781 782 // Submit however, many entries need to be submitted 783 int ret = syscall( __NR_io_uring_enter, ring.fd, 1, 0, 0, 0p, 0); 784 if( ret < 0 ) { 785 switch((int)errno) { 786 default: 787 abort( "KERNEL ERROR: IO_URING SUBMIT - %s\n", strerror(errno) ); 788 } 789 } 790 791 // update statistics 792 #if !defined(__CFA_NO_STATISTICS__) 793 ring.submit_q.stats.submit_avg.val += 1; 794 ring.submit_q.stats.submit_avg.cnt += 1; 795 #endif 796 797 unlock(ring.submit_q.lock); 798 799 __cfadbg_print_safe( io, "Kernel I/O : Performed io_submit for %p, returned %d\n", active_thread(), ret ); 800 } 675 801 } 676 802 -
libcfa/src/concurrency/kernel.cfa
r856fe3e r2223c80 256 256 } 257 257 258 void ?{}(cluster & this, const char name[], Duration preemption_rate, intio_flags) with( this ) {258 void ?{}(cluster & this, const char name[], Duration preemption_rate, unsigned io_flags) with( this ) { 259 259 this.name = name; 260 260 this.preemption_rate = preemption_rate; … … 374 374 375 375 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 376 /* paranoid */ verify( kernelTLS.this_thread == thrd_dst ); 376 377 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) < ((uintptr_t)__get_stack(thrd_dst->curr_cor)->base ) || thrd_dst->curr_cor == proc_cor, "ERROR : Destination $thread %p has been corrupted.\n StackPointer too small.\n", thrd_dst ); // add escape condition if we are setting up the processor 377 378 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) > ((uintptr_t)__get_stack(thrd_dst->curr_cor)->limit) || thrd_dst->curr_cor == proc_cor, "ERROR : Destination $thread %p has been corrupted.\n StackPointer too large.\n", thrd_dst ); // add escape condition if we are setting up the processor … … 384 385 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) > ((uintptr_t)__get_stack(thrd_dst->curr_cor)->limit), "ERROR : Destination $thread %p has been corrupted.\n StackPointer too large.\n", thrd_dst ); 385 386 /* paranoid */ verifyf( ((uintptr_t)thrd_dst->context.SP) < ((uintptr_t)__get_stack(thrd_dst->curr_cor)->base ), "ERROR : Destination $thread %p has been corrupted.\n StackPointer too small.\n", thrd_dst ); 387 /* paranoid */ verify( kernelTLS.this_thread == thrd_dst ); 386 388 /* paranoid */ verify( ! kernelTLS.preemption_state.enabled ); 387 389 -
libcfa/src/concurrency/kernel.hfa
r856fe3e r2223c80 116 116 struct __io_data; 117 117 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 119 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 1 118 #define CFA_CLUSTER_IO_POLLER_USER_THREAD 1 << 0 // 0x1 119 #define CFA_CLUSTER_IO_POLLER_THREAD_SUBMITS 1 << 1 // 0x2 120 // #define CFA_CLUSTER_IO_POLLER_KERNEL_SIDE 1 << 2 // 0x4 121 #define CFA_CLUSTER_IO_BUFFLEN_OFFSET 16 120 122 121 123 //----------------------------------------------------------------------------- … … 159 161 extern Duration default_preemption(); 160 162 161 void ?{} (cluster & this, const char name[], Duration preemption_rate, intflags);163 void ?{} (cluster & this, const char name[], Duration preemption_rate, unsigned flags); 162 164 void ^?{}(cluster & this); 163 165 164 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; }165 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; }166 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; }167 static inline void ?{} (cluster & this, intflags) { this{"Anonymous Cluster", default_preemption(), flags}; }168 static inline void ?{} (cluster & this, Duration preemption_rate, intflags) { this{"Anonymous Cluster", preemption_rate, flags}; }169 static inline void ?{} (cluster & this, const char name[], intflags) { this{name, default_preemption(), flags}; }166 static inline void ?{} (cluster & this) { this{"Anonymous Cluster", default_preemption(), 0}; } 167 static inline void ?{} (cluster & this, Duration preemption_rate) { this{"Anonymous Cluster", preemption_rate, 0}; } 168 static inline void ?{} (cluster & this, const char name[]) { this{name, default_preemption(), 0}; } 169 static inline void ?{} (cluster & this, unsigned flags) { this{"Anonymous Cluster", default_preemption(), flags}; } 170 static inline void ?{} (cluster & this, Duration preemption_rate, unsigned flags) { this{"Anonymous Cluster", preemption_rate, flags}; } 171 static inline void ?{} (cluster & this, const char name[], unsigned flags) { this{name, default_preemption(), flags}; } 170 172 171 173 static inline [cluster *&, cluster *& ] __get( cluster & this ) __attribute__((const)) { return this.node.[next, prev]; } -
libcfa/src/concurrency/kernel_private.hfa
r856fe3e r2223c80 77 77 //----------------------------------------------------------------------------- 78 78 // I/O 79 void __kernel_io_startup ( cluster &, int, bool );79 void __kernel_io_startup ( cluster &, unsigned, bool ); 80 80 void __kernel_io_finish_start( cluster & ); 81 81 void __kernel_io_prepare_stop( cluster & ); -
libcfa/src/containers/list.hfa
r856fe3e r2223c80 301 301 $prev_link(list_pos) = (Telem*) 0p; 302 302 } 303 304 static inline bool ?`is_empty(dlist(Tnode, Telem) &list) { 305 assert( &list != 0p ); 306 $dlinks(Telem) *listLinks = & list.$links; 307 if (listLinks->next.is_terminator) { 308 assert(listLinks->prev.is_terminator); 309 assert(listLinks->next.terminator); 310 assert(listLinks->prev.terminator); 311 return true; 312 } else { 313 assert(!listLinks->prev.is_terminator); 314 assert(listLinks->next.elem); 315 assert(listLinks->prev.elem); 316 return false; 317 } 318 } 319 320 static inline Telem & pop_first(dlist(Tnode, Telem) &list) { 321 assert( &list != 0p ); 322 assert( !list`is_empty ); 323 $dlinks(Telem) *listLinks = & list.$links; 324 Telem & first = *listLinks->next.elem; 325 Tnode & list_pos_first = $tempcv_e2n( first ); 326 remove(list_pos_first); 327 return first; 328 } 329 330 static inline Telem & pop_last(dlist(Tnode, Telem) &list) { 331 assert( &list != 0p ); 332 assert( !list`is_empty ); 333 $dlinks(Telem) *listLinks = & list.$links; 334 Telem & last = *listLinks->prev.elem; 335 Tnode & list_pos_last = $tempcv_e2n( last ); 336 remove(list_pos_last); 337 return last; 338 } 339 303 340 } 304 341 -
libcfa/src/exception.c
r856fe3e r2223c80 121 121 122 122 123 // TERMINATION =============================================================== 124 125 // MEMORY MANAGEMENT (still for integers) 126 // May have to move to cfa for constructors and destructors (references). 123 // MEMORY MANAGEMENT ========================================================= 127 124 128 125 // How to clean up an exception in various situations. … … 203 200 } 204 201 205 // If this isn't a rethrow (*except==0), delete the provided exception. 206 void __cfaehm_cleanup_terminate( void * except ) { 207 if ( *(void**)except ) __cfaehm_delete_exception( *(exception_t **)except ); 208 } 202 // CANCELLATION ============================================================== 209 203 210 204 // Function needed by force unwind … … 228 222 } 229 223 224 // Cancel the current stack, prefroming approprate clean-up and messaging. 225 void __cfaehm_cancel_stack( exception_t * exception ) { 226 // TODO: Detect current stack and pick a particular stop-function. 227 _Unwind_Reason_Code ret; 228 ret = _Unwind_ForcedUnwind( &this_exception_storage, _Stop_Fn, (void*)0x22 ); 229 printf("UNWIND ERROR %d after force unwind\n", ret); 230 abort(); 231 } 232 233 234 // TERMINATION =============================================================== 235 236 // If this isn't a rethrow (*except==0), delete the provided exception. 237 void __cfaehm_cleanup_terminate( void * except ) { 238 if ( *(void**)except ) __cfaehm_delete_exception( *(exception_t **)except ); 239 } 240 230 241 // The exception that is being thrown must already be stored. 231 242 static __attribute__((noreturn)) void __cfaehm_begin_unwind(void) { … … 245 256 // the whole stack. 246 257 258 // No handler found, go to the default operation. 259 // Currently this will always be a cancellation. 247 260 if ( ret == _URC_END_OF_STACK ) { 248 // No proper handler was found. This can be handled in many ways, C++ calls std::terminate. 249 // Here we force unwind the stack, basically raising a cancellation. 250 printf("Uncaught exception %p\n", &this_exception_storage); 251 252 ret = _Unwind_ForcedUnwind( &this_exception_storage, _Stop_Fn, (void*)0x22 ); 253 printf("UNWIND ERROR %d after force unwind\n", ret); 254 abort(); 261 __cfadbg_print_safe(exception, "Uncaught exception %p\n", &this_exception_storage); 262 263 __cfaehm_cancel_stack(this_exception_context()->current_exception); 255 264 } 256 265 -
libcfa/src/exception.h
r856fe3e r2223c80 38 38 39 39 40 void __cfaehm_cancel_stack(exception_t * except) __attribute__((noreturn)); 41 40 42 // Used in throw statement translation. 41 43 void __cfaehm_throw_terminate(exception_t * except) __attribute__((noreturn)); -
libcfa/src/exception.hfa
r856fe3e r2223c80 10 10 // Created On : Thu Apr 7 10:25:00 2020 11 11 // Last Modified By : Andrew Beach 12 // Last Modified On : Thu Apr 7 10:25:00 202013 // Update Count : 012 // Last Modified On : Wed Apr 13 15:42:00 2020 13 // Update Count : 1 14 14 // 15 16 // WARNING: This is for documentation as it will match ANY type. 17 trait is_exception(dtype T) { 18 /* The first field must be a pointer to a virtual table. 19 * That virtual table must be a decendent of the base exception virtual table. 20 */ 21 }; 22 23 forall(dtype T | is_exception(T)) 24 inline void cancel_stack(T & except) __attribute__((noreturn)) { 25 __cfaehm_cancel_stack( (exception_t *)&except ); 26 } 15 27 16 28 // Everything below this line should be considered a patch while the exception -
libcfa/src/executor.cfa
r856fe3e r2223c80 4 4 // buffer. 5 5 6 #include < bits/containers.hfa>6 #include <containers/list.hfa> 7 7 #include <thread.hfa> 8 8 #include <stdio.h> 9 9 10 forall( dtype T )11 monitor Buffer { // unbounded buffer12 __queue_t( T ) queue; // unbounded list of work requests13 condition delay;14 }; // Buffer15 forall( dtype T | is_node(T) ) {16 void insert( Buffer( T ) & mutex buf, T * elem ) with(buf) {17 append( queue, elem ); // insert element into buffer18 signal( delay ); // restart19 } // insert20 21 T * remove( Buffer( T ) & mutex buf ) with(buf) {22 if ( queue.head != 0 ) wait( delay ); // no request to process ? => wait23 // return pop_head( queue );24 } // remove25 } // distribution26 27 10 struct WRequest { // client request, no return 28 11 void (* action)( void ); 29 WRequest * next; // intrusive queue field12 DLISTED_MGD_IMPL_IN(WRequest) 30 13 }; // WRequest 14 DLISTED_MGD_IMPL_OUT(WRequest) 31 15 32 WRequest *& get_next( WRequest & this ) { return this.next; } 33 void ?{}( WRequest & req ) with(req) { action = 0; next = 0; } 34 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; next = 0; } 16 void ?{}( WRequest & req ) with(req) { action = 0; } 17 void ?{}( WRequest & req, void (* action)( void ) ) with(req) { req.action = action; } 35 18 bool stop( WRequest & req ) { return req.action == 0; } 36 19 void doit( WRequest & req ) { req.action(); } 20 21 monitor WRBuffer { // unbounded buffer 22 dlist( WRequest, WRequest ) queue; // unbounded list of work requests 23 condition delay; 24 }; // WRBuffer 25 26 void insert( WRBuffer & mutex buf, WRequest * elem ) with(buf) { 27 insert_last( queue, *elem ); // insert element into buffer 28 signal( delay ); // restart 29 } // insert 30 31 WRequest * remove( WRBuffer & mutex buf ) with(buf) { 32 if ( queue`is_empty ) wait( delay ); // no request to process ? => wait 33 return & pop_first( queue ); 34 } // remove 37 35 38 36 // Each worker has its own work buffer to reduce contention between client and server. Hence, work requests arrive and … … 40 38 41 39 thread Worker { 42 Buffer( WRequest )* requests;40 WRBuffer * requests; 43 41 unsigned int start, range; 44 42 }; // Worker … … 54 52 } // Worker::main 55 53 56 void ?{}( Worker & worker, cluster * wc, Buffer( WRequest )* requests, unsigned int start, unsigned int range ) {54 void ?{}( Worker & worker, cluster * wc, WRBuffer * requests, unsigned int start, unsigned int range ) { 57 55 (*get_thread(worker)){ *wc }; // create on given cluster 58 56 worker.[requests, start, range] = [requests, start, range]; … … 62 60 cluster * cluster; // if workers execute on separate cluster 63 61 processor ** processors; // array of virtual processors adding parallelism for workers 64 Buffer( WRequest ) * requests;// list of work requests62 WRBuffer * requests; // list of work requests 65 63 Worker ** workers; // array of workers executing work requests 66 64 unsigned int nprocessors, nworkers, nmailboxes; // number of mailboxes/workers/processor tasks … … 79 77 cluster = sepClus ? new( "Executor" ) : active_cluster(); 80 78 processors = (processor **)anew( nprocessors ); 81 requests = anew( nmailboxes );79 requests = (WRBuffer *)anew( nmailboxes ); 82 80 workers = (Worker **)anew( nworkers ); 83 81 … … 141 139 for ( i; 3000 ) { 142 140 send( exector, workie ); 143 if ( i % 100 ) yield(); 141 if ( i % 100 == 0 ) { 142 // fprintf( stderr, "%d\n", i ); 143 yield(); 144 } 144 145 } // for 145 146 }
Note: See TracChangeset
for help on using the changeset viewer.