Skip to content

Commit bd70d1e

Browse files
committed
fix grpc: coredump from ReadRemainingAndFinish & remove PingPongFinish
Tests: CI commit_hash:aaf5e31181ffc08d43627972e73ec9761a36b1a9
1 parent c9c177a commit bd70d1e

4 files changed

Lines changed: 43 additions & 104 deletions

File tree

grpc/include/userver/ugrpc/client/graceful_stream_finish.hpp

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,12 @@ namespace ugrpc::client {
2626
/// or the server returning an error status)
2727
template <typename Response>
2828
std::optional<std::size_t> ReadRemainingAndFinish(Reader<Response>& stream) noexcept {
29-
return impl::ReadRemainingAndFinish<Reader<Response>, Response>(stream);
29+
try {
30+
return impl::ReadRemainingAndFinish<Reader<Response>, Response>(stream);
31+
} catch (const std::exception& e) {
32+
LOG_WARNING() << "Failed to read remaining messages and finish - " << e;
33+
return std::nullopt;
34+
}
3035
}
3136

3237
/// @brief Announce end-of-input to the server, read all
@@ -41,40 +46,14 @@ std::optional<std::size_t> ReadRemainingAndFinish(Reader<Response>& stream) noex
4146
/// either reads or writes, or the server returning an error status)
4247
template <typename Request, typename Response>
4348
std::optional<std::size_t> ReadRemainingAndFinish(ReaderWriter<Request, Response>& stream) noexcept {
44-
const bool writes_done_success = stream.WritesDone();
45-
const std::optional<std::size_t>
46-
messages_remaining = impl::ReadRemainingAndFinish<ReaderWriter<Request, Response>, Response>(stream);
47-
return writes_done_success ? messages_remaining : std::nullopt;
48-
}
49-
50-
/// @brief Gracefully finish a ping-pong style interaction
51-
///
52-
/// 1. Announces end-of-output to the server
53-
/// 2. Ensures there are no more messages to read
54-
///
55-
/// @warning The method will hang indefenitely if the stream
56-
/// is never interrupted or closed by the server
57-
///
58-
/// @return true if the operation was successful; false if there are more messages
59-
/// in the stream to read or if the operation failed (i.e. due to task
60-
/// cancellation, the stream being already closed for writes,
61-
/// or the server returning an error status)
62-
template <typename Request, typename Response>
63-
[[nodiscard]] bool PingPongFinish(ReaderWriter<Request, Response>& stream) noexcept {
6449
try {
6550
const bool writes_done_success = stream.WritesDone();
66-
67-
Response response;
68-
if (!stream.Read(response)) {
69-
// As expected, there are no more messages in the stream
70-
return writes_done_success;
71-
}
72-
73-
LOG_WARNING() << "PingPongFinish: there are more messages to read from the stream";
74-
return false;
51+
const std::optional<std::size_t>
52+
messages_remaining = impl::ReadRemainingAndFinish<ReaderWriter<Request, Response>, Response>(stream);
53+
return writes_done_success ? messages_remaining : std::nullopt;
7554
} catch (const std::exception& e) {
76-
LOG_WARNING() << "PingPongFinish: failed to close the stream - " << e;
77-
return false;
55+
LOG_WARNING() << "Failed to read remaining messages and finish - " << e;
56+
return std::nullopt;
7857
}
7958
}
8059

grpc/include/userver/ugrpc/client/impl/graceful_stream_finish.hpp

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,13 @@ USERVER_NAMESPACE_BEGIN
1111
namespace ugrpc::client::impl {
1212

1313
template <typename Stream, typename Response>
14-
std::optional<std::size_t> ReadRemainingAndFinish(Stream& stream) noexcept {
15-
try {
16-
Response response;
17-
std::size_t num_messages = 0;
18-
while (stream.Read(response)) {
19-
++num_messages;
20-
}
21-
return num_messages;
22-
} catch (const std::exception& e) {
23-
LOG_WARNING() << "Failed to read remaining messages and finish - " << e;
24-
return std::nullopt;
14+
std::optional<std::size_t> ReadRemainingAndFinish(Stream& stream) {
15+
Response response;
16+
std::size_t num_messages = 0;
17+
while (stream.Read(response)) {
18+
++num_messages;
2519
}
20+
return num_messages;
2621
}
2722

2823
} // namespace ugrpc::client::impl

grpc/include/userver/ugrpc/client/stream.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class [[nodiscard]] Writer final {
146146
/// If destroyed early, the RPC is cancelled. The server gets @ref ugrpc::client::RpcInterruptedError
147147
/// and the `abandoned-error` metric is incremented. The connection stays open for reuse.
148148
/// gRPC provides no way to early-close a server-streaming RPC gracefully.
149-
// See @ref ugrpc::client::ReadRemainingAndFinish and @ref ugrpc::client::PingPongFinish for graceful completion.
149+
// See @ref ugrpc::client::ReadRemainingAndFinish for graceful completion.
150150
///
151151
/// `Read` and `AsyncRead` can throw if error status is received from server.
152152
/// User MUST NOT call `Read` or `AsyncRead` again after failure of any of these

grpc/tests/stream_test.cpp

Lines changed: 25 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -107,66 +107,6 @@ UTEST_F_MT(GrpcBidirectionalStream, BidirectionalStreamTest, 2) {
107107
);
108108
}
109109

110-
UTEST_F(GrpcBidirectionalStream, PingPongFinishOk) {
111-
auto stream = GetClient().Chat();
112-
113-
ASSERT_TRUE(stream.Write(sample::ugrpc::StreamGreetingRequest()));
114-
sample::ugrpc::StreamGreetingResponse response;
115-
ASSERT_TRUE(stream.Read(response));
116-
117-
ASSERT_TRUE(ugrpc::client::PingPongFinish(stream));
118-
119-
ASSERT_FALSE(stream.Write(sample::ugrpc::StreamGreetingRequest()));
120-
ASSERT_FALSE(stream.WritesDone());
121-
ASSERT_THROW(stream.WriteAndCheck(sample::ugrpc::StreamGreetingRequest()), ugrpc::client::RpcError);
122-
ASSERT_FALSE(stream.Read(response));
123-
ASSERT_THROW([[maybe_unused]] auto _ = stream.ReadAsync(response), ugrpc::client::RpcError);
124-
}
125-
126-
UTEST_F(GrpcBidirectionalStream, PingPongFinishNoMessages) {
127-
auto stream = GetClient().Chat();
128-
129-
ASSERT_TRUE(ugrpc::client::PingPongFinish(stream));
130-
131-
ASSERT_FALSE(stream.Write(sample::ugrpc::StreamGreetingRequest()));
132-
ASSERT_FALSE(stream.WritesDone());
133-
ASSERT_THROW(stream.WriteAndCheck(sample::ugrpc::StreamGreetingRequest()), ugrpc::client::RpcError);
134-
sample::ugrpc::StreamGreetingResponse response;
135-
ASSERT_FALSE(stream.Read(response));
136-
ASSERT_THROW([[maybe_unused]] auto _ = stream.ReadAsync(response), ugrpc::client::RpcError);
137-
}
138-
139-
UTEST_F(GrpcBidirectionalStream, PingPongFinishMoreMessages) {
140-
auto stream = GetClient().Chat();
141-
142-
ASSERT_TRUE(stream.Write(sample::ugrpc::StreamGreetingRequest()));
143-
// No 'Read' here
144-
145-
ASSERT_FALSE(ugrpc::client::PingPongFinish(stream));
146-
147-
ASSERT_FALSE(stream.Write(sample::ugrpc::StreamGreetingRequest()));
148-
ASSERT_FALSE(stream.WritesDone());
149-
ASSERT_THROW(stream.WriteAndCheck(sample::ugrpc::StreamGreetingRequest()), ugrpc::client::RpcError);
150-
sample::ugrpc::StreamGreetingResponse response;
151-
ASSERT_FALSE(stream.Read(response));
152-
ASSERT_THROW([[maybe_unused]] auto _ = stream.ReadAsync(response), ugrpc::client::RpcError);
153-
}
154-
155-
UTEST_F(GrpcBidirectionalStream, PingPongFinishAfterWritesDone) {
156-
auto stream = GetClient().Chat();
157-
158-
ASSERT_TRUE(stream.WritesDone());
159-
160-
ASSERT_FALSE(ugrpc::client::PingPongFinish(stream));
161-
162-
ASSERT_FALSE(stream.Write(sample::ugrpc::StreamGreetingRequest()));
163-
ASSERT_FALSE(stream.WritesDone());
164-
ASSERT_THROW(stream.WriteAndCheck(sample::ugrpc::StreamGreetingRequest()), ugrpc::client::RpcError);
165-
sample::ugrpc::StreamGreetingResponse response;
166-
ASSERT_FALSE(stream.Read(response));
167-
ASSERT_THROW([[maybe_unused]] auto _ = stream.ReadAsync(response), ugrpc::client::RpcError);
168-
}
169-
170110
UTEST_F(GrpcBidirectionalStream, BidirectionalStreamReadRemaining) {
171111
auto stream = GetClient().Chat();
172112

@@ -240,6 +180,31 @@ UTEST_F(GrpcBidirectionalStream, BidirectionalStreamDestroy) {
240180
EXPECT_EQ(get_metric(kStatus, {{"grpc_code", "UNKNOWN"}}), 0);
241181
}
242182

183+
namespace {
184+
185+
class UnitTestServiceChatAutoFinish final : public sample::ugrpc::UnitTestServiceBase {
186+
public:
187+
ChatResult Chat(CallContext& /*context*/, ChatReaderWriter& stream) override {
188+
stream.Write(sample::ugrpc::StreamGreetingResponse{});
189+
return grpc::Status::OK;
190+
}
191+
};
192+
193+
using GrpcBidirectionalStreamChatAutoFinish =
194+
ugrpc::tests::ServiceWithClientFixture<UnitTestServiceChatAutoFinish, sample::ugrpc::UnitTestServiceClient>;
195+
196+
} // namespace
197+
198+
UTEST_F(GrpcBidirectionalStreamChatAutoFinish, BidirectionalStreamWritesDoneThrowOnClosedStream) {
199+
auto stream = GetClient().Chat();
200+
201+
sample::ugrpc::StreamGreetingResponse response;
202+
ASSERT_TRUE(stream.Read(response));
203+
ASSERT_FALSE(stream.Read(response));
204+
205+
UASSERT_NO_THROW(ugrpc::client::ReadRemainingAndFinish(stream));
206+
}
207+
243208
UTEST_F(GrpcInputStream, InputStreamDestroy) {
244209
{
245210
const sample::ugrpc::StreamGreetingRequest request;

0 commit comments

Comments
 (0)