1 //===------- SimpleEPCServer.cpp - EPC over simple abstract channel -------===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 #include "llvm/ExecutionEngine/Orc/TargetProcess/SimpleRemoteEPCServer.h"
11 #include "llvm/ExecutionEngine/Orc/Shared/TargetProcessControlTypes.h"
12 #include "llvm/Support/FormatVariadic.h"
13 #include "llvm/Support/Host.h"
14 #include "llvm/Support/Process.h"
16 #include "OrcRTBootstrap.h"
18 #define DEBUG_TYPE "orc"
20 using namespace llvm::orc::shared
;
25 ExecutorBootstrapService::~ExecutorBootstrapService() {}
27 SimpleRemoteEPCServer::Dispatcher::~Dispatcher() {}
29 #if LLVM_ENABLE_THREADS
30 void SimpleRemoteEPCServer::ThreadDispatcher::dispatch(
31 unique_function
<void()> Work
) {
33 std::lock_guard
<std::mutex
> Lock(DispatchMutex
);
39 std::thread([this, Work
= std::move(Work
)]() mutable {
41 std::lock_guard
<std::mutex
> Lock(DispatchMutex
);
43 OutstandingCV
.notify_all();
47 void SimpleRemoteEPCServer::ThreadDispatcher::shutdown() {
48 std::unique_lock
<std::mutex
> Lock(DispatchMutex
);
50 OutstandingCV
.wait(Lock
, [this]() { return Outstanding
== 0; });
54 StringMap
<ExecutorAddr
> SimpleRemoteEPCServer::defaultBootstrapSymbols() {
55 StringMap
<ExecutorAddr
> DBS
;
56 rt_bootstrap::addTo(DBS
);
60 Expected
<SimpleRemoteEPCTransportClient::HandleMessageAction
>
61 SimpleRemoteEPCServer::handleMessage(SimpleRemoteEPCOpcode OpC
, uint64_t SeqNo
,
63 SimpleRemoteEPCArgBytesVector ArgBytes
) {
66 dbgs() << "SimpleRemoteEPCServer::handleMessage: opc = ";
68 case SimpleRemoteEPCOpcode::Setup
:
70 assert(SeqNo
== 0 && "Non-zero SeqNo for Setup?");
71 assert(TagAddr
.getValue() == 0 && "Non-zero TagAddr for Setup?");
73 case SimpleRemoteEPCOpcode::Hangup
:
75 assert(SeqNo
== 0 && "Non-zero SeqNo for Hangup?");
76 assert(TagAddr
.getValue() == 0 && "Non-zero TagAddr for Hangup?");
78 case SimpleRemoteEPCOpcode::Result
:
80 assert(TagAddr
.getValue() == 0 && "Non-zero TagAddr for Result?");
82 case SimpleRemoteEPCOpcode::CallWrapper
:
83 dbgs() << "CallWrapper";
86 dbgs() << ", seqno = " << SeqNo
87 << ", tag-addr = " << formatv("{0:x}", TagAddr
.getValue())
88 << ", arg-buffer = " << formatv("{0:x}", ArgBytes
.size())
92 using UT
= std::underlying_type_t
<SimpleRemoteEPCOpcode
>;
93 if (static_cast<UT
>(OpC
) > static_cast<UT
>(SimpleRemoteEPCOpcode::LastOpC
))
94 return make_error
<StringError
>("Unexpected opcode",
95 inconvertibleErrorCode());
97 // TODO: Clean detach message?
99 case SimpleRemoteEPCOpcode::Setup
:
100 return make_error
<StringError
>("Unexpected Setup opcode",
101 inconvertibleErrorCode());
102 case SimpleRemoteEPCOpcode::Hangup
:
103 return SimpleRemoteEPCTransportClient::EndSession
;
104 case SimpleRemoteEPCOpcode::Result
:
105 if (auto Err
= handleResult(SeqNo
, TagAddr
, std::move(ArgBytes
)))
106 return std::move(Err
);
108 case SimpleRemoteEPCOpcode::CallWrapper
:
109 handleCallWrapper(SeqNo
, TagAddr
, std::move(ArgBytes
));
112 return ContinueSession
;
115 Error
SimpleRemoteEPCServer::waitForDisconnect() {
116 std::unique_lock
<std::mutex
> Lock(ServerStateMutex
);
117 ShutdownCV
.wait(Lock
, [this]() { return RunState
== ServerShutDown
; });
118 return std::move(ShutdownErr
);
121 void SimpleRemoteEPCServer::handleDisconnect(Error Err
) {
122 PendingJITDispatchResultsMap TmpPending
;
125 std::lock_guard
<std::mutex
> Lock(ServerStateMutex
);
126 std::swap(TmpPending
, PendingJITDispatchResults
);
127 RunState
= ServerShuttingDown
;
130 // Send out-of-band errors to any waiting threads.
131 for (auto &KV
: TmpPending
)
132 KV
.second
->set_value(
133 shared::WrapperFunctionResult::createOutOfBandError("disconnecting"));
135 // Wait for dispatcher to clear.
138 // Shut down services.
139 while (!Services
.empty()) {
141 joinErrors(std::move(ShutdownErr
), Services
.back()->shutdown());
145 std::lock_guard
<std::mutex
> Lock(ServerStateMutex
);
146 ShutdownErr
= joinErrors(std::move(ShutdownErr
), std::move(Err
));
147 RunState
= ServerShutDown
;
148 ShutdownCV
.notify_all();
151 Error
SimpleRemoteEPCServer::sendMessage(SimpleRemoteEPCOpcode OpC
,
152 uint64_t SeqNo
, ExecutorAddr TagAddr
,
153 ArrayRef
<char> ArgBytes
) {
156 dbgs() << "SimpleRemoteEPCServer::sendMessage: opc = ";
158 case SimpleRemoteEPCOpcode::Setup
:
160 assert(SeqNo
== 0 && "Non-zero SeqNo for Setup?");
161 assert(TagAddr
.getValue() == 0 && "Non-zero TagAddr for Setup?");
163 case SimpleRemoteEPCOpcode::Hangup
:
165 assert(SeqNo
== 0 && "Non-zero SeqNo for Hangup?");
166 assert(TagAddr
.getValue() == 0 && "Non-zero TagAddr for Hangup?");
168 case SimpleRemoteEPCOpcode::Result
:
170 assert(TagAddr
.getValue() == 0 && "Non-zero TagAddr for Result?");
172 case SimpleRemoteEPCOpcode::CallWrapper
:
173 dbgs() << "CallWrapper";
176 dbgs() << ", seqno = " << SeqNo
177 << ", tag-addr = " << formatv("{0:x}", TagAddr
.getValue())
178 << ", arg-buffer = " << formatv("{0:x}", ArgBytes
.size())
181 auto Err
= T
->sendMessage(OpC
, SeqNo
, TagAddr
, ArgBytes
);
184 dbgs() << " \\--> SimpleRemoteEPC::sendMessage failed\n";
189 Error
SimpleRemoteEPCServer::sendSetupMessage(
190 StringMap
<ExecutorAddr
> BootstrapSymbols
) {
192 using namespace SimpleRemoteEPCDefaultBootstrapSymbolNames
;
194 std::vector
<char> SetupPacket
;
195 SimpleRemoteEPCExecutorInfo EI
;
196 EI
.TargetTriple
= sys::getProcessTriple();
197 if (auto PageSize
= sys::Process::getPageSize())
198 EI
.PageSize
= *PageSize
;
200 return PageSize
.takeError();
201 EI
.BootstrapSymbols
= std::move(BootstrapSymbols
);
203 assert(!EI
.BootstrapSymbols
.count(ExecutorSessionObjectName
) &&
204 "Dispatch context name should not be set");
205 assert(!EI
.BootstrapSymbols
.count(DispatchFnName
) &&
206 "Dispatch function name should not be set");
207 EI
.BootstrapSymbols
[ExecutorSessionObjectName
] = ExecutorAddr::fromPtr(this);
208 EI
.BootstrapSymbols
[DispatchFnName
] = ExecutorAddr::fromPtr(jitDispatchEntry
);
211 shared::SPSArgList
<shared::SPSSimpleRemoteEPCExecutorInfo
>;
212 auto SetupPacketBytes
=
213 shared::WrapperFunctionResult::allocate(SPSSerialize::size(EI
));
214 shared::SPSOutputBuffer
OB(SetupPacketBytes
.data(), SetupPacketBytes
.size());
215 if (!SPSSerialize::serialize(OB
, EI
))
216 return make_error
<StringError
>("Could not send setup packet",
217 inconvertibleErrorCode());
219 return sendMessage(SimpleRemoteEPCOpcode::Setup
, 0, ExecutorAddr(),
220 {SetupPacketBytes
.data(), SetupPacketBytes
.size()});
223 Error
SimpleRemoteEPCServer::handleResult(
224 uint64_t SeqNo
, ExecutorAddr TagAddr
,
225 SimpleRemoteEPCArgBytesVector ArgBytes
) {
226 std::promise
<shared::WrapperFunctionResult
> *P
= nullptr;
228 std::lock_guard
<std::mutex
> Lock(ServerStateMutex
);
229 auto I
= PendingJITDispatchResults
.find(SeqNo
);
230 if (I
== PendingJITDispatchResults
.end())
231 return make_error
<StringError
>("No call for sequence number " +
233 inconvertibleErrorCode());
235 PendingJITDispatchResults
.erase(I
);
238 auto R
= shared::WrapperFunctionResult::allocate(ArgBytes
.size());
239 memcpy(R
.data(), ArgBytes
.data(), ArgBytes
.size());
240 P
->set_value(std::move(R
));
241 return Error::success();
244 void SimpleRemoteEPCServer::handleCallWrapper(
245 uint64_t RemoteSeqNo
, ExecutorAddr TagAddr
,
246 SimpleRemoteEPCArgBytesVector ArgBytes
) {
247 D
->dispatch([this, RemoteSeqNo
, TagAddr
, ArgBytes
= std::move(ArgBytes
)]() {
249 shared::CWrapperFunctionResult (*)(const char *, size_t);
250 auto *Fn
= TagAddr
.toPtr
<WrapperFnTy
>();
251 shared::WrapperFunctionResult
ResultBytes(
252 Fn(ArgBytes
.data(), ArgBytes
.size()));
253 if (auto Err
= sendMessage(SimpleRemoteEPCOpcode::Result
, RemoteSeqNo
,
255 {ResultBytes
.data(), ResultBytes
.size()}))
256 ReportError(std::move(Err
));
260 shared::WrapperFunctionResult
261 SimpleRemoteEPCServer::doJITDispatch(const void *FnTag
, const char *ArgData
,
264 std::promise
<shared::WrapperFunctionResult
> ResultP
;
265 auto ResultF
= ResultP
.get_future();
267 std::lock_guard
<std::mutex
> Lock(ServerStateMutex
);
268 if (RunState
!= ServerRunning
)
269 return shared::WrapperFunctionResult::createOutOfBandError(
270 "jit_dispatch not available (EPC server shut down)");
272 SeqNo
= getNextSeqNo();
273 assert(!PendingJITDispatchResults
.count(SeqNo
) && "SeqNo already in use");
274 PendingJITDispatchResults
[SeqNo
] = &ResultP
;
277 if (auto Err
= sendMessage(SimpleRemoteEPCOpcode::CallWrapper
, SeqNo
,
278 ExecutorAddr::fromPtr(FnTag
), {ArgData
, ArgSize
}))
279 ReportError(std::move(Err
));
281 return ResultF
.get();
284 shared::CWrapperFunctionResult
285 SimpleRemoteEPCServer::jitDispatchEntry(void *DispatchCtx
, const void *FnTag
,
286 const char *ArgData
, size_t ArgSize
) {
287 return reinterpret_cast<SimpleRemoteEPCServer
*>(DispatchCtx
)
288 ->doJITDispatch(FnTag
, ArgData
, ArgSize
)
292 } // end namespace orc
293 } // end namespace llvm