本文来自 fair-jm.iteye.com 转截请注明出处 ?
??
play的官方文档(for scala) 第二章的笔记 有兴趣的可以看下 有错误欢迎指出 ?
?
?
?
1、Handling asynchronous results
来自 <https://www.playframework.com/documentation/2.3.x/ScalaAsync>
?
Play自下而上都是异步的。Play以异步非阻塞的方式处理每个请求。
?
在controllers理应该避免阻塞。常见的阻塞操作包括JDBC调用流API HTTP请求和长时间的计算
?
当然也可以通过增加默认的线程数来允许controllers里的阻塞代码承受更多的并发访问遵循以下的推荐可以保持controllers的异步以此来保持更高的扩展性和高负载下的可用性
?
?
创建非阻塞的actions
Play的工作方式需要action越快越好(无阻塞)。具体的做法是通过返回一个future的结果(Future[Result])作为回应。
Play将会在承诺被取回(一个Future对应一个Promise承诺取回也就是Future得到结果)时服务结果。
?
如何创建Future[Result]
所有的Play异步API都会给你Future
play.api.libs.WS
play.api.libs.Akka
?
例子:
import play.api.libs.concurrent.Execution.Implicits.defaultContext
val futureInt:Future[Int]= scala.concurrent.Future{
? intensiveComputation()
}
注意上面用的线程池是默认的线程池也就是如果要处理很耗时的任务会阻塞线程那最好用其他的线程池而不是play的默认线程池
也可以使用Actor
?
返回Futures
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def index =Action.async {
? val futureInt = scala.concurrent.Future{ intensiveComputation()}
? futureInt.map(i =>Ok("Got result: "+ i))
}
?
来自 <https://www.playframework.com/documentation/2.3.x/ScalaAsync>
?
?
Action默认就是异步的
Action.apply和Action.async的处理机制是一样的只有一种Action类型(异步),两者的差别只是返回值不同.
?
?
超时机制
超时机制的实现是相当简单的:
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import scala.concurrent.duration._
def index =Action.async {
? val futureInt = scala.concurrent.Future{ intensiveComputation()}
? val timeoutFuture = play.api.libs.concurrent.Promise.timeout("Oops",1.second)
? Future.firstCompletedOf(Seq(futureInt, timeoutFuture)).map {
??? case i:Int=>Ok("Got result: "+ i)
??? case t:String=>InternalServerError(t)
? }
}
?
来自 <https://www.playframework.com/documentation/2.3.x/ScalaAsync>
?
?
可见scala本身的Future提供的功能还是很完善的基础设施完善所需的额外代码就少了 ?
?
------------------------------------------------------------------------------------------------------------------------------------
?
2 Streaming HTTP responses
?
自HTTP 1.1以来,保持单个连接开放来处理多个HTTP请求和回应的话,服务器必须在回应中包含一个合适的Content-Length HTTP头
?
def index =Action{
? Ok("Hello World")
}
?
默认下,返回一个简单的结果是不会指定Content-Length的,当然简单的回应的发送信息是已知的,play也会计算这个长度并放在头中。(要注意基于文本的内容计算长度可不是看上去那么简单,因为他要根据特定的字符集来计算)?
?
上面的代码是下面的简化写法,实际用到了play.api.libs.iteratee.Enumerator:
?
def index =Action{
? Result(
??? header =ResponseHeader(200),
??? body =Enumerator("Hello World")
? )
}
这意味着要正确地计算Content-Length Play必须消费整个enumerator并且把内容载入内存中计算?
?
?
发送大数据量的内容
将小的数据量的内容全部读入当然没问题
那大数据量的怎么办呢?
?
一个错误的例子:
def index =Action{
val file =new java.io.File("/tmp/fileToServe.pdf")
? val fileContent:Enumerator[Array[Byte]]=Enumerator.fromFile(file)???
???
? Result(
??? header =ResponseHeader(200),
??? body = fileContent
? )
}
为什么错? 因为没设置Content-Length 然后play就要把整个吃进去计算实在是浪费资源…
?
正确的姿势:
def index =Action{
val file =new java.io.File("/tmp/fileToServe.pdf")
? val fileContent:Enumerator[Array[Byte]]=Enumerator.fromFile(file)???
???
? Result(
??? header =ResponseHeader(200,Map(CONTENT_LENGTH -> file.length.toString)),
??? body = fileContent
? )
}
接着Play就会用惰加载的方式消费内容了
?
?
提供文件(Serving files)
play也提供了一些简便的方式来实现上面的需求:
def index =Action{
? Ok.sendFile(new java.io.File("/tmp/fileToServe.pdf"))
}
回应中增加的内容:
?Content-Disposition: attachment;filename=fileToServe.pdf
(关于Content-Disposition:http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html)
?
提供自定义的文件名:
def index =Action{
? Ok.sendFile(
??? content =new java.io.File("/tmp/fileToServe.pdf"),
??? fileName = _ =>"termsOfService.pdf"
? )
}
?
浏览器直接显示(inline) 而不是下载:
def index =Action{
? Ok.sendFile(
??? content =new java.io.File("/tmp/fileToServe.pdf"),
??? inline =true
? )
}
?
分块响应(Chunked responses)
上面的例子是对于可以在传输文件之前得到文件长度的
那么如果对于动态计算的内容事先不能知道内容长度怎么处理呢?
?
要使用Chunked transfer encoding
Chunked transfer encoding是HTTP 1.1的一种数据传输机制
web服务器提供以一系列的块的方式来提供文件
他使用Transfer-Encoding回应头而不是Content-Length
因为没有使用Content-Length 所以服务器不需要在传输前知道内容的长度可以用于传输动态生成的文件
?
数据的结束是通过最后传一个长度为0的chunk结束的
?
好处: 可以处理活数据(数据可用就立马传输)
坏处: 浏览器就显示不了下载进度条了?
?
play中要实现:
def index =Action{
val data = getDataStream
? val dataContent:Enumerator[Array[Byte]]=Enumerator.fromStream(data)
?
? ChunkedResult(
??? header =ResponseHeader(200),
??? chunks = dataContent
? )
}
和上面一样也有种简便的方式:
def index =Action{
val data = getDataStream
? val dataContent:Enumerator[Array[Byte]]=Enumerator.fromStream(data)
?
? Ok.chunked(dataContent)
}
当然里面的Enumerator是可以自己定义的:
def index =Action{
? Ok.chunked(
??? Enumerator("kiki","foo","bar").andThen(Enumerator.eof)
? )
}
?
HTTP的回应:
HTTP/1.1200 OK 4 |
?
------------------------------------------------------------------------------------------------------------------------------------
?
3 Comet sockets
Comet socket仅仅是内容为text/html只包含<script>元素的chunked回应
我们可以向浏览器发送事件
?
def comet =Action{
? val events =Enumerator(
??? """<script>console.log(‘kiki‘)</script>""",
??? """<script>console.log(‘foo‘)</script>""",
??? """<script>console.log(‘bar‘)</script>"""
? )
? Ok.chunked(events).as(HTML)
}
?
也可以用play.api.libs.iteratee.Enumeratee
这个是一个适配器从Enumerator[A]转换到Enumerator[B]
?
import play.twirl.api.Html
// Transform a String message into an Html script tag
val toCometMessage =Enumeratee.map[String]{ data =>
? Html("""<script>console.log(‘"""+ data +"""‘)</script>""")
}
def comet =Action{
? val events =Enumerator("kiki","foo","bar")
? Ok.chunked(events &> toCometMessage)
}
?
使用 play.api.libs.Comet
上面的代码可以简单地写成:
def comet =Action{
? val events =Enumerator("kiki","foo","bar")
? Ok.chunked(events &>Comet(callback ="console.log"))
}
实际上这个帮助类干了更多的事比如为了浏览器的兼容性发送一个初始化的空白缓冲数据
支持String和JSON可以扩展类型类来支持更多的消息类型
?
?
永远的iframe技术(The forever iframe technique)
标准的输出Comet socket的技术是载入一个无限的chunked comet回应在iframe里
并且调用父框架上的回调函数
?
def comet =Action{
? val events =Enumerator("kiki","foo","bar")
? Ok.chunked(events &>Comet(callback ="parent.cometMessage"))
}
?
html页:
<scripttype="text/javascript">
? var cometMessage =function(event){
??? console.log(‘Received event: ‘+ event)
? }
</script>
<iframesrc="/comet"></iframe>
?
------------------------------------------------------------------------------------------------------------------------------------
?
?
4 websocket
允许浏览器进行全双工通信
使用webSocket可以复用存在的play使用的TCP端口
?
处理 WebSocket
webSocket是个完全不同的野兽不能用标准的Action来驾驭
play提供了两种方式来驾驭WebSocket
第一种是使用actors
第二种是使用iteratees
两种方式都可以通过WebSocket这个构造者获得
?
通过Actor处理WebSocket
机制比较简单 Play会给用来发送数据的akka.actor.ActorRef
我们使用这个来创建一个akka.actor.Props
?
import play.api.mvc._
import play.api.Play.current
def socket =WebSocket.acceptWithActor[String,String]{ request => out =>
? MyWebSocketActor.props(out)
}
actor:
import akka.actor._
objectMyWebSocketActor{
? def props(out:ActorRef)=Props(newMyWebSocketActor(out))
}
classMyWebSocketActor(out:ActorRef)extendsActor{
? def receive ={
??? case msg:String=>
????? out !("I received your message: "+ msg)
? }
}
所有从客户端接收的数据都会发送给上面这个actor
任何发送给out那个ActorRef的数据都会发送给客户端
?
当WebSocket关闭时 Play会自动停止那个actor
想主动关闭一个websocket连接时也只需要关闭那个actor就行了
一颗毒药丸:
import akka.actor.PoisonPill
self !PoisonPill
(注:这个是触发 ActorKilledException默认的策略是Stop来实现的)
?
?
拒绝操作也比较简单改变就是方法从acceptWithActor变为了tryAcceptWithActor:
import scala.concurrent.Future
import play.api.mvc._
import play.api.Play.current
def socket =WebSocket.tryAcceptWithActor[String,String]{ request =>
? Future.successful(request.session.get("user")match{
??? caseNone=>Left(Forbidden)
??? caseSome(_)=>Right(MyWebSocketActor.props)
? })
}
?
上面的代码如果在session中没有user这个属性那么返回Forbidden否则建立连接
?
处理不同格式的消息
上面的例子全部都是基于String的 Play内置支持Array[Byte]和JsValue
比如:
import play.api.mvc._
import play.api.libs.json._
import play.api.Play.current
def socket =WebSocket.acceptWithActor[JsValue,JsValue]{ request => out =>
? MyWebSocketActor.props(out)
}
?
也可以自定义需要处理的格式
比如下面这个例子i我们接收JSON消息并且将其转化为InEvent 返回的消息转化为OutEvent
第一件事是完成 InEvent和OutEvent的JSON转换:(PS:能直接用format说明InEvent和OutEvent是case class)
import play.api.libs.json._
implicitval inEventFormat =Json.format[InEvent]
implicitval outEventFormat =Json.format[OutEvent]
?
第二件事是创建FrameFormatter:
import play.api.mvc.WebSocket.FrameFormatter
implicitval inEventFrameFormatter =FrameFormatter.jsonFrame[InEvent]
implicitval outEventFrameFormatter =FrameFormatter.jsonFrame[OutEvent]
?
最后可以用于WebSocket:
import play.api.mvc._
import play.api.Play.current
def socket =WebSocket.acceptWithActor[InEvent,OutEvent]{ request => out =>
? MyWebSocketActor.props(out)
}
?
使用Iteratees来处理WebSocket
Actors对于处理消息来说是个更好的抽象
Iteratee对于处理流来说是个更好的抽象
?
例子:
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def socket =WebSocket.using[String]{ request =>
// Log events to the console
? val in =Iteratee.foreach[String](println).map { _ =>
??? println("Disconnected")
? }
// Send a single ‘Hello!‘ message
? val out =Enumerator("Hello!")
(in, out)
}
这种方式最后返回的是那两个channel
in是Iteratee[A,Unit]? 当接收到EOF时说明客户端那边关闭了socket
out是Enumerator[B]? 当发送EOF时说明服务端这边关闭了Socket
?
第二个例子是忽视输入直接输出后关闭:
import play.api.mvc._
import play.api.libs.iteratee._
def socket =WebSocket.using[String]{ request =>
// Just ignore the input
? val in =Iteratee.ignore[String]
// Send a single ‘Hello!‘ message and close
? val out =Enumerator("Hello!").andThen(Enumerator.eof)
(in, out)
}
?
Concurrent.broadcase可以用于广播:
import play.api.mvc._
import play.api.libs.iteratee._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def socket =? WebSocket.using[String]{ request =>
// Concurrent.broadcast returns (Enumerator, Concurrent.Channel)
? val(out, channel)=Concurrent.broadcast[String]
?
// log the message to stdout and send response back to client
? val in =Iteratee.foreach[String]{
??? msg => println(msg)
????? // the Enumerator returned by Concurrent.broadcast subscribes to the channel and will
????? // receive the pushed messages
????? channel push("I received your message: "+ msg)
? }
? (in,out)
原文:http://fair-jm.iteye.com/blog/2213321