From 93fe3354b5e4d925c941abc7397d076eb5c6e553 Mon Sep 17 00:00:00 2001 From: Bernardo Meurer Costa Date: Thu, 30 Oct 2025 01:19:44 +0000 Subject: [PATCH] refactor(libutil): add `CompressedSource` Introduce a `CompressedSource` class in libutil's `serialise.hh` that compresses a `RestartableSource` and owns the compressed data. This is a general-purpose utility that can be used anywhere compressed data needs to be treated as a source. --- src/libstore/http-binary-cache-store.cc | 12 ++----- src/libutil/include/nix/util/serialise.hh | 44 +++++++++++++++++++++++ src/libutil/serialise.cc | 14 ++++++++ 3 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 8a9f06992..8f37bba3e 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -141,18 +141,12 @@ void HttpBinaryCacheStore::upsertFile( req.method = HttpMethod::PUT; auto compressionMethod = getCompressionMethod(path); - std::string data; - std::optional stringSource{}; + std::optional compressed; if (compressionMethod) { - StringSink sink{}; - auto compressionSink = makeCompressionSink(*compressionMethod, sink); - source.drainInto(*compressionSink); - compressionSink->finish(); - data = std::move(sink.s); + compressed = CompressedSource(source, *compressionMethod); req.headers.emplace_back("Content-Encoding", *compressionMethod); - stringSource = StringSource{data}; - req.data = {*stringSource}; + req.data = {compressed->size(), *compressed}; } else { req.data = {sizeHint, source}; } diff --git a/src/libutil/include/nix/util/serialise.hh b/src/libutil/include/nix/util/serialise.hh index 1bf06c2e8..09b33bf95 100644 --- a/src/libutil/include/nix/util/serialise.hh +++ b/src/libutil/include/nix/util/serialise.hh @@ -272,6 +272,50 @@ struct StringSource : RestartableSource } }; +/** + * Compresses a RestartableSource using the specified compression method. + * + * @note currently this buffers the entire compressed data stream in memory. In the future it may instead compress data + * on demand, lazily pulling from the original `RestartableSource`. In that case, the `size()` method would go away + * because we would not in fact know the compressed size in advance. + */ +struct CompressedSource : RestartableSource +{ +private: + std::string compressedData; + std::string compressionMethod; + StringSource stringSource; + +public: + /** + * Compress a RestartableSource using the specified compression method. + * + * @param source The source data to compress + * @param compressionMethod The compression method to use (e.g., "xz", "br") + */ + CompressedSource(RestartableSource & source, const std::string & compressionMethod); + + size_t read(char * data, size_t len) override + { + return stringSource.read(data, len); + } + + void restart() override + { + stringSource.restart(); + } + + uint64_t size() const + { + return compressedData.size(); + } + + std::string_view getCompressionMethod() const + { + return compressionMethod; + } +}; + /** * Create a restartable Source from a factory function. * diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index 51cf48d9a..fea31fbb6 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -1,4 +1,5 @@ #include "nix/util/serialise.hh" +#include "nix/util/compression.hh" #include "nix/util/signals.hh" #include "nix/util/util.hh" @@ -252,6 +253,19 @@ void StringSource::skip(size_t len) pos += len; } +CompressedSource::CompressedSource(RestartableSource & source, const std::string & compressionMethod) + : compressedData([&]() { + StringSink sink; + auto compressionSink = makeCompressionSink(compressionMethod, sink); + source.drainInto(*compressionSink); + compressionSink->finish(); + return std::move(sink.s); + }()) + , compressionMethod(compressionMethod) + , stringSource(compressedData) +{ +} + std::unique_ptr sourceToSink(std::function fun) { struct SourceToSink : FinishSink