From 7b8090810ed5ff10da842c08cfcdb8bfbc00be52 Mon Sep 17 00:00:00 2001 From: Radoslav Gerganov Date: Fri, 28 Mar 2025 08:18:04 +0200 Subject: [PATCH] rpc : send hash when tensor data is above some fixed threshold (llama/12496) * rpc : send hash when tensor data is above some fixed threshold ref #10095 * rpc : put cache under $HOME/.cache/llama.cpp * try to fix win32 build * another try to fix win32 build * remove llama as dependency --- ggml/include/ggml-rpc.h | 4 +- ggml/src/ggml-rpc/ggml-rpc.cpp | 150 +++++++++++++++++++++++++++++++-- 2 files changed, 146 insertions(+), 8 deletions(-) diff --git a/ggml/include/ggml-rpc.h b/ggml/include/ggml-rpc.h index ade6c3b0..4e0d210f 100644 --- a/ggml/include/ggml-rpc.h +++ b/ggml/include/ggml-rpc.h @@ -17,7 +17,9 @@ GGML_BACKEND_API ggml_backend_buffer_type_t ggml_backend_rpc_buffer_type(const c GGML_BACKEND_API void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, size_t * total); -GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, size_t free_mem, size_t total_mem); +GGML_BACKEND_API void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, + const char * cache_dir, + size_t free_mem, size_t total_mem); GGML_BACKEND_API ggml_backend_reg_t ggml_backend_rpc_reg(void); diff --git a/ggml/src/ggml-rpc/ggml-rpc.cpp b/ggml/src/ggml-rpc/ggml-rpc.cpp index 6c3b80b0..862b9b66 100644 --- a/ggml/src/ggml-rpc/ggml-rpc.cpp +++ b/ggml/src/ggml-rpc/ggml-rpc.cpp @@ -26,6 +26,10 @@ # include #endif #include +#include +#include + +namespace fs = std::filesystem; #ifdef _WIN32 typedef SOCKET sockfd_t; @@ -80,6 +84,7 @@ enum rpc_cmd { RPC_CMD_FREE_BUFFER, RPC_CMD_BUFFER_CLEAR, RPC_CMD_SET_TENSOR, + RPC_CMD_SET_TENSOR_HASH, RPC_CMD_GET_TENSOR, RPC_CMD_COPY_TENSOR, RPC_CMD_GRAPH_COMPUTE, @@ -89,6 +94,9 @@ enum rpc_cmd { RPC_CMD_COUNT, }; +// Try RPC_CMD_SET_TENSOR_HASH first when data size is larger than this threshold +const size_t HASH_THRESHOLD = 10 * 1024 * 1024; + struct rpc_msg_get_alloc_size_req { rpc_tensor tensor; }; @@ -135,6 +143,10 @@ struct rpc_msg_buffer_clear_req { uint8_t value; }; +struct rpc_msg_set_tensor_hash_rsp { + uint8_t result; +}; + struct rpc_msg_get_tensor_req { rpc_tensor tensor; uint64_t offset; @@ -187,6 +199,18 @@ struct ggml_backend_rpc_buffer_context { // RPC helper functions +// Computes FNV-1a hash of the data +static uint64_t fnv_hash(const uint8_t * data, size_t len) { + const uint64_t fnv_prime = 0x100000001b3ULL; + uint64_t hash = 0xcbf29ce484222325ULL; + + for (size_t i = 0; i < len; ++i) { + hash ^= data[i]; + hash *= fnv_prime; + } + return hash; +} + static std::shared_ptr make_socket(sockfd_t fd) { #ifdef _WIN32 if (fd == INVALID_SOCKET) { @@ -483,10 +507,26 @@ static enum ggml_status ggml_backend_rpc_buffer_init_tensor(ggml_backend_buffer_ static void ggml_backend_rpc_buffer_set_tensor(ggml_backend_buffer_t buffer, ggml_tensor * tensor, const void * data, size_t offset, size_t size) { ggml_backend_rpc_buffer_context * ctx = (ggml_backend_rpc_buffer_context *)buffer->context; - // input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) | + rpc_tensor rpc_tensor = serialize_tensor(tensor); + if (size > HASH_THRESHOLD) { + // input serialization format: | rpc_tensor | offset (8 bytes) | hash (8 bytes) + size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + sizeof(uint64_t); + std::vector input(input_size, 0); + uint64_t hash = fnv_hash((const uint8_t*)data, size); + memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor)); + memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset)); + memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), &hash, sizeof(hash)); + rpc_msg_set_tensor_hash_rsp response; + bool status = send_rpc_cmd(ctx->sock, RPC_CMD_SET_TENSOR_HASH, input.data(), input.size(), &response, sizeof(response)); + GGML_ASSERT(status); + if (response.result) { + // the server has the same data, no need to send it + return; + } + } + // input serialization format: | rpc_tensor | offset (8 bytes) | data (size bytes) size_t input_size = sizeof(rpc_tensor) + sizeof(uint64_t) + size; std::vector input(input_size, 0); - rpc_tensor rpc_tensor = serialize_tensor(tensor); memcpy(input.data(), &rpc_tensor, sizeof(rpc_tensor)); memcpy(input.data() + sizeof(rpc_tensor), &offset, sizeof(offset)); memcpy(input.data() + sizeof(rpc_tensor) + sizeof(offset), data, size); @@ -772,7 +812,9 @@ void ggml_backend_rpc_get_device_memory(const char * endpoint, size_t * free, si class rpc_server { public: - rpc_server(ggml_backend_t backend) : backend(backend) {} + rpc_server(ggml_backend_t backend, const char * cache_dir) + : backend(backend), cache_dir(cache_dir) { + } ~rpc_server(); void alloc_buffer(const rpc_msg_alloc_buffer_req & request, rpc_msg_alloc_buffer_rsp & response); @@ -782,6 +824,7 @@ public: bool free_buffer(const rpc_msg_free_buffer_req & request); bool buffer_clear(const rpc_msg_buffer_clear_req & request); bool set_tensor(const std::vector & input); + bool set_tensor_hash(const std::vector & input, rpc_msg_set_tensor_hash_rsp & response); bool get_tensor(const rpc_msg_get_tensor_req & request, std::vector & response); bool copy_tensor(const rpc_msg_copy_tensor_req & request, rpc_msg_copy_tensor_rsp & response); bool graph_compute(const std::vector & input, rpc_msg_graph_compute_rsp & response); @@ -789,6 +832,7 @@ public: bool get_alloc_size(const rpc_msg_get_alloc_size_req & request, rpc_msg_get_alloc_size_rsp & response); private: + bool get_cached_file(uint64_t hash, std::vector & data); ggml_tensor * deserialize_tensor(struct ggml_context * ctx, const rpc_tensor * tensor); ggml_tensor * create_node(uint64_t id, struct ggml_context * ctx, @@ -797,6 +841,7 @@ private: ggml_backend_t backend; + const char * cache_dir; std::unordered_set buffers; }; @@ -960,11 +1005,85 @@ bool rpc_server::set_tensor(const std::vector & input) { } const void * data = input.data() + sizeof(rpc_tensor) + sizeof(offset); + if (cache_dir && size > HASH_THRESHOLD) { + uint64_t hash = fnv_hash((const uint8_t*)data, size); + char hash_str[17]; + snprintf(hash_str, sizeof(hash_str), "%016" PRIx64, hash); + // save to cache_dir/hash_str + fs::path cache_file = fs::path(cache_dir) / hash_str; + std::ofstream ofs(cache_file, std::ios::binary); + ofs.write((const char *)data, size); + printf("[%s] saved to '%s'\n", __func__, cache_file.c_str()); + } ggml_backend_tensor_set(tensor, data, offset, size); ggml_free(ctx); return true; } +bool rpc_server::get_cached_file(uint64_t hash, std::vector & data) { + if (!cache_dir) { + return false; + } + char hash_str[17]; + snprintf(hash_str, sizeof(hash_str), "%016" PRIx64, hash); + fs::path cache_file = fs::path(cache_dir) / hash_str; + if (!fs::exists(cache_file)) { + return false; + } + std::ifstream ifs(cache_file, std::ios::binary); + ifs.seekg(0, std::ios::end); + size_t size = ifs.tellg(); + ifs.seekg(0, std::ios::beg); + data.resize(size); + ifs.read((char *)data.data(), size); + return true; +} + +bool rpc_server::set_tensor_hash(const std::vector & input, rpc_msg_set_tensor_hash_rsp & response) +{ + // serialization format: | rpc_tensor | offset (8 bytes) | hash (8 bytes) | + if (input.size() != sizeof(rpc_tensor) + 16) { + return false; + } + const rpc_tensor * in_tensor = (const rpc_tensor *)input.data(); + uint64_t offset; + memcpy(&offset, input.data() + sizeof(rpc_tensor), sizeof(offset)); + const uint64_t * hash = (const uint64_t *)(input.data() + sizeof(rpc_tensor) + sizeof(offset)); + std::vector cached_file; + if (!get_cached_file(*hash, cached_file)) { + response.result = 0; + return true; + } + size_t size = cached_file.size(); + struct ggml_init_params params { + /*.mem_size =*/ ggml_tensor_overhead(), + /*.mem_buffer =*/ NULL, + /*.no_alloc =*/ true, + }; + struct ggml_context * ctx = ggml_init(params); + ggml_tensor * tensor = deserialize_tensor(ctx, in_tensor); + if (tensor == nullptr) { + GGML_LOG_ERROR("[%s] error deserializing tensor\n", __func__); + ggml_free(ctx); + return false; + } + GGML_PRINT_DEBUG("[%s] buffer: %p, data: %p, offset: %" PRIu64 ", size: %zu, hash: %" PRIx64 "\n", __func__, (void*)tensor->buffer, tensor->data, offset, size, *hash); + + // sanitize tensor->data + { + const size_t p0 = (size_t) ggml_backend_buffer_get_base(tensor->buffer); + const size_t p1 = p0 + ggml_backend_buffer_get_size(tensor->buffer); + + if (in_tensor->data + offset < p0 || in_tensor->data + offset >= p1 || size > (p1 - in_tensor->data - offset)) { + GGML_ABORT("[%s] tensor->data out of bounds\n", __func__); + } + } + ggml_backend_tensor_set(tensor, cached_file.data(), offset, size); + response.result = 1; + ggml_free(ctx); + return true; +} + bool rpc_server::init_tensor(const rpc_msg_init_tensor_req & request) { struct ggml_init_params params { /*.mem_size =*/ ggml_tensor_overhead(), @@ -1148,8 +1267,9 @@ rpc_server::~rpc_server() { } } -static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t free_mem, size_t total_mem) { - rpc_server server(backend); +static void rpc_serve_client(ggml_backend_t backend, const char * cache_dir, + sockfd_t sockfd, size_t free_mem, size_t total_mem) { + rpc_server server(backend, cache_dir); while (true) { uint8_t cmd; if (!recv_data(sockfd, &cmd, 1)) { @@ -1260,6 +1380,20 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre } break; } + case RPC_CMD_SET_TENSOR_HASH: { + std::vector input; + if (!recv_msg(sockfd, input)) { + return; + } + rpc_msg_set_tensor_hash_rsp response; + if (!server.set_tensor_hash(input, response)) { + return; + } + if (!send_msg(sockfd, &response, sizeof(response))) { + return; + } + break; + } case RPC_CMD_INIT_TENSOR: { rpc_msg_init_tensor_req request; if (!recv_msg(sockfd, &request,sizeof(request))) { @@ -1335,7 +1469,9 @@ static void rpc_serve_client(ggml_backend_t backend, sockfd_t sockfd, size_t fre } } -void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, size_t free_mem, size_t total_mem) { +void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint, + const char * cache_dir, + size_t free_mem, size_t total_mem) { std::string host; int port; if (!parse_endpoint(endpoint, host, port)) { @@ -1364,7 +1500,7 @@ void ggml_backend_rpc_start_server(ggml_backend_t backend, const char * endpoint } printf("Accepted client connection, free_mem=%zu, total_mem=%zu\n", free_mem, total_mem); fflush(stdout); - rpc_serve_client(backend, client_socket->fd, free_mem, total_mem); + rpc_serve_client(backend, cache_dir, client_socket->fd, free_mem, total_mem); printf("Client connection closed\n"); fflush(stdout); }