decadence

個人のメモ帳

PlayのNettyServer

PlayのNettyに対するアダプターとなるNettyServer、こいつが何をするのかと、ここでリクエストを受け取って処理するとこを見る。 play.api...で行うリクエスト受け取ってレスポンス返すとこは、Qiitaの以下らへんの記事見れば良いと思うので省略。ContentTypeを決定するあたりにアドホック多相とか使っててScalaっぽくて好き。

簡単にまとめると、Nettyから受け取ってplay.apiまで渡すまでに↓の過程を踏む

  • play.core.server.NettyServerが↓の持ってる
  • play.core.server.netty.PlayDefaultUpstreamHandler.messageReceivedで↓の呼び出す
  • play.core.server.Server.getHandlerForが↓の持ってる
    • play.core.server.Server.sendHandlerで↓の呼び出す
  • play.api.GlobalSettings.onRequestReceived: (RequestHeader, Handler)では↓の3つの処理が行われる
    • onRouteRequest(request)
    • router.handleFor(request)
    • doFilter(rh => handler)(routedRequest)

以下で触れるのは2.3.xブランチのこのcommit上の話、ただコードを読むだけ

play.core.server.Server

NettyServerが持つtraitの1つ

  • bodyParserTimeout: どこでも使われて無くてウケる
  • mode: Dev, Test, Prodとか
  • applicationProvider
  • stop(): Logger落とす
  • getHandlerFor: 大事
    • applicationProvider.handleWebCommand
      • ブラウザ上でevolutionの実行をクリックするとかそういう処理
      • 失敗時はその様子をGlobal.onError上に載せて出す
    • sendHandler: リクエストの処理
      • application.global.onRequestReceived(request)

play.core.server.NettyServer

object NettyServer
  • main持っててPlaySetting内ではmainClassがこいつを指定してる(runは下のmainDevOnly~が使われ、こちらは使われない)
  • mainから呼ばれるcreateServerでは以下の事が行われPlayアプリケーションが起動する
    • pidの取得(java.lang.management.ManagementFactory.getRuntimeMXBean.getName.split('@').headOption)
    • pidファイルの指定を基本はアプリケーションルートのRUNNNING_PIDとする
    • runtimeのshutdownHookにpidファイルの削除を追加
    • StaticApplicationを用いたNettyServerインスタンスの生成
    • runtimeのshutdownHookにserver.stop()を追加
  • mainDevOnlyHttpModemainDevOnlyHttpsModeも持ってて、run時にリフレクション使ってここが呼び出されてる(参考: 前の記事のPlayRunのとこ)
class NettyServer
  • 先に述べたServerServerWithStopのtraitがmixinされてる

PlayのApplicationの情報を提供するApplicationProviderやportなどを引数にNettyとのアダプターとなるクラス。インスタンス生成時に大まかな設定が全て用意される。

  • NettyのBootstrapを用意
    • netty-bossnetty-workerに用いられるスレッドプールを用意
    • Netty用のオプション(http.netty.option.で始まるもの)をNettyに渡す
    • Nettyで用いるpipelineを提供するPlayPipelineFactoryを設定する
  • Nettyはpipelineに様々な処理を積んでく実装を行うため、Playでは以下のpipelineを順に積む
    • SSL接続の場合はSslHandler
    • HttpRequestDecoder
    • HttpResponseEncoder
    • HttpContentDecompressor
    • HttpPipeliningHandler
    • defaultUpStreamHandler: ここでplay.api内の処理等を行う
      • 下のPlayDefaultUpstreamHandler.messageReceivedへ続く
  • stop()時の挙動
    • Play.stop()
    • Server.stop()
    • リクエストを送るチャンネルを閉じる
    • 外部リソースを開放

PlayDefaultUpstreamHandler

Nettyで使われるHandlerの1つになり、リクエストが来た際のNettyからPlayへの入り口及び出口となる

