1 //===----------------------- Queue.h - RPC Queue ------------------*-c++-*-===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 #ifndef LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H
10 #define LLVM_UNITTESTS_EXECUTIONENGINE_ORC_QUEUECHANNEL_H
12 #include "llvm/ExecutionEngine/Orc/RawByteChannel.h"
13 #include "llvm/Support/Error.h"
16 #include <condition_variable>
21 class QueueChannelError
: public ErrorInfo
<QueueChannelError
> {
26 class QueueChannelClosedError
27 : public ErrorInfo
<QueueChannelClosedError
, QueueChannelError
> {
30 std::error_code
convertToErrorCode() const override
{
31 return inconvertibleErrorCode();
34 void log(raw_ostream
&OS
) const override
{
39 class Queue
: public std::queue
<char> {
41 using ErrorInjector
= std::function
<Error()>;
44 : ReadError([]() { return Error::success(); }),
45 WriteError([]() { return Error::success(); }) {}
47 Queue(const Queue
&) = delete;
48 Queue
& operator=(const Queue
&) = delete;
49 Queue(Queue
&&) = delete;
50 Queue
& operator=(Queue
&&) = delete;
52 std::mutex
&getMutex() { return M
; }
53 std::condition_variable
&getCondVar() { return CV
; }
54 Error
checkReadError() { return ReadError(); }
55 Error
checkWriteError() { return WriteError(); }
56 void setReadError(ErrorInjector NewReadError
) {
58 std::lock_guard
<std::mutex
> Lock(M
);
59 ReadError
= std::move(NewReadError
);
63 void setWriteError(ErrorInjector NewWriteError
) {
64 std::lock_guard
<std::mutex
> Lock(M
);
65 WriteError
= std::move(NewWriteError
);
69 std::condition_variable CV
;
70 std::function
<Error()> ReadError
, WriteError
;
73 class QueueChannel
: public orc::rpc::RawByteChannel
{
75 QueueChannel(std::shared_ptr
<Queue
> InQueue
,
76 std::shared_ptr
<Queue
> OutQueue
)
77 : InQueue(InQueue
), OutQueue(OutQueue
) {}
79 QueueChannel(const QueueChannel
&) = delete;
80 QueueChannel
& operator=(const QueueChannel
&) = delete;
81 QueueChannel(QueueChannel
&&) = delete;
82 QueueChannel
& operator=(QueueChannel
&&) = delete;
84 template <typename FunctionIdT
, typename SequenceIdT
>
85 Error
startSendMessage(const FunctionIdT
&FnId
, const SequenceIdT
&SeqNo
) {
86 ++InFlightOutgoingMessages
;
87 return orc::rpc::RawByteChannel::startSendMessage(FnId
, SeqNo
);
90 Error
endSendMessage() {
91 --InFlightOutgoingMessages
;
92 ++CompletedOutgoingMessages
;
93 return orc::rpc::RawByteChannel::endSendMessage();
96 template <typename FunctionIdT
, typename SequenceNumberT
>
97 Error
startReceiveMessage(FunctionIdT
&FnId
, SequenceNumberT
&SeqNo
) {
98 ++InFlightIncomingMessages
;
99 return orc::rpc::RawByteChannel::startReceiveMessage(FnId
, SeqNo
);
102 Error
endReceiveMessage() {
103 --InFlightIncomingMessages
;
104 ++CompletedIncomingMessages
;
105 return orc::rpc::RawByteChannel::endReceiveMessage();
108 Error
readBytes(char *Dst
, unsigned Size
) override
{
109 std::unique_lock
<std::mutex
> Lock(InQueue
->getMutex());
112 Error Err
= InQueue
->checkReadError();
113 while (!Err
&& InQueue
->empty()) {
114 InQueue
->getCondVar().wait(Lock
);
115 Err
= InQueue
->checkReadError();
120 *Dst
++ = InQueue
->front();
125 return Error::success();
128 Error
appendBytes(const char *Src
, unsigned Size
) override
{
129 std::unique_lock
<std::mutex
> Lock(OutQueue
->getMutex());
131 if (Error Err
= OutQueue
->checkWriteError())
133 OutQueue
->push(*Src
++);
136 OutQueue
->getCondVar().notify_one();
137 return Error::success();
140 Error
send() override
{
142 return Error::success();
146 auto ChannelClosed
= []() { return make_error
<QueueChannelClosedError
>(); };
147 InQueue
->setReadError(ChannelClosed
);
148 InQueue
->setWriteError(ChannelClosed
);
149 OutQueue
->setReadError(ChannelClosed
);
150 OutQueue
->setWriteError(ChannelClosed
);
153 uint64_t NumWritten
= 0;
154 uint64_t NumRead
= 0;
155 std::atomic
<size_t> InFlightIncomingMessages
{0};
156 std::atomic
<size_t> CompletedIncomingMessages
{0};
157 std::atomic
<size_t> InFlightOutgoingMessages
{0};
158 std::atomic
<size_t> CompletedOutgoingMessages
{0};
159 std::atomic
<size_t> SendCalls
{0};
163 std::shared_ptr
<Queue
> InQueue
;
164 std::shared_ptr
<Queue
> OutQueue
;
167 inline std::pair
<std::unique_ptr
<QueueChannel
>, std::unique_ptr
<QueueChannel
>>
168 createPairedQueueChannels() {
169 auto Q1
= std::make_shared
<Queue
>();
170 auto Q2
= std::make_shared
<Queue
>();
171 auto C1
= std::make_unique
<QueueChannel
>(Q1
, Q2
);
172 auto C2
= std::make_unique
<QueueChannel
>(Q2
, Q1
);
173 return std::make_pair(std::move(C1
), std::move(C2
));