WebSocket Akka HTTP на практиці

Досить тривалий час існувала лише одна гідна реалізація роботи з HTTP поверх Akka — spray. До цієї бібліотеки пару умільців написали розширення для WebSocket,
яке було цілком зрозуміло у використанні і проблем не виникало. Але роки йшли і spray, в тому чи іншому вигляді, перекочував в Akka HTTP із реалізованою підтримкою WebSocket з коробки.
Для роботи з WebSocket хлопці з Akka пропонують нам використовувати Akka Stream, тим самим спрощуючи нам життя з потоковими даними і, одночасно, ускладнюючи її. Akka Stream не так простий в розумінні. Далі я спробую показати базові практичні приклади використання.

Коротко про Akka Stream
Це своєрідний pipeline обробки даних, кожна ітерація якого що-небудь робить з даними, що потрапляють в нього. Flow ділиться на 3 складові: Source, GraphStage, Sink.
Найкраще це показано на діаграмі документації
image

Для реалізації WebSocket нам потрібно реалізовувати GraphStagе. Source нам надає akka, це якраз і є наш клієнт з летять від нього повідомленнями. А Sink — сама відправка наших повідомлень клієнту.

Actor style
Мабуть один з найбільш неефективних способів обробки, але самий простий для розуміння.
Ідея його полягає в тому, щоб всі вхідні повідомлення потрапляли в актор, і у нього був ActorRef, який відправляв дані безпосередньо клієнтові.

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props, Terminated}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._

import scala.io.StdIn

object Boot extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()

def flow: Flow[Message, Message, Any] = {
val client = system.actorOf(Props(classOf[ClientConnectionActor]))
val in = Sink.actorRef(client, 'sinkclose)
val out = Source.actorRef(8, OverflowStrategy.fail).mapMaterializedValue { a ⇒
client ! ('income → a)
a
}
Flow.fromSinkAndSource(in, out)
}

val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()

import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ ⇒ system.terminate())
}

class ClientConnectionActor extends Actor {
var connection: Option[ActorRef] = None

val receive: Receive = {
case ('income, a: ActorRef) ⇒ connection = Some(a); context.watch(a)
case Terminated(a) if connection.contains(a) ⇒ connection = None; context.stop(self)
case 'sinkclose ⇒ context.stop(self)

case TextMessage.Strict(t) ⇒ connection.foreach(_ ! TextMessage.Strict(s"echo $t"))
case _ ⇒ // ingone
}

override def postStop(): Unit = connection.foreach(context.stop)
}

На кожне підключення клієнта ми створюємо актор ClientConnectionActor. А також Source, який буде представляти із себе ще один актор, направляючий отримані повідомлення у flow. Після його створення через метод mapMaterializedValue ми отримаємо на нього посилання. Крім цього ми створюємо Sink, який все повідомлення буде відправляти в ClientConnectionActor.
Таким чином ClientConnectionActor буде одержувати всі повідомлення з сокета. Відправляти ми їх будемо через прилетів йому ActorRef, який буде доставляти їх клієнту.
Мінуси: необхідно стежити за побічними акторами; бути акуратним OverflowStrategy; для обробки всіх повідомлень у нас всього один актор, він, відповідно, однопотоковий, з-за чого можуть виникнути проблеми з продуктивністю.

Похідний варіант з використанням ActorPublisher ActorSubscriber ми розглядати не будемо, так як, судячи з офіційної документації, він в змозі deprecated.

Flow style
Ідея даного підходу полягає в повному використанні Akka Stream для досягнення цілей. Загальний вигляд його зводиться до побудови pipeline обробки вхідних повідомлень клієнта.
Скелет
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._

import scala.io.StdIn

object Boot extends App {
implicit val system = ActorSystem("example")
implicit val materializer = ActorMaterializer()

def flow: Flow[Message, Message, Any] = {
Flow[Message].collect {
case TextMessage.Strict(t) ⇒ t
}.map { text ⇒
TextMessage.Strict(s"echo: $text")
}
}

val route = path("ws")(handleWebSocketMessages(flow))
val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()

import system.dispatcher
bindingFuture
.flatMap(_.unbind())
.onComplete(_ ⇒ system.terminate())
}

В даному випадку ми обробляємо тільки текстові повідомлення і змінюємо їх. Далі TextMessage відправляється клієнтові.


Тепер трохи ускладнимо скелет і додамо парсинг і серіалізацію JSON.
Класи для серіалізації
trait WsIncome
trait WsOutgoing
@JsonCodec case class Say(name: String) extends WsIncome with WsOutgoing

implicit val WsIncomeDecoder: Decoder[WsIncome] = Decoder[Say].map[WsIncome](identity)
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
case s: Say ⇒ s.asJson
}


