@@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {
2525
2626 bool withProxy = Context->Config .HasProxy () && !Context->Config .GetProxy ().GetHostname ().Empty ();
2727 if (withProxy) {
28- SendResponseOkAndDie (Context->Config .GetProxy ().GetHostname (), Context->Config .GetProxy ().GetPort (), - 1 , ctx);
28+ SendResponseOkAndDie (Context->Config .GetProxy ().GetHostname (), Context->Config .GetProxy ().GetPort (), NKafka::ProxyNodeId , ctx);
2929 return ;
3030 }
3131
@@ -54,6 +54,8 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
5454 response->Port = port;
5555 response->NodeId = nodeId;
5656
57+ KAFKA_LOG_D (" FIND_COORDINATOR response. Host#: " << host << " , Port#: " << port << " , NodeId# " << nodeId);
58+
5759 Send (Context->ConnectionId , new TEvKafka::TEvResponse (CorrelationId, response, static_cast <EKafkaErrors>(response->ErrorCode )));
5860 Die (ctx);
5961}
@@ -71,15 +73,21 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons
7173
7274 response->Coordinators .push_back (coordinator);
7375 }
74-
76+
77+ response->ErrorCode = error;
78+
7579 Send (Context->ConnectionId , new TEvKafka::TEvResponse (CorrelationId, response, static_cast <EKafkaErrors>(response->ErrorCode )));
7680 Die (ctx);
7781}
7882
7983void TKafkaFindCoordinatorActor::Handle (NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
8084 auto iter = ev->Get ()->NodeIdsMapping ->find (ctx.SelfID .NodeId ());
8185 Y_ABORT_UNLESS (!iter.IsEnd ());
86+
8287auto host = (*ev->Get ()->Nodes )[iter->second ].Host ;
88+ if (host.StartsWith (UnderlayPrefix)) {
89+ host = host.substr (sizeof (UnderlayPrefix) - 1 );
90+ }
8391 KAFKA_LOG_D (" FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
8492 SendResponseOkAndDie (host, Context->Config .GetListeningPort (), ctx.SelfID .NodeId (), ctx);
8593}
0 commit comments