source: doc/theses/colby_parsons_MMAth/benchmarks/actors/akka/Executor/AkkaExecutor.scala

Last change on this file was 5adf4f4, checked in by caparson <caparson@…>, 16 months ago

added caf/uC++/proto benchmarks

  • Property mode set to 100644
File size: 5.6 KB
Line 
1import akka.actor.typed.scaladsl.Behaviors
2import akka.actor.typed.scaladsl.LoggerOps
3import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
4import java.util.concurrent.atomic.AtomicInteger
5import akka.event.Logging
6import akka.actor.typed.DispatcherSelector
7import java.util.concurrent.Semaphore
8
9object ExecutorActor {
10        sealed trait MessageTypes
11        final case class Dummy( id: Int ) extends MessageTypes
12        final case class StartMsg( id: Int ) extends MessageTypes
13
14        val rounds = ExecutorMain.Set * ExecutorMain.Rounds
15
16        def apply(): Behavior[MessageTypes] = Behaviors.receive { (context, message) =>
17                message match {
18                        case StartMsg( id ) =>
19                                ExecutorMain.groups(id) = id / ExecutorMain.Set * ExecutorMain.Set
20                                if (id == ExecutorMain.Actors - 1) {
21                                        ExecutorMain.system ! ExecutorMain.Continue()
22                                } // if
23                        case Dummy( id ) =>
24                                if ( ExecutorMain.recs(id) >= rounds ) {
25                                        if ( ExecutorMain.actorCnt.incrementAndGet() == ExecutorMain.Actors ) {
26                                                if ( ExecutorMain.trials.get() > 0 ) { // ignore trial 1
27                                                        println( s"${ExecutorMain.Processors}" + "\t" + f"${(System.nanoTime() - ExecutorMain.startTime) * ExecutorMain.Factor / 1_000_000_000.0}%1.2f" )
28                                                } // if
29                                                if ( ExecutorMain.trials.incrementAndGet() == ExecutorMain.Numtimes + 1 ) {
30                                                        ExecutorMain.system ! ExecutorMain.Stop()
31                                                } else {
32                                                        ExecutorMain.actorCnt.set( 0 )
33                                                        ExecutorMain.startTime = System.nanoTime()
34                                                        for ( r <- 0 until ExecutorMain.Actors ) { // start actors again
35                                                                ExecutorMain.actors(r) ! Dummy( r )
36                                                        } // for
37                                                } // if
38                                        } // if
39                                        ExecutorMain.recs(id) = 0
40                                        ExecutorMain.sends(id) = 0                      // reset for next trial
41                                } else {
42                                        if ( ExecutorMain.recs(id) % ExecutorMain.Batch == 0 ) {
43                                                for ( r <- 0 until ExecutorMain.Batch ) {  // start actors again
44                                                        val sendId = ExecutorMain.groups(id) + ExecutorMain.sends(id) % ExecutorMain.Set
45                                                        ExecutorMain.actors( sendId ) ! Dummy( sendId ) // cycle through group
46                                                        ExecutorMain.sends(id) += 1
47                                                } // for
48                                        } // if
49                                        ExecutorMain.recs(id) += 1
50                                } // if
51                } // message
52                Behaviors.same
53        }
54}
55
56object ExecutorMain {
57        sealed trait MainMessages
58        final case class Start() extends MainMessages
59        final case class Stop() extends MainMessages
60        final case class Continue() extends MainMessages
61
62        var Actors = 40_000; var Set = 100; var Rounds = 400; var Processors = 1; var Batch = 1; var Numtimes = 1; var Factor = 2 // default values
63
64        val actors = new Array[ActorRef[ExecutorActor.MessageTypes]](Actors)
65        val recs = new Array[Int](Actors)
66        val sends = new Array[Int](Actors)
67        val groups = new Array[Int](Actors)
68
69        val actorCnt = new AtomicInteger(0)
70        val trials = new AtomicInteger(0)
71    val sem = new Semaphore(0)
72
73        var system : ActorSystem[ExecutorMain.MainMessages] = null
74        var startTime = System.nanoTime()
75
76        def apply(): Behavior[MainMessages] = Behaviors.setup { context =>
77                for ( id <- 0 until Actors ) {                                          // create actors
78                        recs(id) = 0
79                        sends(id) = 0
80                        actors(id) = context.spawn(ExecutorActor(), "actor_" + id, DispatcherSelector.fromConfig("akka.dispatcher"))
81                } // for
82
83        Behaviors.receiveMessage { message =>
84                        message match {
85                                case Start() =>
86                                        for ( id <- 0 until Actors ) {                                          // start actors
87                                                actors(id) ! ExecutorActor.StartMsg( id )
88                                        } // for
89                                        Behaviors.same
90                                case Continue() =>
91                                        for ( id <- 0 until Actors ) {                                          // start actors
92                                                actors(id) ! ExecutorActor.Dummy( id )
93                                        } // for
94                                        Behaviors.same
95                                case Stop() =>
96                    sem.release()
97                                        Behaviors.stopped
98                        } // match
99        }
100    }
101
102        def usage() = {
103                println( "Usage: " +
104                        s"[ actors (> 0 && > set && actors % set == 0 ) | 'd' (default ${Actors}) ] " +
105                        s"[ set (> 0) | 'd' (default ${Set}) ] " +
106                        s"[ rounds (> 0) | 'd' (default ${Rounds}) ] " +
107                        s"[ processors (> 0) | 'd' (default ${Processors}) ] " +
108            s"[ batch (> 0) | 'd' (default ${Batch}) ] " +
109                        s"[ num times (> 0) | 'd' (default ${Numtimes}) ]"
110                )
111                System.exit( 1 )
112        }
113
114        def main(args: Array[String]): Unit = {
115                if ( args.length > 7 ) usage()                                          // process command-line arguments
116        if ( args.length == 7 ) {
117                        if ( args(6) != "d" ) {                                                 // default ?
118                                Factor = args(6).toInt
119                                if ( Factor < 1 ) usage()
120                        } // if
121                } // if
122        if ( args.length >= 6 ) {
123                        if ( args(5) != "d" ) {                                                 // default ?
124                                Numtimes = args(5).toInt
125                                if ( Numtimes < 1 ) usage()
126                        } // if
127                } // if
128                if ( args.length >= 5 ) {
129                        if ( args(4) != "d" ) {                                                 // default ?
130                                Batch = args(4).toInt
131                                if ( Batch < 1 ) usage()
132                        } // if
133                } // if
134                if ( args.length >= 4 ) {
135                        if ( args(3) != "d" ) {                                                 // default ?
136                                Processors = args(3).toInt
137                                if ( Processors < 1 ) usage()
138                        } // if
139                } // if
140                if ( args.length >= 3 ) {                                                       // fall through
141                        if ( args(2) != "d" ) {                                                 // default ?
142                                Rounds = args(2).toInt
143                                if ( Rounds < 1 ) usage()
144                        } // if
145                } // if
146                if ( args.length >= 2 ) {                                                       // fall through
147                        if ( args(1) != "d" ) {                                                 // default ?
148                                Set = args(1).toInt
149                                if ( Set < 1 ) usage()
150                        } // if
151                } // if
152                if ( args.length >= 1 ) {                                                       // fall through
153                        if ( args(0) != "d" ) {                                                 // default ?
154                                Actors = args(0).toInt
155                                if ( Actors < 1 || Actors <= Set || Actors % Set != 0 ) usage()
156                        } // if
157                } // if
158
159        Rounds = Rounds / Factor;
160
161                //println( "Actors " + Actors + " Set " + Set + " Rounds " + Rounds + " Processors " + Processors + " Batch " + Batch )
162
163                system = ActorSystem( ExecutorMain(), "Executor" )
164                system ! Start()
165        sem.acquire()
166        }
167}
168
169// Local Variables: //
170// tab-width: 4 //
171// mode: java //
172// compile-command: "sbt --warn -J-Xmx32g \"run\"" //
173// End: //
Note: See TracBrowser for help on using the repository browser.