博多南ウェブサービスのblog

博多南ウェブサービスのサービス紹介

【Play Framework 2.8.x のServer-Sent Events(SSE) サンプル】Akka Receptionist(Akka Cluster) を使って、複数サーバー間でのメッセージ送信

Play Framework を始めたばかりの方向けに、サンプルを進めるうえで困ったところを共有する目的で書いています。

前回のPlay Framework 2.8.x とAkka Typed を使ったServer-Sent Events において、複数サーバー間でのメッセージ送信をAkka Receptionist によって実装します。

以下、目次

やりたいこと

localhost:9000 にアクセスしたクライアントがサーバより受ける同じメッセージを、 localhost:9001 にアクセスしたクライアントも受けることができる

見た目

localhost:9000 と localhost:9001 は別のPlay インスタンス

f:id:hakataminamiWS:20210821002823g:plain
複数サーバー間でのSSE

設計のポイント

Receptionist は、(私の理解では)他のActor へのReference を取得する ための仕組み(で、local, cluster のどちらのActor もサポートしている)。

Receptionist にメッセージをPush するActor(のRef )を登録(自動的に各サーバーのReceptionist へ登録内容を伝搬)し、Push 時にはReceptionist から宛先を取得することで複数サーバーへのメッセージ送信を実現する。

Server1
Sender
Sen...
Receptionist
Receptionist
Receptionist
Receptionist
Actor2(Ref)
Actor2(Ref)
Actor3(Ref)
Actor3(Ref)
Actor4(Ref)
Actor4(Ref)
伝搬
伝搬
Actor4
Act...
Actor3
Act...
Actor1(Ref)
Actor1(Ref)
Actor2(Ref)
Actor2(Ref)
Actor3(Ref)
Actor3(Ref)
Actor4(Ref)
Actor4(Ref)
Actor1(Ref)
Actor1(Ref)
Actor2
Act...
Actor1
Act...
Server2
Client1
Client1
Client2
Client2
Client3
Client3
Client4
Client4
message
message
Find
Find
Viewer does not support full SVG 1.1

実装してみて

  • Receptionist に、Client へPush するActor (ActorSource.actorRef) を直接登録するとdeadLetters が発生するのかうまくいかない(下のログ参照)
  • なので、各サーバー内のActorSource.actorRef を管理するManager Actor をReceptionist に登録、Push 時には、Sender -> Manager とメッセージをリレーしている
DEBUG akka.remote.artery.Decoder Decoder(akka://application) Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.InternalClusterAction$InitJoin]
DEBUG a.r.a.c.InboundManifestCompression InboundManifestCompression(akka://application) Initializing InboundManifestCompression for originUid [3659362368514754290]
DEBUG a.r.a.c.InboundActorRefCompression InboundActorRefCompression(akka://application) Initializing InboundActorRefCompression for originUid [3659362368514754290]
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.remote.serialization.ArteryMessageSerializer] for message [akka.remote.artery.OutboundHandshake$HandshakeRsp]
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.InternalClusterAction$InitJoinNack]
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoinAck message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon#-104449099]] to [akka://application@127.0.0.1:2552]
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.InternalClusterAction$Join]
INFO  akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Welcome from [akka://application@127.0.0.1:2551]
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.GossipEnvelope]
DEBUG akka.cluster.sbr.SplitBrainResolver akka://application/system/cluster/core/daemon/downingProvider SBR add Up [Member(akka://application@127.0.0.1:2551, Up)]
DEBUG akka.cluster.sbr.SplitBrainResolver akka://application/system/cluster/core/daemon/downingProvider SBR reset stable deadline when members/unreachable changed
DEBUG akka.cluster.sbr.SplitBrainResolver akka://application/system/cluster/core/daemon/downingProvider SBR add Joining/WeaklyUp [Member(akka://application@127.0.0.1:2552, Joining)]
DEBUG a.c.t.i.r.ClusterReceptionist akka://application/system/clusterReceptionist ClusterReceptionist [akka://application@127.0.0.1:2552] - Node added [UniqueAddress(akka://application@127.0.0.1:2551,3659362368514754290)]
DEBUG akka.cluster.ddata.Replicator akka://application@127.0.0.1:2552/system/clusterReceptionist/replicator Received gossip status from [akka://application@127.0.0.1:2551], chunk [1] of [1] containing [ReceptionistKey_3].
DEBUG akka.cluster.ddata.Replicator akka://application@127.0.0.1:2552/system/clusterReceptionist/replicator Sending gossip to [akka://application@127.0.0.1:2551], containing [ReceptionistKey_3]
DEBUG akka.remote.artery.Association Association(akka://application) Using priority message stream for akka://application@127.0.0.1:2551/system/cluster/core/daemon/heartbeatSender
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.ClusterHeartbeatSender$HeartbeatRsp]
DEBUG akka.cluster.ddata.Replicator akka://application@127.0.0.1:2552/system/clusterReceptionist/replicator Created [1] Gossip messages from [1] data entries.
DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.ddata.protobuf.ReplicatorMessageSerializer] for message [akka.cluster.ddata.Replicator$Internal$Gossip]
DEBUG a.a.L.Deserialization akka.actor.LocalActorRefProvider.Deserialization Resolve (deserialization) of path [system/Materializers/StreamSupervisor-0/$$d-actorRefSource#1662435177] doesn't match an active actor. It has probably been stopped, using deadLetters.

Githubこちら

以上でした。