Changes in / [2ae845e9:2295320]
- Files:
-
- 14 edited
-
doc/bibliography/pl.bib (modified) (1 diff)
-
doc/uC++toCFA/uC++toCFA.tex (modified) (7 diffs)
-
libcfa/src/concurrency/actor.hfa (modified) (6 diffs)
-
libcfa/src/concurrency/barrier.hfa (modified) (2 diffs)
-
tests/concurrency/actors/dynamic.cfa (modified) (2 diffs)
-
tests/concurrency/actors/executor.cfa (modified) (1 diff)
-
tests/concurrency/actors/inherit.cfa (modified) (3 diffs)
-
tests/concurrency/actors/inline.cfa (modified) (2 diffs)
-
tests/concurrency/actors/matrixMultiply.cfa (modified) (3 diffs)
-
tests/concurrency/actors/pingpong.cfa (modified) (2 diffs)
-
tests/concurrency/actors/poison.cfa (modified) (2 diffs)
-
tests/concurrency/actors/static.cfa (modified) (1 diff)
-
tests/concurrency/actors/types.cfa (modified) (9 diffs)
-
tests/concurrency/barrier/order.cfa (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
doc/bibliography/pl.bib
r2ae845e9 r2295320 3685 3685 address = {Waterloo, Ontario, Canada, N2L 3G1}, 3686 3686 note = {\url{http://uwspace.uwaterloo.ca/bitstream/10012/3501/1/Thesis.pdf}}, 3687 } 3688 3689 @article{Hesselink24, 3690 author = {Wim A. Hesselink and Peter A. Buhr and Colby A. Parsons}, 3691 title = {First-Come-First-Served as a Separate Principle}, 3692 journal = {ACM Trans. Parallel Comput.}, 3693 publisher = {ACM}, 3694 address = {New York, NY, USA}, 3695 volume = 11, 3696 number = 4, 3697 month = nov, 3698 year = 2024, 3687 3699 } 3688 3700 -
doc/uC++toCFA/uC++toCFA.tex
r2ae845e9 r2295320 11 11 %% Created On : Wed Apr 6 14:53:29 2016 12 12 %% Last Modified By : Peter A. Buhr 13 %% Last Modified On : Fri Nov 8 08:22:25202414 %% Update Count : 61 0713 %% Last Modified On : Mon Nov 11 21:51:39 2024 14 %% Update Count : 6144 15 15 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 16 16 … … 498 498 499 499 500 \section{Coroutine s}500 \section{Coroutine} 501 501 502 502 \begin{cquote} … … 591 591 592 592 struct StrMsg : @public uActor::Message@ { 593 593 594 const char * val; // string message 594 595 595 596 596 StrMsg( const char * val ) : … … 600 600 _Actor Hello { ${\color{red}\LstCommentStyle{// : public uActor}}$ 601 601 Allocation receive( Message & msg ) { 602 Case( StrMsg, msg ) { // discriminate 602 Case( @StartMsg@, msg ) { // discriminate 603 604 } else Case( StrMsg, msg ) { 603 605 osacquire( cout ) << msg_d->val << endl; 604 }; 605 return Delete; // delete after use 606 607 } else Case( @StopMsg@, msg ) 608 return Delete; // delete actor 609 return Nodelete; // reuse actor 606 610 } 607 611 }; 608 612 int main() { 609 613 @uActor::start();@ // start actor system 610 *new Hello() | *new StrMsg( "hello" ); 611 *new Hello() | *new StrMsg( "bonjour" ); 612 @uActor::stop();@ // wait for all actors to terminate 614 *new Hello() | uActor::startMsg 615 | *new StrMsg( "hello" ) | uActor::stopMsg; 616 *new Hello() | uActor::startMsg 617 | *new StrMsg( "bonjour" ) | uActor::stopMsg; 618 @uActor::stop();@ // wait for actors to terminate 613 619 } 614 620 \end{uC++} … … 623 629 const char * val; // string message 624 630 }; 625 void ?{}( StrMsg & msg, char * str ) { 631 void ?{}( StrMsg & msg, const char * str ) { 632 @set_allocation( msg, Delete );@ // delete after use 626 633 msg.val = str; 627 @set_allocation( msg, Delete );@ // delete after use 628 } 629 struct Hello{630 @inline actor;@ // derived actor631 } ;634 } 635 struct Hello { @inline actor;@ }; // derived actor 636 allocation receive( Hello & receiver, @start_msg_t@ & ) { 637 return Nodelete; 638 } 632 639 allocation receive( Hello & receiver, StrMsg & msg ) { 633 640 mutex( sout ) sout | msg.val; 634 return Delete; // delete after use 641 return Nodelete; // reuse actor 642 } 643 allocation receive( Hello & receiver, @stop_msg_t@ & ) { 644 return Delete; // delete actor 635 645 } 636 646 637 647 int main() { 638 @start_actor_system();@ // start actor system 639 *(Hello *)new() | *(StrMsg *)new( "hello" ); 640 *(Hello *)new() | *(StrMsg *)new( "bonjour" ); 641 @stop_actor_system();@ // wait for all actors to terminate 642 } 643 \end{cfa} 644 \end{tabular} 645 \end{cquote} 646 647 648 \section{Threads} 648 @actor_start();@ // start actor system 649 *(Hello *)new() | start_msg 650 | *(StrMsg *)new( "hello" ) | stop_msg; 651 *(Hello *)new() | start_msg 652 | *(StrMsg *)new( "bonjour" ) | stop_msg; 653 @actor_stop();@ // wait for actors to terminate 654 } 655 \end{cfa} 656 \end{tabular} 657 \end{cquote} 658 659 660 \section{Thread} 649 661 650 662 \begin{cquote} … … 710 722 711 723 712 \section{Monitor s}724 \section{Monitor} 713 725 714 726 Internal Scheduling … … 776 788 \end{tabular} 777 789 \end{cquote} 778 \enlargethispage{1000pt} 790 791 \newpage 792 779 793 External Scheduling 780 794 \begin{cquote} -
libcfa/src/concurrency/actor.hfa
r2ae845e9 r2295320 398 398 // TODO: update globals in this file to be static fields once the static fields project is done 399 399 static executor * __actor_executor_ = 0p; 400 static bool __actor_executor_passed = false; // was an executor passed to start_actor_system400 static bool __actor_executor_passed = false; // was an executor passed to actor_start 401 401 static size_t __num_actors_ = 0; // number of actor objects in system 402 402 static struct thread$ * __actor_executor_thd = 0p; // used to wake executor after actors finish … … 410 410 // Once an actor is allocated it must be sent a message or the actor system cannot stop. Hence, its receive 411 411 // member must be called to end it 412 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling start_actor_system() can cause undefined behaviour.\n" );412 DEBUG_ABORT( __actor_executor_ == 0p, "Creating actor before calling actor_start() can cause undefined behaviour.\n" ); 413 413 alloc = Nodelete; 414 414 ticket = __get_next_ticket( *__actor_executor_ ); … … 682 682 } 683 683 684 static inline void start_actor_system( size_t num_thds ) {684 static inline void actor_start( size_t num_thds ) { 685 685 __reset_stats(); 686 686 __actor_executor_thd = active_thread(); … … 689 689 } 690 690 691 static inline void start_actor_system() { start_actor_system( get_proc_count( *active_cluster() ) ); }692 693 static inline void start_actor_system( executor & this ) {691 static inline void actor_start() { actor_start( get_proc_count( *active_cluster() ) ); } 692 693 static inline void actor_start( executor & this ) { 694 694 __reset_stats(); 695 695 __actor_executor_thd = active_thread(); … … 698 698 } 699 699 700 static inline void stop_actor_system() {700 static inline void actor_stop() { 701 701 park(); // unparked when actor system is finished 702 702 … … 715 715 struct finished_msg_t { inline message; } finished_msg = __base_msg_finished; 716 716 717 allocation receive( actor & this, delete_msg_t & msg) { return Delete; }718 allocation receive( actor & this, destroy_msg_t & msg) { return Destroy; }719 allocation receive( actor & this, finished_msg_t & msg) { return Finished; }717 allocation receive( actor & this, delete_msg_t & ) { return Delete; } 718 allocation receive( actor & this, destroy_msg_t & ) { return Destroy; } 719 allocation receive( actor & this, finished_msg_t & ) { return Finished; } 720 720 721 721 // Default messages used all the time. 722 //static struct startmsg_t { inline message; } start_msg; // start actor723 //static struct stopmsg_t { inline message; } stop_msg; // terminate actor722 struct start_msg_t { inline message; } start_msg = __base_msg_finished; // start actor 723 struct stop_msg_t { inline message; } stop_msg = __base_msg_finished; // terminate actor -
libcfa/src/concurrency/barrier.hfa
r2ae845e9 r2295320 1 // 1 // -*- Mode: C -*- 2 // 2 3 // Cforall Version 1.0.0 Copyright (C) 2022 University of Waterloo 3 // 4 // 4 5 // The contents of this file are covered under the licence agreement in the 5 6 // file "LICENCE" distributed with Cforall. 6 7 // 7 // barrier.hfa -- simple barrier implemented from monitors8 // 9 // Author : Thierry Delisle10 // Created On : Thu Mar 31 16:51:35 202211 // Last Modified By : 12 // Last Modified On : 13 // Update Count : 14 // 8 // barrier.hfa -- simple barrier implemented using a monitor 9 // 10 // Author : Peter A. Buhr 11 // Created On : Sun Nov 10 08:07:35 2024 12 // Last Modified By : Peter A. Buhr 13 // Last Modified On : Sun Nov 10 08:11:55 2024 14 // Update Count : 3 15 // 15 16 16 17 #pragma once … … 18 19 #include <monitor.hfa> 19 20 20 // Simple barrier based on a monitor 21 // Plan 9 inheritance does not work with monitors. Two monitor locks are created. 22 21 23 monitor barrier { 22 // Number of threads blocking needed to unblock the barrier 23 // Unsigned should be enough, I don't expect use cases with 2^32 thread barriers. 24 unsigned width; 25 26 // Current count (counting backwards) 27 unsigned count; 28 29 // Barrier uses internal scheduling 30 condition c; 24 unsigned int group, arrivals; // group size, arrival counter 25 condition c; // wait for group to form 31 26 }; 32 27 33 28 // Constructor 34 void ?{}( barrier & this, unsigned width ) { 35 this.width = width; 36 this.count = width; // Count backwards so initialize at width 29 void ?{}( barrier & b, unsigned group ) { 30 b.group = b.arrivals = group; // count backwards 37 31 } 38 32 39 // block until the number of threads needed have blocked 40 // returns an value indicating the reverse order the threads arrived in 41 // i.e. last thread will return 0 (and not block) 42 // second last thread returns 1 43 // etc. 44 // last is an optional hook that will be called by the last thread 45 // before unblocking the others 46 static inline unsigned block(barrier & mutex this, fptr_t last = (fptr_t)0 ) { 47 this.count -= 1; // prefix decrement so we the last is 0 and not 1 48 unsigned arrival = this.count; // Note arrival order 49 if(arrival == 0) { 50 if(last) last(); 51 // If arrived last unblock everyone and reset 52 signal_all(this.c); 53 this.count = this.width; 54 } else { 55 // Otherwise block 56 wait(this.c); 57 } 58 return arrival; // return arrival order 33 // Returns a value indicating the reverse order the threads arrived i.e. last thread returns 0 (and does not block) 34 // last is an optional hook that is called by the last thread before unblocking the others. 35 static inline unsigned block( barrier & mutex b, fptr_t last = (fptr_t)0 ) with( b ) { 36 arrivals -= 1; // prefix decrement so last is 0 not 1 37 unsigned arrived = b.arrivals; // note arrival order 38 if ( arrivals != 0 ) { // wait for group to form 39 wait( b.c ); 40 } else { // group formed 41 if ( last ) last(); // safe to call 42 signal_all( c ); // unblock group 43 arrivals = group; // reset 44 } // if 45 return arrived; // return arrival order 59 46 } -
tests/concurrency/actors/dynamic.cfa
r2ae845e9 r2295320 48 48 49 49 executor e{ 0, 1, 1, false }; 50 start_actor_system( e ); 51 50 actor_start( e ); 52 51 sout | "started"; 53 52 … … 58 57 *d_actor | *d_msg; 59 58 60 stop_actor_system(); 61 59 actor_stop(); 62 60 sout | "stopped"; 63 61 } -
tests/concurrency/actors/executor.cfa
r2ae845e9 r2295320 84 84 85 85 sout | "starting"; 86 87 start_actor_system( e ); 88 86 actor_start( e ); 89 87 sout | "started"; 90 91 88 d_actor actors[ Actors ]; 92 93 89 for ( i; Actors ) { 94 90 actors[i] | shared_msg; 95 91 } // for 96 97 92 sout | "stopping"; 98 99 stop_actor_system(); 100 93 actor_stop(); 101 94 sout | "stopped"; 102 95 } -
tests/concurrency/actors/inherit.cfa
r2ae845e9 r2295320 29 29 sout | "Start"; 30 30 { 31 start_actor_system();31 actor_start(); 32 32 D_msg * dm = alloc(); 33 33 (*dm){}; … … 40 40 *s | *dm; 41 41 *s2 | *dm2; 42 stop_actor_system();42 actor_stop(); 43 43 } 44 44 { 45 start_actor_system();45 actor_start(); 46 46 Server s[2]; 47 47 D_msg * dm = alloc(); … … 51 51 s[0] | *dm; 52 52 s[1] | *dm2; 53 stop_actor_system();53 actor_stop(); 54 54 } 55 55 sout | "Finished"; -
tests/concurrency/actors/inline.cfa
r2ae845e9 r2295320 38 38 processor p; 39 39 { 40 start_actor_system(); // sets up executor40 actor_start(); // sets up executor 41 41 d_actor da; 42 42 d_msg * dm = alloc(); 43 43 (*dm){ 42, 2423 }; 44 44 da | *dm; 45 stop_actor_system(); // waits until actors finish45 actor_stop(); // waits until actors finish 46 46 } 47 47 { 48 start_actor_system(); // sets up executor48 actor_start(); // sets up executor 49 49 d_actor da; 50 50 d_msg2 dm{ 29079 }; … … 54 54 virtual_dtor * v = &dm; 55 55 da | dm; 56 stop_actor_system(); // waits until actors finish56 actor_stop(); // waits until actors finish 57 57 } 58 58 } -
tests/concurrency/actors/matrixMultiply.cfa
r2ae845e9 r2295320 88 88 89 89 sout | "starting"; 90 91 start_actor_system( e ); 92 90 actor_start( e ); 93 91 sout | "started"; 94 95 92 derived_msg messages[xr]; 96 97 93 derived_actor actors[xr]; 98 94 … … 100 96 messages[r]{ Z[r], X[r], Y }; 101 97 } // for 102 103 98 for ( r; xr ) { 104 99 actors[r] | messages[r]; … … 106 101 107 102 sout | "stopping"; 108 109 stop_actor_system(); 110 103 actor_stop(); 111 104 sout | "stopped"; 112 105 -
tests/concurrency/actors/pingpong.cfa
r2ae845e9 r2295320 47 47 processor p[Processors - 1]; 48 48 49 start_actor_system( Processors ); // test passing number of processors49 actor_start( Processors ); // test passing number of processors 50 50 ping pi_actor; 51 51 pong po_actor; … … 54 54 p_msg m; 55 55 pi_actor | m; 56 stop_actor_system();56 actor_stop(); 57 57 58 58 sout | "end"; -
tests/concurrency/actors/poison.cfa
r2ae845e9 r2295320 15 15 sout | "Finished"; 16 16 { 17 start_actor_system();17 actor_start(); 18 18 Server s[10]; 19 19 for ( i; 10 ) { 20 20 s[i] | finished_msg; 21 21 } 22 stop_actor_system();22 actor_stop(); 23 23 } 24 24 25 25 sout | "Delete"; 26 26 { 27 start_actor_system();27 actor_start(); 28 28 for ( i; 10 ) { 29 29 Server * s = alloc(); … … 31 31 (*s) | delete_msg; 32 32 } 33 stop_actor_system();33 actor_stop(); 34 34 } 35 35 36 36 sout | "Destroy"; 37 37 { 38 start_actor_system();38 actor_start(); 39 39 Server s[10]; 40 40 for ( i; 10 ) 41 41 s[i] | destroy_msg; 42 stop_actor_system();42 actor_stop(); 43 43 for ( i; 10 ) 44 44 if (s[i].val != 777) -
tests/concurrency/actors/static.cfa
r2ae845e9 r2295320 45 45 46 46 executor e{ 0, 1, 1, false }; 47 start_actor_system( e ); 48 47 actor_start( e ); 49 48 sout | "started"; 50 51 49 derived_msg msg; 52 53 50 derived_actor actor; 54 55 51 actor | msg; 56 57 stop_actor_system(); 58 52 actor_stop(); 59 53 sout | "stopped"; 60 54 } -
tests/concurrency/actors/types.cfa
r2ae845e9 r2295320 67 67 68 68 sout | "basic test"; 69 start_actor_system( Processors ); // test passing number of processors69 actor_start( Processors ); // test passing number of processors 70 70 derived_actor a; 71 71 d_msg b, c; … … 73 73 c.num = 2; 74 74 a | b | c; 75 stop_actor_system();75 actor_stop(); 76 76 77 77 sout | "same message and different actors test"; 78 start_actor_system(); // let system detect # of processors78 actor_start(); // let system detect # of processors 79 79 derived_actor2 d_ac2_0, d_ac2_1; 80 80 d_msg d_ac2_msg; … … 82 82 d_ac2_0 | d_ac2_msg; 83 83 d_ac2_1 | d_ac2_msg; 84 stop_actor_system();84 actor_stop(); 85 85 86 86 … … 88 88 sout | "same message and different actor types test"; 89 89 executor e{ 0, Processors, Processors == 1 ? 1 : Processors * 4, false }; 90 start_actor_system( e ); // pass an explicit executor90 actor_start( e ); // pass an explicit executor 91 91 derived_actor2 d_ac2_2; 92 92 derived_actor3 d_ac3_0; … … 95 95 d_ac3_0 | d_ac23_msg; 96 96 d_ac2_2 | d_ac23_msg; 97 stop_actor_system();97 actor_stop(); 98 98 } // RAII to clean up executor 99 99 … … 101 101 sout | "different message types, one actor test"; 102 102 executor e{ 1, Processors, Processors == 1 ? 1 : Processors * 4, true }; 103 start_actor_system( Processors );103 actor_start( Processors ); 104 104 derived_actor3 a3; 105 105 d_msg b1; … … 108 108 c2.num = 5; 109 109 a3 | b1 | c2; 110 stop_actor_system();110 actor_stop(); 111 111 } // RAII to clean up executor 112 112 … … 114 114 sout | "nested inheritance actor test"; 115 115 executor e{ 1, Processors, Processors == 1 ? 1 : Processors * 4, true }; 116 start_actor_system( Processors );116 actor_start( Processors ); 117 117 derived_actor4 a4; 118 118 d_msg b1; … … 121 121 c2.num = 5; 122 122 a4 | b1 | c2; 123 stop_actor_system();123 actor_stop(); 124 124 } // RAII to clean up executor 125 125 -
tests/concurrency/barrier/order.cfa
r2ae845e9 r2295320 5 5 // file "LICENCE" distributed with Cforall. 6 6 // 7 // order.cfa -- validates barriers the return value of 8 // barrier block 7 // order.cfa -- validates barrier return value from barrier block 9 8 // 10 9 // Author : Thierry Delisle 11 10 // Created On : Fri Apr 01 11:39:09 2022 12 // Last Modified By : 13 // Last Modified On : 14 // Update Count : 11 // Last Modified By : Peter A. Buhr 12 // Last Modified On : Sun Nov 10 11:22:56 2024 13 // Update Count : 20 15 14 // 16 17 // Test validates barrier and block return value by checking18 // that no more than one thread gets the same return value19 15 20 16 #include <concurrency/barrier.hfa> … … 23 19 #include <thread.hfa> 24 20 25 const unsigned NUM_LAPS = 173; 26 const unsigned NUM_THREADS = 11; 21 enum { NUM_LAPS = 173, NUM_THREADS = 11 }; 27 22 28 // The barrier we are testing29 23 barrier bar = { NUM_THREADS }; 30 24 31 // The return values of the previous generation. 32 volatile unsigned * generation; 25 volatile unsigned generation = 0; // count laps 26 void last() { 27 generation += 1; // last thread at barrier advances 28 } 29 volatile unsigned * generations; // global array pointer 33 30 34 31 thread Tester {}; 35 32 void main( Tester & this ) { 36 // Repeat a few times 37 for(l; NUM_LAPS) { 38 // Yield for chaos 39 yield( prng(this, 10) ); 33 for ( l; NUM_LAPS ) { 34 yield( prng( this, 10 ) ); // yield for chaos 35 unsigned int order = block( bar, last ); // block at barrier 40 36 41 // Block and what order we arrived 42 unsigned ret = block(bar); 43 44 // Check what was the last generation of that last thread in this position 45 unsigned g = generation[ret]; 46 47 // Is it what we expect? 48 if(g != l) { 49 // Complain that they are different 50 sout | "Gen" | l | ": Expeced generation at" | ret | "to be" | l | "was" | g; 51 } 52 53 // Mark the expected next generation 54 generation[ret] = l+1; 55 } 37 // For G == T, no thread should be able to advance generation until current generation finishes. 38 if ( generation - 1 != l || generations[order] != l ) { // generation advanced in block 39 mutex( sout ) sout | "mismatched generation, expected" | l | "got" | generation; 40 } // if 41 generations[order] = l + 1; // every thread advances their current order generation 42 } // for 56 43 } 57 44 58 45 int main() { 59 // Create the data ans zero it.60 46 volatile unsigned gen_data[NUM_THREADS]; 61 for( t; NUM_THREADS)62 gen_data[t] = 0;47 for( t; NUM_THREADS ) gen_data[t] = 0; 48 generations = gen_data; // global points at local 63 49 64 generation = gen_data; 65 66 // Run the experiment 67 processor p[4]; 68 { 50 processor p[4]; // parallelism 51 { // run experiment 69 52 Tester testers[NUM_THREADS]; 70 53 }
Note:
See TracChangeset
for help on using the changeset viewer.