1 //===--- Server.cpp - gRPC-based Remote Index Server ---------------------===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
11 #include "MonitoringService.grpc.pb.h"
12 #include "MonitoringService.pb.h"
13 #include "Service.grpc.pb.h"
14 #include "Service.pb.h"
15 #include "index/Index.h"
16 #include "index/Serialization.h"
17 #include "index/Symbol.h"
18 #include "index/remote/marshalling/Marshalling.h"
19 #include "support/Context.h"
20 #include "support/Logger.h"
21 #include "support/Shutdown.h"
22 #include "support/ThreadsafeFS.h"
23 #include "support/Trace.h"
24 #include "llvm/ADT/IntrusiveRefCntPtr.h"
25 #include "llvm/ADT/StringRef.h"
26 #include "llvm/Support/Chrono.h"
27 #include "llvm/Support/CommandLine.h"
28 #include "llvm/Support/Error.h"
29 #include "llvm/Support/FileSystem.h"
30 #include "llvm/Support/FormatVariadic.h"
31 #include "llvm/Support/Path.h"
32 #include "llvm/Support/Signals.h"
33 #include "llvm/Support/VirtualFileSystem.h"
36 #include <grpc++/grpc++.h>
37 #include <grpc++/health_check_service_interface.h>
44 #if ENABLE_GRPC_REFLECTION
45 #include <grpc++/ext/proto_server_reflection_plugin.h>
57 static constexpr char Overview
[] = R
"(
58 This is an experimental remote index implementation. The server opens Dex and
59 awaits gRPC lookup requests from the client.
62 llvm::cl::opt
<std::string
> IndexPath(llvm::cl::desc("<INDEX FILE>"),
63 llvm::cl::Positional
, llvm::cl::Required
);
65 llvm::cl::opt
<std::string
> IndexRoot(llvm::cl::desc("<PROJECT ROOT>"),
66 llvm::cl::Positional
, llvm::cl::Required
);
68 llvm::cl::opt
<Logger::Level
> LogLevel
{
70 llvm::cl::desc("Verbosity of log messages written to stderr"),
71 values(clEnumValN(Logger::Error
, "error", "Error messages only"),
72 clEnumValN(Logger::Info
, "info", "High level execution tracing"),
73 clEnumValN(Logger::Debug
, "verbose", "Low level details")),
74 llvm::cl::init(Logger::Info
),
77 llvm::cl::opt
<bool> LogPublic
{
79 llvm::cl::desc("Avoid logging potentially-sensitive request details"),
80 llvm::cl::init(false),
83 llvm::cl::opt
<std::string
> LogPrefix
{
85 llvm::cl::desc("A string that'll be prepended to all log statements. "
86 "Useful when running multiple instances on same host."),
89 llvm::cl::opt
<std::string
> TraceFile(
91 llvm::cl::desc("Path to the file where tracer logs will be stored"));
93 llvm::cl::opt
<bool> PrettyPrint
{
95 llvm::cl::desc("Pretty-print JSON output in the trace"),
96 llvm::cl::init(false),
99 llvm::cl::opt
<std::string
> ServerAddress(
100 "server-address", llvm::cl::init("0.0.0.0:50051"),
101 llvm::cl::desc("Address of the invoked server. Defaults to 0.0.0.0:50051"));
103 llvm::cl::opt
<size_t> IdleTimeoutSeconds(
104 "idle-timeout", llvm::cl::init(8 * 60),
105 llvm::cl::desc("Maximum time a channel may stay idle until server closes "
106 "the connection, in seconds. Defaults to 480."));
108 llvm::cl::opt
<size_t> LimitResults(
109 "limit-results", llvm::cl::init(10000),
110 llvm::cl::desc("Maximum number of results to stream as a response to "
111 "single request. Limit is to keep the server from being "
112 "DOS'd. Defaults to 10000."));
114 static Key
<grpc::ServerContext
*> CurrentRequest
;
116 class RemoteIndexServer final
: public v1::SymbolIndex::Service
{
118 RemoteIndexServer(clangd::SymbolIndex
&Index
, llvm::StringRef IndexRoot
)
120 llvm::SmallString
<256> NativePath
= IndexRoot
;
121 llvm::sys::path::native(NativePath
);
122 ProtobufMarshaller
= std::unique_ptr
<Marshaller
>(new Marshaller(
123 /*RemoteIndexRoot=*/llvm::StringRef(NativePath
),
124 /*LocalIndexRoot=*/""));
128 using stopwatch
= std::chrono::steady_clock
;
130 grpc::Status
Lookup(grpc::ServerContext
*Context
,
131 const LookupRequest
*Request
,
132 grpc::ServerWriter
<LookupReply
> *Reply
) override
{
133 auto StartTime
= stopwatch::now();
134 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
135 logRequest(*Request
);
136 trace::Span
Tracer("LookupRequest");
137 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
139 elog("Can not parse LookupRequest from protobuf: {0}", Req
.takeError());
140 return grpc::Status::CANCELLED
;
143 unsigned FailedToSend
= 0;
144 bool HasMore
= false;
145 Index
.lookup(*Req
, [&](const clangd::Symbol
&Item
) {
146 if (Sent
>= LimitResults
) {
150 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
151 if (!SerializedItem
) {
152 elog("Unable to convert Symbol to protobuf: {0}",
153 SerializedItem
.takeError());
157 LookupReply NextMessage
;
158 *NextMessage
.mutable_stream_result() = *SerializedItem
;
159 logResponse(NextMessage
);
160 Reply
->Write(NextMessage
);
164 log("[public] Limiting result size for Lookup request.");
165 LookupReply LastMessage
;
166 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
167 logResponse(LastMessage
);
168 Reply
->Write(LastMessage
);
169 SPAN_ATTACH(Tracer
, "Sent", Sent
);
170 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
171 logRequestSummary("v1/Lookup", Sent
, StartTime
);
172 return grpc::Status::OK
;
175 grpc::Status
FuzzyFind(grpc::ServerContext
*Context
,
176 const FuzzyFindRequest
*Request
,
177 grpc::ServerWriter
<FuzzyFindReply
> *Reply
) override
{
178 auto StartTime
= stopwatch::now();
179 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
180 logRequest(*Request
);
181 trace::Span
Tracer("FuzzyFindRequest");
182 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
184 elog("Can not parse FuzzyFindRequest from protobuf: {0}",
186 return grpc::Status::CANCELLED
;
188 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
189 log("[public] Limiting result size for FuzzyFind request from {0} to {1}",
190 Req
->Limit
, LimitResults
);
191 Req
->Limit
= LimitResults
;
194 unsigned FailedToSend
= 0;
195 bool HasMore
= Index
.fuzzyFind(*Req
, [&](const clangd::Symbol
&Item
) {
196 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
197 if (!SerializedItem
) {
198 elog("Unable to convert Symbol to protobuf: {0}",
199 SerializedItem
.takeError());
203 FuzzyFindReply NextMessage
;
204 *NextMessage
.mutable_stream_result() = *SerializedItem
;
205 logResponse(NextMessage
);
206 Reply
->Write(NextMessage
);
209 FuzzyFindReply LastMessage
;
210 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
211 logResponse(LastMessage
);
212 Reply
->Write(LastMessage
);
213 SPAN_ATTACH(Tracer
, "Sent", Sent
);
214 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
215 logRequestSummary("v1/FuzzyFind", Sent
, StartTime
);
216 return grpc::Status::OK
;
219 grpc::Status
Refs(grpc::ServerContext
*Context
, const RefsRequest
*Request
,
220 grpc::ServerWriter
<RefsReply
> *Reply
) override
{
221 auto StartTime
= stopwatch::now();
222 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
223 logRequest(*Request
);
224 trace::Span
Tracer("RefsRequest");
225 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
227 elog("Can not parse RefsRequest from protobuf: {0}", Req
.takeError());
228 return grpc::Status::CANCELLED
;
230 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
231 log("[public] Limiting result size for Refs request from {0} to {1}.",
232 Req
->Limit
, LimitResults
);
233 Req
->Limit
= LimitResults
;
236 unsigned FailedToSend
= 0;
237 bool HasMore
= Index
.refs(*Req
, [&](const clangd::Ref
&Item
) {
238 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
239 if (!SerializedItem
) {
240 elog("Unable to convert Ref to protobuf: {0}",
241 SerializedItem
.takeError());
245 RefsReply NextMessage
;
246 *NextMessage
.mutable_stream_result() = *SerializedItem
;
247 logResponse(NextMessage
);
248 Reply
->Write(NextMessage
);
251 RefsReply LastMessage
;
252 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
253 logResponse(LastMessage
);
254 Reply
->Write(LastMessage
);
255 SPAN_ATTACH(Tracer
, "Sent", Sent
);
256 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
257 logRequestSummary("v1/Refs", Sent
, StartTime
);
258 return grpc::Status::OK
;
262 ContainedRefs(grpc::ServerContext
*Context
,
263 const ContainedRefsRequest
*Request
,
264 grpc::ServerWriter
<ContainedRefsReply
> *Reply
) override
{
265 auto StartTime
= stopwatch::now();
266 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
267 logRequest(*Request
);
268 trace::Span
Tracer("ContainedRefsRequest");
269 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
271 elog("Can not parse ContainedRefsRequest from protobuf: {0}",
273 return grpc::Status::CANCELLED
;
275 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
276 log("[public] Limiting result size for ContainedRefs request from {0} to "
278 Req
->Limit
, LimitResults
);
279 Req
->Limit
= LimitResults
;
282 unsigned FailedToSend
= 0;
284 Index
.containedRefs(*Req
, [&](const clangd::ContainedRefsResult
&Item
) {
285 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
286 if (!SerializedItem
) {
287 elog("Unable to convert ContainedRefsResult to protobuf: {0}",
288 SerializedItem
.takeError());
292 ContainedRefsReply NextMessage
;
293 *NextMessage
.mutable_stream_result() = *SerializedItem
;
294 logResponse(NextMessage
);
295 Reply
->Write(NextMessage
);
298 ContainedRefsReply LastMessage
;
299 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
300 logResponse(LastMessage
);
301 Reply
->Write(LastMessage
);
302 SPAN_ATTACH(Tracer
, "Sent", Sent
);
303 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
304 logRequestSummary("v1/ContainedRefs", Sent
, StartTime
);
305 return grpc::Status::OK
;
308 grpc::Status
Relations(grpc::ServerContext
*Context
,
309 const RelationsRequest
*Request
,
310 grpc::ServerWriter
<RelationsReply
> *Reply
) override
{
311 auto StartTime
= stopwatch::now();
312 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
313 logRequest(*Request
);
314 trace::Span
Tracer("RelationsRequest");
315 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
317 elog("Can not parse RelationsRequest from protobuf: {0}",
319 return grpc::Status::CANCELLED
;
321 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
322 log("[public] Limiting result size for Relations request from {0} to "
324 Req
->Limit
, LimitResults
);
325 Req
->Limit
= LimitResults
;
328 unsigned FailedToSend
= 0;
330 *Req
, [&](const SymbolID
&Subject
, const clangd::Symbol
&Object
) {
331 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Subject
, Object
);
332 if (!SerializedItem
) {
333 elog("Unable to convert Relation to protobuf: {0}",
334 SerializedItem
.takeError());
338 RelationsReply NextMessage
;
339 *NextMessage
.mutable_stream_result() = *SerializedItem
;
340 logResponse(NextMessage
);
341 Reply
->Write(NextMessage
);
344 RelationsReply LastMessage
;
345 LastMessage
.mutable_final_result()->set_has_more(true);
346 logResponse(LastMessage
);
347 Reply
->Write(LastMessage
);
348 SPAN_ATTACH(Tracer
, "Sent", Sent
);
349 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
350 logRequestSummary("v1/Relations", Sent
, StartTime
);
351 return grpc::Status::OK
;
354 // Proxy object to allow proto messages to be lazily serialized as text.
356 const google::protobuf::Message
&M
;
357 friend llvm::raw_ostream
&operator<<(llvm::raw_ostream
&OS
,
358 const TextProto
&P
) {
359 return OS
<< P
.M
.DebugString();
363 void logRequest(const google::protobuf::Message
&M
) {
364 vlog("<<< {0}\n{1}", M
.GetDescriptor()->name(), TextProto
{M
});
366 void logResponse(const google::protobuf::Message
&M
) {
367 vlog(">>> {0}\n{1}", M
.GetDescriptor()->name(), TextProto
{M
});
369 void logRequestSummary(llvm::StringLiteral RequestName
, unsigned Sent
,
370 stopwatch::time_point StartTime
) {
371 auto Duration
= stopwatch::now() - StartTime
;
373 std::chrono::duration_cast
<std::chrono::milliseconds
>(Duration
).count();
374 log("[public] request {0} => OK: {1} results in {2}ms", RequestName
, Sent
,
378 std::unique_ptr
<Marshaller
> ProtobufMarshaller
;
379 clangd::SymbolIndex
&Index
;
382 class Monitor final
: public v1::Monitor::Service
{
384 Monitor(llvm::sys::TimePoint
<> IndexAge
)
385 : StartTime(std::chrono::system_clock::now()), IndexBuildTime(IndexAge
) {}
387 void updateIndex(llvm::sys::TimePoint
<> UpdateTime
) {
388 IndexBuildTime
.exchange(UpdateTime
);
392 // FIXME(kirillbobyrev): Most fields should be populated when the index
393 // reloads (probably in adjacent metadata.txt file next to loaded .idx) but
394 // they aren't right now.
395 grpc::Status
MonitoringInfo(grpc::ServerContext
*Context
,
396 const v1::MonitoringInfoRequest
*Request
,
397 v1::MonitoringInfoReply
*Reply
) override
{
398 Reply
->set_uptime_seconds(std::chrono::duration_cast
<std::chrono::seconds
>(
399 std::chrono::system_clock::now() - StartTime
)
401 // FIXME(kirillbobyrev): We are currently making use of the last
402 // modification time of the index artifact to deduce its age. This is wrong
403 // as it doesn't account for the indexing delay. Propagate some metadata
404 // with the index artifacts to indicate time of the commit we indexed.
405 Reply
->set_index_age_seconds(
406 std::chrono::duration_cast
<std::chrono::seconds
>(
407 std::chrono::system_clock::now() - IndexBuildTime
.load())
409 return grpc::Status::OK
;
412 const llvm::sys::TimePoint
<> StartTime
;
413 std::atomic
<llvm::sys::TimePoint
<>> IndexBuildTime
;
416 void maybeTrimMemory() {
417 #if defined(__GLIBC__) && CLANGD_MALLOC_TRIM
422 // Detect changes in \p IndexPath file and load new versions of the index
423 // whenever they become available.
424 void hotReload(clangd::SwapIndex
&Index
, llvm::StringRef IndexPath
,
425 llvm::vfs::Status
&LastStatus
,
426 llvm::IntrusiveRefCntPtr
<llvm::vfs::FileSystem
> &FS
,
428 // glibc malloc doesn't shrink an arena if there are items living at the end,
429 // which might happen since we destroy the old index after building new one.
430 // Trim more aggresively to keep memory usage of the server low.
431 // Note that we do it deliberately here rather than after Index.reset(),
432 // because old index might still be kept alive after the reset call if we are
435 auto Status
= FS
->status(IndexPath
);
436 // Requested file is same as loaded index: no reload is needed.
437 if (!Status
|| (Status
->getLastModificationTime() ==
438 LastStatus
.getLastModificationTime() &&
439 Status
->getSize() == LastStatus
.getSize()))
441 vlog("Found different index version: existing index was modified at "
442 "{0}, new index was modified at {1}. Attempting to reload.",
443 LastStatus
.getLastModificationTime(), Status
->getLastModificationTime());
444 LastStatus
= *Status
;
445 std::unique_ptr
<clang::clangd::SymbolIndex
> NewIndex
=
446 loadIndex(IndexPath
, SymbolOrigin::Static
, /*UseDex=*/true,
447 /*SupportContainedRefs=*/true);
449 elog("Failed to load new index. Old index will be served.");
452 Index
.reset(std::move(NewIndex
));
453 Monitor
.updateIndex(Status
->getLastModificationTime());
454 log("New index version loaded. Last modification time: {0}, size: {1} bytes.",
455 Status
->getLastModificationTime(), Status
->getSize());
458 void runServerAndWait(clangd::SymbolIndex
&Index
, llvm::StringRef ServerAddress
,
459 llvm::StringRef IndexPath
, Monitor
&Monitor
) {
460 RemoteIndexServer
Service(Index
, IndexRoot
);
462 grpc::EnableDefaultHealthCheckService(true);
463 #if ENABLE_GRPC_REFLECTION
464 grpc::reflection::InitProtoReflectionServerBuilderPlugin();
466 grpc::ServerBuilder Builder
;
467 Builder
.AddListeningPort(ServerAddress
.str(),
468 grpc::InsecureServerCredentials());
469 Builder
.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS
,
470 IdleTimeoutSeconds
* 1000);
471 Builder
.RegisterService(&Service
);
472 Builder
.RegisterService(&Monitor
);
473 std::unique_ptr
<grpc::Server
> Server(Builder
.BuildAndStart());
474 log("Server listening on {0}", ServerAddress
);
476 std::thread
ServerShutdownWatcher([&]() {
477 static constexpr auto WatcherFrequency
= std::chrono::seconds(5);
478 while (!clang::clangd::shutdownRequested())
479 std::this_thread::sleep_for(WatcherFrequency
);
484 ServerShutdownWatcher
.join();
487 std::unique_ptr
<Logger
> makeLogger(llvm::StringRef LogPrefix
,
488 llvm::raw_ostream
&OS
) {
489 std::unique_ptr
<Logger
> Base
;
492 // - messages outside the scope of a request: log fully
493 // - messages tagged [public]: log fully
494 // - errors: log the format string
496 class RedactedLogger
: public StreamLogger
{
498 using StreamLogger::StreamLogger
;
499 void log(Level L
, const char *Fmt
,
500 const llvm::formatv_object_base
&Message
) override
{
501 if (Context::current().get(CurrentRequest
) == nullptr ||
502 llvm::StringRef(Fmt
).starts_with("[public]"))
503 return StreamLogger::log(L
, Fmt
, Message
);
505 return StreamLogger::log(L
, Fmt
,
506 llvm::formatv("[redacted] {0}", Fmt
));
509 Base
= std::make_unique
<RedactedLogger
>(OS
, LogLevel
);
511 Base
= std::make_unique
<StreamLogger
>(OS
, LogLevel
);
514 if (LogPrefix
.empty())
516 class PrefixedLogger
: public Logger
{
517 std::string LogPrefix
;
518 std::unique_ptr
<Logger
> Base
;
521 PrefixedLogger(llvm::StringRef LogPrefix
, std::unique_ptr
<Logger
> Base
)
522 : LogPrefix(LogPrefix
.str()), Base(std::move(Base
)) {}
523 void log(Level L
, const char *Fmt
,
524 const llvm::formatv_object_base
&Message
) override
{
525 Base
->log(L
, Fmt
, llvm::formatv("[{0}] {1}", LogPrefix
, Message
));
528 return std::make_unique
<PrefixedLogger
>(LogPrefix
, std::move(Base
));
532 } // namespace remote
533 } // namespace clangd
536 using clang::clangd::elog
;
538 int main(int argc
, char *argv
[]) {
539 using namespace clang::clangd::remote
;
540 llvm::cl::ParseCommandLineOptions(argc
, argv
, Overview
);
541 llvm::sys::PrintStackTraceOnErrorSignal(argv
[0]);
542 llvm::sys::SetInterruptFunction(&clang::clangd::requestShutdown
);
544 if (!llvm::sys::path::is_absolute(IndexRoot
)) {
545 llvm::errs() << "Index root should be an absolute path.\n";
549 llvm::errs().SetBuffered();
550 auto Logger
= makeLogger(LogPrefix
.getValue(), llvm::errs());
551 clang::clangd::LoggingSession
LoggingSession(*Logger
);
553 std::optional
<llvm::raw_fd_ostream
> TracerStream
;
554 std::unique_ptr
<clang::clangd::trace::EventTracer
> Tracer
;
555 if (!TraceFile
.empty()) {
557 TracerStream
.emplace(TraceFile
, EC
,
558 llvm::sys::fs::FA_Read
| llvm::sys::fs::FA_Write
);
560 TracerStream
.reset();
561 elog("Error while opening trace file {0}: {1}", TraceFile
, EC
.message());
563 // FIXME(kirillbobyrev): Also create metrics tracer to track latency and
564 // accumulate other request statistics.
565 Tracer
= clang::clangd::trace::createJSONTracer(*TracerStream
,
566 /*PrettyPrint=*/false);
567 clang::clangd::vlog("Successfully created a tracer.");
571 std::optional
<clang::clangd::trace::Session
> TracingSession
;
573 TracingSession
.emplace(*Tracer
);
575 clang::clangd::RealThreadsafeFS TFS
;
576 auto FS
= TFS
.view(std::nullopt
);
577 auto Status
= FS
->status(IndexPath
);
579 elog("{0} does not exist.", IndexPath
);
580 return Status
.getError().value();
583 auto SymIndex
= clang::clangd::loadIndex(
584 IndexPath
, clang::clangd::SymbolOrigin::Static
, /*UseDex=*/true,
585 /*SupportContainedRefs=*/true);
587 llvm::errs() << "Failed to open the index.\n";
590 clang::clangd::SwapIndex
Index(std::move(SymIndex
));
592 Monitor
Monitor(Status
->getLastModificationTime());
594 std::thread
HotReloadThread([&Index
, &Status
, &FS
, &Monitor
]() {
595 llvm::vfs::Status LastStatus
= *Status
;
596 static constexpr auto RefreshFrequency
= std::chrono::seconds(30);
597 while (!clang::clangd::shutdownRequested()) {
598 hotReload(Index
, llvm::StringRef(IndexPath
), LastStatus
, FS
, Monitor
);
599 std::this_thread::sleep_for(RefreshFrequency
);
603 runServerAndWait(Index
, ServerAddress
, IndexPath
, Monitor
);
605 HotReloadThread
.join();