- Timestamp:
- Sep 24, 2021, 6:13:46 PM (4 years ago)
- Branches:
- ADT, ast-experimental, enum, forall-pointer-decay, master, pthread-emulation, qualifiedEnum
- Children:
- 166b384
- Parents:
- 7e7a076 (diff), 9411cf0 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - Location:
- libcfa/src
- Files:
-
- 16 edited
Legend:
- Unmodified
- Added
- Removed
-
libcfa/src/concurrency/clib/cfathread.cfa
r7e7a076 rf93c50a 14 14 // 15 15 16 // #define EPOLL_FOR_SOCKETS 17 16 18 #include "fstream.hfa" 17 19 #include "locks.hfa" … … 23 25 #include "cfathread.h" 24 26 27 extern "C" { 28 #include <string.h> 29 #include <errno.h> 30 } 31 25 32 extern void ?{}(processor &, const char[], cluster &, thread$ *); 26 33 extern "C" { 27 34 extern void __cfactx_invoke_thread(void (*main)(void *), void * this); 35 extern int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags); 28 36 } 29 37 30 38 extern Time __kernel_get_time(); 39 extern unsigned register_proc_id( void ); 31 40 32 41 //================================================================================ 33 // Thread run y the C Interface 42 // Epoll support for sockets 43 44 #if defined(EPOLL_FOR_SOCKETS) 45 extern "C" { 46 #include <sys/epoll.h> 47 #include <sys/resource.h> 48 } 49 50 static pthread_t master_poller; 51 static int master_epollfd = 0; 52 static size_t poller_cnt = 0; 53 static int * poller_fds = 0p; 54 static struct leaf_poller * pollers = 0p; 55 56 struct __attribute__((aligned)) fd_info_t { 57 int pollid; 58 size_t rearms; 59 }; 60 rlim_t fd_limit = 0; 61 static fd_info_t * volatile * fd_map = 0p; 62 63 void * master_epoll( __attribute__((unused)) void * args ) { 64 unsigned id = register_proc_id(); 65 66 enum { MAX_EVENTS = 5 }; 67 struct epoll_event events[MAX_EVENTS]; 68 for() { 69 int ret = epoll_wait(master_epollfd, events, MAX_EVENTS, -1); 70 if ( ret < 0 ) { 71 abort | "Master epoll error: " | strerror(errno); 72 } 73 74 for(i; ret) { 75 thread$ * thrd = (thread$ *)events[i].data.u64; 76 unpark( thrd ); 77 } 78 } 79 80 return 0p; 81 } 82 83 static inline int epoll_rearm(int epollfd, int fd, uint32_t event) { 84 struct epoll_event eevent; 85 eevent.events = event | EPOLLET | EPOLLONESHOT; 86 eevent.data.u64 = (uint64_t)active_thread(); 87 88 if(0 != epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &eevent)) 89 { 90 if(errno == ENOENT) return -1; 91 abort | acquire | "epoll" | epollfd | "ctl rearm" | fd | "error: " | errno | strerror(errno); 92 } 93 94 park(); 95 return 0; 96 } 97 98 thread leaf_poller { 99 int epollfd; 100 }; 101 102 void ?{}(leaf_poller & this, int fd) { this.epollfd = fd; } 103 104 void main(leaf_poller & this) { 105 enum { MAX_EVENTS = 1024 }; 106 struct epoll_event events[MAX_EVENTS]; 107 const int max_retries = 5; 108 int retries = max_retries; 109 110 struct epoll_event event; 111 event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; 112 event.data.u64 = (uint64_t)&(thread&)this; 113 114 if(0 != epoll_ctl(master_epollfd, EPOLL_CTL_ADD, this.epollfd, &event)) 115 { 116 abort | "master epoll ctl add leaf: " | errno | strerror(errno); 117 } 118 119 park(); 120 121 for() { 122 yield(); 123 int ret = epoll_wait(this.epollfd, events, MAX_EVENTS, 0); 124 if ( ret < 0 ) { 125 abort | "Leaf epoll error: " | errno | strerror(errno); 126 } 127 128 if(ret) { 129 for(i; ret) { 130 thread$ * thrd = (thread$ *)events[i].data.u64; 131 unpark( thrd, UNPARK_REMOTE ); 132 } 133 } 134 else if(0 >= --retries) { 135 epoll_rearm(master_epollfd, this.epollfd, EPOLLIN); 136 } 137 } 138 } 139 140 void setup_epoll( void ) __attribute__(( constructor )); 141 void setup_epoll( void ) { 142 if(master_epollfd) abort | "Master epoll already setup"; 143 144 master_epollfd = epoll_create1(0); 145 if(master_epollfd == -1) { 146 abort | "failed to create master epoll: " | errno | strerror(errno); 147 } 148 149 struct rlimit rlim; 150 if(int ret = getrlimit(RLIMIT_NOFILE, &rlim); 0 != ret) { 151 abort | "failed to get nofile limit: " | errno | strerror(errno); 152 } 153 154 fd_limit = rlim.rlim_cur; 155 fd_map = alloc(fd_limit); 156 for(i;fd_limit) { 157 fd_map[i] = 0p; 158 } 159 160 poller_cnt = 2; 161 poller_fds = alloc(poller_cnt); 162 pollers = alloc(poller_cnt); 163 for(i; poller_cnt) { 164 poller_fds[i] = epoll_create1(0); 165 if(poller_fds[i] == -1) { 166 abort | "failed to create leaf epoll [" | i | "]: " | errno | strerror(errno); 167 } 168 169 (pollers[i]){ poller_fds[i] }; 170 } 171 172 pthread_attr_t attr; 173 if (int ret = pthread_attr_init(&attr); 0 != ret) { 174 abort | "failed to create master epoll thread attr: " | ret | strerror(ret); 175 } 176 177 if (int ret = pthread_create(&master_poller, &attr, master_epoll, 0p); 0 != ret) { 178 abort | "failed to create master epoll thread: " | ret | strerror(ret); 179 } 180 } 181 182 static inline int epoll_wait(int fd, uint32_t event) { 183 if(fd_map[fd] >= 1p) { 184 fd_map[fd]->rearms++; 185 epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event); 186 return 0; 187 } 188 189 for() { 190 fd_info_t * expected = 0p; 191 fd_info_t * sentinel = 1p; 192 if(__atomic_compare_exchange_n( &(fd_map[fd]), &expected, sentinel, true, __ATOMIC_SEQ_CST, __ATOMIC_RELAXED)) { 193 struct epoll_event eevent; 194 eevent.events = event | EPOLLET | EPOLLONESHOT; 195 eevent.data.u64 = (uint64_t)active_thread(); 196 197 int id = thread_rand() % poller_cnt; 198 if(0 != epoll_ctl(poller_fds[id], EPOLL_CTL_ADD, fd, &eevent)) 199 { 200 abort | "epoll ctl add" | poller_fds[id] | fd | fd_map[fd] | expected | "error: " | errno | strerror(errno); 201 } 202 203 fd_info_t * ninfo = alloc(); 204 ninfo->pollid = id; 205 ninfo->rearms = 0; 206 __atomic_store_n( &fd_map[fd], ninfo, __ATOMIC_SEQ_CST); 207 208 park(); 209 return 0; 210 } 211 212 if(expected >= 0) { 213 fd_map[fd]->rearms++; 214 epoll_rearm(poller_fds[fd_map[fd]->pollid], fd, event); 215 return 0; 216 } 217 218 Pause(); 219 } 220 } 221 #endif 222 223 //================================================================================ 224 // Thread run by the C Interface 34 225 35 226 struct cfathread_object { … … 245 436 // Mutex 246 437 struct cfathread_mutex { 247 fast_lock impl;438 linear_backoff_then_block_lock impl; 248 439 }; 249 440 int cfathread_mutex_init(cfathread_mutex_t *restrict mut, const cfathread_mutexattr_t *restrict) __attribute__((nonnull (1))) { *mut = new(); return 0; } … … 260 451 // Condition 261 452 struct cfathread_condition { 262 condition_variable( fast_lock) impl;453 condition_variable(linear_backoff_then_block_lock) impl; 263 454 }; 264 455 int cfathread_cond_init(cfathread_cond_t *restrict cond, const cfathread_condattr_t *restrict) __attribute__((nonnull (1))) { *cond = new(); return 0; } … … 288 479 // IO operations 289 480 int cfathread_socket(int domain, int type, int protocol) { 290 return socket(domain, type, protocol); 481 return socket(domain, type 482 #if defined(EPOLL_FOR_SOCKETS) 483 | SOCK_NONBLOCK 484 #endif 485 , protocol); 291 486 } 292 487 int cfathread_bind(int socket, const struct sockaddr *address, socklen_t address_len) { … … 299 494 300 495 int cfathread_accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len) { 301 return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY); 496 #if defined(EPOLL_FOR_SOCKETS) 497 int ret; 498 for() { 499 yield(); 500 ret = accept4(socket, address, address_len, SOCK_NONBLOCK); 501 if(ret >= 0) break; 502 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 503 504 epoll_wait(socket, EPOLLIN); 505 } 506 return ret; 507 #else 508 return cfa_accept4(socket, address, address_len, 0, CFA_IO_LAZY); 509 #endif 302 510 } 303 511 304 512 int cfathread_connect(int socket, const struct sockaddr *address, socklen_t address_len) { 305 return cfa_connect(socket, address, address_len, CFA_IO_LAZY); 513 #if defined(EPOLL_FOR_SOCKETS) 514 int ret; 515 for() { 516 ret = connect(socket, address, address_len); 517 if(ret >= 0) break; 518 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 519 520 epoll_wait(socket, EPOLLIN); 521 } 522 return ret; 523 #else 524 return cfa_connect(socket, address, address_len, CFA_IO_LAZY); 525 #endif 306 526 } 307 527 … … 315 535 316 536 ssize_t cfathread_sendmsg(int socket, const struct msghdr *message, int flags) { 317 return cfa_sendmsg(socket, message, flags, CFA_IO_LAZY); 537 #if defined(EPOLL_FOR_SOCKETS) 538 ssize_t ret; 539 __STATS__( false, io.ops.sockwrite++; ) 540 for() { 541 ret = sendmsg(socket, message, flags); 542 if(ret >= 0) break; 543 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 544 545 __STATS__( false, io.ops.epllwrite++; ) 546 epoll_wait(socket, EPOLLOUT); 547 } 548 #else 549 ssize_t ret = cfa_sendmsg(socket, message, flags, CFA_IO_LAZY); 550 #endif 551 return ret; 318 552 } 319 553 320 554 ssize_t cfathread_write(int fildes, const void *buf, size_t nbyte) { 321 555 // Use send rather then write for socket since it's faster 322 return cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY); 556 #if defined(EPOLL_FOR_SOCKETS) 557 ssize_t ret; 558 // __STATS__( false, io.ops.sockwrite++; ) 559 for() { 560 ret = send(fildes, buf, nbyte, 0); 561 if(ret >= 0) break; 562 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 563 564 // __STATS__( false, io.ops.epllwrite++; ) 565 epoll_wait(fildes, EPOLLOUT); 566 } 567 #else 568 ssize_t ret = cfa_send(fildes, buf, nbyte, 0, CFA_IO_LAZY); 569 #endif 570 return ret; 323 571 } 324 572 … … 336 584 msg.msg_controllen = 0; 337 585 338 ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY); 586 #if defined(EPOLL_FOR_SOCKETS) 587 ssize_t ret; 588 yield(); 589 for() { 590 ret = recvmsg(socket, &msg, flags); 591 if(ret >= 0) break; 592 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 593 594 epoll_wait(socket, EPOLLIN); 595 } 596 #else 597 ssize_t ret = cfa_recvmsg(socket, &msg, flags, CFA_IO_LAZY); 598 #endif 339 599 340 600 if(address_len) *address_len = msg.msg_namelen; … … 344 604 ssize_t cfathread_read(int fildes, void *buf, size_t nbyte) { 345 605 // Use recv rather then read for socket since it's faster 346 return cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY); 347 } 348 349 } 606 #if defined(EPOLL_FOR_SOCKETS) 607 ssize_t ret; 608 __STATS__( false, io.ops.sockread++; ) 609 yield(); 610 for() { 611 ret = recv(fildes, buf, nbyte, 0); 612 if(ret >= 0) break; 613 if(errno != EAGAIN && errno != EWOULDBLOCK) break; 614 615 __STATS__( false, io.ops.epllread++; ) 616 epoll_wait(fildes, EPOLLIN); 617 } 618 #else 619 ssize_t ret = cfa_recv(fildes, buf, nbyte, 0, CFA_IO_LAZY); 620 #endif 621 return ret; 622 } 623 624 } -
libcfa/src/concurrency/invoke.h
r7e7a076 rf93c50a 170 170 bool corctx_flag; 171 171 172 int last_cpu;173 174 172 //SKULLDUGGERY errno is not save in the thread data structure because returnToKernel appears to be the only function to require saving and restoring it 175 173 … … 177 175 struct cluster * curr_cluster; 178 176 179 // preferred ready-queue 177 // preferred ready-queue or CPU 180 178 unsigned preferred; 181 179 -
libcfa/src/concurrency/io.cfa
r7e7a076 rf93c50a 90 90 static inline unsigned __flush( struct $io_context & ); 91 91 static inline __u32 __release_sqes( struct $io_context & ); 92 extern void __kernel_unpark( thread$ * thrd );92 extern void __kernel_unpark( thread$ * thrd, unpark_hint ); 93 93 94 94 bool __cfa_io_drain( processor * proc ) { … … 118 118 __cfadbg_print_safe( io, "Kernel I/O : Syscall completed : cqe %p, result %d for %p\n", &cqe, cqe.res, future ); 119 119 120 __kernel_unpark( fulfil( *future, cqe.res, false ) );120 __kernel_unpark( fulfil( *future, cqe.res, false ), UNPARK_LOCAL ); 121 121 } 122 122 -
libcfa/src/concurrency/kernel.cfa
r7e7a076 rf93c50a 341 341 } 342 342 343 343 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 0\n", this->unique_id, rdtscl()); ) 344 344 __cfadbg_print_safe(runtime_core, "Kernel : core %p waiting on eventfd %d\n", this, this->idle); 345 345 346 // __disable_interrupts_hard(); 347 eventfd_t val; 348 eventfd_read( this->idle, &val ); 349 // __enable_interrupts_hard(); 346 { 347 eventfd_t val; 348 ssize_t ret = read( this->idle, &val, sizeof(val) ); 349 if(ret < 0) { 350 switch((int)errno) { 351 case EAGAIN: 352 #if EAGAIN != EWOULDBLOCK 353 case EWOULDBLOCK: 354 #endif 355 case EINTR: 356 // No need to do anything special here, just assume it's a legitimate wake-up 357 break; 358 default: 359 abort( "KERNEL : internal error, read failure on idle eventfd, error(%d) %s.", (int)errno, strerror( (int)errno ) ); 360 } 361 } 362 } 350 363 351 364 __STATS( if(this->print_halts) __cfaabi_bits_print_safe( STDOUT_FILENO, "PH:%d - %lld 1\n", this->unique_id, rdtscl()); ) … … 409 422 /* paranoid */ verifyf( thrd_dst->link.next == 0p, "Expected null got %p", thrd_dst->link.next ); 410 423 __builtin_prefetch( thrd_dst->context.SP ); 411 412 int curr = __kernel_getcpu();413 if(thrd_dst->last_cpu != curr) {414 int64_t l = thrd_dst->last_cpu;415 int64_t c = curr;416 int64_t v = (l << 32) | c;417 __push_stat( __tls_stats(), v, false, "Processor", this );418 }419 420 thrd_dst->last_cpu = curr;421 424 422 425 __cfadbg_print_safe(runtime_core, "Kernel : core %p running thread %p (%s)\n", this, thrd_dst, thrd_dst->self_cor.name); … … 473 476 if(unlikely(thrd_dst->preempted != __NO_PREEMPTION)) { 474 477 // The thread was preempted, reschedule it and reset the flag 475 schedule_thread$( thrd_dst );478 schedule_thread$( thrd_dst, UNPARK_LOCAL ); 476 479 break RUNNING; 477 480 } … … 557 560 // Scheduler routines 558 561 // KERNEL ONLY 559 static void __schedule_thread( thread$ * thrd ) {562 static void __schedule_thread( thread$ * thrd, unpark_hint hint ) { 560 563 /* paranoid */ verify( ! __preemption_enabled() ); 561 564 /* paranoid */ verify( ready_schedule_islocked()); … … 577 580 // Dereference the thread now because once we push it, there is not guaranteed it's still valid. 578 581 struct cluster * cl = thrd->curr_cluster; 579 __STATS(bool outside = thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; )582 __STATS(bool outside = hint == UNPARK_LOCAL && thrd->last_proc && thrd->last_proc != kernelTLS().this_processor; ) 580 583 581 584 // push the thread to the cluster ready-queue 582 push( cl, thrd, local);585 push( cl, thrd, hint ); 583 586 584 587 // variable thrd is no longer safe to use … … 605 608 } 606 609 607 void schedule_thread$( thread$ * thrd ) {610 void schedule_thread$( thread$ * thrd, unpark_hint hint ) { 608 611 ready_schedule_lock(); 609 __schedule_thread( thrd );612 __schedule_thread( thrd, hint ); 610 613 ready_schedule_unlock(); 611 614 } … … 658 661 } 659 662 660 void __kernel_unpark( thread$ * thrd ) {663 void __kernel_unpark( thread$ * thrd, unpark_hint hint ) { 661 664 /* paranoid */ verify( ! __preemption_enabled() ); 662 665 /* paranoid */ verify( ready_schedule_islocked()); … … 666 669 if(__must_unpark(thrd)) { 667 670 // Wake lost the race, 668 __schedule_thread( thrd );671 __schedule_thread( thrd, hint ); 669 672 } 670 673 … … 673 676 } 674 677 675 void unpark( thread$ * thrd ) {678 void unpark( thread$ * thrd, unpark_hint hint ) { 676 679 if( !thrd ) return; 677 680 … … 679 682 disable_interrupts(); 680 683 // Wake lost the race, 681 schedule_thread$( thrd );684 schedule_thread$( thrd, hint ); 682 685 enable_interrupts(false); 683 686 } -
libcfa/src/concurrency/kernel.hfa
r7e7a076 rf93c50a 151 151 struct __attribute__((aligned(128))) __timestamp_t { 152 152 volatile unsigned long long tv; 153 }; 154 155 static inline void ?{}(__timestamp_t & this) { this.tv = 0; } 153 volatile unsigned long long ma; 154 }; 155 156 // Aligned timestamps which are used by the relaxed ready queue 157 struct __attribute__((aligned(128))) __help_cnts_t { 158 volatile unsigned long long src; 159 volatile unsigned long long dst; 160 volatile unsigned long long tri; 161 }; 162 163 static inline void ?{}(__timestamp_t & this) { this.tv = 0; this.ma = 0; } 156 164 static inline void ^?{}(__timestamp_t & this) {} 157 165 … … 169 177 // Array of times 170 178 __timestamp_t * volatile tscs; 179 180 // Array of stats 181 __help_cnts_t * volatile help; 171 182 172 183 // Number of lanes (empty or not) -
libcfa/src/concurrency/kernel/fwd.hfa
r7e7a076 rf93c50a 119 119 120 120 extern "Cforall" { 121 enum unpark_hint { UNPARK_LOCAL, UNPARK_REMOTE }; 122 121 123 extern void park( void ); 122 extern void unpark( struct thread$ * this ); 124 extern void unpark( struct thread$ *, unpark_hint ); 125 static inline void unpark( struct thread$ * thrd ) { unpark(thrd, UNPARK_LOCAL); } 123 126 static inline struct thread$ * active_thread () { 124 127 struct thread$ * t = publicTLS_get( this_thread ); -
libcfa/src/concurrency/kernel/startup.cfa
r7e7a076 rf93c50a 200 200 __cfadbg_print_safe(runtime_core, "Kernel : Main cluster ready\n"); 201 201 202 // Construct the processor context of the main processor 203 void ?{}(processorCtx_t & this, processor * proc) { 204 (this.__cor){ "Processor" }; 205 this.__cor.starter = 0p; 206 this.proc = proc; 207 } 208 209 void ?{}(processor & this) with( this ) { 210 ( this.terminated ){}; 211 ( this.runner ){}; 212 init( this, "Main Processor", *mainCluster, 0p ); 213 kernel_thread = pthread_self(); 214 215 runner{ &this }; 216 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner); 217 } 218 219 // Initialize the main processor and the main processor ctx 220 // (the coroutine that contains the processing control flow) 221 mainProcessor = (processor *)&storage_mainProcessor; 222 (*mainProcessor){}; 223 224 register_tls( mainProcessor ); 225 202 226 // Start by initializing the main thread 203 227 // SKULLDUGGERY: the mainThread steals the process main thread … … 210 234 __cfadbg_print_safe(runtime_core, "Kernel : Main thread ready\n"); 211 235 212 213 214 // Construct the processor context of the main processor215 void ?{}(processorCtx_t & this, processor * proc) {216 (this.__cor){ "Processor" };217 this.__cor.starter = 0p;218 this.proc = proc;219 }220 221 void ?{}(processor & this) with( this ) {222 ( this.terminated ){};223 ( this.runner ){};224 init( this, "Main Processor", *mainCluster, 0p );225 kernel_thread = pthread_self();226 227 runner{ &this };228 __cfadbg_print_safe(runtime_core, "Kernel : constructed main processor context %p\n", &runner);229 }230 231 // Initialize the main processor and the main processor ctx232 // (the coroutine that contains the processing control flow)233 mainProcessor = (processor *)&storage_mainProcessor;234 (*mainProcessor){};235 236 register_tls( mainProcessor );237 mainThread->last_cpu = __kernel_getcpu();238 239 236 //initialize the global state variables 240 237 __cfaabi_tls.this_processor = mainProcessor; … … 252 249 // Add the main thread to the ready queue 253 250 // once resume is called on mainProcessor->runner the mainThread needs to be scheduled like any normal thread 254 schedule_thread$(mainThread );251 schedule_thread$(mainThread, UNPARK_LOCAL); 255 252 256 253 // SKULLDUGGERY: Force a context switch to the main processor to set the main thread's context to the current UNIX … … 486 483 link.next = 0p; 487 484 link.ts = -1llu; 488 preferred = -1u;485 preferred = ready_queue_new_preferred(); 489 486 last_proc = 0p; 490 487 #if defined( __CFA_WITH_VERIFY__ ) -
libcfa/src/concurrency/kernel_private.hfa
r7e7a076 rf93c50a 46 46 } 47 47 48 void schedule_thread$( thread$ * ) __attribute__((nonnull (1)));48 void schedule_thread$( thread$ *, unpark_hint hint ) __attribute__((nonnull (1))); 49 49 50 50 extern bool __preemption_enabled(); … … 300 300 // push thread onto a ready queue for a cluster 301 301 // returns true if the list was previously empty, false otherwise 302 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool local);302 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint); 303 303 304 304 //----------------------------------------------------------------------- … … 321 321 322 322 //----------------------------------------------------------------------- 323 // get preferred ready for new thread 324 unsigned ready_queue_new_preferred(); 325 326 //----------------------------------------------------------------------- 323 327 // Increase the width of the ready queue (number of lanes) by 4 324 328 void ready_queue_grow (struct cluster * cltr); -
libcfa/src/concurrency/ready_queue.cfa
r7e7a076 rf93c50a 100 100 #define __kernel_rseq_unregister rseq_unregister_current_thread 101 101 #elif defined(CFA_HAVE_LINUX_RSEQ_H) 102 void __kernel_raw_rseq_register (void);103 void __kernel_raw_rseq_unregister(void);102 static void __kernel_raw_rseq_register (void); 103 static void __kernel_raw_rseq_unregister(void); 104 104 105 105 #define __kernel_rseq_register __kernel_raw_rseq_register … … 246 246 // Cforall Ready Queue used for scheduling 247 247 //======================================================================= 248 unsigned long long moving_average(unsigned long long nval, unsigned long long oval) { 249 const unsigned long long tw = 16; 250 const unsigned long long nw = 4; 251 const unsigned long long ow = tw - nw; 252 return ((nw * nval) + (ow * oval)) / tw; 253 } 254 248 255 void ?{}(__ready_queue_t & this) with (this) { 249 256 #if defined(USE_CPU_WORK_STEALING) … … 251 258 lanes.data = alloc( lanes.count ); 252 259 lanes.tscs = alloc( lanes.count ); 260 lanes.help = alloc( cpu_info.hthrd_count ); 253 261 254 262 for( idx; (size_t)lanes.count ) { 255 263 (lanes.data[idx]){}; 256 264 lanes.tscs[idx].tv = rdtscl(); 265 lanes.tscs[idx].ma = rdtscl(); 266 } 267 for( idx; (size_t)cpu_info.hthrd_count ) { 268 lanes.help[idx].src = 0; 269 lanes.help[idx].dst = 0; 270 lanes.help[idx].tri = 0; 257 271 } 258 272 #else 259 273 lanes.data = 0p; 260 274 lanes.tscs = 0p; 275 lanes.help = 0p; 261 276 lanes.count = 0; 262 277 #endif … … 270 285 free(lanes.data); 271 286 free(lanes.tscs); 287 free(lanes.help); 272 288 } 273 289 274 290 //----------------------------------------------------------------------- 275 291 #if defined(USE_CPU_WORK_STEALING) 276 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {292 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 277 293 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 278 294 279 295 processor * const proc = kernelTLS().this_processor; 280 const bool external = !push_local || (!proc) || (cltr != proc->cltr); 281 296 const bool external = (!proc) || (cltr != proc->cltr); 297 298 // Figure out the current cpu and make sure it is valid 282 299 const int cpu = __kernel_getcpu(); 283 300 /* paranoid */ verify(cpu >= 0); … … 285 302 /* paranoid */ verify(cpu * READYQ_SHARD_FACTOR < lanes.count); 286 303 287 const cpu_map_entry_t & map = cpu_info.llc_map[cpu]; 304 // Figure out where thread was last time and make sure it's 305 /* paranoid */ verify(thrd->preferred >= 0); 306 /* paranoid */ verify(thrd->preferred < cpu_info.hthrd_count); 307 /* paranoid */ verify(thrd->preferred * READYQ_SHARD_FACTOR < lanes.count); 308 const int prf = thrd->preferred * READYQ_SHARD_FACTOR; 309 310 const cpu_map_entry_t & map; 311 choose(hint) { 312 case UNPARK_LOCAL : &map = &cpu_info.llc_map[cpu]; 313 case UNPARK_REMOTE: &map = &cpu_info.llc_map[prf]; 314 } 288 315 /* paranoid */ verify(map.start * READYQ_SHARD_FACTOR < lanes.count); 289 316 /* paranoid */ verify(map.self * READYQ_SHARD_FACTOR < lanes.count); … … 296 323 if(unlikely(external)) { r = __tls_rand(); } 297 324 else { r = proc->rdq.its++; } 298 i = start + (r % READYQ_SHARD_FACTOR); 325 choose(hint) { 326 case UNPARK_LOCAL : i = start + (r % READYQ_SHARD_FACTOR); 327 case UNPARK_REMOTE: i = prf + (r % READYQ_SHARD_FACTOR); 328 } 299 329 // If we can't lock it retry 300 330 } while( !__atomic_try_acquire( &lanes.data[i].lock ) ); … … 332 362 processor * const proc = kernelTLS().this_processor; 333 363 const int start = map.self * READYQ_SHARD_FACTOR; 364 const unsigned long long ctsc = rdtscl(); 334 365 335 366 // Did we already have a help target 336 367 if(proc->rdq.target == -1u) { 337 // if We don't have a 338 unsigned long long min = ts(lanes.data[start]); 368 unsigned long long max = 0; 339 369 for(i; READYQ_SHARD_FACTOR) { 340 unsigned long long tsc = ts(lanes.data[start + i]); 341 if(tsc < min) min = tsc; 342 } 343 proc->rdq.cutoff = min; 344 370 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 371 if(tsc > max) max = tsc; 372 } 373 proc->rdq.cutoff = (max + 2 * max) / 2; 345 374 /* paranoid */ verify(lanes.count < 65536); // The following code assumes max 65536 cores. 346 375 /* paranoid */ verify(map.count < 65536); // The following code assumes max 65536 cores. 347 376 348 if(0 == (__tls_rand() % 10 _000)) {377 if(0 == (__tls_rand() % 100)) { 349 378 proc->rdq.target = __tls_rand() % lanes.count; 350 379 } else { … … 358 387 } 359 388 else { 360 const unsigned long long bias = 0; //2_500_000_000; 361 const unsigned long long cutoff = proc->rdq.cutoff > bias ? proc->rdq.cutoff - bias : proc->rdq.cutoff; 389 unsigned long long max = 0; 390 for(i; READYQ_SHARD_FACTOR) { 391 unsigned long long tsc = moving_average(ctsc - ts(lanes.data[start + i]), lanes.tscs[start + i].ma); 392 if(tsc > max) max = tsc; 393 } 394 const unsigned long long cutoff = (max + 2 * max) / 2; 362 395 { 363 396 unsigned target = proc->rdq.target; 364 397 proc->rdq.target = -1u; 365 if(lanes.tscs[target].tv < cutoff && ts(lanes.data[target]) < cutoff) { 398 lanes.help[target / READYQ_SHARD_FACTOR].tri++; 399 if(moving_average(ctsc - lanes.tscs[target].tv, lanes.tscs[target].ma) > cutoff) { 366 400 thread$ * t = try_pop(cltr, target __STATS(, __tls_stats()->ready.pop.help)); 367 401 proc->rdq.last = target; 368 402 if(t) return t; 403 else proc->rdq.target = -1u; 369 404 } 405 else proc->rdq.target = -1u; 370 406 } 371 407 … … 428 464 } 429 465 430 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {466 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 431 467 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 432 468 433 const bool external = !push_local|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);469 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 434 470 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 435 471 … … 515 551 #endif 516 552 #if defined(USE_WORK_STEALING) 517 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, bool push_local) with (cltr->ready_queue) {553 __attribute__((hot)) void push(struct cluster * cltr, struct thread$ * thrd, unpark_hint hint) with (cltr->ready_queue) { 518 554 __cfadbg_print_safe(ready_queue, "Kernel : Pushing %p on cluster %p\n", thrd, cltr); 519 555 520 556 // #define USE_PREFERRED 521 557 #if !defined(USE_PREFERRED) 522 const bool external = !push_local|| (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr);558 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || (cltr != kernelTLS().this_processor->cltr); 523 559 /* paranoid */ verify(external || kernelTLS().this_processor->rdq.id < lanes.count ); 524 560 #else 525 561 unsigned preferred = thrd->preferred; 526 const bool external = push_local|| (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr;562 const bool external = (hint != UNPARK_LOCAL) || (!kernelTLS().this_processor) || preferred == -1u || thrd->curr_cluster != cltr; 527 563 /* paranoid */ verifyf(external || preferred < lanes.count, "Invalid preferred queue %u for %u lanes", preferred, lanes.count ); 528 564 … … 645 681 // Actually pop the list 646 682 struct thread$ * thrd; 683 unsigned long long tsc_before = ts(lane); 647 684 unsigned long long tsv; 648 685 [thrd, tsv] = pop(lane); … … 658 695 __STATS( stats.success++; ) 659 696 660 #if defined(USE_WORK_STEALING) 697 #if defined(USE_WORK_STEALING) || defined(USE_CPU_WORK_STEALING) 698 unsigned long long now = rdtscl(); 661 699 lanes.tscs[w].tv = tsv; 700 lanes.tscs[w].ma = moving_average(now > tsc_before ? now - tsc_before : 0, lanes.tscs[w].ma); 662 701 #endif 663 702 664 thrd->preferred = w; 703 #if defined(USE_CPU_WORK_STEALING) 704 thrd->preferred = w / READYQ_SHARD_FACTOR; 705 #else 706 thrd->preferred = w; 707 #endif 665 708 666 709 // return the popped thread … … 688 731 689 732 //----------------------------------------------------------------------- 733 // get preferred ready for new thread 734 unsigned ready_queue_new_preferred() { 735 unsigned pref = 0; 736 if(struct thread$ * thrd = publicTLS_get( this_thread )) { 737 pref = thrd->preferred; 738 } 739 else { 740 #if defined(USE_CPU_WORK_STEALING) 741 pref = __kernel_getcpu(); 742 #endif 743 } 744 745 #if defined(USE_CPU_WORK_STEALING) 746 /* paranoid */ verify(pref >= 0); 747 /* paranoid */ verify(pref < cpu_info.hthrd_count); 748 #endif 749 750 return pref; 751 } 752 753 //----------------------------------------------------------------------- 690 754 // Check that all the intrusive queues in the data structure are still consistent 691 755 static void check( __ready_queue_t & q ) with (q) { … … 915 979 extern void __enable_interrupts_hard(); 916 980 917 void __kernel_raw_rseq_register (void) {981 static void __kernel_raw_rseq_register (void) { 918 982 /* paranoid */ verify( __cfaabi_rseq.cpu_id == RSEQ_CPU_ID_UNINITIALIZED ); 919 983 … … 933 997 } 934 998 935 void __kernel_raw_rseq_unregister(void) {999 static void __kernel_raw_rseq_unregister(void) { 936 1000 /* paranoid */ verify( __cfaabi_rseq.cpu_id >= 0 ); 937 1001 -
libcfa/src/concurrency/ready_subqueue.hfa
r7e7a076 rf93c50a 98 98 99 99 // Get the relevant nodes locally 100 unsigned long long ts = this.anchor.ts;101 100 thread$ * node = this.anchor.next; 102 101 this.anchor.next = node->link.next; … … 116 115 /* paranoid */ verify( node->link.ts != 0 ); 117 116 /* paranoid */ verify( this.anchor.ts != 0 ); 118 return [node, t s];117 return [node, this.anchor.ts]; 119 118 } 120 119 -
libcfa/src/concurrency/stats.cfa
r7e7a076 rf93c50a 48 48 stats->io.calls.completed = 0; 49 49 stats->io.calls.errors.busy = 0; 50 stats->io.ops.sockread = 0; 51 stats->io.ops.epllread = 0; 52 stats->io.ops.sockwrite = 0; 53 stats->io.ops.epllwrite = 0; 50 54 #endif 51 55 … … 104 108 tally_one( &cltr->io.calls.completed , &proc->io.calls.completed ); 105 109 tally_one( &cltr->io.calls.errors.busy, &proc->io.calls.errors.busy ); 110 tally_one( &cltr->io.ops.sockread , &proc->io.ops.sockread ); 111 tally_one( &cltr->io.ops.epllread , &proc->io.ops.epllread ); 112 tally_one( &cltr->io.ops.sockwrite , &proc->io.ops.sockwrite ); 113 tally_one( &cltr->io.ops.epllwrite , &proc->io.ops.epllwrite ); 106 114 #endif 107 115 } … … 179 187 | " - cmp " | eng3(io.calls.drain) | "/" | eng3(io.calls.completed) | "(" | ws(3, 3, avgcomp) | "/drain)" 180 188 | " - " | eng3(io.calls.errors.busy) | " EBUSY"; 189 sstr | "- ops blk: " 190 | " sk rd: " | eng3(io.ops.sockread) | "epll: " | eng3(io.ops.epllread) 191 | " sk wr: " | eng3(io.ops.sockwrite) | "epll: " | eng3(io.ops.epllwrite); 181 192 sstr | nl; 182 193 } -
libcfa/src/concurrency/stats.hfa
r7e7a076 rf93c50a 102 102 volatile uint64_t sleeps; 103 103 } poller; 104 struct { 105 volatile uint64_t sockread; 106 volatile uint64_t epllread; 107 volatile uint64_t sockwrite; 108 volatile uint64_t epllwrite; 109 } ops; 104 110 }; 105 111 #endif -
libcfa/src/concurrency/thread.cfa
r7e7a076 rf93c50a 25 25 #include "invoke.h" 26 26 27 uint64_t thread_rand(); 28 27 29 //----------------------------------------------------------------------------- 28 30 // Thread ctors and dtors … … 34 36 preempted = __NO_PREEMPTION; 35 37 corctx_flag = false; 36 disable_interrupts();37 last_cpu = __kernel_getcpu();38 enable_interrupts();39 38 curr_cor = &self_cor; 40 39 self_mon.owner = &this; … … 44 43 link.next = 0p; 45 44 link.ts = -1llu; 46 preferred = -1u;45 preferred = ready_queue_new_preferred(); 47 46 last_proc = 0p; 48 47 #if defined( __CFA_WITH_VERIFY__ ) … … 141 140 /* paranoid */ verify( this_thrd->context.SP ); 142 141 143 schedule_thread$( this_thrd );142 schedule_thread$( this_thrd, UNPARK_LOCAL ); 144 143 enable_interrupts(); 145 144 } -
libcfa/src/containers/string_res.cfa
r7e7a076 rf93c50a 21 21 22 22 23 #ifdef VbyteDebug 24 extern HandleNode *HeaderPtr; 23 24 25 26 27 28 29 // DON'T COMMIT: 30 // #define VbyteDebug 31 32 33 34 35 36 #ifdef VbyteDebug 37 HandleNode *HeaderPtr; 25 38 #endif // VbyteDebug 26 39 … … 140 153 141 154 VbyteHeap HeapArea; 155 156 VbyteHeap * DEBUG_string_heap = & HeapArea; 157 158 size_t DEBUG_string_bytes_avail_until_gc( VbyteHeap * heap ) { 159 return ((char*)heap->ExtVbyte) - heap->EndVbyte; 160 } 161 162 const char * DEBUG_string_heap_start( VbyteHeap * heap ) { 163 return heap->StartVbyte; 164 } 165 142 166 143 167 // Returns the size of the string in bytes … … 225 249 void assign(string_res &this, const char* buffer, size_t bsize) { 226 250 251 // traverse the incumbent share-edit set (SES) to recover the range of a base string to which `this` belongs 252 string_res * shareEditSetStartPeer = & this; 253 string_res * shareEditSetEndPeer = & this; 254 for (string_res * editPeer = this.shareEditSet_next; editPeer != &this; editPeer = editPeer->shareEditSet_next) { 255 if ( editPeer->Handle.s < shareEditSetStartPeer->Handle.s ) { 256 shareEditSetStartPeer = editPeer; 257 } 258 if ( shareEditSetEndPeer->Handle.s + shareEditSetEndPeer->Handle.lnth < editPeer->Handle.s + editPeer->Handle.lnth) { 259 shareEditSetEndPeer = editPeer; 260 } 261 } 262 263 // full string is from start of shareEditSetStartPeer thru end of shareEditSetEndPeer 264 // `this` occurs in the middle of it, to be replaced 265 // build up the new text in `pasting` 266 267 string_res pasting = { 268 shareEditSetStartPeer->Handle.s, // start of SES 269 this.Handle.s - shareEditSetStartPeer->Handle.s }; // length of SES, before this 270 append( pasting, 271 buffer, // start of replacement for this 272 bsize ); // length of replacement for this 273 append( pasting, 274 this.Handle.s + this.Handle.lnth, // start of SES after this 275 shareEditSetEndPeer->Handle.s + shareEditSetEndPeer->Handle.lnth - 276 (this.Handle.s + this.Handle.lnth) ); // length of SES, after this 277 278 // The above string building can trigger compaction. 279 // The reference points (that are arguments of the string building) may move during that building. 280 // From this point on, they are stable. 281 // So now, capture their values for use in the overlap cases, below. 282 // Do not factor these definitions with the arguments used above. 283 284 char * beforeBegin = shareEditSetStartPeer->Handle.s; 285 size_t beforeLen = this.Handle.s - beforeBegin; 286 227 287 char * afterBegin = this.Handle.s + this.Handle.lnth; 228 229 char * shareEditSetStart = this.Handle.s; 230 char * shareEditSetEnd = afterBegin; 231 for (string_res * editPeer = this.shareEditSet_next; editPeer != &this; editPeer = editPeer->shareEditSet_next) { 232 shareEditSetStart = min( shareEditSetStart, editPeer->Handle.s ); 233 shareEditSetEnd = max( shareEditSetStart, editPeer->Handle.s + editPeer->Handle.lnth); 234 } 235 236 char * beforeBegin = shareEditSetStart; 237 size_t beforeLen = this.Handle.s - shareEditSetStart; 238 size_t afterLen = shareEditSetEnd - afterBegin; 239 240 string_res pasting = { beforeBegin, beforeLen }; 241 append(pasting, buffer, bsize); 242 string_res after = { afterBegin, afterLen }; // juxtaposed with in-progress pasting 243 pasting += after; // optimized case 288 size_t afterLen = shareEditSetEndPeer->Handle.s + shareEditSetEndPeer->Handle.lnth - afterBegin; 244 289 245 290 size_t oldLnth = this.Handle.lnth; … … 253 298 for (string_res * p = this.shareEditSet_next; p != &this; p = p->shareEditSet_next) { 254 299 assert (p->Handle.s >= beforeBegin); 255 if ( p->Handle.s < beforeBegin + beforeLen ) { 256 // p starts before the edit 257 if ( p->Handle.s + p->Handle.lnth < beforeBegin + beforeLen ) { 300 if ( p->Handle.s >= afterBegin ) { 301 assert ( p->Handle.s <= afterBegin + afterLen ); 302 assert ( p->Handle.s + p->Handle.lnth <= afterBegin + afterLen ); 303 // p starts after the edit 304 // take start and end as end-anchored 305 size_t startOffsetFromEnd = afterBegin + afterLen - p->Handle.s; 306 p->Handle.s = limit - startOffsetFromEnd; 307 // p->Handle.lnth unaffected 308 } else if ( p->Handle.s <= beforeBegin + beforeLen ) { 309 // p starts before, or at the start of, the edit 310 if ( p->Handle.s + p->Handle.lnth <= beforeBegin + beforeLen ) { 258 311 // p ends before the edit 259 312 // take end as start-anchored too 260 313 // p->Handle.lnth unaffected 261 314 } else if ( p->Handle.s + p->Handle.lnth < afterBegin ) { 262 // p ends during the edit 315 // p ends during the edit; p does not include the last character replaced 263 316 // clip end of p to end at start of edit 264 317 p->Handle.lnth = beforeLen - ( p->Handle.s - beforeBegin ); … … 274 327 size_t startOffsetFromStart = p->Handle.s - beforeBegin; 275 328 p->Handle.s = pasting.Handle.s + startOffsetFromStart; 276 } else if ( p->Handle.s < afterBegin ) { 329 } else { 330 assert ( p->Handle.s < afterBegin ); 277 331 // p starts during the edit 278 332 assert( p->Handle.s + p->Handle.lnth >= beforeBegin + beforeLen ); 279 333 if ( p->Handle.s + p->Handle.lnth < afterBegin ) { 280 // p ends during the edit 334 // p ends during the edit; p does not include the last character replaced 281 335 // set p to empty string at start of edit 282 336 p->Handle.s = this.Handle.s; 283 337 p->Handle.lnth = 0; 284 338 } else { 285 // p ends afterthe edit339 // p includes the end of the edit 286 340 // clip start of p to start at end of edit 341 int charsToClip = afterBegin - p->Handle.s; 287 342 p->Handle.s = this.Handle.s + this.Handle.lnth; 288 p->Handle.lnth += this.Handle.lnth; 289 p->Handle.lnth -= oldLnth; 343 p->Handle.lnth -= charsToClip; 290 344 } 291 } else {292 assert ( p->Handle.s <= afterBegin + afterLen );293 assert ( p->Handle.s + p->Handle.lnth <= afterBegin + afterLen );294 // p starts after the edit295 // take start and end as end-anchored296 size_t startOffsetFromEnd = afterBegin + afterLen - p->Handle.s;297 p->Handle.s = limit - startOffsetFromEnd;298 // p->Handle.lnth unaffected299 345 } 300 346 MoveThisAfter( p->Handle, pasting.Handle ); // move substring handle to maintain sorted order by string position … … 641 687 } // if 642 688 #ifdef VbyteDebug 643 serr | "exit:MoveThisAfter";644 689 { 645 690 serr | "HandleList:"; … … 650 695 serr | n->s[i]; 651 696 } // for 652 serr | "\" flink:" | n->flink | " blink:" | n->blink ;697 serr | "\" flink:" | n->flink | " blink:" | n->blink | nl; 653 698 } // for 654 699 serr | nlOn; 655 700 } 701 serr | "exit:MoveThisAfter"; 656 702 #endif // VbyteDebug 657 703 } // MoveThisAfter … … 662 708 663 709 //######################### VbyteHeap ######################### 664 665 #ifdef VbyteDebug666 HandleNode *HeaderPtr = 0p;667 #endif // VbyteDebug668 710 669 711 // Move characters from one location in the byte-string area to another. The routine handles the following situations: -
libcfa/src/containers/string_res.hfa
r7e7a076 rf93c50a 36 36 void ?{}( HandleNode &, VbyteHeap & ); // constructor for nodes in the handle list 37 37 void ^?{}( HandleNode & ); // destructor for handle nodes 38 39 extern VbyteHeap * DEBUG_string_heap; 40 size_t DEBUG_string_bytes_avail_until_gc( VbyteHeap * heap ); 41 const char * DEBUG_string_heap_start( VbyteHeap * heap ); 38 42 39 43 -
libcfa/src/fstream.cfa
r7e7a076 rf93c50a 10 10 // Created On : Wed May 27 17:56:53 2015 11 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : T hu Jul 29 22:34:10202113 // Update Count : 4 5412 // Last Modified On : Tue Sep 21 21:51:38 2021 13 // Update Count : 460 14 14 // 15 15 … … 28 28 #define IO_MSG "I/O error: " 29 29 30 void ?{}( ofstream & os, void * file ) {31 os.file$ = file;32 os.sepDefault$ = true;33 os.sepOnOff$ = false;34 os.nlOnOff$ = true;35 os.prt$ = false;36 os.sawNL$ = false;37 os.acquired$ = false;30 void ?{}( ofstream & os, void * file ) with(os) { 31 file$ = file; 32 sepDefault$ = true; 33 sepOnOff$ = false; 34 nlOnOff$ = true; 35 prt$ = false; 36 sawNL$ = false; 37 acquired$ = false; 38 38 sepSetCur$( os, sepGet( os ) ); 39 39 sepSet( os, " " ); … … 124 124 void open( ofstream & os, const char name[], const char mode[] ) { 125 125 FILE * file = fopen( name, mode ); 126 // #ifdef __CFA_DEBUG__127 126 if ( file == 0p ) { 128 127 throw (Open_Failure){ os }; 129 128 // abort | IO_MSG "open output file \"" | name | "\"" | nl | strerror( errno ); 130 129 } // if 131 // #endif // __CFA_DEBUG__ 132 (os){ file }; 130 (os){ file }; // initialize 133 131 } // open 134 132 … … 137 135 } // open 138 136 139 void close( ofstream & os ) {140 if ( (FILE *)( os.file$) == 0p ) return;141 if ( (FILE *)( os.file$) == (FILE *)stdout || (FILE *)(os.file$) == (FILE *)stderr ) return;142 143 if ( fclose( (FILE *)( os.file$) ) == EOF ) {137 void close( ofstream & os ) with(os) { 138 if ( (FILE *)(file$) == 0p ) return; 139 if ( (FILE *)(file$) == (FILE *)stdout || (FILE *)(file$) == (FILE *)stderr ) return; 140 141 if ( fclose( (FILE *)(file$) ) == EOF ) { 144 142 throw (Close_Failure){ os }; 145 143 // abort | IO_MSG "close output" | nl | strerror( errno ); 146 144 } // if 147 os.file$ = 0p;145 file$ = 0p; 148 146 } // close 149 147 … … 177 175 } // fmt 178 176 179 inline void acquire( ofstream & os ) {180 lock( os.lock$ );181 if ( ! os.acquired$ ) os.acquired$ = true;182 else unlock( os.lock$ );177 inline void acquire( ofstream & os ) with(os) { 178 lock( lock$ ); // may increase recursive lock 179 if ( ! acquired$ ) acquired$ = true; // not locked ? 180 else unlock( lock$ ); // unwind recursive lock at start 183 181 } // acquire 184 182 … … 187 185 } // release 188 186 189 inline void lock( ofstream & os ) { acquire( os ); } 190 inline void unlock( ofstream & os ) { release( os ); } 191 192 void ?{}( osacquire & acq, ofstream & os ) { &acq.os = &os; lock( os.lock$ ); } 187 void ?{}( osacquire & acq, ofstream & os ) { lock( os.lock$ ); &acq.os = &os; } 193 188 void ^?{}( osacquire & acq ) { release( acq.os ); } 194 189 … … 222 217 223 218 // private 224 void ?{}( ifstream & is, void * file ) {225 is.file$ = file;226 is.nlOnOff$ = false;227 is.acquired$ = false;219 void ?{}( ifstream & is, void * file ) with(is) { 220 file$ = file; 221 nlOnOff$ = false; 222 acquired$ = false; 228 223 } // ?{} 229 224 … … 265 260 void open( ifstream & is, const char name[], const char mode[] ) { 266 261 FILE * file = fopen( name, mode ); 267 // #ifdef __CFA_DEBUG__268 262 if ( file == 0p ) { 269 263 throw (Open_Failure){ is }; 270 264 // abort | IO_MSG "open input file \"" | name | "\"" | nl | strerror( errno ); 271 265 } // if 272 // #endif // __CFA_DEBUG__ 273 is.file$ = file; 266 (is){ file }; // initialize 274 267 } // open 275 268 … … 278 271 } // open 279 272 280 void close( ifstream & is ) {281 if ( (FILE *)( is.file$) == 0p ) return;282 if ( (FILE *)( is.file$) == (FILE *)stdin ) return;283 284 if ( fclose( (FILE *)( is.file$) ) == EOF ) {273 void close( ifstream & is ) with(is) { 274 if ( (FILE *)(file$) == 0p ) return; 275 if ( (FILE *)(file$) == (FILE *)stdin ) return; 276 277 if ( fclose( (FILE *)(file$) ) == EOF ) { 285 278 throw (Close_Failure){ is }; 286 279 // abort | IO_MSG "close input" | nl | strerror( errno ); 287 280 } // if 288 is.file$ = 0p;281 file$ = 0p; 289 282 } // close 290 283 … … 327 320 } // fmt 328 321 329 inline void acquire( ifstream & is ) {330 lock( is.lock$ );331 if ( ! is.acquired$ ) is.acquired$ = true;332 else unlock( is.lock$ );322 inline void acquire( ifstream & is ) with(is) { 323 lock( lock$ ); // may increase recursive lock 324 if ( ! acquired$ ) acquired$ = true; // not locked ? 325 else unlock( lock$ ); // unwind recursive lock at start 333 326 } // acquire 334 327 … … 337 330 } // release 338 331 339 void ?{}( isacquire & acq, ifstream & is ) { &acq.is = &is; lock( is.lock$ ); }332 void ?{}( isacquire & acq, ifstream & is ) { lock( is.lock$ ); &acq.is = &is; } 340 333 void ^?{}( isacquire & acq ) { release( acq.is ); } 341 334 … … 350 343 351 344 // exception I/O constructors 352 void ?{}( Open_Failure & this, ofstream & ostream) {353 this.virtual_table = &Open_Failure_vt;354 this.ostream = &ostream;355 t his.tag = 1;356 } // ?{} 357 358 void ?{}( Open_Failure & this, ifstream & istream) {359 this.virtual_table = &Open_Failure_vt;360 this.istream = &istream;361 t his.tag = 0;345 void ?{}( Open_Failure & ex, ofstream & ostream ) with(ex) { 346 virtual_table = &Open_Failure_vt; 347 ostream = &ostream; 348 tag = 1; 349 } // ?{} 350 351 void ?{}( Open_Failure & ex, ifstream & istream ) with(ex) { 352 virtual_table = &Open_Failure_vt; 353 istream = &istream; 354 tag = 0; 362 355 } // ?{} 363 356 … … 366 359 367 360 // exception I/O constructors 368 void ?{}( Close_Failure & this, ofstream & ostream) {369 this.virtual_table = &Close_Failure_vt;370 this.ostream = &ostream;371 t his.tag = 1;372 } // ?{} 373 374 void ?{}( Close_Failure & this, ifstream & istream) {375 this.virtual_table = &Close_Failure_vt;376 this.istream = &istream;377 t his.tag = 0;361 void ?{}( Close_Failure & ex, ofstream & ostream ) with(ex) { 362 virtual_table = &Close_Failure_vt; 363 ostream = &ostream; 364 tag = 1; 365 } // ?{} 366 367 void ?{}( Close_Failure & ex, ifstream & istream ) with(ex) { 368 virtual_table = &Close_Failure_vt; 369 istream = &istream; 370 tag = 0; 378 371 } // ?{} 379 372 … … 382 375 383 376 // exception I/O constructors 384 void ?{}( Write_Failure & this, ofstream & ostream) {385 this.virtual_table = &Write_Failure_vt;386 this.ostream = &ostream;387 t his.tag = 1;388 } // ?{} 389 390 void ?{}( Write_Failure & this, ifstream & istream) {391 this.virtual_table = &Write_Failure_vt;392 this.istream = &istream;393 t his.tag = 0;377 void ?{}( Write_Failure & ex, ofstream & ostream ) with(ex) { 378 virtual_table = &Write_Failure_vt; 379 ostream = &ostream; 380 tag = 1; 381 } // ?{} 382 383 void ?{}( Write_Failure & ex, ifstream & istream ) with(ex) { 384 virtual_table = &Write_Failure_vt; 385 istream = &istream; 386 tag = 0; 394 387 } // ?{} 395 388 … … 398 391 399 392 // exception I/O constructors 400 void ?{}( Read_Failure & this, ofstream & ostream) {401 this.virtual_table = &Read_Failure_vt;402 this.ostream = &ostream;403 t his.tag = 1;404 } // ?{} 405 406 void ?{}( Read_Failure & this, ifstream & istream) {407 this.virtual_table = &Read_Failure_vt;408 this.istream = &istream;409 t his.tag = 0;393 void ?{}( Read_Failure & ex, ofstream & ostream ) with(ex) { 394 virtual_table = &Read_Failure_vt; 395 ostream = &ostream; 396 tag = 1; 397 } // ?{} 398 399 void ?{}( Read_Failure & ex, ifstream & istream ) with(ex) { 400 virtual_table = &Read_Failure_vt; 401 istream = &istream; 402 tag = 0; 410 403 } // ?{} 411 404
Note:
See TracChangeset
for help on using the changeset viewer.