[libc][NFC] Move aligned access implementations to separate header
[llvm-project.git] / libc / src / __support / RPC / rpc.h
blob9c39cd2ccbc8155ddd5b76d53136c82b04ef957f
1 //===-- Shared memory RPC client / server interface -------------*- C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // This file implements a remote procedure call mechanism to communicate between
10 // heterogeneous devices that can share an address space atomically. We provide
11 // a client and a server to facilitate the remote call. The client makes request
12 // to the server using a shared communication channel. We use separate atomic
13 // signals to indicate which side, the client or the server is in ownership of
14 // the buffer.
16 //===----------------------------------------------------------------------===//
18 #ifndef LLVM_LIBC_SRC_SUPPORT_RPC_RPC_H
19 #define LLVM_LIBC_SRC_SUPPORT_RPC_RPC_H
21 #include "rpc_util.h"
22 #include "src/__support/CPP/atomic.h"
23 #include "src/__support/CPP/functional.h"
24 #include "src/__support/CPP/optional.h"
25 #include "src/__support/GPU/utils.h"
26 #include "src/string/memory_utils/memcpy_implementations.h"
28 #include <stdint.h>
30 namespace __llvm_libc {
31 namespace rpc {
33 /// A fixed size channel used to communicate between the RPC client and server.
34 struct Buffer {
35 uint64_t data[8];
37 static_assert(sizeof(Buffer) == 64, "Buffer size mismatch");
39 /// The information associated with a packet. This indicates which operations to
40 /// perform and which threads are active in the slots.
41 struct Header {
42 uint64_t mask;
43 uint16_t opcode;
46 /// The data payload for the associated packet. We provide enough space for each
47 /// thread in the cooperating lane to have a buffer.
48 template <uint32_t lane_size = gpu::LANE_SIZE> struct Payload {
49 Buffer slot[lane_size];
52 /// A packet used to share data between the client and server across an entire
53 /// lane. We use a lane as the minimum granularity for execution.
54 template <uint32_t lane_size = gpu::LANE_SIZE> struct alignas(64) Packet {
55 Header header;
56 Payload<lane_size> payload;
59 // TODO: This should be configured by the server and passed in. The general rule
60 // of thumb is that you should have at least as many ports as possible
61 // concurrent work items on the GPU to mitigate the lack offorward
62 // progress guarantees on the GPU.
63 constexpr uint64_t DEFAULT_PORT_COUNT = 64;
65 /// A common process used to synchronize communication between a client and a
66 /// server. The process contains a read-only inbox and a write-only outbox used
67 /// for signaling ownership of the shared buffer between both sides. We assign
68 /// ownership of the buffer to the client if the inbox and outbox bits match,
69 /// otherwise it is owned by the server.
70 ///
71 /// This process is designed to allow the client and the server to exchange data
72 /// using a fixed size packet in a mostly arbitrary order using the 'send' and
73 /// 'recv' operations. The following restrictions to this scheme apply:
74 /// - The client will always start with a 'send' operation.
75 /// - The server will always start with a 'recv' operation.
76 /// - Every 'send' or 'recv' call is mirrored by the other process.
77 template <bool Invert, typename Packet> struct Process {
78 LIBC_INLINE Process() = default;
79 LIBC_INLINE Process(const Process &) = delete;
80 LIBC_INLINE Process &operator=(const Process &) = delete;
81 LIBC_INLINE Process(Process &&) = default;
82 LIBC_INLINE Process &operator=(Process &&) = default;
83 LIBC_INLINE ~Process() = default;
85 uint64_t port_count;
86 cpp::Atomic<uint32_t> *inbox;
87 cpp::Atomic<uint32_t> *outbox;
88 Packet *packet;
90 cpp::Atomic<uint32_t> lock[DEFAULT_PORT_COUNT] = {0};
92 /// Initialize the communication channels.
93 LIBC_INLINE void reset(uint64_t port_count, void *buffer) {
94 this->port_count = port_count;
95 this->inbox = reinterpret_cast<cpp::Atomic<uint32_t> *>(
96 advance(buffer, inbox_offset(port_count)));
97 this->outbox = reinterpret_cast<cpp::Atomic<uint32_t> *>(
98 advance(buffer, outbox_offset(port_count)));
99 this->packet =
100 reinterpret_cast<Packet *>(advance(buffer, buffer_offset(port_count)));
103 /// Returns the beginning of the unified buffer. Intended for initializing the
104 /// client after the server has been started.
105 LIBC_INLINE void *get_buffer_start() const { return Invert ? outbox : inbox; }
107 /// Allocate a memory buffer sufficient to store the following equivalent
108 /// representation in memory.
110 /// struct Equivalent {
111 /// Atomic<uint32_t> primary[port_count];
112 /// Atomic<uint32_t> secondary[port_count];
113 /// Packet buffer[port_count];
114 /// };
115 LIBC_INLINE static constexpr uint64_t allocation_size(uint64_t port_count) {
116 return buffer_offset(port_count) + buffer_bytes(port_count);
119 /// Retrieve the inbox state from memory shared between processes.
120 LIBC_INLINE uint32_t load_inbox(uint64_t index) {
121 return inbox[index].load(cpp::MemoryOrder::RELAXED);
124 /// Retrieve the outbox state from memory shared between processes.
125 LIBC_INLINE uint32_t load_outbox(uint64_t index) {
126 return outbox[index].load(cpp::MemoryOrder::RELAXED);
129 /// Signal to the other process that this one is finished with the buffer.
130 /// Equivalent to loading outbox followed by store of the inverted value
131 /// The outbox is write only by this warp and tracking the value locally is
132 /// cheaper than calling load_outbox to get the value to store.
133 LIBC_INLINE uint32_t invert_outbox(uint64_t index, uint32_t current_outbox) {
134 uint32_t inverted_outbox = !current_outbox;
135 atomic_thread_fence(cpp::MemoryOrder::RELEASE);
136 outbox[index].store(inverted_outbox, cpp::MemoryOrder::RELAXED);
137 return inverted_outbox;
140 // Given the current outbox and inbox values, wait until the inbox changes
141 // to indicate that this thread owns the buffer element.
142 LIBC_INLINE void wait_for_ownership(uint64_t index, uint32_t outbox,
143 uint32_t in) {
144 while (buffer_unavailable(in, outbox)) {
145 sleep_briefly();
146 in = load_inbox(index);
148 atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
151 /// Determines if this process needs to wait for ownership of the buffer. We
152 /// invert the condition on one of the processes to indicate that if one
153 /// process owns the buffer then the other does not.
154 LIBC_INLINE static bool buffer_unavailable(uint32_t in, uint32_t out) {
155 bool cond = in != out;
156 return Invert ? !cond : cond;
159 /// Attempt to claim the lock at index. Return true on lock taken.
160 /// lane_mask is a bitmap of the threads in the warp that would hold the
161 /// single lock on success, e.g. the result of gpu::get_lane_mask()
162 /// The lock is held when the zeroth bit of the uint32_t at lock[index]
163 /// is set, and available when that bit is clear. Bits [1, 32) are zero.
164 /// Or with one is a no-op when the lock is already held.
165 [[clang::convergent]] LIBC_INLINE bool try_lock(uint64_t lane_mask,
166 uint64_t index) {
167 // On amdgpu, test and set to lock[index] and a sync_lane would suffice
168 // On volta, need to handle differences between the threads running and
169 // the threads that were detected in the previous call to get_lane_mask()
171 // All threads in lane_mask try to claim the lock. At most one can succeed.
172 // There may be threads active which are not in lane mask which must not
173 // succeed in taking the lock, as otherwise it will leak. This is handled
174 // by making threads which are not in lane_mask or with 0, a no-op.
175 uint32_t id = gpu::get_lane_id();
176 bool id_in_lane_mask = lane_mask & (1ul << id);
178 // All threads in the warp call fetch_or. Possibly at the same time.
179 bool before =
180 lock[index].fetch_or(id_in_lane_mask, cpp::MemoryOrder::RELAXED);
181 uint64_t packed = gpu::ballot(lane_mask, before);
183 // If every bit set in lane_mask is also set in packed, every single thread
184 // in the warp failed to get the lock. Ballot returns unset for threads not
185 // in the lane mask.
187 // Cases, per thread:
188 // mask==0 -> unspecified before, discarded by ballot -> 0
189 // mask==1 and before==0 (success), set zero by ballot -> 0
190 // mask==1 and before==1 (failure), set one by ballot -> 1
192 // mask != packed implies at least one of the threads got the lock
193 // atomic semantics of fetch_or mean at most one of the threads for the lock
195 // If holding the lock then the caller can load values knowing said loads
196 // won't move past the lock. No such guarantee is needed if the lock acquire
197 // failed. This conditional branch is expected to fold in the caller after
198 // inlining the current function.
199 bool holding_lock = lane_mask != packed;
200 if (holding_lock)
201 atomic_thread_fence(cpp::MemoryOrder::ACQUIRE);
202 return holding_lock;
205 /// Unlock the lock at index. We need a lane sync to keep this function
206 /// convergent, otherwise the compiler will sink the store and deadlock.
207 [[clang::convergent]] LIBC_INLINE void unlock(uint64_t lane_mask,
208 uint64_t index) {
209 // Do not move any writes past the unlock
210 atomic_thread_fence(cpp::MemoryOrder::RELEASE);
212 // Wait for other threads in the warp to finish using the lock
213 gpu::sync_lane(lane_mask);
215 // Use exactly one thread to clear the bit at position 0 in lock[index]
216 // Must restrict to a single thread to avoid one thread dropping the lock,
217 // then an unrelated warp claiming the lock, then a second thread in this
218 // warp dropping the lock again.
219 uint32_t and_mask = ~(rpc::is_first_lane(lane_mask) ? 1 : 0);
220 lock[index].fetch_and(and_mask, cpp::MemoryOrder::RELAXED);
221 gpu::sync_lane(lane_mask);
224 /// Number of bytes to allocate for an inbox or outbox.
225 LIBC_INLINE static constexpr uint64_t mailbox_bytes(uint64_t port_count) {
226 return port_count * sizeof(cpp::Atomic<uint32_t>);
229 /// Number of bytes to allocate for the buffer containing the packets.
230 LIBC_INLINE static constexpr uint64_t buffer_bytes(uint64_t port_count) {
231 return port_count * sizeof(Packet);
234 /// Offset of the inbox in memory. This is the same as the outbox if inverted.
235 LIBC_INLINE static constexpr uint64_t inbox_offset(uint64_t port_count) {
236 return Invert ? mailbox_bytes(port_count) : 0;
239 /// Offset of the outbox in memory. This is the same as the inbox if inverted.
240 LIBC_INLINE static constexpr uint64_t outbox_offset(uint64_t port_count) {
241 return Invert ? 0 : mailbox_bytes(port_count);
244 /// Offset of the buffer containing the packets after the inbox and outbox.
245 LIBC_INLINE static constexpr uint64_t buffer_offset(uint64_t port_count) {
246 return align_up(2 * mailbox_bytes(port_count), alignof(Packet));
250 /// Invokes a function accross every active buffer across the total lane size.
251 template <uint32_t lane_size>
252 static LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *)> fn,
253 Packet<lane_size> &packet) {
254 if constexpr (is_process_gpu()) {
255 fn(&packet.payload.slot[gpu::get_lane_id()]);
256 } else {
257 for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
258 if (packet.header.mask & 1ul << i)
259 fn(&packet.payload.slot[i]);
263 /// Alternate version that also provides the index of the current lane.
264 template <uint32_t lane_size>
265 static LIBC_INLINE void invoke_rpc(cpp::function<void(Buffer *, uint32_t)> fn,
266 Packet<lane_size> &packet) {
267 if constexpr (is_process_gpu()) {
268 fn(&packet.payload.slot[gpu::get_lane_id()], gpu::get_lane_id());
269 } else {
270 for (uint32_t i = 0; i < lane_size; i += gpu::get_lane_size())
271 if (packet.header.mask & 1ul << i)
272 fn(&packet.payload.slot[i], i);
276 /// The port provides the interface to communicate between the multiple
277 /// processes. A port is conceptually an index into the memory provided by the
278 /// underlying process that is guarded by a lock bit.
279 template <bool T, typename S> struct Port {
280 LIBC_INLINE Port(Process<T, S> &process, uint64_t lane_mask, uint64_t index,
281 uint32_t out)
282 : process(process), lane_mask(lane_mask), index(index), out(out),
283 receive(false), owns_buffer(true) {}
284 LIBC_INLINE ~Port() = default;
286 private:
287 LIBC_INLINE Port(const Port &) = delete;
288 LIBC_INLINE Port &operator=(const Port &) = delete;
289 LIBC_INLINE Port(Port &&) = default;
290 LIBC_INLINE Port &operator=(Port &&) = default;
292 friend struct Client;
293 template <uint32_t U> friend struct Server;
294 friend class cpp::optional<Port<T, S>>;
296 public:
297 template <typename U> LIBC_INLINE void recv(U use);
298 template <typename F> LIBC_INLINE void send(F fill);
299 template <typename F, typename U>
300 LIBC_INLINE void send_and_recv(F fill, U use);
301 template <typename W> LIBC_INLINE void recv_and_send(W work);
302 LIBC_INLINE void send_n(const void *const *src, uint64_t *size);
303 LIBC_INLINE void send_n(const void *src, uint64_t size);
304 template <typename A>
305 LIBC_INLINE void recv_n(void **dst, uint64_t *size, A &&alloc);
307 LIBC_INLINE uint16_t get_opcode() const {
308 return process.packet[index].header.opcode;
311 LIBC_INLINE void close() {
312 // The server is passive, if it own the buffer when it closes we need to
313 // give ownership back to the client.
314 if (owns_buffer && T)
315 out = process.invert_outbox(index, out);
316 process.unlock(lane_mask, index);
319 private:
320 Process<T, S> &process;
321 uint64_t lane_mask;
322 uint64_t index;
323 uint32_t out;
324 bool receive;
325 bool owns_buffer;
328 /// The RPC client used to make requests to the server.
329 struct Client {
330 LIBC_INLINE Client() = default;
331 LIBC_INLINE Client(const Client &) = delete;
332 LIBC_INLINE Client &operator=(const Client &) = delete;
333 LIBC_INLINE ~Client() = default;
335 using Port = rpc::Port<false, Packet<gpu::LANE_SIZE>>;
336 template <uint16_t opcode> LIBC_INLINE cpp::optional<Port> try_open();
337 template <uint16_t opcode> LIBC_INLINE Port open();
339 LIBC_INLINE void reset(uint64_t port_count, void *buffer) {
340 process.reset(port_count, buffer);
343 private:
344 Process<false, Packet<gpu::LANE_SIZE>> process;
346 static_assert(cpp::is_trivially_copyable<Client>::value &&
347 sizeof(Process<false, Packet<1>>) ==
348 sizeof(Process<false, Packet<32>>),
349 "The client is not trivially copyable from the server");
351 /// The RPC server used to respond to the client.
352 template <uint32_t lane_size> struct Server {
353 LIBC_INLINE Server() = default;
354 LIBC_INLINE Server(const Server &) = delete;
355 LIBC_INLINE Server &operator=(const Server &) = delete;
356 LIBC_INLINE ~Server() = default;
358 using Port = rpc::Port<true, Packet<lane_size>>;
359 LIBC_INLINE cpp::optional<Port> try_open();
360 LIBC_INLINE Port open();
362 LIBC_INLINE void reset(uint64_t port_count, void *buffer) {
363 process.reset(port_count, buffer);
366 LIBC_INLINE void *get_buffer_start() const {
367 return process.get_buffer_start();
370 LIBC_INLINE static uint64_t allocation_size(uint64_t port_count) {
371 return Process<true, Packet<lane_size>>::allocation_size(port_count);
374 private:
375 Process<true, Packet<lane_size>> process;
378 /// Applies \p fill to the shared buffer and initiates a send operation.
379 template <bool T, typename S>
380 template <typename F>
381 LIBC_INLINE void Port<T, S>::send(F fill) {
382 uint32_t in = owns_buffer ? out ^ T : process.load_inbox(index);
384 // We need to wait until we own the buffer before sending.
385 process.wait_for_ownership(index, out, in);
387 // Apply the \p fill function to initialize the buffer and release the memory.
388 invoke_rpc(fill, process.packet[index]);
389 out = process.invert_outbox(index, out);
390 owns_buffer = false;
391 receive = false;
394 /// Applies \p use to the shared buffer and acknowledges the send.
395 template <bool T, typename S>
396 template <typename U>
397 LIBC_INLINE void Port<T, S>::recv(U use) {
398 // We only exchange ownership of the buffer during a receive if we are waiting
399 // for a previous receive to finish.
400 if (receive) {
401 out = process.invert_outbox(index, out);
402 owns_buffer = false;
405 uint32_t in = owns_buffer ? out ^ T : process.load_inbox(index);
407 // We need to wait until we own the buffer before receiving.
408 process.wait_for_ownership(index, out, in);
410 // Apply the \p use function to read the memory out of the buffer.
411 invoke_rpc(use, process.packet[index]);
412 receive = true;
413 owns_buffer = true;
416 /// Combines a send and receive into a single function.
417 template <bool T, typename S>
418 template <typename F, typename U>
419 LIBC_INLINE void Port<T, S>::send_and_recv(F fill, U use) {
420 send(fill);
421 recv(use);
424 /// Combines a receive and send operation into a single function. The \p work
425 /// function modifies the buffer in-place and the send is only used to initiate
426 /// the copy back.
427 template <bool T, typename S>
428 template <typename W>
429 LIBC_INLINE void Port<T, S>::recv_and_send(W work) {
430 recv(work);
431 send([](Buffer *) { /* no-op */ });
434 /// Helper routine to simplify the interface when sending from the GPU using
435 /// thread private pointers to the underlying value.
436 template <bool T, typename S>
437 LIBC_INLINE void Port<T, S>::send_n(const void *src, uint64_t size) {
438 const void **src_ptr = &src;
439 uint64_t *size_ptr = &size;
440 send_n(src_ptr, size_ptr);
443 /// Sends an arbitrarily sized data buffer \p src across the shared channel in
444 /// multiples of the packet length.
445 template <bool T, typename S>
446 LIBC_INLINE void Port<T, S>::send_n(const void *const *src, uint64_t *size) {
447 uint64_t num_sends = 0;
448 send([&](Buffer *buffer, uint32_t id) {
449 reinterpret_cast<uint64_t *>(buffer->data)[0] = lane_value(size, id);
450 num_sends = is_process_gpu() ? lane_value(size, id)
451 : max(lane_value(size, id), num_sends);
452 uint64_t len =
453 lane_value(size, id) > sizeof(Buffer::data) - sizeof(uint64_t)
454 ? sizeof(Buffer::data) - sizeof(uint64_t)
455 : lane_value(size, id);
456 inline_memcpy(&buffer->data[1], lane_value(src, id), len);
458 uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t);
459 uint64_t mask = process.packet[index].header.mask;
460 while (gpu::ballot(mask, idx < num_sends)) {
461 send([=](Buffer *buffer, uint32_t id) {
462 uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
463 ? sizeof(Buffer::data)
464 : lane_value(size, id) - idx;
465 if (idx < lane_value(size, id))
466 inline_memcpy(buffer->data, advance(lane_value(src, id), idx), len);
468 idx += sizeof(Buffer::data);
472 /// Receives an arbitrarily sized data buffer across the shared channel in
473 /// multiples of the packet length. The \p alloc function is called with the
474 /// size of the data so that we can initialize the size of the \p dst buffer.
475 template <bool T, typename S>
476 template <typename A>
477 LIBC_INLINE void Port<T, S>::recv_n(void **dst, uint64_t *size, A &&alloc) {
478 uint64_t num_recvs = 0;
479 recv([&](Buffer *buffer, uint32_t id) {
480 lane_value(size, id) = reinterpret_cast<uint64_t *>(buffer->data)[0];
481 lane_value(dst, id) =
482 reinterpret_cast<uint8_t *>(alloc(lane_value(size, id)));
483 num_recvs = is_process_gpu() ? lane_value(size, id)
484 : max(lane_value(size, id), num_recvs);
485 uint64_t len =
486 lane_value(size, id) > sizeof(Buffer::data) - sizeof(uint64_t)
487 ? sizeof(Buffer::data) - sizeof(uint64_t)
488 : lane_value(size, id);
489 inline_memcpy(lane_value(dst, id), &buffer->data[1], len);
491 uint64_t idx = sizeof(Buffer::data) - sizeof(uint64_t);
492 uint64_t mask = process.packet[index].header.mask;
493 while (gpu::ballot(mask, idx < num_recvs)) {
494 recv([=](Buffer *buffer, uint32_t id) {
495 uint64_t len = lane_value(size, id) - idx > sizeof(Buffer::data)
496 ? sizeof(Buffer::data)
497 : lane_value(size, id) - idx;
498 if (idx < lane_value(size, id))
499 inline_memcpy(advance(lane_value(dst, id), idx), buffer->data, len);
501 idx += sizeof(Buffer::data);
505 /// Attempts to open a port to use as the client. The client can only open a
506 /// port if we find an index that is in a valid sending state. That is, there
507 /// are send operations pending that haven't been serviced on this port. Each
508 /// port instance uses an associated \p opcode to tell the server what to do.
509 template <uint16_t opcode>
510 [[clang::convergent]] LIBC_INLINE cpp::optional<Client::Port>
511 Client::try_open() {
512 // Perform a naive linear scan for a port that can be opened to send data.
513 for (uint64_t index = 0; index < process.port_count; ++index) {
514 // Attempt to acquire the lock on this index.
515 uint64_t lane_mask = gpu::get_lane_mask();
516 if (!process.try_lock(lane_mask, index))
517 continue;
519 uint32_t in = process.load_inbox(index);
520 uint32_t out = process.load_outbox(index);
522 // Once we acquire the index we need to check if we are in a valid sending
523 // state.
524 if (process.buffer_unavailable(in, out)) {
525 process.unlock(lane_mask, index);
526 continue;
529 if (is_first_lane(lane_mask)) {
530 process.packet[index].header.opcode = opcode;
531 process.packet[index].header.mask = lane_mask;
533 gpu::sync_lane(lane_mask);
534 return Port(process, lane_mask, index, out);
536 return cpp::nullopt;
539 template <uint16_t opcode> LIBC_INLINE Client::Port Client::open() {
540 for (;;) {
541 if (cpp::optional<Client::Port> p = try_open<opcode>())
542 return cpp::move(p.value());
543 sleep_briefly();
547 /// Attempts to open a port to use as the server. The server can only open a
548 /// port if it has a pending receive operation
549 template <uint32_t lane_size>
550 [[clang::convergent]] LIBC_INLINE
551 cpp::optional<typename Server<lane_size>::Port>
552 Server<lane_size>::try_open() {
553 // Perform a naive linear scan for a port that has a pending request.
554 for (uint64_t index = 0; index < process.port_count; ++index) {
555 uint32_t in = process.load_inbox(index);
556 uint32_t out = process.load_outbox(index);
558 // The server is passive, if there is no work pending don't bother
559 // opening a port.
560 if (process.buffer_unavailable(in, out))
561 continue;
563 // Attempt to acquire the lock on this index.
564 uint64_t lane_mask = gpu::get_lane_mask();
565 if (!process.try_lock(lane_mask, index))
566 continue;
568 in = process.load_inbox(index);
569 out = process.load_outbox(index);
571 if (process.buffer_unavailable(in, out)) {
572 process.unlock(lane_mask, index);
573 continue;
576 return Port(process, lane_mask, index, out);
578 return cpp::nullopt;
581 template <uint32_t lane_size>
582 LIBC_INLINE typename Server<lane_size>::Port Server<lane_size>::open() {
583 for (;;) {
584 if (cpp::optional<Server::Port> p = try_open())
585 return cpp::move(p.value());
586 sleep_briefly();
590 } // namespace rpc
591 } // namespace __llvm_libc
593 #endif