Play Framework を始めたばかりの方向けに、サンプルを進めるうえで困ったところを共有する目的で書いています。
前回のPlay Framework 2.8.x とAkka Typed を使ったServer-Sent Events において、複数サーバー間でのメッセージ送信をAkka Receptionist によって実装します。
以下、目次
やりたいこと
localhost:9000 にアクセスしたクライアントがサーバより受ける同じメッセージを、 localhost:9001 にアクセスしたクライアントも受けることができる
見た目
localhost:9000 と localhost:9001 は別のPlay インスタンス
設計のポイント
Receptionist は、(私の理解では)他のActor へのReference を取得する ための仕組み(で、local, cluster のどちらのActor もサポートしている)。
Receptionist にメッセージをPush するActor(のRef )を登録(自動的に各サーバーのReceptionist へ登録内容を伝搬)し、Push 時にはReceptionist から宛先を取得することで複数サーバーへのメッセージ送信を実現する。
実装してみて
- Receptionist に、Client へPush するActor (ActorSource.actorRef) を直接登録するとdeadLetters が発生するのかうまくいかない(下のログ参照)
- なので、各サーバー内のActorSource.actorRef を管理するManager Actor をReceptionist に登録、Push 時には、Sender -> Manager とメッセージをリレーしている
DEBUG akka.remote.artery.Decoder Decoder(akka://application) Decoded message but unable to record hits for compression as no remoteAddress known. No association yet? DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.InternalClusterAction$InitJoin] DEBUG a.r.a.c.InboundManifestCompression InboundManifestCompression(akka://application) Initializing InboundManifestCompression for originUid [3659362368514754290] DEBUG a.r.a.c.InboundActorRefCompression InboundActorRefCompression(akka://application) Initializing InboundActorRefCompression for originUid [3659362368514754290] DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.remote.serialization.ArteryMessageSerializer] for message [akka.remote.artery.OutboundHandshake$HandshakeRsp] INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.InternalClusterAction$InitJoinNack] INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoin message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon/firstSeedNodeProcess-1#-1187705133]], but this node is not initialized yet INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Received InitJoinAck message from [Actor[akka://application@127.0.0.1:2551/system/cluster/core/daemon#-104449099]] to [akka://application@127.0.0.1:2552] DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.InternalClusterAction$Join] INFO akka.cluster.Cluster Cluster(akka://application) Cluster Node [akka://application@127.0.0.1:2552] - Welcome from [akka://application@127.0.0.1:2551] DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.GossipEnvelope] DEBUG akka.cluster.sbr.SplitBrainResolver akka://application/system/cluster/core/daemon/downingProvider SBR add Up [Member(akka://application@127.0.0.1:2551, Up)] DEBUG akka.cluster.sbr.SplitBrainResolver akka://application/system/cluster/core/daemon/downingProvider SBR reset stable deadline when members/unreachable changed DEBUG akka.cluster.sbr.SplitBrainResolver akka://application/system/cluster/core/daemon/downingProvider SBR add Joining/WeaklyUp [Member(akka://application@127.0.0.1:2552, Joining)] DEBUG a.c.t.i.r.ClusterReceptionist akka://application/system/clusterReceptionist ClusterReceptionist [akka://application@127.0.0.1:2552] - Node added [UniqueAddress(akka://application@127.0.0.1:2551,3659362368514754290)] DEBUG akka.cluster.ddata.Replicator akka://application@127.0.0.1:2552/system/clusterReceptionist/replicator Received gossip status from [akka://application@127.0.0.1:2551], chunk [1] of [1] containing [ReceptionistKey_3]. DEBUG akka.cluster.ddata.Replicator akka://application@127.0.0.1:2552/system/clusterReceptionist/replicator Sending gossip to [akka://application@127.0.0.1:2551], containing [ReceptionistKey_3] DEBUG akka.remote.artery.Association Association(akka://application) Using priority message stream for akka://application@127.0.0.1:2551/system/cluster/core/daemon/heartbeatSender DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.protobuf.ClusterMessageSerializer] for message [akka.cluster.ClusterHeartbeatSender$HeartbeatRsp] DEBUG akka.cluster.ddata.Replicator akka://application@127.0.0.1:2552/system/clusterReceptionist/replicator Created [1] Gossip messages from [1] data entries. DEBUG a.s.Serialization(akka://application) akka.serialization.Serialization(akka://application) Using serializer [akka.cluster.ddata.protobuf.ReplicatorMessageSerializer] for message [akka.cluster.ddata.Replicator$Internal$Gossip] DEBUG a.a.L.Deserialization akka.actor.LocalActorRefProvider.Deserialization Resolve (deserialization) of path [system/Materializers/StreamSupervisor-0/$$d-actorRefSource#1662435177] doesn't match an active actor. It has probably been stopped, using deadLetters.
以上でした。