Akka-Streams是一个让人激动的Reactive Streams的框架,Akka-Http也是构建在其之上,除了内置背压模式的支持,使用其DSL构建一个Graph也是一个让人惊艳的过程。对于Akka-Streams的介绍会在后续的文章中逐一展开,本文只奉上一段代码,实现的是HTTP代理功能,同时可以对请求和响应做一些修饰。
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.coding.{Deflate, Gzip, NoCoding}
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.HttpEncodings
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Sink, Source}
import akka.util.ByteString
import io.circe._
import io.circe.parser._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object HttpProxy extends App {
implicit val actorSystem = ActorSystem(name = "system")
implicit val streamMaterializer = ActorMaterializer()
implicit val executionContext = actorSystem.dispatcher
// The handle flow contains 2 sub-flows: forward flow & decorate flow.
// The forward flow will forward http request to destination server.
// The decorate flow will decorate http response then return to clients.
val handleFlow: Flow[HttpRequest, HttpResponse, _] = Flow.fromGraph(GraphDSL.create() {
implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val forwardFlow = Http(actorSystem).outgoingConnection(host = "destination-server", port = 8080)
val forwardRequestFlow: Flow[HttpRequest, (HttpRequest, Future[HttpResponse]), _] = {
Flow[HttpRequest]
.map {
orgRequest => {
val request = orgRequest.copy(
// The uri string value looks like: "http://domain/path",
// this is invalid for some http servers, the acceptance format is: "/path"
// So, we need remove domain, now, The value 21 is the length of
// string: "http://destination-server:8080", it‘s hard code for short-term,
// later, it will be configurable.
uri = orgRequest.uri.toString().substring(30),
headers =
for {
header <- orgRequest.headers
if header.isNot("timeout-access")
} yield {
if (header.is("host")) {
akka.http.scaladsl.model.headers.Host(Uri.Host("destination-server"), 8080)
}
else {
header
}
}
)
val response = Source.single(request).via(forwardFlow).runWith(Sink.head)
(request, response)
}
}
}
val decorateResponseFlow: Flow[(HttpRequest, Future[HttpResponse]), HttpResponse, _] = {
Flow[(HttpRequest, Future[HttpResponse])]
.mapAsync(1) {
(tuple) => {
val request = tuple._1
val responseFuture = tuple._2
val (path, body) = extractPathAndBody(request)
path match {
case "/path/to/decorate" if body.contains("some special key words") =>
responseFuture.flatMap {
response =>
val coder = response.encoding match{
case HttpEncodings.gzip => Gzip
case HttpEncodings.deflate => Deflate
case HttpEncodings.identity => NoCoding
case _ => throw new RuntimeException("akka not supported encoding "+response.encoding.toString())
}
response.entity
.dataBytes.runFold(ByteString(""))(_ ++ _)
.map {
byteString =>
val gzipDecodeFuture = coder.decode(byteString)
val gzipDecodeByteString = Await.result(gzipDecodeFuture, Duration.Inf)
val decodedByteString = gzipDecodeByteString.decodeString("utf-8")
val json: Json = parse(decodedByteString).getOrElse(Json.Null)
// do some transformation jobs based on origin json.
val transformedEntity = ???
val entity = HttpEntity(ContentTypes.`application/json`,
coder.encode(ByteString.fromString(transformedEntity)))
response.copy(entity = entity)
}
}
case _ => responseFuture
}
}
}
}
val forwardRequest: FlowShape[HttpRequest, (HttpRequest, Future[HttpResponse])] = builder.add(forwardRequestFlow)
val decorateResponse: FlowShape[(HttpRequest, Future[HttpResponse]), HttpResponse] = builder.add(decorateResponseFlow)
forwardRequest ~> decorateResponse
FlowShape(forwardRequest.in, decorateResponse.out)
})
def extractPathAndBody(request:HttpRequest):(String,String) ={
val path = request.uri.path.toString()
val body = request.entity.dataBytes.map(_.decodeString("utf-8")).runWith(Sink.head)
val bodyStr = Await.result(body, Duration.Inf)
(path,bodyStr)
}
val serverSource = Http(actorSystem).bind(interface = "0.0.0.0", port = 7000)
serverSource.to(Sink.foreach { connection =>
connection.handleWith(handleFlow)
}).run()
}
原文:http://blog.csdn.net/bluishglc/article/details/55657757