以下のメソッドをoverrideしてる

  • exceptionCaught: 例外発生時に呼ばれるもの
    • Logger("play.nettyException")で例外の内容に応じてログを書き出し、チャンネルを閉じたりする
  • channelConnected: チャンネルへの接続を行った時に呼ばれるもの
    • ssl接続の場合、SSLハンドシェイクを行う
  • channelDisconnected: チャンネルの接続がはずれた時に呼ばれるもの
    • ChannelHandlerContextの中身を空にする
  • channelOpen: チャンネルを開いた時に呼ばれるもの
    • 保持するチャンネルグループにイベントを伝播させるように設定する
  • messageReceiveed: リクエストなどが飛んできた時に呼ばれるもの(今回の本題)
    • 下の方で別に見る

後で困るので先に、EssentialActionはこういうやつ。

trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result]) with Handler
object EssentialAction {
  def apply(f: RequestHeader => Iteratee[Array[Byte], Result]): EssentialAction = new EssentialAction {
    def apply(rh: RequestHeader) = f(rh)
  }
}
.messageReceived

リクエストの処理等を行う箇所

HttpRequest以外のイベントが飛んできたらログに吐いて終わり。HttpRequestに対しては以下の処理が行われる

  • 来たリクエストを基に、新しいrh: RequestHeaderの生成(Netty用のRequestHeaderからPlay用のものへ)
  • rhを基に、play.core.server.Server.getHandlerForからhandlerの取得
  • handler: HandlerがEssentialActionWebsocketにより処理の振り分け
  • EssentialActionの場合
    • "rh: RequestHeaderを引数にhandlerに食わせIteratee[Array[Byte], Result]をunflattenして、InterateeからStepを取り、結果としてIteratee[Array[Byte], Result]を返す関数"を引数に、EssentialActionオブジェクトからEssentialActionクラスのインスタンスを取得(下に実際のコード置いた)。エラー吐く際はapp.handleErrorの結果がEssentialActionに渡される
    • handleActionにこいつ(action: EssencialAction)と、app: Applicationを渡す: 以下handleActionの話
      • 結局play.api....で定義した内容はここで処理される
    • actionに事前にPlay用に変換したrequestHeaderを与え、リクエストの中身であるArray[Byte]から結果Resultを生成するbodyParser: Iteratee[Array[Byte], Result]を用意
    • Chunkでデータが来る場合には、bodyParserを使い回し、RequestBodyHandlerにより順に処理してResultを得る
    • データがまとめて送られて来る場合には、リクエストから中身を読み込み、Enumerator[Array[Byte]]した上で先ほどのbodyParserに食わせResultを得る
    • Expect: 100-continueヘッダーの処理もここに存在
    • play.core.server.netty.NettyResultStream.sendResult: resultの種類により以下のように処理を振り分けつつ、各々の処理が終わるとチャンネル接続が閉じてる場合にはDownstreamへその情報を送る(このあたりは他のNettyの記事などを参考に)
      • ヘッダの値にnullが含まれている場合: InternalServerErrorとして結果を返し、接続を閉じる
      • Transfer-Encodingヘッダがあり、HTTP1.0の場合: HTTP1.1を使うように505を返す
      • コネクションが閉じられる場合: チャンネルのコネクションが閉じられるような状態にしつつ、レスポンスのデータを順にEOFまでdownstreamへ送る
      • Transfer-Encodingヘッダがある or Content-Lengthヘッダがある場合: チャンネルのコネクションが閉じられるような状態にしつつ、レスポンスのデータを順にEOFまでdownstreamへ送る(ここEndOfBodyInProtocolってなっててよくわからん)
      • それ以外: bufferingしながらnettyResponseに内容入れてDonwstreamへ送る(途中HTTP1.1の場合はTransfer-Encoding: Chunkedを設定しつつchunkを順に操作する)
    • 送った後はglobal.onRequestCompletionAtomicBooleanで縛って一度だけ実行
val a = EssentialAction { rh =>
  import play.api.libs.iteratee.Execution.Implicits.trampoline
  Iteratee.flatten(action(rh).unflatten.map(_.it).recover {
  case error =>
    Iteratee.flatten(
      app.handleError(requestHeader, error).map(result => Done(result, Input.Empty))
      ): Iteratee[Array[Byte], Result]
  })
}
handleAction(a, Some(app))