Skip to content

Commit de70eb2

Browse files
committed
feat grpc: perform ProcessCall in a subtask
This allows Congestion Control to kill grpc handler tasks in case of service overload. commit_hash:910e8a82b62d60d415d0237fe02162d3d4bbe1b0
1 parent 6a6e036 commit de70eb2

9 files changed

Lines changed: 283 additions & 111 deletions

File tree

.mapping.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2500,6 +2500,7 @@
25002500
"grpc/include/userver/ugrpc/server/impl/completion_queue_pool.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/completion_queue_pool.hpp",
25012501
"grpc/include/userver/ugrpc/server/impl/error_code.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/error_code.hpp",
25022502
"grpc/include/userver/ugrpc/server/impl/exceptions.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/exceptions.hpp",
2503+
"grpc/include/userver/ugrpc/server/impl/ratelimit_metadata.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/ratelimit_metadata.hpp",
25032504
"grpc/include/userver/ugrpc/server/impl/request_async_call.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/request_async_call.hpp",
25042505
"grpc/include/userver/ugrpc/server/impl/rpc.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/rpc.hpp",
25052506
"grpc/include/userver/ugrpc/server/impl/service_internals.hpp":"taxi/uservices/userver/grpc/include/userver/ugrpc/server/impl/service_internals.hpp",
@@ -2651,6 +2652,8 @@
26512652
"grpc/src/ugrpc/server/impl/generic_service_worker.hpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/generic_service_worker.hpp",
26522653
"grpc/src/ugrpc/server/impl/parse_config.cpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/parse_config.cpp",
26532654
"grpc/src/ugrpc/server/impl/parse_config.hpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/parse_config.hpp",
2655+
"grpc/src/ugrpc/server/impl/ratelimit_metadata.cpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/ratelimit_metadata.cpp",
2656+
"grpc/src/ugrpc/server/impl/request_async_call.cpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/request_async_call.cpp",
26542657
"grpc/src/ugrpc/server/impl/rpc.cpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/rpc.cpp",
26552658
"grpc/src/ugrpc/server/impl/service_defaults.hpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/service_defaults.hpp",
26562659
"grpc/src/ugrpc/server/impl/service_worker.cpp":"taxi/uservices/userver/grpc/src/ugrpc/server/impl/service_worker.cpp",
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include <grpcpp/server_context.h>
4+
5+
USERVER_NAMESPACE_BEGIN
6+
7+
namespace ugrpc::server::impl {
8+
9+
void AddRatelimitMetadata(grpc::ServerContext& server_context);
10+
11+
} // namespace ugrpc::server::impl
12+
13+
USERVER_NAMESPACE_END
Lines changed: 56 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,78 @@
11
#pragma once
22

3+
#include <grpcpp/support/async_stream.h>
4+
#include <grpcpp/support/async_unary_call.h>
5+
36
#include <userver/ugrpc/impl/async_service.hpp>
4-
#include <userver/ugrpc/server/impl/call_traits.hpp>
57

68
USERVER_NAMESPACE_BEGIN
79

