Akka实现WordCount(Scala):
架构图:

项目结构:
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.citi.sky</groupId>
<artifactId>AkkaPJ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>AkkaPJ</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.6</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>2.3.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>3.0.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
消息:
case class MapData (dataList: List[WordCount]) case class ReduceData (reduceDataList: Map[String, Int]) case class Result() case class WordCount (key: String, count: Int)
Actors:
MasterActor
import akka.actor.Actor
import akka.actor.Props
import com.citi.dw.messages.Result
class MasterActor extends Actor {
private val aggregateActor = context.actorOf(Props(classOf[AggregateActor]), "aggregateActor")
private val reduceActor = context.actorOf(Props(classOf[ReduceActor], aggregateActor), "reduceActor")
private val mapActor = context.actorOf(Props(classOf[MapActor], reduceActor), "mapActor")
def receive: Actor.Receive = {
case msg: String => {
mapActor ! msg
}
case msg: Result => {
aggregateActor.forward(msg)
}
// case msg: Map[String, Int] =>
case _ => println("MasterActor receive wrong message.")
}
}
MapActor:
import akka.actor.Actor
import com.citi.dw.messages.MapData
import com.citi.dw.messages.WordCount
import scala.collection.mutable.ListBuffer
import akka.actor.ActorRef
class MapActor(val reduceActor: ActorRef) extends Actor {
def receive: Actor.Receive = {
case msg: String => {
val mapData = evaluateExpression(msg)
reduceActor ! mapData
}
case _ => println("MapActor receive wrong message.")
}
private[this] def evaluateExpression(line: String): MapData = {
val dataList = ListBuffer[WordCount]()
line.split(" ").map(word => dataList += WordCount(word, 1))
// val wordArr = line.split(" ")
// for(word <- wordArr) {
// dataList += WordCount(word, 1)
// }
// println(dataList)
MapData(dataList.toList)
}
}
ReduceActor:
import akka.actor.Actor
import com.citi.dw.messages.MapData
import com.citi.dw.messages.ReduceData
import com.citi.dw.messages.WordCount
import scala.collection.mutable.HashMap
import akka.actor.ActorRef
class ReduceActor(val aggregateActor: ActorRef) extends Actor {
def receive: Actor.Receive = {
case msg: MapData => {
val reduceData = reduce(msg.dataList)
aggregateActor ! reduceData
}
case _ => println("ReduceActor receive wrong message.")
}
private[this] def reduce(dataList: List[WordCount]): ReduceData = {
val reduceMap = HashMap[String, Int]()
for (wc <- dataList) {
wc match {
case WordCount(key, count) if reduceMap.contains(key) => {
val localSumCount = reduceMap.get(key).get + count
reduceMap += ((key, localSumCount))
// println(reduceMap)
}
case WordCount(key, count) => {
reduceMap += ((key, 1))
// println(reduceMap)
}
}
}
ReduceData(reduceMap.toMap)
}
}
AggregateActor:
import akka.actor.Actor
import com.citi.dw.messages.ReduceData
import scala.collection.mutable.HashMap
import com.citi.dw.messages.Result
import akka.actor.ActorRef
class AggregateActor extends Actor {
private[this] var finalReduceMap = HashMap[String, Int]()
def receive: Actor.Receive = {
case msg: ReduceData => {
aggregateAndReduce(msg.reduceDataList)
}
case msg: Result => {
// println(f"Result: ${finalReduceMap}")
// sender().tell(finalReduceMap.toMap, ActorRef.noSender)
sender ! finalReduceMap.toMap
}
case _ => println("AggregateActor receive wrong message.")
}
private[this] def aggregateAndReduce(reduceList: Map[String, Int]) = {
// println(s"final: ${finalReduceMap}")
for (key <- reduceList.keys) {
if (finalReduceMap.contains(key)) {
val count = finalReduceMap.get(key).get + reduceList.get(key).get
finalReduceMap += ((key, count))
} else {
finalReduceMap += ((key, reduceList.get(key).get))
}
}
}
}
主程序:
import akka.actor.ActorSystem
import akka.actor.Props
import com.citi.dw.actors.MasterActor
import com.citi.dw.messages.Result
import akka.pattern.ask
import scala.concurrent.duration._
import akka.util.Timeout
import scala.util._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
object AkkaWordCount extends App {
implicit val timeout = Timeout(5 seconds)
val system = ActorSystem("WordCountAkka")
val master = system.actorOf(Props(classOf[MasterActor]), "master")
master ! "Hi! Hi!"
master ! ("My name is Sky. I am so so so happy to be here ")
master ! ("Today, I am going to introduce word count for Akka ")
master ! ("I hope hope It is helpful to you ")
master ! ("Thank you ")
Thread.sleep(1000)
val future = master ? Result()
// future.onComplete({
// case Success(x: String) => println(x)
// case Failure(t) => println(t)
// case msg => println("unknown message! " + msg)
// })
val result = Await.result(future, timeout.duration).asInstanceOf[Map[String, Int]]
result.map(m => println(m._1, m._2))
system.shutdown()
}
运行结果:
(for,1)
(name,1)
(count,1)
(is,2)
(am,2)
(My,1)
(going,1)
(so,3)
(introduce,1)
(Sky.,1)
(I,3)
(to,3)
(Hi!,2)
(you,2)
(here,1)
(happy,1)
(Thank,1)
(hope,2)
(Today,,1)
(helpful,1)
(Akka,1)
(It,1)
(be,1)
(word,1)