Модифікуємо flow
Flow[Message]
.collect {
case tm: TextMessage ⇒ tm.textStream
}
.mapAsync(CORE_COUNT * 2 - 1)(in ⇒ in.runFold("")(_ + _).flatMap(in ⇒ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
.collect {
case Say(name) ⇒ Say(s"hello: $name")
}
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))

Спершу ми відкидаємо всі двійкові повідомлення, далі парсим вхідний потік в JSON, обробляємо його і сериализуем в текст для відправки клієнтові.

Ускладнимо конструкцію, додавши контекст для кожного клієнта. В цьому нам допоможе statefulMapConcat.
ClientContext
class ClientContext {
@volatile var userName: Option[String] = None
}
object ClientContext {
def unapply(arg: ClientContext): Option[String] = arg.userName
}

@JsonCodec case class SetName(name: String) extends WsIncome
@JsonCodec case class Say(text: String) extends WsIncome with WsOutgoing

implicit val WsIncomeDecoder: Decoder[WsIncome] =
Decoder[Say].map[WsIncome](identity)
.or(Decoder[SetName].map[WsIncome](identity))


def flow: Flow[Message, Message, Any] = {
Flow[Message]
.collect {
case tm: TextMessage ⇒ tm.textStream
}
.mapAsync(CORE_COUNT * 2 - 1)(in ⇒ in.runFold("")(_ + _).flatMap(in ⇒ Future.fromTry(parse(in).toTry.flatMap(_.as[WsIncome].toTry))))
.statefulMapConcat(() ⇒ {
val context = new ClientContext
m ⇒ (context → m) :: Nil
})
.mapConcat {
case (c: ClientContext, SetName(name)) ⇒
c.userName = Some(name)
Nil
case a ⇒ a :: Nil
}
.collect {
case (ClientContext(userName), Say(text)) ⇒ Say(s"$userName: $text")
case (_, Say(text)) ⇒ Say(s"unknown: $text")
}
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))
}

Є й інший спосіб: можна реалізувати свій filter/map успадкувавши GraphStage[FlowShape[A, A]].
Приклад (не адаптовано під попередній код)
class AuthFilter(auth: ws.AuthMessage ⇒ Future[Option[UserProfile]])(implicit ec: контексті виконання) extends GraphStage[FlowShape[ws.WsIncomeMessage, ws.WsContextIncomeMessage]] {

val in = Inlet[ws.WsIncomeMessage]("AuthFilter.in")
val out = Outlet[ws.WsContextIncomeMessage]("AuthFilter.out")

val shape = FlowShape.of(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
@volatile var profile: Option[UserProfile] = None
setHandler(in, new InHandler {
override def onPush(): Unit = profile match {
case Some(p) ⇒ push(out, ws.WsContextIncomeMessage(p, grab(in)))
case _ ⇒ grab(in) match {
case a: ws.AuthMessage ⇒ auth(a) onComplete {
case Success(p) ⇒
profile = p
pull(in)
case Failure(e) ⇒ fail(out, e)
}
case _ ⇒ pull(in)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
}

В даному варіанті фільтрується всі повідомлення до тих пір, поки не отримано повідомлення на авторизацію. Якщо авторизація пройде успішно, повідомлення проходять далі спільно з профілем користувача.

І наостанок зробимо так, щоб усім підключеним користувачам кожну секунду відправлялося поточний час:
case object Tick extends WsOutgoing
implicit val WsOutgoingEncoder: Encoder[WsOutgoing] = {
case s: Say ⇒ s.asJson
case Tick ⇒ Json.obj("time" → DateTime.now.toIsoDateTimeString().asJson)
}

...

val broadcast = Source.tick[WsOutgoing](1.second, 1.second, Tick)

...

.collect {
case (ClientContext(userName), Say(text)) ⇒ Say(s"$userName: $text")
case (_, Say(text)) ⇒ Say(s"unknown: $text")
}
.merge(broadcast)
.mapAsync(CORE_COUNT * 2 - 1)(out ⇒ Future(TextMessage(out.asJson.noSpaces)))

Це базові приклади того, як можна реалізувати підтримку WebSocket у своєму проекті. Пакет Akka Stream великий і разннобразный, він допоможе вирішити досить великий пласт завдань, не переживаючи за масштабування і параллелизацию.

PS: Використовуючи нову для вас технологію в більш-менш навантаженому проекті, не забувайте проводити навантажувальне тестування, стежити за пам'яттю і гарячими ділянками коду (у цьому вам може допомогти gatling). Всім добра.
Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.