1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-11-09 03:56:01 +01:00

Merge pull request #14420 from lovesegfault/compressed-source

refactor(libutil): add `CompressedSource`
This commit is contained in:
John Ericson 2025-10-30 05:21:41 +00:00 committed by GitHub
commit e6f0dd8df5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 61 additions and 9 deletions

View file

@ -141,18 +141,12 @@ void HttpBinaryCacheStore::upsertFile(
req.method = HttpMethod::PUT;
auto compressionMethod = getCompressionMethod(path);
std::string data;
std::optional<StringSource> stringSource{};
std::optional<CompressedSource> compressed;
if (compressionMethod) {
StringSink sink{};
auto compressionSink = makeCompressionSink(*compressionMethod, sink);
source.drainInto(*compressionSink);
compressionSink->finish();
data = std::move(sink.s);
compressed = CompressedSource(source, *compressionMethod);
req.headers.emplace_back("Content-Encoding", *compressionMethod);
stringSource = StringSource{data};
req.data = {*stringSource};
req.data = {compressed->size(), *compressed};
} else {
req.data = {sizeHint, source};
}

View file

@ -272,6 +272,50 @@ struct StringSource : RestartableSource
}
};
/**
* Compresses a RestartableSource using the specified compression method.
*
* @note currently this buffers the entire compressed data stream in memory. In the future it may instead compress data
* on demand, lazily pulling from the original `RestartableSource`. In that case, the `size()` method would go away
* because we would not in fact know the compressed size in advance.
*/
struct CompressedSource : RestartableSource
{
private:
std::string compressedData;
std::string compressionMethod;
StringSource stringSource;
public:
/**
* Compress a RestartableSource using the specified compression method.
*
* @param source The source data to compress
* @param compressionMethod The compression method to use (e.g., "xz", "br")
*/
CompressedSource(RestartableSource & source, const std::string & compressionMethod);
size_t read(char * data, size_t len) override
{
return stringSource.read(data, len);
}
void restart() override
{
stringSource.restart();
}
uint64_t size() const
{
return compressedData.size();
}
std::string_view getCompressionMethod() const
{
return compressionMethod;
}
};
/**
* Create a restartable Source from a factory function.
*

View file

@ -1,4 +1,5 @@
#include "nix/util/serialise.hh"
#include "nix/util/compression.hh"
#include "nix/util/signals.hh"
#include "nix/util/util.hh"
@ -252,6 +253,19 @@ void StringSource::skip(size_t len)
pos += len;
}
CompressedSource::CompressedSource(RestartableSource & source, const std::string & compressionMethod)
: compressedData([&]() {
StringSink sink;
auto compressionSink = makeCompressionSink(compressionMethod, sink);
source.drainInto(*compressionSink);
compressionSink->finish();
return std::move(sink.s);
}())
, compressionMethod(compressionMethod)
, stringSource(compressedData)
{
}
std::unique_ptr<FinishSink> sourceToSink(std::function<void(Source &)> fun)
{
struct SourceToSink : FinishSink