@@ -63,13 +63,13 @@ inline TRetentionsConversionResult ConvertRetentions(std::optional<TString> rete
6363 RETENTION_MS_CONFIG_NAME,
6464 [&result](std::optional<ui64> retention) -> void { result.Ms = retention; }
6565 );
66-
66+
6767 convertRetention (
6868 retentionBytes,
6969 RETENTION_BYTES_CONFIG_NAME,
7070 [&result](std::optional<ui64> retention) -> void { result.Bytes = retention; }
7171 );
72-
72+
7373 return result;
7474}
7575
@@ -107,7 +107,7 @@ inline std::optional<THolder<TEvKafka::TEvTopicModificationResponse>> ValidateTo
107107 } else {
108108 return std::optional<THolder<TEvKafka::TEvTopicModificationResponse>>();
109109 }
110- }
110+ }
111111
112112template <class T >
113113inline std::unordered_set<TString> ExtractDuplicates (
@@ -134,15 +134,15 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
134134public:
135135
136136 TAlterTopicActor (
137- TActorId requester,
137+ TActorId requester,
138138 TIntrusiveConstPtr<NACLib::TUserToken> userToken,
139139 TString topicPath,
140140 TString databaseName)
141141 : TBase(new U(
142142 userToken,
143143 topicPath,
144144 databaseName,
145- [this ](const EKafkaErrors status, const std::string& message) {
145+ [this ](const EKafkaErrors status, const std::string& message, const ::google::protobuf::Message& ) {
146146 this ->SendResult (status,TString{message});
147147 })
148148 )
@@ -176,15 +176,15 @@ class TAlterTopicActor : public NKikimr::NGRpcProxy::V1::TUpdateSchemeActor<T, U
176176 const std::shared_ptr<TString> SerializedToken;
177177};
178178
179- class TKafkaTopicModificationRequest : public NKikimr ::NGRpcService::IRequestOpCtx {
179+ class TKafkaTopicRequestCtx : public NKikimr ::NGRpcService::IRequestOpCtx {
180180public:
181- using TRequest = TKafkaTopicModificationRequest ;
181+ using TRequest = TKafkaTopicRequestCtx ;
182182
183- TKafkaTopicModificationRequest (
183+ TKafkaTopicRequestCtx (
184184 TIntrusiveConstPtr<NACLib::TUserToken> userToken,
185185 TString topicPath,
186186 TString databaseName,
187- const std::function<void (const EKafkaErrors, const std::string&)> sendResultCallback)
187+ const std::function<void (const EKafkaErrors, const std::string&, const google::protobuf::Message& result )> sendResultCallback)
188188 : UserToken(userToken)
189189 , TopicPath(topicPath)
190190 , DatabaseName(databaseName)
@@ -239,7 +239,7 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
239239 };
240240
241241 void ReplyWithYdbStatus (Ydb::StatusIds::StatusCode status) override {
242- ProcessYdbStatusCode (status);
242+ ProcessYdbStatusCode (status, google::protobuf::Empty{} );
243243 };
244244
245245 void ReplyWithRpcStatus (grpc::StatusCode code, const TString& msg = " " , const TString& details = " " ) override {
@@ -334,26 +334,24 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
334334 }
335335
336336 void SendResult (const google::protobuf::Message& result, Ydb::StatusIds::StatusCode status) override {
337- Y_UNUSED (result);
338- ProcessYdbStatusCode (status);
337+ ProcessYdbStatusCode (status, result);
339338 };
340339
341340 void SendResult (
342341 const google::protobuf::Message& result,
343342 Ydb::StatusIds::StatusCode status,
344343 const google::protobuf::RepeatedPtrField<NKikimr::NGRpcService::TYdbIssueMessageType>& message) override {
345344
346- Y_UNUSED (result);
347345 Y_UNUSED (message);
348- ProcessYdbStatusCode (status);
346+ ProcessYdbStatusCode (status, result );
349347 };
350348
351349 const Ydb::Operations::OperationParams& operation_params () const {
352350 return DummyParams;
353351 }
354352
355- static TKafkaTopicModificationRequest * GetProtoRequest (std::shared_ptr<IRequestOpCtx> request) {
356- return static_cast <TKafkaTopicModificationRequest *>(&(*request));
353+ static TKafkaTopicRequestCtx * GetProtoRequest (std::shared_ptr<IRequestOpCtx> request) {
354+ return static_cast <TKafkaTopicRequestCtx *>(&(*request));
357355 }
358356
359357protected:
@@ -371,11 +369,12 @@ class TKafkaTopicModificationRequest : public NKikimr::NGRpcService::IRequestOpC
371369 const NKikimr::NGRpcService::TAuditLogParts DummyAuditLogParts;
372370 const TString TopicPath;
373371 const TString DatabaseName;
374- const std::function<void (const EKafkaErrors status, const std::string& message)> SendResultCallback;
372+ const std::function<void (const EKafkaErrors status, const std::string& message, const google::protobuf::Message& result )> SendResultCallback;
375373 NYql::TIssue Issue;
376374
377- void ProcessYdbStatusCode (Ydb::StatusIds::StatusCode& status) {
378- SendResultCallback (Convert (status), Issue.GetMessage ());
375+ void ProcessYdbStatusCode (Ydb::StatusIds::StatusCode& status, const google::protobuf::Message& result ) {
376+ SendResultCallback (Convert (status), Issue.GetMessage (), result );
379377 }
380378};
381- }
379+
380+ } // namespace NKafka
0 commit comments