import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import java.util.concurrent.atomic.AtomicInteger import akka.event.Logging import akka.actor.typed.DispatcherSelector import java.util.concurrent.Semaphore object Server { def apply(): Behavior[Client.MainMessages] = Behaviors.receive { (context, message) => message match { case Client.CharMsg( value ) => Client.system ! Client.CharMsg( 'X' ) case Client.IntMsg( value ) => Client.system ! Client.IntMsg( 7 ) case Client.State() => } // message Behaviors.same } } object Client { sealed trait MainMessages final case class State() extends MainMessages final case class IntMsg( val value: Int ) extends MainMessages final case class CharMsg( val value: Char ) extends MainMessages var Messages = 100_000; var Times = 100; var Processors = 4; var Factor = 1 // default values var times = 0 var results = 0 val servers = new Array[ActorRef[MainMessages]](Messages) val actorCnt = new AtomicInteger(0) val sem = new Semaphore(0) var system : ActorSystem[MainMessages] = null var startTime = System.nanoTime() def process() = { } def apply(): Behavior[MainMessages] = Behaviors.setup { context => for ( id <- 0 until Messages ) { // create actors servers(id) = context.spawn(Server(), "actor_" + id, DispatcherSelector.fromConfig("akka.dispatcher")) } // for Behaviors.receiveMessage { message => message match { case State() => for ( id <- 0 until Messages ) { // start actors servers(id) ! IntMsg( 0 ) servers(id) ! CharMsg( 0 ) } // for Behaviors.same case IntMsg( value ) => results += 1 if ( results == 2 * Messages ) { times += 1 if ( times == Times ) { sem.release() Behaviors.stopped } else { results = 0 system ! State() Behaviors.same } } else Behaviors.same case CharMsg( value ) => results += 1 if ( results == 2 * Messages ) { times += 1 if ( times == Times ) { sem.release() Behaviors.stopped } else { results = 0 system ! State() Behaviors.same } } else Behaviors.same } // match } } def usage() = { println( "Usage: " + s"[ messages (> 0) | 'd' (default ${Messages}) ] " + s"[ processors (> 0) | 'd' (default ${Processors}) ] " + s"[ num times (> 0) | 'd' (default ${Times}) ]" ) System.exit( 1 ) } def main(args: Array[String]): Unit = { if ( args.length > 5 ) usage() // process command-line arguments if ( args.length == 4 ) { // fall through if ( args(3) != "d" ) { // default ? Factor = args(3).toInt if ( Factor < 1 ) usage() } // if } // if if ( args.length >= 3 ) { // fall through if ( args(2) != "d" ) { // default ? Times = args(2).toInt if ( Times < 1 ) usage() } // if } // if if ( args.length >= 2 ) { // fall through if ( args(1) != "d" ) { // default ? Processors = args(1).toInt if ( Processors < 1 ) usage() } // if } // if if ( args.length >= 1 ) { // fall through if ( args(0) != "d" ) { // default ? Messages = args(0).toInt if ( Messages < 1 ) usage() } // if } // if Times = Times / Factor system = ActorSystem( Client(), "Executor" ) system ! State() sem.acquire() println( s"${Processors}\t" + f"${(System.nanoTime() - startTime) * Factor / 1_000_000_000.0}%1.2f" ) } } // Local Variables: // // tab-width: 4 // // mode: java // // compile-command: "sbt --warn -J-Xmx32g \"run\"" // // End: //