810
namespace ugrpc::server::impl {
911

10-
template <typename CallTraits, typename Service>
12+
template <typename GrpcppService, typename Request, typename Response>
1113
void RequestAsyncCall(
12-
ugrpc::impl::AsyncService<Service>& async_service,
14+
ugrpc::impl::AsyncService<GrpcppService>& async_service,
1315
int method_id,
14-
grpc::ServerContext& server_context,
15-
typename CallTraits::InitialRequest& initial_request,
16-
typename CallTraits::RawResponder& stream,
17-
grpc::CompletionQueue& call_cq,
18-
grpc::ServerCompletionQueue& notification_cq,
16+
grpc::ServerContext* server_context,
17+
Request* request,
18+
grpc::ServerAsyncResponseWriter<Response>* stream,
19+
grpc::CompletionQueue* call_cq,
20+
grpc::ServerCompletionQueue* notification_cq,
1921
void* tag
2022
) {
21-
if constexpr (CallTraits::kRpcType == RpcType::kUnary) {
22-
async_service
23-
.RequestAsyncUnary(method_id, &server_context, &initial_request, &stream, &call_cq, &notification_cq, tag);
24-
} else if constexpr (CallTraits::kRpcType == RpcType::kClientStreaming) {
25-
async_service.RequestAsyncClientStreaming(method_id, &server_context, &stream, &call_cq, &notification_cq, tag);
26-
} else if constexpr (CallTraits::kRpcType == RpcType::kServerStreaming) {
27-
async_service.RequestAsyncServerStreaming(
28-
method_id,
29-
&server_context,
30-
&initial_request,
31-
&stream,
32-
&call_cq,
33-
&notification_cq,
34-
tag
35-
);
36-
} else if constexpr (CallTraits::kRpcType == RpcType::kBidiStreaming) {
37-
async_service.RequestAsyncBidiStreaming(method_id, &server_context, &stream, &call_cq, &notification_cq, tag);
38-
} else {
39-
static_assert(!sizeof(CallTraits), "Invalid kCallCategory");
40-
}
23+
async_service.RequestAsyncUnary(method_id, server_context, request, stream, call_cq, notification_cq, tag);
4124
}
4225

43-
template <typename CallTraits>
26+
template <typename GrpcppService, typename Request, typename Response>
4427
void RequestAsyncCall(
45-
ugrpc::impl::AsyncGenericService& async_generic_service,
46-
int /*method_id*/,
47-
grpc::GenericServerContext& server_context,
48-
NoInitialRequest& /*initial_request*/,
49-
grpc::GenericServerAsyncReaderWriter& stream,
50-
grpc::CompletionQueue& call_cq,
51-
grpc::ServerCompletionQueue& notification_cq,
28+
ugrpc::impl::AsyncService<GrpcppService>& async_service,
29+
int method_id,
30+
grpc::ServerContext* server_context,
31+
grpc::ServerAsyncReader<Response, Request>* stream,
32+
grpc::CompletionQueue* call_cq,
33+
grpc::ServerCompletionQueue* notification_cq,
34+
void* tag
35+
) {
36+
async_service.RequestAsyncClientStreaming(method_id, server_context, stream, call_cq, notification_cq, tag);
37+
}
38+
39+
template <typename GrpcppService, typename Request, typename Response>
40+
void RequestAsyncCall(
41+
ugrpc::impl::AsyncService<GrpcppService>& async_service,
42+
int method_id,
43+
grpc::ServerContext* server_context,
44+
Request* request,
45+
grpc::ServerAsyncWriter<Response>* stream,
46+
grpc::CompletionQueue* call_cq,
47+
grpc::ServerCompletionQueue* notification_cq,
5248
void* tag
5349
) {
54-
async_generic_service.GetAsyncGenericService()
55-
.RequestCall(&server_context, &stream, &call_cq, &notification_cq, tag);
50+
async_service
51+
.RequestAsyncServerStreaming(method_id, server_context, request, stream, call_cq, notification_cq, tag);
5652
}
5753

54+
template <typename GrpcppService, typename Request, typename Response>
55+
void RequestAsyncCall(
56+
ugrpc::impl::AsyncService<GrpcppService>& async_service,
57+
int method_id,
58+
grpc::ServerContext* server_context,
59+
grpc::ServerAsyncReaderWriter<Response, Request>* stream,
60+
grpc::CompletionQueue* call_cq,
61+
grpc::ServerCompletionQueue* notification_cq,
62+
void* tag
63+
) {
64+
async_service.RequestAsyncBidiStreaming(method_id, server_context, stream, call_cq, notification_cq, tag);
65+
}
66+
67+
void RequestAsyncCall(
68+
ugrpc::impl::AsyncGenericService& async_generic_service,
69+
grpc::GenericServerContext* server_context,
70+
grpc::GenericServerAsyncReaderWriter* stream,
71+
grpc::CompletionQueue* call_cq,
72+
grpc::ServerCompletionQueue* notification_cq,
73+
void* tag
74+
);
75+
5876
} // namespace ugrpc::server::impl
5977

6078
USERVER_NAMESPACE_END

grpc/include/userver/ugrpc/server/impl/rpc.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,10 @@ template <typename CallTraits>
181181
// to ensure compliance with gRPC spec
182182
impl::NormalizeStatus(status);
183183

184-
if constexpr (!IsSingleResponseMethod(CallTraits::kRpcType)) {
185-
return impl::Finish(raw_responder_, status);
186-
} else {
184+
if constexpr (IsSingleResponseMethod(CallTraits::kRpcType)) {
187185
return impl::FinishWithError(raw_responder_, status);
186+
} else {
187+
return impl::Finish(raw_responder_, status);
188188
}
189189
}
190190

@@ -193,13 +193,13 @@ template <typename CallTraits>
193193
UINVARIANT(!is_finished_, "'Finish' called on a finished stream");
194194
is_finished_ = true;
195195

196-
if constexpr (!IsSingleResponseMethod(CallTraits::kRpcType)) {
196+
if constexpr (IsSingleResponseMethod(CallTraits::kRpcType)) {
197+
return impl::Finish(raw_responder_, response, grpc::Status::OK);
198+
} else {
197199
// Don't buffer writes, optimize for ping-pong-style interaction.
198200
const grpc::WriteOptions write_options{};
199201

200202
return impl::WriteAndFinish(raw_responder_, response, write_options, grpc::Status::OK);
201-
} else {
202-
return impl::Finish(raw_responder_, response, grpc::Status::OK);
203203
}
204204
}
205205

0 commit comments

Comments
 (0)