Skip to content

Commit b4fe421

Browse files
authored
Attempt to solve problem with verify while ydb is shutting down pqv0 25-1-3 (#21729)
2 parents 442bd39 + 5c32f94 commit b4fe421

File tree

5 files changed

+30
-7
lines changed

5 files changed

+30
-7
lines changed

ydb/library/grpc/server/grpc_server.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ void TGRpcServer::Start() {
273273
}
274274

275275
void TGRpcServer::Stop() {
276+
NYdbGrpc::GrpcDead = 1;
276277
for (auto& service : Services_) {
277278
service->StopService();
278279
}
@@ -304,8 +305,8 @@ void TGRpcServer::Stop() {
304305
}
305306

306307
if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat()) {
307-
Cerr << "GRpc shutdown warning: failed to shutdown all connections, left infly: " << infly << ", spent: " << spent << " sec"
308-
<< Endl;
308+
Cerr << "GRpc shutdown warning: failed to shutdown all connections, left infly: " << infly << ", spent: " << spent
309+
<< " sec. GRpcShutdownDeadline: " << Options_.GRpcShutdownDeadline.SecondsFloat() << Endl;
309310
break;
310311
}
311312
Sleep(TDuration::MilliSeconds(10));

ydb/library/grpc/server/grpc_server.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ namespace NMonitoring {
2424

2525
namespace NYdbGrpc {
2626

27+
static std::atomic<int> GrpcDead = 0;
28+
2729
struct TSslData {
2830
TString Cert;
2931
TString Key;

ydb/services/deprecated/persqueue_v0/grpc_pq_read.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ class TPQReadService : public IPQClustersUpdaterCallback, public std::enable_sha
7979

8080
void StopService() {
8181
AtomicSet(ShuttingDown_, 1);
82+
auto g(Guard(Lock));
83+
for (auto it = Sessions.begin(); it != Sessions.end();) {
84+
auto jt = it++;
85+
jt->second->DestroyStream("Grpc server is dead", NPersQueue::NErrorCode::BAD_REQUEST);
86+
}
8287
}
8388

8489
bool IsShuttingDown() const {

ydb/services/deprecated/persqueue_v0/grpc_pq_session.h

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,18 @@ class ISession : public ISessionHandler<TResponse>
118118
Session->HaveWriteInflight = false;
119119
if (Session->NeedFinish) {
120120
lock.Release();
121-
Session->Stream.Finish(Status::OK, new TFinishDone(Session));
121+
if (!NYdbGrpc::GrpcDead) {
122+
Session->Stream.Finish(Status::OK, new TFinishDone(Session));
123+
}
122124
}
123125
} else {
124126
auto resp = std::move(Session->Responses.front());
125127
Session->Responses.pop();
126128
lock.Release();
127129
ui64 sz = resp.ByteSize();
128-
Session->Stream.Write(resp, new TWriteDone(Session, sz));
130+
if (!NYdbGrpc::GrpcDead) {
131+
Session->Stream.Write(resp, new TWriteDone(Session, sz));
132+
}
129133
}
130134

131135
return false;
@@ -271,7 +275,9 @@ class ISession : public ISessionHandler<TResponse>
271275
HaveWriteInflight = true;
272276
}
273277

274-
Stream.Finish(Status::OK, new TFinishDone(this));
278+
if (!NYdbGrpc::GrpcDead) {
279+
Stream.Finish(Status::OK, new TFinishDone(this));
280+
}
275281
}
276282

277283
/// Send reply to client.
@@ -289,7 +295,9 @@ class ISession : public ISessionHandler<TResponse>
289295
}
290296

291297
ui64 size = resp.ByteSize();
292-
Stream.Write(resp, new TWriteDone(this, size));
298+
if (!NYdbGrpc::GrpcDead) {
299+
Stream.Write(resp, new TWriteDone(this, size));
300+
}
293301
}
294302

295303
void ReadyForNextRead() override {
@@ -301,7 +309,9 @@ class ISession : public ISessionHandler<TResponse>
301309
}
302310

303311
auto read = new TReadDone(this);
304-
Stream.Read(&read->Request, read);
312+
if (!NYdbGrpc::GrpcDead) {
313+
Stream.Read(&read->Request, read);
314+
}
305315
}
306316

307317
protected:

ydb/services/deprecated/persqueue_v0/grpc_pq_write.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ class TPQWriteServiceImpl : public IPQClustersUpdaterCallback, public std::enabl
6969

7070
void StopService() {
7171
AtomicSet(ShuttingDown_, 1);
72+
auto g(Guard(Lock));
73+
for (auto it = Sessions.begin(); it != Sessions.end();) {
74+
auto jt = it++;
75+
jt->second->DestroyStream("Grpc server is dead", NPersQueue::NErrorCode::BAD_REQUEST);
76+
}
7277
}
7378

7479
bool IsShuttingDown() const {

0 commit comments

Comments
 (0)