#include using namespace std; #include using namespace chrono; #include "caf/actor_ostream.hpp" #include "caf/caf_main.hpp" #include "caf/event_based_actor.hpp" #include "caf/all.hpp" using namespace caf; struct IntMsg; struct CharMsg; CAF_BEGIN_TYPE_ID_BLOCK(custom_types_1, first_custom_type_id) CAF_ADD_TYPE_ID(custom_types_1, (IntMsg)) CAF_ADD_TYPE_ID(custom_types_1, (CharMsg)) CAF_END_TYPE_ID_BLOCK(custom_types_1) // --(rst-foo-begin)-- struct IntMsg { int val; }; struct CharMsg { char val; }; template bool inspect(Inspector& f, IntMsg& x) { return f.object(x).fields(f.field("val", x.val)); } template bool inspect(Inspector& f, CharMsg& x) { return f.object(x).fields(f.field("val", x.val)); } size_t Messages = 100000, Processors = 4, Times = 100, Factor = 20; time_point starttime; actor * client; actor * servers; unsigned int actorCnt = 0; class Client : public event_based_actor { IntMsg * intmsg; CharMsg * charmsg; size_t results = 0, times = 0; void reset() { times += 1; if ( times == Times ) { for ( unsigned int i = 0; i < Messages; i += 1 ) { this->send( servers[i], 0 ); } // for if ( __atomic_add_fetch( &actorCnt, 1, __ATOMIC_SEQ_CST ) == Messages + 1 ) { aout(this) << (steady_clock::now() - starttime).count() * Factor / 1'000'000'000.0 << endl; } // if this->quit(); return; } results = 0; this->send( this, 0 ); } behavior make_behavior() override { return { [=]( IntMsg & msg ) -> void { results++; if ( results == 2 * Messages ) reset(); }, [=]( CharMsg & msg ) -> void { results++; if ( results == 2 * Messages ) reset(); }, [=]( int & ) -> void { for ( size_t i = 0; i < Messages; i += 1 ) { // send out work this->send( servers[i], intmsg[i] ); this->send( servers[i], charmsg[i] ); } } }; } public: Client( caf::actor_config & cfg ) : event_based_actor( cfg ) { intmsg = new IntMsg[Messages]; charmsg = new CharMsg[Messages]; } ~Client() { delete [] charmsg; delete [] intmsg; } }; // Client class Server : public event_based_actor { behavior make_behavior() override { return { [=]( IntMsg & msg ) -> void { msg.val = 7; send( *client, msg ); }, [=]( CharMsg & msg ) -> void { msg.val = 'x'; this->send( *client, msg ); }, [=]( int & ) -> void { if ( __atomic_add_fetch( &actorCnt, 1, __ATOMIC_SEQ_CST ) == Messages + 1 ) { aout(this) << (steady_clock::now() - starttime).count() / 1'000'000'000.0 << endl; } // if this->quit(); return; } }; } public: Server( caf::actor_config & cfg ) : event_based_actor( cfg ) {} }; // Server void caf_main( actor_system & sys ) { starttime = steady_clock::now(); *client = sys.spawn(); for ( unsigned int i = 0; i < Messages; i += 1 ) { // create actors servers[i] = sys.spawn(); } // for caf::scoped_actor self{sys}; self->send( *client, 0 ); } // caf_main int main( int argc, char * argv[] ) { switch ( argc ) { case 5: if ( strcmp( argv[4], "d" ) != 0 ) { // default ? Factor = stoi( argv[4] ); if ( Factor < 1 ) goto Usage; } // if case 4: if ( strcmp( argv[3], "d" ) != 0 ) { // default ? Times = stoi( argv[3] ); if ( Times < 1 ) goto Usage; } // if case 3: if ( strcmp( argv[2], "d" ) != 0 ) { // default ? Processors = stoi( argv[2] ); if ( Processors < 1 ) goto Usage; } // if case 2: if ( strcmp( argv[1], "d" ) != 0 ) { // default ? Messages = stoi( argv[1] ); if ( Messages < 1 ) goto Usage; } // if case 1: // use defaults break; default: Usage: cerr << "Usage: " << argv[0] << ") ] [ messages (> 0) | 'd' (default " << Messages << ") ] [ processors (> 0) | 'd' (default " << Processors << ") ] [ Times (> 0) | 'd' (default " << Times << ") ]" << endl; exit( EXIT_FAILURE ); } // switch Times = Times / Factor; //cout << Actors << " " << Set << " " << Rounds << " " << Processors << endl; servers = new actor[Messages]; client = new actor(); caf::core::init_global_meta_objects(); caf::exec_main_init_meta_objects(); // caf::exec_main_init_meta_objects<>(); caf::actor_system_config cfg; cfg.set( "caf.scheduler.max-threads", Processors ); caf::actor_system system { cfg }; caf::exec_main<>(caf_main, argc, argv); } // main // /usr/bin/time -f "%Uu %Ss %Er %Mkb" a.out // Local Variables: // // compile-command: "g++-10 -Wall -O3 -std=c++17 -ICAF/actor-framework/libcaf_core -ICAF/actor-framework/libcaf_core/caf -ICAF/actor-framework/build/libcaf_core -LCAF/actor-framework/build/libcaf_core -LCAF/actor-framework/build/libcaf_io CAFMatrix.cpp -lcaf_io -lcaf_core -Wl,-rpath=CAF/actor-framework/build/libcaf_core" // // End: //