1 //===- RPCUtils.h - Utilities for building RPC APIs -------------*- C++ -*-===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 // Utilities to support construction of simple RPC APIs.
11 // The RPC utilities aim for ease of use (minimal conceptual overhead) for C++
12 // programmers, high performance, low memory overhead, and efficient use of the
13 // communications channel.
15 //===----------------------------------------------------------------------===//
17 #ifndef LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H
18 #define LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H
24 #include "llvm/ADT/STLExtras.h"
25 #include "llvm/ExecutionEngine/Orc/OrcError.h"
26 #include "llvm/ExecutionEngine/Orc/RPCSerialization.h"
27 #include "llvm/Support/MSVCErrorWorkarounds.h"
35 /// Base class of all fatal RPC errors (those that necessarily result in the
36 /// termination of the RPC session).
37 class RPCFatalError
: public ErrorInfo
<RPCFatalError
> {
42 /// RPCConnectionClosed is returned from RPC operations if the RPC connection
43 /// has already been closed due to either an error or graceful disconnection.
44 class ConnectionClosed
: public ErrorInfo
<ConnectionClosed
> {
47 std::error_code
convertToErrorCode() const override
;
48 void log(raw_ostream
&OS
) const override
;
51 /// BadFunctionCall is returned from handleOne when the remote makes a call with
52 /// an unrecognized function id.
54 /// This error is fatal because Orc RPC needs to know how to parse a function
55 /// call to know where the next call starts, and if it doesn't recognize the
56 /// function id it cannot parse the call.
57 template <typename FnIdT
, typename SeqNoT
>
59 : public ErrorInfo
<BadFunctionCall
<FnIdT
, SeqNoT
>, RPCFatalError
> {
63 BadFunctionCall(FnIdT FnId
, SeqNoT SeqNo
)
64 : FnId(std::move(FnId
)), SeqNo(std::move(SeqNo
)) {}
66 std::error_code
convertToErrorCode() const override
{
67 return orcError(OrcErrorCode::UnexpectedRPCCall
);
70 void log(raw_ostream
&OS
) const override
{
71 OS
<< "Call to invalid RPC function id '" << FnId
<< "' with "
72 "sequence number " << SeqNo
;
80 template <typename FnIdT
, typename SeqNoT
>
81 char BadFunctionCall
<FnIdT
, SeqNoT
>::ID
= 0;
83 /// InvalidSequenceNumberForResponse is returned from handleOne when a response
84 /// call arrives with a sequence number that doesn't correspond to any in-flight
87 /// This error is fatal because Orc RPC needs to know how to parse the rest of
88 /// the response call to know where the next call starts, and if it doesn't have
89 /// a result parser for this sequence number it can't do that.
90 template <typename SeqNoT
>
91 class InvalidSequenceNumberForResponse
92 : public ErrorInfo
<InvalidSequenceNumberForResponse
<SeqNoT
>, RPCFatalError
> {
96 InvalidSequenceNumberForResponse(SeqNoT SeqNo
)
97 : SeqNo(std::move(SeqNo
)) {}
99 std::error_code
convertToErrorCode() const override
{
100 return orcError(OrcErrorCode::UnexpectedRPCCall
);
103 void log(raw_ostream
&OS
) const override
{
104 OS
<< "Response has unknown sequence number " << SeqNo
;
110 template <typename SeqNoT
>
111 char InvalidSequenceNumberForResponse
<SeqNoT
>::ID
= 0;
113 /// This non-fatal error will be passed to asynchronous result handlers in place
114 /// of a result if the connection goes down before a result returns, or if the
115 /// function to be called cannot be negotiated with the remote.
116 class ResponseAbandoned
: public ErrorInfo
<ResponseAbandoned
> {
120 std::error_code
convertToErrorCode() const override
;
121 void log(raw_ostream
&OS
) const override
;
124 /// This error is returned if the remote does not have a handler installed for
125 /// the given RPC function.
126 class CouldNotNegotiate
: public ErrorInfo
<CouldNotNegotiate
> {
130 CouldNotNegotiate(std::string Signature
);
131 std::error_code
convertToErrorCode() const override
;
132 void log(raw_ostream
&OS
) const override
;
133 const std::string
&getSignature() const { return Signature
; }
135 std::string Signature
;
138 template <typename DerivedFunc
, typename FnT
> class Function
;
140 // RPC Function class.
141 // DerivedFunc should be a user defined class with a static 'getName()' method
142 // returning a const char* representing the function's name.
143 template <typename DerivedFunc
, typename RetT
, typename
... ArgTs
>
144 class Function
<DerivedFunc
, RetT(ArgTs
...)> {
146 /// User defined function type.
147 using Type
= RetT(ArgTs
...);
150 using ReturnType
= RetT
;
152 /// Returns the full function prototype as a string.
153 static const char *getPrototype() {
154 static std::string Name
= [] {
156 raw_string_ostream(Name
)
157 << RPCTypeName
<RetT
>::getName() << " " << DerivedFunc::getName()
158 << "(" << llvm::orc::rpc::RPCTypeNameSequence
<ArgTs
...>() << ")";
165 /// Allocates RPC function ids during autonegotiation.
166 /// Specializations of this class must provide four members:
168 /// static T getInvalidId():
169 /// Should return a reserved id that will be used to represent missing
170 /// functions during autonegotiation.
172 /// static T getResponseId():
173 /// Should return a reserved id that will be used to send function responses
176 /// static T getNegotiateId():
177 /// Should return a reserved id for the negotiate function, which will be used
178 /// to negotiate ids for user defined functions.
180 /// template <typename Func> T allocate():
181 /// Allocate a unique id for function Func.
182 template <typename T
, typename
= void> class RPCFunctionIdAllocator
;
184 /// This specialization of RPCFunctionIdAllocator provides a default
185 /// implementation for integral types.
186 template <typename T
>
187 class RPCFunctionIdAllocator
<
188 T
, typename
std::enable_if
<std::is_integral
<T
>::value
>::type
> {
190 static T
getInvalidId() { return T(0); }
191 static T
getResponseId() { return T(1); }
192 static T
getNegotiateId() { return T(2); }
194 template <typename Func
> T
allocate() { return NextId
++; }
202 /// Provides a typedef for a tuple containing the decayed argument types.
203 template <typename T
> class FunctionArgsTuple
;
205 template <typename RetT
, typename
... ArgTs
>
206 class FunctionArgsTuple
<RetT(ArgTs
...)> {
208 using Type
= std::tuple
<typename
std::decay
<
209 typename
std::remove_reference
<ArgTs
>::type
>::type
...>;
212 // ResultTraits provides typedefs and utilities specific to the return type
214 template <typename RetT
> class ResultTraits
{
216 // The return type wrapped in llvm::Expected.
217 using ErrorReturnType
= Expected
<RetT
>;
220 // The ErrorReturnType wrapped in a std::promise.
221 using ReturnPromiseType
= std::promise
<MSVCPExpected
<RetT
>>;
223 // The ErrorReturnType wrapped in a std::future.
224 using ReturnFutureType
= std::future
<MSVCPExpected
<RetT
>>;
226 // The ErrorReturnType wrapped in a std::promise.
227 using ReturnPromiseType
= std::promise
<ErrorReturnType
>;
229 // The ErrorReturnType wrapped in a std::future.
230 using ReturnFutureType
= std::future
<ErrorReturnType
>;
233 // Create a 'blank' value of the ErrorReturnType, ready and safe to
235 static ErrorReturnType
createBlankErrorReturnValue() {
236 return ErrorReturnType(RetT());
239 // Consume an abandoned ErrorReturnType.
240 static void consumeAbandoned(ErrorReturnType RetOrErr
) {
241 consumeError(RetOrErr
.takeError());
245 // ResultTraits specialization for void functions.
246 template <> class ResultTraits
<void> {
248 // For void functions, ErrorReturnType is llvm::Error.
249 using ErrorReturnType
= Error
;
252 // The ErrorReturnType wrapped in a std::promise.
253 using ReturnPromiseType
= std::promise
<MSVCPError
>;
255 // The ErrorReturnType wrapped in a std::future.
256 using ReturnFutureType
= std::future
<MSVCPError
>;
258 // The ErrorReturnType wrapped in a std::promise.
259 using ReturnPromiseType
= std::promise
<ErrorReturnType
>;
261 // The ErrorReturnType wrapped in a std::future.
262 using ReturnFutureType
= std::future
<ErrorReturnType
>;
265 // Create a 'blank' value of the ErrorReturnType, ready and safe to
267 static ErrorReturnType
createBlankErrorReturnValue() {
268 return ErrorReturnType::success();
271 // Consume an abandoned ErrorReturnType.
272 static void consumeAbandoned(ErrorReturnType Err
) {
273 consumeError(std::move(Err
));
277 // ResultTraits<Error> is equivalent to ResultTraits<void>. This allows
278 // handlers for void RPC functions to return either void (in which case they
279 // implicitly succeed) or Error (in which case their error return is
280 // propagated). See usage in HandlerTraits::runHandlerHelper.
281 template <> class ResultTraits
<Error
> : public ResultTraits
<void> {};
283 // ResultTraits<Expected<T>> is equivalent to ResultTraits<T>. This allows
284 // handlers for RPC functions returning a T to return either a T (in which
285 // case they implicitly succeed) or Expected<T> (in which case their error
286 // return is propagated). See usage in HandlerTraits::runHandlerHelper.
287 template <typename RetT
>
288 class ResultTraits
<Expected
<RetT
>> : public ResultTraits
<RetT
> {};
290 // Determines whether an RPC function's defined error return type supports
291 // error return value.
292 template <typename T
>
293 class SupportsErrorReturn
{
295 static const bool value
= false;
299 class SupportsErrorReturn
<Error
> {
301 static const bool value
= true;
304 template <typename T
>
305 class SupportsErrorReturn
<Expected
<T
>> {
307 static const bool value
= true;
310 // RespondHelper packages return values based on whether or not the declared
311 // RPC function return type supports error returns.
312 template <bool FuncSupportsErrorReturn
>
315 // RespondHelper specialization for functions that support error returns.
317 class RespondHelper
<true> {
321 template <typename WireRetT
, typename HandlerRetT
, typename ChannelT
,
322 typename FunctionIdT
, typename SequenceNumberT
>
323 static Error
sendResult(ChannelT
&C
, const FunctionIdT
&ResponseId
,
324 SequenceNumberT SeqNo
,
325 Expected
<HandlerRetT
> ResultOrErr
) {
326 if (!ResultOrErr
&& ResultOrErr
.template errorIsA
<RPCFatalError
>())
327 return ResultOrErr
.takeError();
329 // Open the response message.
330 if (auto Err
= C
.startSendMessage(ResponseId
, SeqNo
))
333 // Serialize the result.
335 SerializationTraits
<ChannelT
, WireRetT
,
336 Expected
<HandlerRetT
>>::serialize(
337 C
, std::move(ResultOrErr
)))
340 // Close the response message.
341 return C
.endSendMessage();
344 template <typename ChannelT
, typename FunctionIdT
, typename SequenceNumberT
>
345 static Error
sendResult(ChannelT
&C
, const FunctionIdT
&ResponseId
,
346 SequenceNumberT SeqNo
, Error Err
) {
347 if (Err
&& Err
.isA
<RPCFatalError
>())
349 if (auto Err2
= C
.startSendMessage(ResponseId
, SeqNo
))
351 if (auto Err2
= serializeSeq(C
, std::move(Err
)))
353 return C
.endSendMessage();
358 // RespondHelper specialization for functions that do not support error returns.
360 class RespondHelper
<false> {
363 template <typename WireRetT
, typename HandlerRetT
, typename ChannelT
,
364 typename FunctionIdT
, typename SequenceNumberT
>
365 static Error
sendResult(ChannelT
&C
, const FunctionIdT
&ResponseId
,
366 SequenceNumberT SeqNo
,
367 Expected
<HandlerRetT
> ResultOrErr
) {
368 if (auto Err
= ResultOrErr
.takeError())
371 // Open the response message.
372 if (auto Err
= C
.startSendMessage(ResponseId
, SeqNo
))
375 // Serialize the result.
377 SerializationTraits
<ChannelT
, WireRetT
, HandlerRetT
>::serialize(
381 // Close the response message.
382 return C
.endSendMessage();
385 template <typename ChannelT
, typename FunctionIdT
, typename SequenceNumberT
>
386 static Error
sendResult(ChannelT
&C
, const FunctionIdT
&ResponseId
,
387 SequenceNumberT SeqNo
, Error Err
) {
390 if (auto Err2
= C
.startSendMessage(ResponseId
, SeqNo
))
392 return C
.endSendMessage();
398 // Send a response of the given wire return type (WireRetT) over the
399 // channel, with the given sequence number.
400 template <typename WireRetT
, typename HandlerRetT
, typename ChannelT
,
401 typename FunctionIdT
, typename SequenceNumberT
>
402 Error
respond(ChannelT
&C
, const FunctionIdT
&ResponseId
,
403 SequenceNumberT SeqNo
, Expected
<HandlerRetT
> ResultOrErr
) {
404 return RespondHelper
<SupportsErrorReturn
<WireRetT
>::value
>::
405 template sendResult
<WireRetT
>(C
, ResponseId
, SeqNo
, std::move(ResultOrErr
));
408 // Send an empty response message on the given channel to indicate that
410 template <typename WireRetT
, typename ChannelT
, typename FunctionIdT
,
411 typename SequenceNumberT
>
412 Error
respond(ChannelT
&C
, const FunctionIdT
&ResponseId
, SequenceNumberT SeqNo
,
414 return RespondHelper
<SupportsErrorReturn
<WireRetT
>::value
>::
415 sendResult(C
, ResponseId
, SeqNo
, std::move(Err
));
418 // Converts a given type to the equivalent error return type.
419 template <typename T
> class WrappedHandlerReturn
{
421 using Type
= Expected
<T
>;
424 template <typename T
> class WrappedHandlerReturn
<Expected
<T
>> {
426 using Type
= Expected
<T
>;
429 template <> class WrappedHandlerReturn
<void> {
434 template <> class WrappedHandlerReturn
<Error
> {
439 template <> class WrappedHandlerReturn
<ErrorSuccess
> {
444 // Traits class that strips the response function from the list of handler
446 template <typename FnT
> class AsyncHandlerTraits
;
448 template <typename ResultT
, typename
... ArgTs
>
449 class AsyncHandlerTraits
<Error(std::function
<Error(Expected
<ResultT
>)>, ArgTs
...)> {
451 using Type
= Error(ArgTs
...);
452 using ResultType
= Expected
<ResultT
>;
455 template <typename
... ArgTs
>
456 class AsyncHandlerTraits
<Error(std::function
<Error(Error
)>, ArgTs
...)> {
458 using Type
= Error(ArgTs
...);
459 using ResultType
= Error
;
462 template <typename
... ArgTs
>
463 class AsyncHandlerTraits
<ErrorSuccess(std::function
<Error(Error
)>, ArgTs
...)> {
465 using Type
= Error(ArgTs
...);
466 using ResultType
= Error
;
469 template <typename
... ArgTs
>
470 class AsyncHandlerTraits
<void(std::function
<Error(Error
)>, ArgTs
...)> {
472 using Type
= Error(ArgTs
...);
473 using ResultType
= Error
;
476 template <typename ResponseHandlerT
, typename
... ArgTs
>
477 class AsyncHandlerTraits
<Error(ResponseHandlerT
, ArgTs
...)> :
478 public AsyncHandlerTraits
<Error(typename
std::decay
<ResponseHandlerT
>::type
,
481 // This template class provides utilities related to RPC function handlers.
482 // The base case applies to non-function types (the template class is
483 // specialized for function types) and inherits from the appropriate
484 // speciilization for the given non-function type's call operator.
485 template <typename HandlerT
>
486 class HandlerTraits
: public HandlerTraits
<decltype(
487 &std::remove_reference
<HandlerT
>::type::operator())> {
490 // Traits for handlers with a given function type.
491 template <typename RetT
, typename
... ArgTs
>
492 class HandlerTraits
<RetT(ArgTs
...)> {
494 // Function type of the handler.
495 using Type
= RetT(ArgTs
...);
497 // Return type of the handler.
498 using ReturnType
= RetT
;
500 // Call the given handler with the given arguments.
501 template <typename HandlerT
, typename
... TArgTs
>
502 static typename WrappedHandlerReturn
<RetT
>::Type
503 unpackAndRun(HandlerT
&Handler
, std::tuple
<TArgTs
...> &Args
) {
504 return unpackAndRunHelper(Handler
, Args
,
505 std::index_sequence_for
<TArgTs
...>());
508 // Call the given handler with the given arguments.
509 template <typename HandlerT
, typename ResponderT
, typename
... TArgTs
>
510 static Error
unpackAndRunAsync(HandlerT
&Handler
, ResponderT
&Responder
,
511 std::tuple
<TArgTs
...> &Args
) {
512 return unpackAndRunAsyncHelper(Handler
, Responder
, Args
,
513 std::index_sequence_for
<TArgTs
...>());
516 // Call the given handler with the given arguments.
517 template <typename HandlerT
>
518 static typename
std::enable_if
<
519 std::is_void
<typename HandlerTraits
<HandlerT
>::ReturnType
>::value
,
521 run(HandlerT
&Handler
, ArgTs
&&... Args
) {
522 Handler(std::move(Args
)...);
523 return Error::success();
526 template <typename HandlerT
, typename
... TArgTs
>
527 static typename
std::enable_if
<
528 !std::is_void
<typename HandlerTraits
<HandlerT
>::ReturnType
>::value
,
529 typename HandlerTraits
<HandlerT
>::ReturnType
>::type
530 run(HandlerT
&Handler
, TArgTs
... Args
) {
531 return Handler(std::move(Args
)...);
534 // Serialize arguments to the channel.
535 template <typename ChannelT
, typename
... CArgTs
>
536 static Error
serializeArgs(ChannelT
&C
, const CArgTs
... CArgs
) {
537 return SequenceSerialization
<ChannelT
, ArgTs
...>::serialize(C
, CArgs
...);
540 // Deserialize arguments from the channel.
541 template <typename ChannelT
, typename
... CArgTs
>
542 static Error
deserializeArgs(ChannelT
&C
, std::tuple
<CArgTs
...> &Args
) {
543 return deserializeArgsHelper(C
, Args
, std::index_sequence_for
<CArgTs
...>());
547 template <typename ChannelT
, typename
... CArgTs
, size_t... Indexes
>
548 static Error
deserializeArgsHelper(ChannelT
&C
, std::tuple
<CArgTs
...> &Args
,
549 std::index_sequence
<Indexes
...> _
) {
550 return SequenceSerialization
<ChannelT
, ArgTs
...>::deserialize(
551 C
, std::get
<Indexes
>(Args
)...);
554 template <typename HandlerT
, typename ArgTuple
, size_t... Indexes
>
555 static typename WrappedHandlerReturn
<
556 typename HandlerTraits
<HandlerT
>::ReturnType
>::Type
557 unpackAndRunHelper(HandlerT
&Handler
, ArgTuple
&Args
,
558 std::index_sequence
<Indexes
...>) {
559 return run(Handler
, std::move(std::get
<Indexes
>(Args
))...);
562 template <typename HandlerT
, typename ResponderT
, typename ArgTuple
,
564 static typename WrappedHandlerReturn
<
565 typename HandlerTraits
<HandlerT
>::ReturnType
>::Type
566 unpackAndRunAsyncHelper(HandlerT
&Handler
, ResponderT
&Responder
,
567 ArgTuple
&Args
, std::index_sequence
<Indexes
...>) {
568 return run(Handler
, Responder
, std::move(std::get
<Indexes
>(Args
))...);
572 // Handler traits for free functions.
573 template <typename RetT
, typename
... ArgTs
>
574 class HandlerTraits
<RetT(*)(ArgTs
...)>
575 : public HandlerTraits
<RetT(ArgTs
...)> {};
577 // Handler traits for class methods (especially call operators for lambdas).
578 template <typename Class
, typename RetT
, typename
... ArgTs
>
579 class HandlerTraits
<RetT (Class::*)(ArgTs
...)>
580 : public HandlerTraits
<RetT(ArgTs
...)> {};
582 // Handler traits for const class methods (especially call operators for
584 template <typename Class
, typename RetT
, typename
... ArgTs
>
585 class HandlerTraits
<RetT (Class::*)(ArgTs
...) const>
586 : public HandlerTraits
<RetT(ArgTs
...)> {};
588 // Utility to peel the Expected wrapper off a response handler error type.
589 template <typename HandlerT
> class ResponseHandlerArg
;
591 template <typename ArgT
> class ResponseHandlerArg
<Error(Expected
<ArgT
>)> {
593 using ArgType
= Expected
<ArgT
>;
594 using UnwrappedArgType
= ArgT
;
597 template <typename ArgT
>
598 class ResponseHandlerArg
<ErrorSuccess(Expected
<ArgT
>)> {
600 using ArgType
= Expected
<ArgT
>;
601 using UnwrappedArgType
= ArgT
;
604 template <> class ResponseHandlerArg
<Error(Error
)> {
606 using ArgType
= Error
;
609 template <> class ResponseHandlerArg
<ErrorSuccess(Error
)> {
611 using ArgType
= Error
;
614 // ResponseHandler represents a handler for a not-yet-received function call
616 template <typename ChannelT
> class ResponseHandler
{
618 virtual ~ResponseHandler() {}
620 // Reads the function result off the wire and acts on it. The meaning of
621 // "act" will depend on how this method is implemented in any given
622 // ResponseHandler subclass but could, for example, mean running a
623 // user-specified handler or setting a promise value.
624 virtual Error
handleResponse(ChannelT
&C
) = 0;
626 // Abandons this outstanding result.
627 virtual void abandon() = 0;
629 // Create an error instance representing an abandoned response.
630 static Error
createAbandonedResponseError() {
631 return make_error
<ResponseAbandoned
>();
635 // ResponseHandler subclass for RPC functions with non-void returns.
636 template <typename ChannelT
, typename FuncRetT
, typename HandlerT
>
637 class ResponseHandlerImpl
: public ResponseHandler
<ChannelT
> {
639 ResponseHandlerImpl(HandlerT Handler
) : Handler(std::move(Handler
)) {}
641 // Handle the result by deserializing it from the channel then passing it
642 // to the user defined handler.
643 Error
handleResponse(ChannelT
&C
) override
{
644 using UnwrappedArgType
= typename ResponseHandlerArg
<
645 typename HandlerTraits
<HandlerT
>::Type
>::UnwrappedArgType
;
646 UnwrappedArgType Result
;
648 SerializationTraits
<ChannelT
, FuncRetT
,
649 UnwrappedArgType
>::deserialize(C
, Result
))
651 if (auto Err
= C
.endReceiveMessage())
653 return Handler(std::move(Result
));
656 // Abandon this response by calling the handler with an 'abandoned response'
658 void abandon() override
{
659 if (auto Err
= Handler(this->createAbandonedResponseError())) {
660 // Handlers should not fail when passed an abandoned response error.
661 report_fatal_error(std::move(Err
));
669 // ResponseHandler subclass for RPC functions with void returns.
670 template <typename ChannelT
, typename HandlerT
>
671 class ResponseHandlerImpl
<ChannelT
, void, HandlerT
>
672 : public ResponseHandler
<ChannelT
> {
674 ResponseHandlerImpl(HandlerT Handler
) : Handler(std::move(Handler
)) {}
676 // Handle the result (no actual value, just a notification that the function
677 // has completed on the remote end) by calling the user-defined handler with
679 Error
handleResponse(ChannelT
&C
) override
{
680 if (auto Err
= C
.endReceiveMessage())
682 return Handler(Error::success());
685 // Abandon this response by calling the handler with an 'abandoned response'
687 void abandon() override
{
688 if (auto Err
= Handler(this->createAbandonedResponseError())) {
689 // Handlers should not fail when passed an abandoned response error.
690 report_fatal_error(std::move(Err
));
698 template <typename ChannelT
, typename FuncRetT
, typename HandlerT
>
699 class ResponseHandlerImpl
<ChannelT
, Expected
<FuncRetT
>, HandlerT
>
700 : public ResponseHandler
<ChannelT
> {
702 ResponseHandlerImpl(HandlerT Handler
) : Handler(std::move(Handler
)) {}
704 // Handle the result by deserializing it from the channel then passing it
705 // to the user defined handler.
706 Error
handleResponse(ChannelT
&C
) override
{
707 using HandlerArgType
= typename ResponseHandlerArg
<
708 typename HandlerTraits
<HandlerT
>::Type
>::ArgType
;
709 HandlerArgType
Result((typename
HandlerArgType::value_type()));
712 SerializationTraits
<ChannelT
, Expected
<FuncRetT
>,
713 HandlerArgType
>::deserialize(C
, Result
))
715 if (auto Err
= C
.endReceiveMessage())
717 return Handler(std::move(Result
));
720 // Abandon this response by calling the handler with an 'abandoned response'
722 void abandon() override
{
723 if (auto Err
= Handler(this->createAbandonedResponseError())) {
724 // Handlers should not fail when passed an abandoned response error.
725 report_fatal_error(std::move(Err
));
733 template <typename ChannelT
, typename HandlerT
>
734 class ResponseHandlerImpl
<ChannelT
, Error
, HandlerT
>
735 : public ResponseHandler
<ChannelT
> {
737 ResponseHandlerImpl(HandlerT Handler
) : Handler(std::move(Handler
)) {}
739 // Handle the result by deserializing it from the channel then passing it
740 // to the user defined handler.
741 Error
handleResponse(ChannelT
&C
) override
{
742 Error Result
= Error::success();
744 SerializationTraits
<ChannelT
, Error
, Error
>::deserialize(C
, Result
))
746 if (auto Err
= C
.endReceiveMessage())
748 return Handler(std::move(Result
));
751 // Abandon this response by calling the handler with an 'abandoned response'
753 void abandon() override
{
754 if (auto Err
= Handler(this->createAbandonedResponseError())) {
755 // Handlers should not fail when passed an abandoned response error.
756 report_fatal_error(std::move(Err
));
764 // Create a ResponseHandler from a given user handler.
765 template <typename ChannelT
, typename FuncRetT
, typename HandlerT
>
766 std::unique_ptr
<ResponseHandler
<ChannelT
>> createResponseHandler(HandlerT H
) {
767 return std::make_unique
<ResponseHandlerImpl
<ChannelT
, FuncRetT
, HandlerT
>>(
771 // Helper for wrapping member functions up as functors. This is useful for
772 // installing methods as result handlers.
773 template <typename ClassT
, typename RetT
, typename
... ArgTs
>
774 class MemberFnWrapper
{
776 using MethodT
= RetT (ClassT::*)(ArgTs
...);
777 MemberFnWrapper(ClassT
&Instance
, MethodT Method
)
778 : Instance(Instance
), Method(Method
) {}
779 RetT
operator()(ArgTs
&&... Args
) {
780 return (Instance
.*Method
)(std::move(Args
)...);
788 // Helper that provides a Functor for deserializing arguments.
789 template <typename
... ArgTs
> class ReadArgs
{
791 Error
operator()() { return Error::success(); }
794 template <typename ArgT
, typename
... ArgTs
>
795 class ReadArgs
<ArgT
, ArgTs
...> : public ReadArgs
<ArgTs
...> {
797 ReadArgs(ArgT
&Arg
, ArgTs
&... Args
)
798 : ReadArgs
<ArgTs
...>(Args
...), Arg(Arg
) {}
800 Error
operator()(ArgT
&ArgVal
, ArgTs
&... ArgVals
) {
801 this->Arg
= std::move(ArgVal
);
802 return ReadArgs
<ArgTs
...>::operator()(ArgVals
...);
809 // Manage sequence numbers.
810 template <typename SequenceNumberT
> class SequenceNumberManager
{
812 // Reset, making all sequence numbers available.
814 std::lock_guard
<std::mutex
> Lock(SeqNoLock
);
815 NextSequenceNumber
= 0;
816 FreeSequenceNumbers
.clear();
819 // Get the next available sequence number. Will re-use numbers that have
821 SequenceNumberT
getSequenceNumber() {
822 std::lock_guard
<std::mutex
> Lock(SeqNoLock
);
823 if (FreeSequenceNumbers
.empty())
824 return NextSequenceNumber
++;
825 auto SequenceNumber
= FreeSequenceNumbers
.back();
826 FreeSequenceNumbers
.pop_back();
827 return SequenceNumber
;
830 // Release a sequence number, making it available for re-use.
831 void releaseSequenceNumber(SequenceNumberT SequenceNumber
) {
832 std::lock_guard
<std::mutex
> Lock(SeqNoLock
);
833 FreeSequenceNumbers
.push_back(SequenceNumber
);
837 std::mutex SeqNoLock
;
838 SequenceNumberT NextSequenceNumber
= 0;
839 std::vector
<SequenceNumberT
> FreeSequenceNumbers
;
842 // Checks that predicate P holds for each corresponding pair of type arguments
843 // from T1 and T2 tuple.
844 template <template <class, class> class P
, typename T1Tuple
, typename T2Tuple
>
845 class RPCArgTypeCheckHelper
;
847 template <template <class, class> class P
>
848 class RPCArgTypeCheckHelper
<P
, std::tuple
<>, std::tuple
<>> {
850 static const bool value
= true;
853 template <template <class, class> class P
, typename T
, typename
... Ts
,
854 typename U
, typename
... Us
>
855 class RPCArgTypeCheckHelper
<P
, std::tuple
<T
, Ts
...>, std::tuple
<U
, Us
...>> {
857 static const bool value
=
859 RPCArgTypeCheckHelper
<P
, std::tuple
<Ts
...>, std::tuple
<Us
...>>::value
;
862 template <template <class, class> class P
, typename T1Sig
, typename T2Sig
>
863 class RPCArgTypeCheck
{
865 using T1Tuple
= typename FunctionArgsTuple
<T1Sig
>::Type
;
866 using T2Tuple
= typename FunctionArgsTuple
<T2Sig
>::Type
;
868 static_assert(std::tuple_size
<T1Tuple
>::value
>=
869 std::tuple_size
<T2Tuple
>::value
,
870 "Too many arguments to RPC call");
871 static_assert(std::tuple_size
<T1Tuple
>::value
<=
872 std::tuple_size
<T2Tuple
>::value
,
873 "Too few arguments to RPC call");
875 static const bool value
= RPCArgTypeCheckHelper
<P
, T1Tuple
, T2Tuple
>::value
;
878 template <typename ChannelT
, typename WireT
, typename ConcreteT
>
881 using S
= SerializationTraits
<ChannelT
, WireT
, ConcreteT
>;
883 template <typename T
>
884 static std::true_type
885 check(typename
std::enable_if
<
886 std::is_same
<decltype(T::serialize(std::declval
<ChannelT
&>(),
887 std::declval
<const ConcreteT
&>())),
891 template <typename
> static std::false_type
check(...);
894 static const bool value
= decltype(check
<S
>(0))::value
;
897 template <typename ChannelT
, typename WireT
, typename ConcreteT
>
898 class CanDeserialize
{
900 using S
= SerializationTraits
<ChannelT
, WireT
, ConcreteT
>;
902 template <typename T
>
903 static std::true_type
904 check(typename
std::enable_if
<
905 std::is_same
<decltype(T::deserialize(std::declval
<ChannelT
&>(),
906 std::declval
<ConcreteT
&>())),
910 template <typename
> static std::false_type
check(...);
913 static const bool value
= decltype(check
<S
>(0))::value
;
916 /// Contains primitive utilities for defining, calling and handling calls to
917 /// remote procedures. ChannelT is a bidirectional stream conforming to the
918 /// RPCChannel interface (see RPCChannel.h), FunctionIdT is a procedure
919 /// identifier type that must be serializable on ChannelT, and SequenceNumberT
920 /// is an integral type that will be used to number in-flight function calls.
922 /// These utilities support the construction of very primitive RPC utilities.
923 /// Their intent is to ensure correct serialization and deserialization of
924 /// procedure arguments, and to keep the client and server's view of the API in
926 template <typename ImplT
, typename ChannelT
, typename FunctionIdT
,
927 typename SequenceNumberT
>
928 class RPCEndpointBase
{
930 class OrcRPCInvalid
: public Function
<OrcRPCInvalid
, void()> {
932 static const char *getName() { return "__orc_rpc$invalid"; }
935 class OrcRPCResponse
: public Function
<OrcRPCResponse
, void()> {
937 static const char *getName() { return "__orc_rpc$response"; }
940 class OrcRPCNegotiate
941 : public Function
<OrcRPCNegotiate
, FunctionIdT(std::string
)> {
943 static const char *getName() { return "__orc_rpc$negotiate"; }
946 // Helper predicate for testing for the presence of SerializeTraits
948 template <typename WireT
, typename ConcreteT
>
949 class CanSerializeCheck
: detail::CanSerialize
<ChannelT
, WireT
, ConcreteT
> {
951 using detail::CanSerialize
<ChannelT
, WireT
, ConcreteT
>::value
;
953 static_assert(value
, "Missing serializer for argument (Can't serialize the "
954 "first template type argument of CanSerializeCheck "
958 // Helper predicate for testing for the presence of SerializeTraits
960 template <typename WireT
, typename ConcreteT
>
961 class CanDeserializeCheck
962 : detail::CanDeserialize
<ChannelT
, WireT
, ConcreteT
> {
964 using detail::CanDeserialize
<ChannelT
, WireT
, ConcreteT
>::value
;
966 static_assert(value
, "Missing deserializer for argument (Can't deserialize "
967 "the second template type argument of "
968 "CanDeserializeCheck from the first)");
972 /// Construct an RPC instance on a channel.
973 RPCEndpointBase(ChannelT
&C
, bool LazyAutoNegotiation
)
974 : C(C
), LazyAutoNegotiation(LazyAutoNegotiation
) {
975 // Hold ResponseId in a special variable, since we expect Response to be
976 // called relatively frequently, and want to avoid the map lookup.
977 ResponseId
= FnIdAllocator
.getResponseId();
978 RemoteFunctionIds
[OrcRPCResponse::getPrototype()] = ResponseId
;
980 // Register the negotiate function id and handler.
981 auto NegotiateId
= FnIdAllocator
.getNegotiateId();
982 RemoteFunctionIds
[OrcRPCNegotiate::getPrototype()] = NegotiateId
;
983 Handlers
[NegotiateId
] = wrapHandler
<OrcRPCNegotiate
>(
984 [this](const std::string
&Name
) { return handleNegotiate(Name
); });
988 /// Negotiate a function id for Func with the other end of the channel.
989 template <typename Func
> Error
negotiateFunction(bool Retry
= false) {
990 return getRemoteFunctionId
<Func
>(true, Retry
).takeError();
993 /// Append a call Func, does not call send on the channel.
994 /// The first argument specifies a user-defined handler to be run when the
995 /// function returns. The handler should take an Expected<Func::ReturnType>,
996 /// or an Error (if Func::ReturnType is void). The handler will be called
997 /// with an error if the return value is abandoned due to a channel error.
998 template <typename Func
, typename HandlerT
, typename
... ArgTs
>
999 Error
appendCallAsync(HandlerT Handler
, const ArgTs
&... Args
) {
1002 detail::RPCArgTypeCheck
<CanSerializeCheck
, typename
Func::Type
,
1003 void(ArgTs
...)>::value
,
1006 // Look up the function ID.
1008 if (auto FnIdOrErr
= getRemoteFunctionId
<Func
>(LazyAutoNegotiation
, false))
1011 // Negotiation failed. Notify the handler then return the negotiate-failed
1013 cantFail(Handler(make_error
<ResponseAbandoned
>()));
1014 return FnIdOrErr
.takeError();
1017 SequenceNumberT SeqNo
; // initialized in locked scope below.
1019 // Lock the pending responses map and sequence number manager.
1020 std::lock_guard
<std::mutex
> Lock(ResponsesMutex
);
1022 // Allocate a sequence number.
1023 SeqNo
= SequenceNumberMgr
.getSequenceNumber();
1024 assert(!PendingResponses
.count(SeqNo
) &&
1025 "Sequence number already allocated");
1027 // Install the user handler.
1028 PendingResponses
[SeqNo
] =
1029 detail::createResponseHandler
<ChannelT
, typename
Func::ReturnType
>(
1030 std::move(Handler
));
1033 // Open the function call message.
1034 if (auto Err
= C
.startSendMessage(FnId
, SeqNo
)) {
1035 abandonPendingResponses();
1039 // Serialize the call arguments.
1040 if (auto Err
= detail::HandlerTraits
<typename
Func::Type
>::serializeArgs(
1042 abandonPendingResponses();
1046 // Close the function call messagee.
1047 if (auto Err
= C
.endSendMessage()) {
1048 abandonPendingResponses();
1052 return Error::success();
1055 Error
sendAppendedCalls() { return C
.send(); };
1057 template <typename Func
, typename HandlerT
, typename
... ArgTs
>
1058 Error
callAsync(HandlerT Handler
, const ArgTs
&... Args
) {
1059 if (auto Err
= appendCallAsync
<Func
>(std::move(Handler
), Args
...))
1064 /// Handle one incoming call.
1067 SequenceNumberT SeqNo
;
1068 if (auto Err
= C
.startReceiveMessage(FnId
, SeqNo
)) {
1069 abandonPendingResponses();
1072 if (FnId
== ResponseId
)
1073 return handleResponse(SeqNo
);
1074 auto I
= Handlers
.find(FnId
);
1075 if (I
!= Handlers
.end())
1076 return I
->second(C
, SeqNo
);
1078 // else: No handler found. Report error to client?
1079 return make_error
<BadFunctionCall
<FunctionIdT
, SequenceNumberT
>>(FnId
,
1083 /// Helper for handling setter procedures - this method returns a functor that
1084 /// sets the variables referred to by Args... to values deserialized from the
1088 /// typedef Function<0, bool, int> Func1;
1093 /// if (auto Err = expect<Func1>(Channel, readArgs(B, I)))
1094 /// /* Handle Args */ ;
1096 template <typename
... ArgTs
>
1097 static detail::ReadArgs
<ArgTs
...> readArgs(ArgTs
&... Args
) {
1098 return detail::ReadArgs
<ArgTs
...>(Args
...);
1101 /// Abandon all outstanding result handlers.
1103 /// This will call all currently registered result handlers to receive an
1104 /// "abandoned" error as their argument. This is used internally by the RPC
1105 /// in error situations, but can also be called directly by clients who are
1106 /// disconnecting from the remote and don't or can't expect responses to their
1107 /// outstanding calls. (Especially for outstanding blocking calls, calling
1108 /// this function may be necessary to avoid dead threads).
1109 void abandonPendingResponses() {
1110 // Lock the pending responses map and sequence number manager.
1111 std::lock_guard
<std::mutex
> Lock(ResponsesMutex
);
1113 for (auto &KV
: PendingResponses
)
1114 KV
.second
->abandon();
1115 PendingResponses
.clear();
1116 SequenceNumberMgr
.reset();
1119 /// Remove the handler for the given function.
1120 /// A handler must currently be registered for this function.
1121 template <typename Func
>
1122 void removeHandler() {
1123 auto IdItr
= LocalFunctionIds
.find(Func::getPrototype());
1124 assert(IdItr
!= LocalFunctionIds
.end() &&
1125 "Function does not have a registered handler");
1126 auto HandlerItr
= Handlers
.find(IdItr
->second
);
1127 assert(HandlerItr
!= Handlers
.end() &&
1128 "Function does not have a registered handler");
1129 Handlers
.erase(HandlerItr
);
1132 /// Clear all handlers.
1133 void clearHandlers() {
1139 FunctionIdT
getInvalidFunctionId() const {
1140 return FnIdAllocator
.getInvalidId();
1143 /// Add the given handler to the handler map and make it available for
1144 /// autonegotiation and execution.
1145 template <typename Func
, typename HandlerT
>
1146 void addHandlerImpl(HandlerT Handler
) {
1148 static_assert(detail::RPCArgTypeCheck
<
1149 CanDeserializeCheck
, typename
Func::Type
,
1150 typename
detail::HandlerTraits
<HandlerT
>::Type
>::value
,
1153 FunctionIdT NewFnId
= FnIdAllocator
.template allocate
<Func
>();
1154 LocalFunctionIds
[Func::getPrototype()] = NewFnId
;
1155 Handlers
[NewFnId
] = wrapHandler
<Func
>(std::move(Handler
));
1158 template <typename Func
, typename HandlerT
>
1159 void addAsyncHandlerImpl(HandlerT Handler
) {
1161 static_assert(detail::RPCArgTypeCheck
<
1162 CanDeserializeCheck
, typename
Func::Type
,
1163 typename
detail::AsyncHandlerTraits
<
1164 typename
detail::HandlerTraits
<HandlerT
>::Type
1168 FunctionIdT NewFnId
= FnIdAllocator
.template allocate
<Func
>();
1169 LocalFunctionIds
[Func::getPrototype()] = NewFnId
;
1170 Handlers
[NewFnId
] = wrapAsyncHandler
<Func
>(std::move(Handler
));
1173 Error
handleResponse(SequenceNumberT SeqNo
) {
1174 using Handler
= typename
decltype(PendingResponses
)::mapped_type
;
1178 // Lock the pending responses map and sequence number manager.
1179 std::unique_lock
<std::mutex
> Lock(ResponsesMutex
);
1180 auto I
= PendingResponses
.find(SeqNo
);
1182 if (I
!= PendingResponses
.end()) {
1183 PRHandler
= std::move(I
->second
);
1184 PendingResponses
.erase(I
);
1185 SequenceNumberMgr
.releaseSequenceNumber(SeqNo
);
1187 // Unlock the pending results map to prevent recursive lock.
1189 abandonPendingResponses();
1191 InvalidSequenceNumberForResponse
<SequenceNumberT
>>(SeqNo
);
1196 "If we didn't find a response handler we should have bailed out");
1198 if (auto Err
= PRHandler
->handleResponse(C
)) {
1199 abandonPendingResponses();
1203 return Error::success();
1206 FunctionIdT
handleNegotiate(const std::string
&Name
) {
1207 auto I
= LocalFunctionIds
.find(Name
);
1208 if (I
== LocalFunctionIds
.end())
1209 return getInvalidFunctionId();
1213 // Find the remote FunctionId for the given function.
1214 template <typename Func
>
1215 Expected
<FunctionIdT
> getRemoteFunctionId(bool NegotiateIfNotInMap
,
1216 bool NegotiateIfInvalid
) {
1219 // Check if we already have a function id...
1220 auto I
= RemoteFunctionIds
.find(Func::getPrototype());
1221 if (I
!= RemoteFunctionIds
.end()) {
1222 // If it's valid there's nothing left to do.
1223 if (I
->second
!= getInvalidFunctionId())
1225 DoNegotiate
= NegotiateIfInvalid
;
1227 DoNegotiate
= NegotiateIfNotInMap
;
1229 // We don't have a function id for Func yet, but we're allowed to try to
1232 auto &Impl
= static_cast<ImplT
&>(*this);
1233 if (auto RemoteIdOrErr
=
1234 Impl
.template callB
<OrcRPCNegotiate
>(Func::getPrototype())) {
1235 RemoteFunctionIds
[Func::getPrototype()] = *RemoteIdOrErr
;
1236 if (*RemoteIdOrErr
== getInvalidFunctionId())
1237 return make_error
<CouldNotNegotiate
>(Func::getPrototype());
1238 return *RemoteIdOrErr
;
1240 return RemoteIdOrErr
.takeError();
1243 // No key was available in the map and we weren't allowed to try to
1244 // negotiate one, so return an unknown function error.
1245 return make_error
<CouldNotNegotiate
>(Func::getPrototype());
1248 using WrappedHandlerFn
= std::function
<Error(ChannelT
&, SequenceNumberT
)>;
1250 // Wrap the given user handler in the necessary argument-deserialization code,
1251 // result-serialization code, and call to the launch policy (if present).
1252 template <typename Func
, typename HandlerT
>
1253 WrappedHandlerFn
wrapHandler(HandlerT Handler
) {
1254 return [this, Handler
](ChannelT
&Channel
,
1255 SequenceNumberT SeqNo
) mutable -> Error
{
1256 // Start by deserializing the arguments.
1258 typename
detail::FunctionArgsTuple
<
1259 typename
detail::HandlerTraits
<HandlerT
>::Type
>::Type
;
1260 auto Args
= std::make_shared
<ArgsTuple
>();
1263 detail::HandlerTraits
<typename
Func::Type
>::deserializeArgs(
1267 // GCC 4.7 and 4.8 incorrectly issue a -Wunused-but-set-variable warning
1268 // for RPCArgs. Void cast RPCArgs to work around this for now.
1269 // FIXME: Remove this workaround once we can assume a working GCC version.
1272 // End receieve message, unlocking the channel for reading.
1273 if (auto Err
= Channel
.endReceiveMessage())
1276 using HTraits
= detail::HandlerTraits
<HandlerT
>;
1277 using FuncReturn
= typename
Func::ReturnType
;
1278 return detail::respond
<FuncReturn
>(Channel
, ResponseId
, SeqNo
,
1279 HTraits::unpackAndRun(Handler
, *Args
));
1283 // Wrap the given user handler in the necessary argument-deserialization code,
1284 // result-serialization code, and call to the launch policy (if present).
1285 template <typename Func
, typename HandlerT
>
1286 WrappedHandlerFn
wrapAsyncHandler(HandlerT Handler
) {
1287 return [this, Handler
](ChannelT
&Channel
,
1288 SequenceNumberT SeqNo
) mutable -> Error
{
1289 // Start by deserializing the arguments.
1290 using AHTraits
= detail::AsyncHandlerTraits
<
1291 typename
detail::HandlerTraits
<HandlerT
>::Type
>;
1293 typename
detail::FunctionArgsTuple
<typename
AHTraits::Type
>::Type
;
1294 auto Args
= std::make_shared
<ArgsTuple
>();
1297 detail::HandlerTraits
<typename
Func::Type
>::deserializeArgs(
1301 // GCC 4.7 and 4.8 incorrectly issue a -Wunused-but-set-variable warning
1302 // for RPCArgs. Void cast RPCArgs to work around this for now.
1303 // FIXME: Remove this workaround once we can assume a working GCC version.
1306 // End receieve message, unlocking the channel for reading.
1307 if (auto Err
= Channel
.endReceiveMessage())
1310 using HTraits
= detail::HandlerTraits
<HandlerT
>;
1311 using FuncReturn
= typename
Func::ReturnType
;
1313 [this, SeqNo
](typename
AHTraits::ResultType RetVal
) -> Error
{
1314 return detail::respond
<FuncReturn
>(C
, ResponseId
, SeqNo
,
1318 return HTraits::unpackAndRunAsync(Handler
, Responder
, *Args
);
1324 bool LazyAutoNegotiation
;
1326 RPCFunctionIdAllocator
<FunctionIdT
> FnIdAllocator
;
1328 FunctionIdT ResponseId
;
1329 std::map
<std::string
, FunctionIdT
> LocalFunctionIds
;
1330 std::map
<const char *, FunctionIdT
> RemoteFunctionIds
;
1332 std::map
<FunctionIdT
, WrappedHandlerFn
> Handlers
;
1334 std::mutex ResponsesMutex
;
1335 detail::SequenceNumberManager
<SequenceNumberT
> SequenceNumberMgr
;
1336 std::map
<SequenceNumberT
, std::unique_ptr
<detail::ResponseHandler
<ChannelT
>>>
1340 } // end namespace detail
1342 template <typename ChannelT
, typename FunctionIdT
= uint32_t,
1343 typename SequenceNumberT
= uint32_t>
1344 class MultiThreadedRPCEndpoint
1345 : public detail::RPCEndpointBase
<
1346 MultiThreadedRPCEndpoint
<ChannelT
, FunctionIdT
, SequenceNumberT
>,
1347 ChannelT
, FunctionIdT
, SequenceNumberT
> {
1350 detail::RPCEndpointBase
<
1351 MultiThreadedRPCEndpoint
<ChannelT
, FunctionIdT
, SequenceNumberT
>,
1352 ChannelT
, FunctionIdT
, SequenceNumberT
>;
1355 MultiThreadedRPCEndpoint(ChannelT
&C
, bool LazyAutoNegotiation
)
1356 : BaseClass(C
, LazyAutoNegotiation
) {}
1358 /// Add a handler for the given RPC function.
1359 /// This installs the given handler functor for the given RPC Function, and
1360 /// makes the RPC function available for negotiation/calling from the remote.
1361 template <typename Func
, typename HandlerT
>
1362 void addHandler(HandlerT Handler
) {
1363 return this->template addHandlerImpl
<Func
>(std::move(Handler
));
1366 /// Add a class-method as a handler.
1367 template <typename Func
, typename ClassT
, typename RetT
, typename
... ArgTs
>
1368 void addHandler(ClassT
&Object
, RetT (ClassT::*Method
)(ArgTs
...)) {
1370 detail::MemberFnWrapper
<ClassT
, RetT
, ArgTs
...>(Object
, Method
));
1373 template <typename Func
, typename HandlerT
>
1374 void addAsyncHandler(HandlerT Handler
) {
1375 return this->template addAsyncHandlerImpl
<Func
>(std::move(Handler
));
1378 /// Add a class-method as a handler.
1379 template <typename Func
, typename ClassT
, typename RetT
, typename
... ArgTs
>
1380 void addAsyncHandler(ClassT
&Object
, RetT (ClassT::*Method
)(ArgTs
...)) {
1381 addAsyncHandler
<Func
>(
1382 detail::MemberFnWrapper
<ClassT
, RetT
, ArgTs
...>(Object
, Method
));
1385 /// Return type for non-blocking call primitives.
1386 template <typename Func
>
1387 using NonBlockingCallResult
= typename
detail::ResultTraits
<
1388 typename
Func::ReturnType
>::ReturnFutureType
;
1390 /// Call Func on Channel C. Does not block, does not call send. Returns a pair
1391 /// of a future result and the sequence number assigned to the result.
1393 /// This utility function is primarily used for single-threaded mode support,
1394 /// where the sequence number can be used to wait for the corresponding
1395 /// result. In multi-threaded mode the appendCallNB method, which does not
1396 /// return the sequence numeber, should be preferred.
1397 template <typename Func
, typename
... ArgTs
>
1398 Expected
<NonBlockingCallResult
<Func
>> appendCallNB(const ArgTs
&... Args
) {
1399 using RTraits
= detail::ResultTraits
<typename
Func::ReturnType
>;
1400 using ErrorReturn
= typename
RTraits::ErrorReturnType
;
1401 using ErrorReturnPromise
= typename
RTraits::ReturnPromiseType
;
1403 // FIXME: Stack allocate and move this into the handler once LLVM builds
1405 auto Promise
= std::make_shared
<ErrorReturnPromise
>();
1406 auto FutureResult
= Promise
->get_future();
1408 if (auto Err
= this->template appendCallAsync
<Func
>(
1409 [Promise
](ErrorReturn RetOrErr
) {
1410 Promise
->set_value(std::move(RetOrErr
));
1411 return Error::success();
1414 RTraits::consumeAbandoned(FutureResult
.get());
1415 return std::move(Err
);
1417 return std::move(FutureResult
);
1420 /// The same as appendCallNBWithSeq, except that it calls C.send() to
1421 /// flush the channel after serializing the call.
1422 template <typename Func
, typename
... ArgTs
>
1423 Expected
<NonBlockingCallResult
<Func
>> callNB(const ArgTs
&... Args
) {
1424 auto Result
= appendCallNB
<Func
>(Args
...);
1427 if (auto Err
= this->C
.send()) {
1428 this->abandonPendingResponses();
1429 detail::ResultTraits
<typename
Func::ReturnType
>::consumeAbandoned(
1430 std::move(Result
->get()));
1431 return std::move(Err
);
1436 /// Call Func on Channel C. Blocks waiting for a result. Returns an Error
1437 /// for void functions or an Expected<T> for functions returning a T.
1439 /// This function is for use in threaded code where another thread is
1440 /// handling responses and incoming calls.
1441 template <typename Func
, typename
... ArgTs
,
1442 typename AltRetT
= typename
Func::ReturnType
>
1443 typename
detail::ResultTraits
<AltRetT
>::ErrorReturnType
1444 callB(const ArgTs
&... Args
) {
1445 if (auto FutureResOrErr
= callNB
<Func
>(Args
...))
1446 return FutureResOrErr
->get();
1448 return FutureResOrErr
.takeError();
1451 /// Handle incoming RPC calls.
1452 Error
handlerLoop() {
1454 if (auto Err
= this->handleOne())
1456 return Error::success();
1460 template <typename ChannelT
, typename FunctionIdT
= uint32_t,
1461 typename SequenceNumberT
= uint32_t>
1462 class SingleThreadedRPCEndpoint
1463 : public detail::RPCEndpointBase
<
1464 SingleThreadedRPCEndpoint
<ChannelT
, FunctionIdT
, SequenceNumberT
>,
1465 ChannelT
, FunctionIdT
, SequenceNumberT
> {
1468 detail::RPCEndpointBase
<
1469 SingleThreadedRPCEndpoint
<ChannelT
, FunctionIdT
, SequenceNumberT
>,
1470 ChannelT
, FunctionIdT
, SequenceNumberT
>;
1473 SingleThreadedRPCEndpoint(ChannelT
&C
, bool LazyAutoNegotiation
)
1474 : BaseClass(C
, LazyAutoNegotiation
) {}
1476 template <typename Func
, typename HandlerT
>
1477 void addHandler(HandlerT Handler
) {
1478 return this->template addHandlerImpl
<Func
>(std::move(Handler
));
1481 template <typename Func
, typename ClassT
, typename RetT
, typename
... ArgTs
>
1482 void addHandler(ClassT
&Object
, RetT (ClassT::*Method
)(ArgTs
...)) {
1484 detail::MemberFnWrapper
<ClassT
, RetT
, ArgTs
...>(Object
, Method
));
1487 template <typename Func
, typename HandlerT
>
1488 void addAsyncHandler(HandlerT Handler
) {
1489 return this->template addAsyncHandlerImpl
<Func
>(std::move(Handler
));
1492 /// Add a class-method as a handler.
1493 template <typename Func
, typename ClassT
, typename RetT
, typename
... ArgTs
>
1494 void addAsyncHandler(ClassT
&Object
, RetT (ClassT::*Method
)(ArgTs
...)) {
1495 addAsyncHandler
<Func
>(
1496 detail::MemberFnWrapper
<ClassT
, RetT
, ArgTs
...>(Object
, Method
));
1499 template <typename Func
, typename
... ArgTs
,
1500 typename AltRetT
= typename
Func::ReturnType
>
1501 typename
detail::ResultTraits
<AltRetT
>::ErrorReturnType
1502 callB(const ArgTs
&... Args
) {
1503 bool ReceivedResponse
= false;
1504 using ResultType
= typename
detail::ResultTraits
<AltRetT
>::ErrorReturnType
;
1505 auto Result
= detail::ResultTraits
<AltRetT
>::createBlankErrorReturnValue();
1507 // We have to 'Check' result (which we know is in a success state at this
1508 // point) so that it can be overwritten in the async handler.
1511 if (auto Err
= this->template appendCallAsync
<Func
>(
1513 Result
= std::move(R
);
1514 ReceivedResponse
= true;
1515 return Error::success();
1518 detail::ResultTraits
<typename
Func::ReturnType
>::consumeAbandoned(
1520 return std::move(Err
);
1523 while (!ReceivedResponse
) {
1524 if (auto Err
= this->handleOne()) {
1525 detail::ResultTraits
<typename
Func::ReturnType
>::consumeAbandoned(
1527 return std::move(Err
);
1535 /// Asynchronous dispatch for a function on an RPC endpoint.
1536 template <typename RPCClass
, typename Func
>
1537 class RPCAsyncDispatch
{
1539 RPCAsyncDispatch(RPCClass
&Endpoint
) : Endpoint(Endpoint
) {}
1541 template <typename HandlerT
, typename
... ArgTs
>
1542 Error
operator()(HandlerT Handler
, const ArgTs
&... Args
) const {
1543 return Endpoint
.template appendCallAsync
<Func
>(std::move(Handler
), Args
...);
1550 /// Construct an asynchronous dispatcher from an RPC endpoint and a Func.
1551 template <typename Func
, typename RPCEndpointT
>
1552 RPCAsyncDispatch
<RPCEndpointT
, Func
> rpcAsyncDispatch(RPCEndpointT
&Endpoint
) {
1553 return RPCAsyncDispatch
<RPCEndpointT
, Func
>(Endpoint
);
1556 /// Allows a set of asynchrounous calls to be dispatched, and then
1557 /// waited on as a group.
1558 class ParallelCallGroup
{
1561 ParallelCallGroup() = default;
1562 ParallelCallGroup(const ParallelCallGroup
&) = delete;
1563 ParallelCallGroup
&operator=(const ParallelCallGroup
&) = delete;
1565 /// Make as asynchronous call.
1566 template <typename AsyncDispatcher
, typename HandlerT
, typename
... ArgTs
>
1567 Error
call(const AsyncDispatcher
&AsyncDispatch
, HandlerT Handler
,
1568 const ArgTs
&... Args
) {
1569 // Increment the count of outstanding calls. This has to happen before
1570 // we invoke the call, as the handler may (depending on scheduling)
1571 // be run immediately on another thread, and we don't want the decrement
1572 // in the wrapped handler below to run before the increment.
1574 std::unique_lock
<std::mutex
> Lock(M
);
1575 ++NumOutstandingCalls
;
1578 // Wrap the user handler in a lambda that will decrement the
1579 // outstanding calls count, then poke the condition variable.
1580 using ArgType
= typename
detail::ResponseHandlerArg
<
1581 typename
detail::HandlerTraits
<HandlerT
>::Type
>::ArgType
;
1582 // FIXME: Move handler into wrapped handler once we have C++14.
1583 auto WrappedHandler
= [this, Handler
](ArgType Arg
) {
1584 auto Err
= Handler(std::move(Arg
));
1585 std::unique_lock
<std::mutex
> Lock(M
);
1586 --NumOutstandingCalls
;
1591 return AsyncDispatch(std::move(WrappedHandler
), Args
...);
1594 /// Blocks until all calls have been completed and their return value
1597 std::unique_lock
<std::mutex
> Lock(M
);
1598 while (NumOutstandingCalls
> 0)
1604 std::condition_variable CV
;
1605 uint32_t NumOutstandingCalls
= 0;
1608 /// Convenience class for grouping RPC Functions into APIs that can be
1609 /// negotiated as a block.
1611 template <typename
... Funcs
>
1615 /// Test whether this API contains Function F.
1616 template <typename F
>
1619 static const bool value
= false;
1622 /// Negotiate all functions in this API.
1623 template <typename RPCEndpoint
>
1624 static Error
negotiate(RPCEndpoint
&R
) {
1625 return Error::success();
1629 template <typename Func
, typename
... Funcs
>
1630 class APICalls
<Func
, Funcs
...> {
1633 template <typename F
>
1636 static const bool value
= std::is_same
<F
, Func
>::value
|
1637 APICalls
<Funcs
...>::template Contains
<F
>::value
;
1640 template <typename RPCEndpoint
>
1641 static Error
negotiate(RPCEndpoint
&R
) {
1642 if (auto Err
= R
.template negotiateFunction
<Func
>())
1644 return APICalls
<Funcs
...>::negotiate(R
);
1649 template <typename
... InnerFuncs
, typename
... Funcs
>
1650 class APICalls
<APICalls
<InnerFuncs
...>, Funcs
...> {
1653 template <typename F
>
1656 static const bool value
=
1657 APICalls
<InnerFuncs
...>::template Contains
<F
>::value
|
1658 APICalls
<Funcs
...>::template Contains
<F
>::value
;
1661 template <typename RPCEndpoint
>
1662 static Error
negotiate(RPCEndpoint
&R
) {
1663 if (auto Err
= APICalls
<InnerFuncs
...>::negotiate(R
))
1665 return APICalls
<Funcs
...>::negotiate(R
);
1670 } // end namespace rpc
1671 } // end namespace orc
1672 } // end namespace llvm