source: doc/theses/colby_parsons_MMath/benchmarks/actors/akka/Matrix/AkkaMatrix.scala

Last change on this file was f945fa7, checked in by Peter A. Buhr <pabuhr@…>, 13 hours ago

fix spelling mistake in directory name

  • Property mode set to 100644
File size: 3.9 KB
Line 
1import akka.actor.typed.scaladsl.Behaviors
2import akka.actor.typed.scaladsl.LoggerOps
3import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
4import java.util.concurrent.atomic.AtomicInteger
5import akka.event.Logging
6import akka.actor.typed.DispatcherSelector
7import java.util.concurrent.Semaphore
8
9object MatrixMult {
10 final case class WorkMsg( var Z: Array[Int], val X: Array[Int], val Y: Array[Array[Int]] )
11
12 def apply(): Behavior[WorkMsg] = Behaviors.receive { (context, message) =>
13 // message match {
14 // case WorkMsg( Z, X, Y ) =>
15 for ( i <- 0 until MatrixMain.yc ) {
16 message.Z(i) = 0
17 for ( j <- 0 until MatrixMain.xc ) {
18 message.Z(i) = message.Z(i) + message.X(j) * message.Y(j)(i)
19 }
20 }
21 if ( MatrixMain.actorCnt.incrementAndGet() == MatrixMain.xr ) {
22 MatrixMain.system ! MatrixMain.Stop()
23 }
24 // } // message
25 Behaviors.same
26 }
27}
28
29object MatrixMain {
30 sealed trait MainMessages
31 final case class Start() extends MainMessages
32 final case class Stop() extends MainMessages
33
34 var xr = 3_072; var xc = 3_072; var yc = 3_072; var Processors = 1; var MaxProcs = 48; // default values
35
36 val actors = new Array[ActorRef[MatrixMult.WorkMsg]](xr)
37 val messages = new Array[MatrixMult.WorkMsg](xr);
38
39 val actorCnt = new AtomicInteger(0)
40 val sem = new Semaphore(0)
41
42 var system : ActorSystem[MatrixMain.MainMessages] = null
43 var startTime = System.nanoTime()
44
45 var X = Array.ofDim[Int](0, 0) // set type, set size below
46 var Y = Array.ofDim[Int](0, 0)
47 var Z = Array.ofDim[Int](0, 0)
48
49 def apply(): Behavior[MainMessages] = Behaviors.setup { context =>
50 for ( id <- 0 until xr ) { // create actors
51 actors(id) = context.spawn(MatrixMult(), "actor_" + id, DispatcherSelector.fromConfig("akka.dispatcher"))
52 messages(id) = new MatrixMult.WorkMsg(Z(id), X(id), Y)
53 } // for
54
55 Behaviors.receiveMessage { message =>
56 message match {
57 case Start() =>
58 for ( id <- 0 until xr ) { // start actors
59 actors(id) ! messages(id)
60 } // for
61 Behaviors.same
62 case Stop() =>
63 sem.release()
64 Behaviors.stopped
65 } // match
66 }
67 }
68
69 def usage() = {
70 println( "Usage: " +
71 s"[ yc (> 0) | 'd' (default ${xr}) ] " +
72 s"[ xc (> 0) | 'd' (default ${xc}) ] " +
73 s"[ xr (> 0) | 'd' (default ${yc}) ] " +
74 s"[ processors (> 0) | 'd' (default ${Processors}) ]"
75 )
76 System.exit( 1 )
77 }
78
79 def main(args: Array[String]): Unit = {
80 if ( args.length > 4 ) usage() // process command-line arguments
81 if ( args.length == 5 ) {
82 if ( args(4) != "d" ) { // default ?
83 MaxProcs = args(4).toInt
84 if ( MaxProcs < 1 ) usage()
85 } // if
86 } // if
87 if ( args.length >= 4 ) {
88 if ( args(3) != "d" ) { // default ?
89 Processors = args(3).toInt
90 if ( Processors < 1 ) usage()
91 } // if
92 } // if
93 if ( args.length >= 3 ) { // fall through
94 if ( args(2) != "d" ) { // default ?
95 xr = args(2).toInt
96 if ( xr < 1 ) usage()
97 } // if
98 } // if
99 if ( args.length >= 2 ) { // fall through
100 if ( args(1) != "d" ) { // default ?
101 xc = args(1).toInt
102 if ( xc < 1 ) usage()
103 } // if
104 } // if
105 if ( args.length >= 1 ) { // fall through
106 if ( args(0) != "d" ) { // default ?
107 yc = args(0).toInt
108 if ( yc < 1 ) usage()
109 } // if
110 } // if
111
112 xr = xr / (MaxProcs / Processors)
113
114 X = Array.ofDim[Int](xr, xc)
115 Y = Array.ofDim[Int](xc, yc)
116 Z = Array.ofDim[Int](xr, yc)
117
118 for ( r <- 0 until xr ) {
119 for ( c <- 0 until xc ) {
120 X(r)(c) = r * c % 37
121 }
122 }
123
124 for ( r <- 0 until xc ) {
125 for ( c <- 0 until yc ) {
126 Y(r)(c) = r * c % 37
127 }
128 }
129
130 startTime = System.nanoTime()
131 system = ActorSystem( MatrixMain(), "Matrix" )
132 system ! Start()
133
134 sem.acquire()
135
136 println( s"${Processors}\t" + f"${(System.nanoTime() - startTime) * (MaxProcs / Processors) / 1_000_000_000.0}%1.2f" )
137 }
138}
139
140// Local Variables: //
141// tab-width: 4 //
142// mode: java //
143// compile-command: "sbt --warn -J-Xmx32g \"run\"" //
144// End: //
Note: See TracBrowser for help on using the repository browser.