1 //===-- Shared memory RPC client / server interface -------------*- C++ -*-===//
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
7 //===----------------------------------------------------------------------===//
9 // This file implements a remote procedure call mechanism to communicate between
10 // heterogeneous devices that can share an address space atomically. We provide
11 // a client and a server to facilitate the remote call. The client makes request
12 // to the server using a shared communication channel. We use separate atomic
13 // signals to indicate which side, the client or the server is in ownership of
16 //===----------------------------------------------------------------------===//
18 #ifndef LLVM_LIBC_SRC_SUPPORT_RPC_RPC_H
19 #define LLVM_LIBC_SRC_SUPPORT_RPC_RPC_H
22 #include "src/__support/CPP/atomic.h"
23 #include "src/__support/CPP/functional.h"
24 #include "src/__support/CPP/optional.h"
25 #include "src/__support/GPU/utils.h"
26 #include "src/string/memory_utils/memcpy_implementations.h"
30 namespace __llvm_libc
{
33 /// A fixed size channel used to communicate between the RPC client and server.
37 static_assert(sizeof(Buffer
) == 64, "Buffer size mismatch");
39 /// The information associated with a packet. This indicates which operations to
40 /// perform and which threads are active in the slots.
46 /// The data payload for the associated packet. We provide enough space for each
47 /// thread in the cooperating lane to have a buffer.
48 template <uint32_t lane_size
= gpu::LANE_SIZE
> struct Payload
{
49 Buffer slot
[lane_size
];
52 /// A packet used to share data between the client and server across an entire
53 /// lane. We use a lane as the minimum granularity for execution.
54 template <uint32_t lane_size
= gpu::LANE_SIZE
> struct alignas(64) Packet
{
56 Payload
<lane_size
> payload
;
59 // TODO: This should be configured by the server and passed in. The general rule
60 // of thumb is that you should have at least as many ports as possible
61 // concurrent work items on the GPU to mitigate the lack offorward
62 // progress guarantees on the GPU.
63 constexpr uint64_t DEFAULT_PORT_COUNT
= 64;
65 /// A common process used to synchronize communication between a client and a
66 /// server. The process contains a read-only inbox and a write-only outbox used
67 /// for signaling ownership of the shared buffer between both sides. We assign
68 /// ownership of the buffer to the client if the inbox and outbox bits match,
69 /// otherwise it is owned by the server.
71 /// This process is designed to allow the client and the server to exchange data
72 /// using a fixed size packet in a mostly arbitrary order using the 'send' and
73 /// 'recv' operations. The following restrictions to this scheme apply:
74 /// - The client will always start with a 'send' operation.
75 /// - The server will always start with a 'recv' operation.
76 /// - Every 'send' or 'recv' call is mirrored by the other process.
77 template <bool Invert
, typename Packet
> struct Process
{
78 LIBC_INLINE
Process() = default;
79 LIBC_INLINE
Process(const Process
&) = delete;
80 LIBC_INLINE Process
&operator=(const Process
&) = delete;
81 LIBC_INLINE
Process(Process
&&) = default;
82 LIBC_INLINE Process
&operator=(Process
&&) = default;
83 LIBC_INLINE
~Process() = default;
86 cpp::Atomic
<uint32_t> *inbox
;
87 cpp::Atomic
<uint32_t> *outbox
;
90 cpp::Atomic
<uint32_t> lock
[DEFAULT_PORT_COUNT
] = {0};
92 /// Initialize the communication channels.
93 LIBC_INLINE
void reset(uint64_t port_count
, void *buffer
) {
94 this->port_count
= port_count
;
95 this->inbox
= reinterpret_cast<cpp::Atomic
<uint32_t> *>(
96 advance(buffer
, inbox_offset(port_count
)));
97 this->outbox
= reinterpret_cast<cpp::Atomic
<uint32_t> *>(
98 advance(buffer
, outbox_offset(port_count
)));
100 reinterpret_cast<Packet
*>(advance(buffer
, buffer_offset(port_count
)));
103 /// Returns the beginning of the unified buffer. Intended for initializing the
104 /// client after the server has been started.
105 LIBC_INLINE
void *get_buffer_start() const { return Invert
? outbox
: inbox
; }
107 /// Allocate a memory buffer sufficient to store the following equivalent
108 /// representation in memory.
110 /// struct Equivalent {
111 /// Atomic<uint32_t> primary[port_count];
112 /// Atomic<uint32_t> secondary[port_count];
113 /// Packet buffer[port_count];
115 LIBC_INLINE
static constexpr uint64_t allocation_size(uint64_t port_count
) {
116 return buffer_offset(port_count
) + buffer_bytes(port_count
);
119 /// Retrieve the inbox state from memory shared between processes.
120 LIBC_INLINE
uint32_t load_inbox(uint64_t index
) {
121 return inbox
[index
].load(cpp::MemoryOrder::RELAXED
);
124 /// Retrieve the outbox state from memory shared between processes.
125 LIBC_INLINE
uint32_t load_outbox(uint64_t index
) {
126 return outbox
[index
].load(cpp::MemoryOrder::RELAXED
);
129 /// Signal to the other process that this one is finished with the buffer.
130 /// Equivalent to loading outbox followed by store of the inverted value
131 /// The outbox is write only by this warp and tracking the value locally is
132 /// cheaper than calling load_outbox to get the value to store.
133 LIBC_INLINE
uint32_t invert_outbox(uint64_t index
, uint32_t current_outbox
) {
134 uint32_t inverted_outbox
= !current_outbox
;
135 atomic_thread_fence(cpp::MemoryOrder::RELEASE
);
136 outbox
[index
].store(inverted_outbox
, cpp::MemoryOrder::RELAXED
);
137 return inverted_outbox
;
140 // Given the current outbox and inbox values, wait until the inbox changes
141 // to indicate that this thread owns the buffer element.
142 LIBC_INLINE
void wait_for_ownership(uint64_t index
, uint32_t outbox
,
144 while (buffer_unavailable(in
, outbox
)) {
146 in
= load_inbox(index
);
148 atomic_thread_fence(cpp::MemoryOrder::ACQUIRE
);
151 /// Determines if this process needs to wait for ownership of the buffer. We
152 /// invert the condition on one of the processes to indicate that if one
153 /// process owns the buffer then the other does not.
154 LIBC_INLINE
static bool buffer_unavailable(uint32_t in
, uint32_t out
) {
155 bool cond
= in
!= out
;
156 return Invert
? !cond
: cond
;
159 /// Attempt to claim the lock at index. Return true on lock taken.
160 /// lane_mask is a bitmap of the threads in the warp that would hold the
161 /// single lock on success, e.g. the result of gpu::get_lane_mask()
162 /// The lock is held when the zeroth bit of the uint32_t at lock[index]
163 /// is set, and available when that bit is clear. Bits [1, 32) are zero.
164 /// Or with one is a no-op when the lock is already held.
165 [[clang::convergent
]] LIBC_INLINE
bool try_lock(uint64_t lane_mask
,
167 // On amdgpu, test and set to lock[index] and a sync_lane would suffice
168 // On volta, need to handle differences between the threads running and
169 // the threads that were detected in the previous call to get_lane_mask()
171 // All threads in lane_mask try to claim the lock. At most one can succeed.
172 // There may be threads active which are not in lane mask which must not
173 // succeed in taking the lock, as otherwise it will leak. This is handled
174 // by making threads which are not in lane_mask or with 0, a no-op.
175 uint32_t id
= gpu::get_lane_id();
176 bool id_in_lane_mask
= lane_mask
& (1ul << id
);
178 // All threads in the warp call fetch_or. Possibly at the same time.
180 lock
[index
].fetch_or(id_in_lane_mask
, cpp::MemoryOrder::RELAXED
);
181 uint64_t packed
= gpu::ballot(lane_mask
, before
);
183 // If every bit set in lane_mask is also set in packed, every single thread
184 // in the warp failed to get the lock. Ballot returns unset for threads not
187 // Cases, per thread:
188 // mask==0 -> unspecified before, discarded by ballot -> 0
189 // mask==1 and before==0 (success), set zero by ballot -> 0
190 // mask==1 and before==1 (failure), set one by ballot -> 1
192 // mask != packed implies at least one of the threads got the lock
193 // atomic semantics of fetch_or mean at most one of the threads for the lock
195 // If holding the lock then the caller can load values knowing said loads
196 // won't move past the lock. No such guarantee is needed if the lock acquire
197 // failed. This conditional branch is expected to fold in the caller after
198 // inlining the current function.
199 bool holding_lock
= lane_mask
!= packed
;
201 atomic_thread_fence(cpp::MemoryOrder::ACQUIRE
);
205 /// Unlock the lock at index. We need a lane sync to keep this function
206 /// convergent, otherwise the compiler will sink the store and deadlock.
207 [[clang::convergent
]] LIBC_INLINE
void unlock(uint64_t lane_mask
,
209 // Do not move any writes past the unlock
210 atomic_thread_fence(cpp::MemoryOrder::RELEASE
);
212 // Wait for other threads in the warp to finish using the lock
213 gpu::sync_lane(lane_mask
);
215 // Use exactly one thread to clear the bit at position 0 in lock[index]
216 // Must restrict to a single thread to avoid one thread dropping the lock,
217 // then an unrelated warp claiming the lock, then a second thread in this
218 // warp dropping the lock again.
219 uint32_t and_mask
= ~(rpc::is_first_lane(lane_mask
) ? 1 : 0);
220 lock
[index
].fetch_and(and_mask
, cpp::MemoryOrder::RELAXED
);
221 gpu::sync_lane(lane_mask
);
224 /// Number of bytes to allocate for an inbox or outbox.
225 LIBC_INLINE
static constexpr uint64_t mailbox_bytes(uint64_t port_count
) {
226 return port_count
* sizeof(cpp::Atomic
<uint32_t>);
229 /// Number of bytes to allocate for the buffer containing the packets.
230 LIBC_INLINE
static constexpr uint64_t buffer_bytes(uint64_t port_count
) {
231 return port_count
* sizeof(Packet
);
234 /// Offset of the inbox in memory. This is the same as the outbox if inverted.
235 LIBC_INLINE
static constexpr uint64_t inbox_offset(uint64_t port_count
) {
236 return Invert
? mailbox_bytes(port_count
) : 0;
239 /// Offset of the outbox in memory. This is the same as the inbox if inverted.
240 LIBC_INLINE
static constexpr uint64_t outbox_offset(uint64_t port_count
) {
241 return Invert
? 0 : mailbox_bytes(port_count
);
244 /// Offset of the buffer containing the packets after the inbox and outbox.
245 LIBC_INLINE
static constexpr uint64_t buffer_offset(uint64_t port_count
) {
246 return align_up(2 * mailbox_bytes(port_count
), alignof(Packet
));
250 /// Invokes a function accross every active buffer across the total lane size.
251 template <uint32_t lane_size
>
252 static LIBC_INLINE
void invoke_rpc(cpp::function
<void(Buffer
*)> fn
,
253 Packet
<lane_size
> &packet
) {
254 if constexpr (is_process_gpu()) {
255 fn(&packet
.payload
.slot
[gpu::get_lane_id()]);
257 for (uint32_t i
= 0; i
< lane_size
; i
+= gpu::get_lane_size())
258 if (packet
.header
.mask
& 1ul << i
)
259 fn(&packet
.payload
.slot
[i
]);
263 /// Alternate version that also provides the index of the current lane.
264 template <uint32_t lane_size
>
265 static LIBC_INLINE
void invoke_rpc(cpp::function
<void(Buffer
*, uint32_t)> fn
,
266 Packet
<lane_size
> &packet
) {
267 if constexpr (is_process_gpu()) {
268 fn(&packet
.payload
.slot
[gpu::get_lane_id()], gpu::get_lane_id());
270 for (uint32_t i
= 0; i
< lane_size
; i
+= gpu::get_lane_size())
271 if (packet
.header
.mask
& 1ul << i
)
272 fn(&packet
.payload
.slot
[i
], i
);
276 /// The port provides the interface to communicate between the multiple
277 /// processes. A port is conceptually an index into the memory provided by the
278 /// underlying process that is guarded by a lock bit.
279 template <bool T
, typename S
> struct Port
{
280 LIBC_INLINE
Port(Process
<T
, S
> &process
, uint64_t lane_mask
, uint64_t index
,
282 : process(process
), lane_mask(lane_mask
), index(index
), out(out
),
283 receive(false), owns_buffer(true) {}
284 LIBC_INLINE
~Port() = default;
287 LIBC_INLINE
Port(const Port
&) = delete;
288 LIBC_INLINE Port
&operator=(const Port
&) = delete;
289 LIBC_INLINE
Port(Port
&&) = default;
290 LIBC_INLINE Port
&operator=(Port
&&) = default;
292 friend struct Client
;
293 template <uint32_t U
> friend struct Server
;
294 friend class cpp::optional
<Port
<T
, S
>>;
297 template <typename U
> LIBC_INLINE
void recv(U use
);
298 template <typename F
> LIBC_INLINE
void send(F fill
);
299 template <typename F
, typename U
>
300 LIBC_INLINE
void send_and_recv(F fill
, U use
);
301 template <typename W
> LIBC_INLINE
void recv_and_send(W work
);
302 LIBC_INLINE
void send_n(const void *const *src
, uint64_t *size
);
303 LIBC_INLINE
void send_n(const void *src
, uint64_t size
);
304 template <typename A
>
305 LIBC_INLINE
void recv_n(void **dst
, uint64_t *size
, A
&&alloc
);
307 LIBC_INLINE
uint16_t get_opcode() const {
308 return process
.packet
[index
].header
.opcode
;
311 LIBC_INLINE
void close() {
312 // The server is passive, if it own the buffer when it closes we need to
313 // give ownership back to the client.
314 if (owns_buffer
&& T
)
315 out
= process
.invert_outbox(index
, out
);
316 process
.unlock(lane_mask
, index
);
320 Process
<T
, S
> &process
;
328 /// The RPC client used to make requests to the server.
330 LIBC_INLINE
Client() = default;
331 LIBC_INLINE
Client(const Client
&) = delete;
332 LIBC_INLINE Client
&operator=(const Client
&) = delete;
333 LIBC_INLINE
~Client() = default;
335 using Port
= rpc::Port
<false, Packet
<gpu::LANE_SIZE
>>;
336 template <uint16_t opcode
> LIBC_INLINE
cpp::optional
<Port
> try_open();
337 template <uint16_t opcode
> LIBC_INLINE Port
open();
339 LIBC_INLINE
void reset(uint64_t port_count
, void *buffer
) {
340 process
.reset(port_count
, buffer
);
344 Process
<false, Packet
<gpu::LANE_SIZE
>> process
;
346 static_assert(cpp::is_trivially_copyable
<Client
>::value
&&
347 sizeof(Process
<false, Packet
<1>>) ==
348 sizeof(Process
<false, Packet
<32>>),
349 "The client is not trivially copyable from the server");
351 /// The RPC server used to respond to the client.
352 template <uint32_t lane_size
> struct Server
{
353 LIBC_INLINE
Server() = default;
354 LIBC_INLINE
Server(const Server
&) = delete;
355 LIBC_INLINE Server
&operator=(const Server
&) = delete;
356 LIBC_INLINE
~Server() = default;
358 using Port
= rpc::Port
<true, Packet
<lane_size
>>;
359 LIBC_INLINE
cpp::optional
<Port
> try_open();
360 LIBC_INLINE Port
open();
362 LIBC_INLINE
void reset(uint64_t port_count
, void *buffer
) {
363 process
.reset(port_count
, buffer
);
366 LIBC_INLINE
void *get_buffer_start() const {
367 return process
.get_buffer_start();
370 LIBC_INLINE
static uint64_t allocation_size(uint64_t port_count
) {
371 return Process
<true, Packet
<lane_size
>>::allocation_size(port_count
);
375 Process
<true, Packet
<lane_size
>> process
;
378 /// Applies \p fill to the shared buffer and initiates a send operation.
379 template <bool T
, typename S
>
380 template <typename F
>
381 LIBC_INLINE
void Port
<T
, S
>::send(F fill
) {
382 uint32_t in
= owns_buffer
? out
^ T
: process
.load_inbox(index
);
384 // We need to wait until we own the buffer before sending.
385 process
.wait_for_ownership(index
, out
, in
);
387 // Apply the \p fill function to initialize the buffer and release the memory.
388 invoke_rpc(fill
, process
.packet
[index
]);
389 out
= process
.invert_outbox(index
, out
);
394 /// Applies \p use to the shared buffer and acknowledges the send.
395 template <bool T
, typename S
>
396 template <typename U
>
397 LIBC_INLINE
void Port
<T
, S
>::recv(U use
) {
398 // We only exchange ownership of the buffer during a receive if we are waiting
399 // for a previous receive to finish.
401 out
= process
.invert_outbox(index
, out
);
405 uint32_t in
= owns_buffer
? out
^ T
: process
.load_inbox(index
);
407 // We need to wait until we own the buffer before receiving.
408 process
.wait_for_ownership(index
, out
, in
);
410 // Apply the \p use function to read the memory out of the buffer.
411 invoke_rpc(use
, process
.packet
[index
]);
416 /// Combines a send and receive into a single function.
417 template <bool T
, typename S
>
418 template <typename F
, typename U
>
419 LIBC_INLINE
void Port
<T
, S
>::send_and_recv(F fill
, U use
) {
424 /// Combines a receive and send operation into a single function. The \p work
425 /// function modifies the buffer in-place and the send is only used to initiate
427 template <bool T
, typename S
>
428 template <typename W
>
429 LIBC_INLINE
void Port
<T
, S
>::recv_and_send(W work
) {
431 send([](Buffer
*) { /* no-op */ });
434 /// Helper routine to simplify the interface when sending from the GPU using
435 /// thread private pointers to the underlying value.
436 template <bool T
, typename S
>
437 LIBC_INLINE
void Port
<T
, S
>::send_n(const void *src
, uint64_t size
) {
438 const void **src_ptr
= &src
;
439 uint64_t *size_ptr
= &size
;
440 send_n(src_ptr
, size_ptr
);
443 /// Sends an arbitrarily sized data buffer \p src across the shared channel in
444 /// multiples of the packet length.
445 template <bool T
, typename S
>
446 LIBC_INLINE
void Port
<T
, S
>::send_n(const void *const *src
, uint64_t *size
) {
447 uint64_t num_sends
= 0;
448 send([&](Buffer
*buffer
, uint32_t id
) {
449 reinterpret_cast<uint64_t *>(buffer
->data
)[0] = lane_value(size
, id
);
450 num_sends
= is_process_gpu() ? lane_value(size
, id
)
451 : max(lane_value(size
, id
), num_sends
);
453 lane_value(size
, id
) > sizeof(Buffer::data
) - sizeof(uint64_t)
454 ? sizeof(Buffer::data
) - sizeof(uint64_t)
455 : lane_value(size
, id
);
456 inline_memcpy(&buffer
->data
[1], lane_value(src
, id
), len
);
458 uint64_t idx
= sizeof(Buffer::data
) - sizeof(uint64_t);
459 uint64_t mask
= process
.packet
[index
].header
.mask
;
460 while (gpu::ballot(mask
, idx
< num_sends
)) {
461 send([=](Buffer
*buffer
, uint32_t id
) {
462 uint64_t len
= lane_value(size
, id
) - idx
> sizeof(Buffer::data
)
463 ? sizeof(Buffer::data
)
464 : lane_value(size
, id
) - idx
;
465 if (idx
< lane_value(size
, id
))
466 inline_memcpy(buffer
->data
, advance(lane_value(src
, id
), idx
), len
);
468 idx
+= sizeof(Buffer::data
);
472 /// Receives an arbitrarily sized data buffer across the shared channel in
473 /// multiples of the packet length. The \p alloc function is called with the
474 /// size of the data so that we can initialize the size of the \p dst buffer.
475 template <bool T
, typename S
>
476 template <typename A
>
477 LIBC_INLINE
void Port
<T
, S
>::recv_n(void **dst
, uint64_t *size
, A
&&alloc
) {
478 uint64_t num_recvs
= 0;
479 recv([&](Buffer
*buffer
, uint32_t id
) {
480 lane_value(size
, id
) = reinterpret_cast<uint64_t *>(buffer
->data
)[0];
481 lane_value(dst
, id
) =
482 reinterpret_cast<uint8_t *>(alloc(lane_value(size
, id
)));
483 num_recvs
= is_process_gpu() ? lane_value(size
, id
)
484 : max(lane_value(size
, id
), num_recvs
);
486 lane_value(size
, id
) > sizeof(Buffer::data
) - sizeof(uint64_t)
487 ? sizeof(Buffer::data
) - sizeof(uint64_t)
488 : lane_value(size
, id
);
489 inline_memcpy(lane_value(dst
, id
), &buffer
->data
[1], len
);
491 uint64_t idx
= sizeof(Buffer::data
) - sizeof(uint64_t);
492 uint64_t mask
= process
.packet
[index
].header
.mask
;
493 while (gpu::ballot(mask
, idx
< num_recvs
)) {
494 recv([=](Buffer
*buffer
, uint32_t id
) {
495 uint64_t len
= lane_value(size
, id
) - idx
> sizeof(Buffer::data
)
496 ? sizeof(Buffer::data
)
497 : lane_value(size
, id
) - idx
;
498 if (idx
< lane_value(size
, id
))
499 inline_memcpy(advance(lane_value(dst
, id
), idx
), buffer
->data
, len
);
501 idx
+= sizeof(Buffer::data
);
505 /// Attempts to open a port to use as the client. The client can only open a
506 /// port if we find an index that is in a valid sending state. That is, there
507 /// are send operations pending that haven't been serviced on this port. Each
508 /// port instance uses an associated \p opcode to tell the server what to do.
509 template <uint16_t opcode
>
510 [[clang::convergent
]] LIBC_INLINE
cpp::optional
<Client::Port
>
512 // Perform a naive linear scan for a port that can be opened to send data.
513 for (uint64_t index
= 0; index
< process
.port_count
; ++index
) {
514 // Attempt to acquire the lock on this index.
515 uint64_t lane_mask
= gpu::get_lane_mask();
516 if (!process
.try_lock(lane_mask
, index
))
519 uint32_t in
= process
.load_inbox(index
);
520 uint32_t out
= process
.load_outbox(index
);
522 // Once we acquire the index we need to check if we are in a valid sending
524 if (process
.buffer_unavailable(in
, out
)) {
525 process
.unlock(lane_mask
, index
);
529 if (is_first_lane(lane_mask
)) {
530 process
.packet
[index
].header
.opcode
= opcode
;
531 process
.packet
[index
].header
.mask
= lane_mask
;
533 gpu::sync_lane(lane_mask
);
534 return Port(process
, lane_mask
, index
, out
);
539 template <uint16_t opcode
> LIBC_INLINE
Client::Port
Client::open() {
541 if (cpp::optional
<Client::Port
> p
= try_open
<opcode
>())
542 return cpp::move(p
.value());
547 /// Attempts to open a port to use as the server. The server can only open a
548 /// port if it has a pending receive operation
549 template <uint32_t lane_size
>
550 [[clang::convergent
]] LIBC_INLINE
551 cpp::optional
<typename Server
<lane_size
>::Port
>
552 Server
<lane_size
>::try_open() {
553 // Perform a naive linear scan for a port that has a pending request.
554 for (uint64_t index
= 0; index
< process
.port_count
; ++index
) {
555 uint32_t in
= process
.load_inbox(index
);
556 uint32_t out
= process
.load_outbox(index
);
558 // The server is passive, if there is no work pending don't bother
560 if (process
.buffer_unavailable(in
, out
))
563 // Attempt to acquire the lock on this index.
564 uint64_t lane_mask
= gpu::get_lane_mask();
565 if (!process
.try_lock(lane_mask
, index
))
568 in
= process
.load_inbox(index
);
569 out
= process
.load_outbox(index
);
571 if (process
.buffer_unavailable(in
, out
)) {
572 process
.unlock(lane_mask
, index
);
576 return Port(process
, lane_mask
, index
, out
);
581 template <uint32_t lane_size
>
582 LIBC_INLINE typename Server
<lane_size
>::Port Server
<lane_size
>::open() {
584 if (cpp::optional
<Server::Port
> p
= try_open())
585 return cpp::move(p
.value());
591 } // namespace __llvm_libc