1 //===--- Server.cpp - gRPC-based Remote Index Server ---------------------===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
11 #include "MonitoringService.grpc.pb.h"
12 #include "MonitoringService.pb.h"
13 #include "Service.grpc.pb.h"
14 #include "Service.pb.h"
15 #include "index/Index.h"
16 #include "index/Serialization.h"
17 #include "index/Symbol.h"
18 #include "index/remote/marshalling/Marshalling.h"
19 #include "support/Context.h"
20 #include "support/Logger.h"
21 #include "support/Shutdown.h"
22 #include "support/ThreadsafeFS.h"
23 #include "support/Trace.h"
24 #include "llvm/ADT/IntrusiveRefCntPtr.h"
25 #include "llvm/ADT/StringRef.h"
26 #include "llvm/Support/Chrono.h"
27 #include "llvm/Support/CommandLine.h"
28 #include "llvm/Support/Error.h"
29 #include "llvm/Support/FileSystem.h"
30 #include "llvm/Support/FormatVariadic.h"
31 #include "llvm/Support/Path.h"
32 #include "llvm/Support/Signals.h"
33 #include "llvm/Support/VirtualFileSystem.h"
36 #include <grpc++/grpc++.h>
37 #include <grpc++/health_check_service_interface.h>
44 #if ENABLE_GRPC_REFLECTION
45 #include <grpc++/ext/proto_server_reflection_plugin.h>
57 static constexpr char Overview
[] = R
"(
58 This is an experimental remote index implementation. The server opens Dex and
59 awaits gRPC lookup requests from the client.
62 llvm::cl::opt
<std::string
> IndexPath(llvm::cl::desc("<INDEX FILE>"),
63 llvm::cl::Positional
, llvm::cl::Required
);
65 llvm::cl::opt
<std::string
> IndexRoot(llvm::cl::desc("<PROJECT ROOT>"),
66 llvm::cl::Positional
, llvm::cl::Required
);
68 llvm::cl::opt
<Logger::Level
> LogLevel
{
70 llvm::cl::desc("Verbosity of log messages written to stderr"),
71 values(clEnumValN(Logger::Error
, "error", "Error messages only"),
72 clEnumValN(Logger::Info
, "info", "High level execution tracing"),
73 clEnumValN(Logger::Debug
, "verbose", "Low level details")),
74 llvm::cl::init(Logger::Info
),
77 llvm::cl::opt
<bool> LogPublic
{
79 llvm::cl::desc("Avoid logging potentially-sensitive request details"),
80 llvm::cl::init(false),
83 llvm::cl::opt
<std::string
> LogPrefix
{
85 llvm::cl::desc("A string that'll be prepended to all log statements. "
86 "Useful when running multiple instances on same host."),
89 llvm::cl::opt
<std::string
> TraceFile(
91 llvm::cl::desc("Path to the file where tracer logs will be stored"));
93 llvm::cl::opt
<bool> PrettyPrint
{
95 llvm::cl::desc("Pretty-print JSON output in the trace"),
96 llvm::cl::init(false),
99 llvm::cl::opt
<std::string
> ServerAddress(
100 "server-address", llvm::cl::init("0.0.0.0:50051"),
101 llvm::cl::desc("Address of the invoked server. Defaults to 0.0.0.0:50051"));
103 llvm::cl::opt
<size_t> IdleTimeoutSeconds(
104 "idle-timeout", llvm::cl::init(8 * 60),
105 llvm::cl::desc("Maximum time a channel may stay idle until server closes "
106 "the connection, in seconds. Defaults to 480."));
108 llvm::cl::opt
<size_t> LimitResults(
109 "limit-results", llvm::cl::init(10000),
110 llvm::cl::desc("Maximum number of results to stream as a response to "
111 "single request. Limit is to keep the server from being "
112 "DOS'd. Defaults to 10000."));
114 static Key
<grpc::ServerContext
*> CurrentRequest
;
116 class RemoteIndexServer final
: public v1::SymbolIndex::Service
{
118 RemoteIndexServer(clangd::SymbolIndex
&Index
, llvm::StringRef IndexRoot
)
120 llvm::SmallString
<256> NativePath
= IndexRoot
;
121 llvm::sys::path::native(NativePath
);
122 ProtobufMarshaller
= std::unique_ptr
<Marshaller
>(new Marshaller(
123 /*RemoteIndexRoot=*/llvm::StringRef(NativePath
),
124 /*LocalIndexRoot=*/""));
128 using stopwatch
= std::chrono::steady_clock
;
130 grpc::Status
Lookup(grpc::ServerContext
*Context
,
131 const LookupRequest
*Request
,
132 grpc::ServerWriter
<LookupReply
> *Reply
) override
{
133 auto StartTime
= stopwatch::now();
134 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
135 logRequest(*Request
);
136 trace::Span
Tracer("LookupRequest");
137 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
139 elog("Can not parse LookupRequest from protobuf: {0}", Req
.takeError());
140 return grpc::Status::CANCELLED
;
143 unsigned FailedToSend
= 0;
144 bool HasMore
= false;
145 Index
.lookup(*Req
, [&](const clangd::Symbol
&Item
) {
146 if (Sent
>= LimitResults
) {
150 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
151 if (!SerializedItem
) {
152 elog("Unable to convert Symbol to protobuf: {0}",
153 SerializedItem
.takeError());
157 LookupReply NextMessage
;
158 *NextMessage
.mutable_stream_result() = *SerializedItem
;
159 logResponse(NextMessage
);
160 Reply
->Write(NextMessage
);
164 log("[public] Limiting result size for Lookup request.");
165 LookupReply LastMessage
;
166 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
167 logResponse(LastMessage
);
168 Reply
->Write(LastMessage
);
169 SPAN_ATTACH(Tracer
, "Sent", Sent
);
170 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
171 logRequestSummary("v1/Lookup", Sent
, StartTime
);
172 return grpc::Status::OK
;
175 grpc::Status
FuzzyFind(grpc::ServerContext
*Context
,
176 const FuzzyFindRequest
*Request
,
177 grpc::ServerWriter
<FuzzyFindReply
> *Reply
) override
{
178 auto StartTime
= stopwatch::now();
179 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
180 logRequest(*Request
);
181 trace::Span
Tracer("FuzzyFindRequest");
182 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
184 elog("Can not parse FuzzyFindRequest from protobuf: {0}",
186 return grpc::Status::CANCELLED
;
188 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
189 log("[public] Limiting result size for FuzzyFind request from {0} to {1}",
190 Req
->Limit
, LimitResults
);
191 Req
->Limit
= LimitResults
;
194 unsigned FailedToSend
= 0;
195 bool HasMore
= Index
.fuzzyFind(*Req
, [&](const clangd::Symbol
&Item
) {
196 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
197 if (!SerializedItem
) {
198 elog("Unable to convert Symbol to protobuf: {0}",
199 SerializedItem
.takeError());
203 FuzzyFindReply NextMessage
;
204 *NextMessage
.mutable_stream_result() = *SerializedItem
;
205 logResponse(NextMessage
);
206 Reply
->Write(NextMessage
);
209 FuzzyFindReply LastMessage
;
210 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
211 logResponse(LastMessage
);
212 Reply
->Write(LastMessage
);
213 SPAN_ATTACH(Tracer
, "Sent", Sent
);
214 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
215 logRequestSummary("v1/FuzzyFind", Sent
, StartTime
);
216 return grpc::Status::OK
;
219 grpc::Status
Refs(grpc::ServerContext
*Context
, const RefsRequest
*Request
,
220 grpc::ServerWriter
<RefsReply
> *Reply
) override
{
221 auto StartTime
= stopwatch::now();
222 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
223 logRequest(*Request
);
224 trace::Span
Tracer("RefsRequest");
225 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
227 elog("Can not parse RefsRequest from protobuf: {0}", Req
.takeError());
228 return grpc::Status::CANCELLED
;
230 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
231 log("[public] Limiting result size for Refs request from {0} to {1}.",
232 Req
->Limit
, LimitResults
);
233 Req
->Limit
= LimitResults
;
236 unsigned FailedToSend
= 0;
237 bool HasMore
= Index
.refs(*Req
, [&](const clangd::Ref
&Item
) {
238 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Item
);
239 if (!SerializedItem
) {
240 elog("Unable to convert Ref to protobuf: {0}",
241 SerializedItem
.takeError());
245 RefsReply NextMessage
;
246 *NextMessage
.mutable_stream_result() = *SerializedItem
;
247 logResponse(NextMessage
);
248 Reply
->Write(NextMessage
);
251 RefsReply LastMessage
;
252 LastMessage
.mutable_final_result()->set_has_more(HasMore
);
253 logResponse(LastMessage
);
254 Reply
->Write(LastMessage
);
255 SPAN_ATTACH(Tracer
, "Sent", Sent
);
256 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
257 logRequestSummary("v1/Refs", Sent
, StartTime
);
258 return grpc::Status::OK
;
261 grpc::Status
Relations(grpc::ServerContext
*Context
,
262 const RelationsRequest
*Request
,
263 grpc::ServerWriter
<RelationsReply
> *Reply
) override
{
264 auto StartTime
= stopwatch::now();
265 WithContextValue
WithRequestContext(CurrentRequest
, Context
);
266 logRequest(*Request
);
267 trace::Span
Tracer("RelationsRequest");
268 auto Req
= ProtobufMarshaller
->fromProtobuf(Request
);
270 elog("Can not parse RelationsRequest from protobuf: {0}",
272 return grpc::Status::CANCELLED
;
274 if (!Req
->Limit
|| *Req
->Limit
> LimitResults
) {
275 log("[public] Limiting result size for Relations request from {0} to "
277 Req
->Limit
, LimitResults
);
278 Req
->Limit
= LimitResults
;
281 unsigned FailedToSend
= 0;
283 *Req
, [&](const SymbolID
&Subject
, const clangd::Symbol
&Object
) {
284 auto SerializedItem
= ProtobufMarshaller
->toProtobuf(Subject
, Object
);
285 if (!SerializedItem
) {
286 elog("Unable to convert Relation to protobuf: {0}",
287 SerializedItem
.takeError());
291 RelationsReply NextMessage
;
292 *NextMessage
.mutable_stream_result() = *SerializedItem
;
293 logResponse(NextMessage
);
294 Reply
->Write(NextMessage
);
297 RelationsReply LastMessage
;
298 LastMessage
.mutable_final_result()->set_has_more(true);
299 logResponse(LastMessage
);
300 Reply
->Write(LastMessage
);
301 SPAN_ATTACH(Tracer
, "Sent", Sent
);
302 SPAN_ATTACH(Tracer
, "Failed to send", FailedToSend
);
303 logRequestSummary("v1/Relations", Sent
, StartTime
);
304 return grpc::Status::OK
;
307 // Proxy object to allow proto messages to be lazily serialized as text.
309 const google::protobuf::Message
&M
;
310 friend llvm::raw_ostream
&operator<<(llvm::raw_ostream
&OS
,
311 const TextProto
&P
) {
312 return OS
<< P
.M
.DebugString();
316 void logRequest(const google::protobuf::Message
&M
) {
317 vlog("<<< {0}\n{1}", M
.GetDescriptor()->name(), TextProto
{M
});
319 void logResponse(const google::protobuf::Message
&M
) {
320 vlog(">>> {0}\n{1}", M
.GetDescriptor()->name(), TextProto
{M
});
322 void logRequestSummary(llvm::StringLiteral RequestName
, unsigned Sent
,
323 stopwatch::time_point StartTime
) {
324 auto Duration
= stopwatch::now() - StartTime
;
326 std::chrono::duration_cast
<std::chrono::milliseconds
>(Duration
).count();
327 log("[public] request {0} => OK: {1} results in {2}ms", RequestName
, Sent
,
331 std::unique_ptr
<Marshaller
> ProtobufMarshaller
;
332 clangd::SymbolIndex
&Index
;
335 class Monitor final
: public v1::Monitor::Service
{
337 Monitor(llvm::sys::TimePoint
<> IndexAge
)
338 : StartTime(std::chrono::system_clock::now()), IndexBuildTime(IndexAge
) {}
340 void updateIndex(llvm::sys::TimePoint
<> UpdateTime
) {
341 IndexBuildTime
.exchange(UpdateTime
);
345 // FIXME(kirillbobyrev): Most fields should be populated when the index
346 // reloads (probably in adjacent metadata.txt file next to loaded .idx) but
347 // they aren't right now.
348 grpc::Status
MonitoringInfo(grpc::ServerContext
*Context
,
349 const v1::MonitoringInfoRequest
*Request
,
350 v1::MonitoringInfoReply
*Reply
) override
{
351 Reply
->set_uptime_seconds(std::chrono::duration_cast
<std::chrono::seconds
>(
352 std::chrono::system_clock::now() - StartTime
)
354 // FIXME(kirillbobyrev): We are currently making use of the last
355 // modification time of the index artifact to deduce its age. This is wrong
356 // as it doesn't account for the indexing delay. Propagate some metadata
357 // with the index artifacts to indicate time of the commit we indexed.
358 Reply
->set_index_age_seconds(
359 std::chrono::duration_cast
<std::chrono::seconds
>(
360 std::chrono::system_clock::now() - IndexBuildTime
.load())
362 return grpc::Status::OK
;
365 const llvm::sys::TimePoint
<> StartTime
;
366 std::atomic
<llvm::sys::TimePoint
<>> IndexBuildTime
;
369 void maybeTrimMemory() {
370 #if defined(__GLIBC__) && CLANGD_MALLOC_TRIM
375 // Detect changes in \p IndexPath file and load new versions of the index
376 // whenever they become available.
377 void hotReload(clangd::SwapIndex
&Index
, llvm::StringRef IndexPath
,
378 llvm::vfs::Status
&LastStatus
,
379 llvm::IntrusiveRefCntPtr
<llvm::vfs::FileSystem
> &FS
,
381 // glibc malloc doesn't shrink an arena if there are items living at the end,
382 // which might happen since we destroy the old index after building new one.
383 // Trim more aggresively to keep memory usage of the server low.
384 // Note that we do it deliberately here rather than after Index.reset(),
385 // because old index might still be kept alive after the reset call if we are
388 auto Status
= FS
->status(IndexPath
);
389 // Requested file is same as loaded index: no reload is needed.
390 if (!Status
|| (Status
->getLastModificationTime() ==
391 LastStatus
.getLastModificationTime() &&
392 Status
->getSize() == LastStatus
.getSize()))
394 vlog("Found different index version: existing index was modified at "
395 "{0}, new index was modified at {1}. Attempting to reload.",
396 LastStatus
.getLastModificationTime(), Status
->getLastModificationTime());
397 LastStatus
= *Status
;
398 std::unique_ptr
<clang::clangd::SymbolIndex
> NewIndex
=
399 loadIndex(IndexPath
, SymbolOrigin::Static
);
401 elog("Failed to load new index. Old index will be served.");
404 Index
.reset(std::move(NewIndex
));
405 Monitor
.updateIndex(Status
->getLastModificationTime());
406 log("New index version loaded. Last modification time: {0}, size: {1} bytes.",
407 Status
->getLastModificationTime(), Status
->getSize());
410 void runServerAndWait(clangd::SymbolIndex
&Index
, llvm::StringRef ServerAddress
,
411 llvm::StringRef IndexPath
, Monitor
&Monitor
) {
412 RemoteIndexServer
Service(Index
, IndexRoot
);
414 grpc::EnableDefaultHealthCheckService(true);
415 #if ENABLE_GRPC_REFLECTION
416 grpc::reflection::InitProtoReflectionServerBuilderPlugin();
418 grpc::ServerBuilder Builder
;
419 Builder
.AddListeningPort(ServerAddress
.str(),
420 grpc::InsecureServerCredentials());
421 Builder
.AddChannelArgument(GRPC_ARG_MAX_CONNECTION_IDLE_MS
,
422 IdleTimeoutSeconds
* 1000);
423 Builder
.RegisterService(&Service
);
424 Builder
.RegisterService(&Monitor
);
425 std::unique_ptr
<grpc::Server
> Server(Builder
.BuildAndStart());
426 log("Server listening on {0}", ServerAddress
);
428 std::thread
ServerShutdownWatcher([&]() {
429 static constexpr auto WatcherFrequency
= std::chrono::seconds(5);
430 while (!clang::clangd::shutdownRequested())
431 std::this_thread::sleep_for(WatcherFrequency
);
436 ServerShutdownWatcher
.join();
439 std::unique_ptr
<Logger
> makeLogger(llvm::StringRef LogPrefix
,
440 llvm::raw_ostream
&OS
) {
441 std::unique_ptr
<Logger
> Base
;
444 // - messages outside the scope of a request: log fully
445 // - messages tagged [public]: log fully
446 // - errors: log the format string
448 class RedactedLogger
: public StreamLogger
{
450 using StreamLogger::StreamLogger
;
451 void log(Level L
, const char *Fmt
,
452 const llvm::formatv_object_base
&Message
) override
{
453 if (Context::current().get(CurrentRequest
) == nullptr ||
454 llvm::StringRef(Fmt
).startswith("[public]"))
455 return StreamLogger::log(L
, Fmt
, Message
);
457 return StreamLogger::log(L
, Fmt
,
458 llvm::formatv("[redacted] {0}", Fmt
));
461 Base
= std::make_unique
<RedactedLogger
>(OS
, LogLevel
);
463 Base
= std::make_unique
<StreamLogger
>(OS
, LogLevel
);
466 if (LogPrefix
.empty())
468 class PrefixedLogger
: public Logger
{
469 std::string LogPrefix
;
470 std::unique_ptr
<Logger
> Base
;
473 PrefixedLogger(llvm::StringRef LogPrefix
, std::unique_ptr
<Logger
> Base
)
474 : LogPrefix(LogPrefix
.str()), Base(std::move(Base
)) {}
475 void log(Level L
, const char *Fmt
,
476 const llvm::formatv_object_base
&Message
) override
{
477 Base
->log(L
, Fmt
, llvm::formatv("[{0}] {1}", LogPrefix
, Message
));
480 return std::make_unique
<PrefixedLogger
>(LogPrefix
, std::move(Base
));
484 } // namespace remote
485 } // namespace clangd
488 using clang::clangd::elog
;
490 int main(int argc
, char *argv
[]) {
491 using namespace clang::clangd::remote
;
492 llvm::cl::ParseCommandLineOptions(argc
, argv
, Overview
);
493 llvm::sys::PrintStackTraceOnErrorSignal(argv
[0]);
494 llvm::sys::SetInterruptFunction(&clang::clangd::requestShutdown
);
496 if (!llvm::sys::path::is_absolute(IndexRoot
)) {
497 llvm::errs() << "Index root should be an absolute path.\n";
501 llvm::errs().SetBuffered();
502 // Don't flush stdout when logging for thread safety.
503 llvm::errs().tie(nullptr);
504 auto Logger
= makeLogger(LogPrefix
.getValue(), llvm::errs());
505 clang::clangd::LoggingSession
LoggingSession(*Logger
);
507 std::optional
<llvm::raw_fd_ostream
> TracerStream
;
508 std::unique_ptr
<clang::clangd::trace::EventTracer
> Tracer
;
509 if (!TraceFile
.empty()) {
511 TracerStream
.emplace(TraceFile
, EC
,
512 llvm::sys::fs::FA_Read
| llvm::sys::fs::FA_Write
);
514 TracerStream
.reset();
515 elog("Error while opening trace file {0}: {1}", TraceFile
, EC
.message());
517 // FIXME(kirillbobyrev): Also create metrics tracer to track latency and
518 // accumulate other request statistics.
519 Tracer
= clang::clangd::trace::createJSONTracer(*TracerStream
,
520 /*PrettyPrint=*/false);
521 clang::clangd::vlog("Successfully created a tracer.");
525 std::optional
<clang::clangd::trace::Session
> TracingSession
;
527 TracingSession
.emplace(*Tracer
);
529 clang::clangd::RealThreadsafeFS TFS
;
530 auto FS
= TFS
.view(std::nullopt
);
531 auto Status
= FS
->status(IndexPath
);
533 elog("{0} does not exist.", IndexPath
);
534 return Status
.getError().value();
538 clang::clangd::loadIndex(IndexPath
, clang::clangd::SymbolOrigin::Static
);
540 llvm::errs() << "Failed to open the index.\n";
543 clang::clangd::SwapIndex
Index(std::move(SymIndex
));
545 Monitor
Monitor(Status
->getLastModificationTime());
547 std::thread
HotReloadThread([&Index
, &Status
, &FS
, &Monitor
]() {
548 llvm::vfs::Status LastStatus
= *Status
;
549 static constexpr auto RefreshFrequency
= std::chrono::seconds(30);
550 while (!clang::clangd::shutdownRequested()) {
551 hotReload(Index
, llvm::StringRef(IndexPath
), LastStatus
, FS
, Monitor
);
552 std::this_thread::sleep_for(RefreshFrequency
);
556 runServerAndWait(Index
, ServerAddress
, IndexPath
, Monitor
);
558 HotReloadThread
.join();