From e947c895ecebc987c7364eaec063fd6383bd4e88 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Tue, 28 Oct 2025 03:02:02 +0300 Subject: [PATCH 1/3] binary-cache-store: UpsertFile accept `Source &` instead of `std::istream` --- src/libstore/binary-cache-store.cc | 11 +++++------ src/libstore/http-binary-cache-store.cc | 8 ++------ .../include/nix/store/binary-cache-store.hh | 7 ++----- .../include/nix/store/http-binary-cache-store.hh | 7 ++----- src/libstore/local-binary-cache-store.cc | 7 +------ src/libstore/s3-binary-cache-store.cc | 14 ++++---------- 6 files changed, 16 insertions(+), 38 deletions(-) diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 9bb81add7..0133a45e7 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -79,7 +79,8 @@ std::optional BinaryCacheStore::getNixCacheInfo() void BinaryCacheStore::upsertFile( const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint) { - upsertFile(path, std::make_shared(std::move(data)), mimeType, sizeHint); + StringSource source{data}; + upsertFile(path, source, mimeType, sizeHint); } void BinaryCacheStore::getFile(const std::string & path, Callback> callback) noexcept @@ -271,12 +272,10 @@ ref BinaryCacheStore::addToStoreCommon( /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { + AutoCloseFD fd = toDescriptor(open(fnTemp.c_str(), O_RDONLY)); + FdSource source{fd.get()}; stats.narWrite++; - upsertFile( - narInfo->url, - std::make_shared(fnTemp, std::ios_base::in | std::ios_base::binary), - "application/x-nix-nar", - narInfo->fileSize); + upsertFile(narInfo->url, source, "application/x-nix-nar", narInfo->fileSize); } else stats.narWriteAverted++; diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 4b8276572..987ceef97 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -135,15 +135,11 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path) } void HttpBinaryCacheStore::upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) + const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) { auto req = makeRequest(path); req.method = HttpMethod::PUT; - auto data = StreamToSourceAdapter(istream).drain(); - + auto data = source.drain(); auto compressionMethod = getCompressionMethod(path); if (compressionMethod) { diff --git a/src/libstore/include/nix/store/binary-cache-store.hh b/src/libstore/include/nix/store/binary-cache-store.hh index 0bed09aec..e5c88a8d4 100644 --- a/src/libstore/include/nix/store/binary-cache-store.hh +++ b/src/libstore/include/nix/store/binary-cache-store.hh @@ -100,11 +100,8 @@ public: virtual bool fileExists(const std::string & path) = 0; - virtual void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) = 0; + virtual void + upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) = 0; void upsertFile( const std::string & path, diff --git a/src/libstore/include/nix/store/http-binary-cache-store.hh b/src/libstore/include/nix/store/http-binary-cache-store.hh index ecad09975..723034d4d 100644 --- a/src/libstore/include/nix/store/http-binary-cache-store.hh +++ b/src/libstore/include/nix/store/http-binary-cache-store.hh @@ -80,11 +80,8 @@ protected: bool fileExists(const std::string & path) override; - void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) override; + void + upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; FileTransferRequest makeRequest(std::string_view path); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index c1811bf17..8f07286f4 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -53,17 +53,12 @@ protected: bool fileExists(const std::string & path) override; - void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) override + void upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override { auto path2 = config->binaryCacheDir + "/" + path; static std::atomic counter{0}; Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter); AutoDelete del(tmp, false); - StreamToSourceAdapter source(istream); writeFile(tmp, source); std::filesystem::rename(tmp, path2); del.cancel(); diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 8617c9fe9..387efbeb4 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -20,11 +20,8 @@ public: { } - void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) override; + void + upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; private: ref s3Config; @@ -70,12 +67,9 @@ private: }; void S3BinaryCacheStore::upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) + const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) { - HttpBinaryCacheStore::upsertFile(path, istream, mimeType, sizeHint); + HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint); } std::string S3BinaryCacheStore::createMultipartUpload( From b8d7f551e4135c27a57897301694c839e2729c59 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Tue, 28 Oct 2025 01:54:58 +0300 Subject: [PATCH 2/3] libutil: Add RestartableSource This is necessary to make seeking work with libcurl. --- src/libutil/include/nix/util/serialise.hh | 24 ++++++++++++++- src/libutil/serialise.cc | 37 +++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/src/libutil/include/nix/util/serialise.hh b/src/libutil/include/nix/util/serialise.hh index d6845a494..1bf06c2e8 100644 --- a/src/libutil/include/nix/util/serialise.hh +++ b/src/libutil/include/nix/util/serialise.hh @@ -230,10 +230,18 @@ struct StringSink : Sink void operator()(std::string_view data) override; }; +/** + * Source type that can be restarted. + */ +struct RestartableSource : Source +{ + virtual void restart() = 0; +}; + /** * A source that reads data from a string. */ -struct StringSource : Source +struct StringSource : RestartableSource { std::string_view s; size_t pos; @@ -257,8 +265,22 @@ struct StringSource : Source size_t read(char * data, size_t len) override; void skip(size_t len) override; + + void restart() override + { + pos = 0; + } }; +/** + * Create a restartable Source from a factory function. + * + * @param factory Factory function that returns a fresh instance of the Source. Gets + * called for each source restart. + * @pre factory must return an equivalent source for each invocation. + */ +std::unique_ptr restartableSourceFromFactory(std::function()> factory); + /** * A sink that writes all incoming data to two other sinks. */ diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index ba153625e..51cf48d9a 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -513,4 +513,41 @@ size_t ChainSource::read(char * data, size_t len) } } +std::unique_ptr restartableSourceFromFactory(std::function()> factory) +{ + struct RestartableSourceImpl : RestartableSource + { + RestartableSourceImpl(decltype(factory) factory_) + : factory_(std::move(factory_)) + , impl(this->factory_()) + { + } + + decltype(factory) factory_; + std::unique_ptr impl = factory_(); + + size_t read(char * data, size_t len) override + { + return impl->read(data, len); + } + + bool good() override + { + return impl->good(); + } + + void skip(size_t len) override + { + return impl->skip(len); + } + + void restart() override + { + impl = factory_(); + } + }; + + return std::make_unique(std::move(factory)); +} + } // namespace nix From cf75079bd83a62367e50ee8ac436dbf53a4cea40 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Wed, 29 Oct 2025 01:29:09 +0300 Subject: [PATCH 3/3] libstore: Make uploads with filetransfer.cc consume a RestartableSource Make uploads run in constant memory. Also change the callbacks to be noexcept, since we really don't want to be unwinding the stack in the curl thread. That will definitely corrupt that stack and make nix/curl crash in very bad ways. --- src/libfetchers/git-lfs-fetch.cc | 4 +- src/libstore/binary-cache-store.cc | 19 ++++++--- src/libstore/filetransfer.cc | 42 ++++++++++--------- src/libstore/http-binary-cache-store.cc | 17 ++++++-- .../include/nix/store/binary-cache-store.hh | 4 +- .../include/nix/store/filetransfer.hh | 21 +++++++++- .../nix/store/http-binary-cache-store.hh | 4 +- src/libstore/local-binary-cache-store.cc | 3 +- src/libstore/s3-binary-cache-store.cc | 15 ++++--- 9 files changed, 87 insertions(+), 42 deletions(-) diff --git a/src/libfetchers/git-lfs-fetch.cc b/src/libfetchers/git-lfs-fetch.cc index aee1163bb..936976e55 100644 --- a/src/libfetchers/git-lfs-fetch.cc +++ b/src/libfetchers/git-lfs-fetch.cc @@ -219,7 +219,9 @@ std::vector Fetch::fetchUrls(const std::vector & pointe nlohmann::json oidList = pointerToPayload(pointers); nlohmann::json data = {{"operation", "download"}}; data["objects"] = oidList; - request.data = data.dump(); + auto payload = data.dump(); + StringSource source{payload}; + request.data = {source}; FileTransferResult result = getFileTransfer()->upload(request); auto responseString = result.data; diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 0133a45e7..274e47271 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -79,8 +79,8 @@ std::optional BinaryCacheStore::getNixCacheInfo() void BinaryCacheStore::upsertFile( const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint) { - StringSource source{data}; - upsertFile(path, source, mimeType, sizeHint); + auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique(data); }); + upsertFile(path, *source, mimeType, sizeHint); } void BinaryCacheStore::getFile(const std::string & path, Callback> callback) noexcept @@ -272,10 +272,19 @@ ref BinaryCacheStore::addToStoreCommon( /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { - AutoCloseFD fd = toDescriptor(open(fnTemp.c_str(), O_RDONLY)); - FdSource source{fd.get()}; + auto source = restartableSourceFromFactory([fnTemp]() { + struct AutoCloseFDSource : AutoCloseFD, FdSource + { + AutoCloseFDSource(AutoCloseFD fd) + : AutoCloseFD(std::move(fd)) + , FdSource(get()) + { + } + }; + return std::make_unique(toDescriptor(open(fnTemp.c_str(), O_RDONLY))); + }); stats.narWrite++; - upsertFile(narInfo->url, source, "application/x-nix-nar", narInfo->fileSize); + upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize); } else stats.narWriteAverted++; diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 304984d99..ae739fcf8 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer return 0; } - size_t readOffset = 0; - - size_t readCallback(char * buffer, size_t size, size_t nitems) - { - if (readOffset == request.data->length()) - return 0; - auto count = std::min(size * nitems, request.data->length() - readOffset); - assert(count); - memcpy(buffer, request.data->data() + readOffset, count); - readOffset += count; - return count; + size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept + try { + auto data = request.data; + return data->source->read(buffer, nitems * size); + } catch (EndOfFile &) { + return 0; + } catch (...) { + return CURL_READFUNC_ABORT; } - static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) + static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept { return ((TransferItem *) userp)->readCallback(buffer, size, nitems); } @@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer } #endif - size_t seekCallback(curl_off_t offset, int origin) - { + size_t seekCallback(curl_off_t offset, int origin) noexcept + try { + auto source = request.data->source; if (origin == SEEK_SET) { - readOffset = offset; + source->restart(); + source->skip(offset); } else if (origin == SEEK_CUR) { - readOffset += offset; + source->skip(offset); } else if (origin == SEEK_END) { - readOffset = request.data->length() + offset; + NullSink sink{}; + source->drainInto(sink); } return CURL_SEEKFUNC_OK; + } catch (...) { + return CURL_SEEKFUNC_FAIL; } - static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) + static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept { return ((TransferItem *) clientp)->seekCallback(offset, origin); } @@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer if (request.data) { if (request.method == HttpMethod::POST) { curl_easy_setopt(req, CURLOPT_POST, 1L); - curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint); } else if (request.method == HttpMethod::PUT) { curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint); } else { unreachable(); } diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 987ceef97..8a9f06992 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -135,19 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path) } void HttpBinaryCacheStore::upsertFile( - const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { auto req = makeRequest(path); req.method = HttpMethod::PUT; - auto data = source.drain(); auto compressionMethod = getCompressionMethod(path); + std::string data; + std::optional stringSource{}; + if (compressionMethod) { - data = compress(*compressionMethod, data); + StringSink sink{}; + auto compressionSink = makeCompressionSink(*compressionMethod, sink); + source.drainInto(*compressionSink); + compressionSink->finish(); + data = std::move(sink.s); req.headers.emplace_back("Content-Encoding", *compressionMethod); + stringSource = StringSource{data}; + req.data = {*stringSource}; + } else { + req.data = {sizeHint, source}; } - req.data = std::move(data); req.mimeType = mimeType; try { diff --git a/src/libstore/include/nix/store/binary-cache-store.hh b/src/libstore/include/nix/store/binary-cache-store.hh index e5c88a8d4..e64dc3eae 100644 --- a/src/libstore/include/nix/store/binary-cache-store.hh +++ b/src/libstore/include/nix/store/binary-cache-store.hh @@ -100,8 +100,8 @@ public: virtual bool fileExists(const std::string & path) = 0; - virtual void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) = 0; + virtual void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0; void upsertFile( const std::string & path, diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 08a2b6329..6419a686e 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -115,7 +115,26 @@ struct FileTransferRequest unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT; ActivityId parentAct; bool decompress = true; - std::optional data; + + struct UploadData + { + UploadData(StringSource & s) + : sizeHint(s.s.length()) + , source(&s) + { + } + + UploadData(std::size_t sizeHint, RestartableSource & source) + : sizeHint(sizeHint) + , source(&source) + { + } + + std::size_t sizeHint = 0; + RestartableSource * source = nullptr; + }; + + std::optional data; std::string mimeType; std::function dataCallback; /** diff --git a/src/libstore/include/nix/store/http-binary-cache-store.hh b/src/libstore/include/nix/store/http-binary-cache-store.hh index 723034d4d..70ba36feb 100644 --- a/src/libstore/include/nix/store/http-binary-cache-store.hh +++ b/src/libstore/include/nix/store/http-binary-cache-store.hh @@ -80,8 +80,8 @@ protected: bool fileExists(const std::string & path) override; - void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; FileTransferRequest makeRequest(std::string_view path); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 8f07286f4..63730a01b 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -53,7 +53,8 @@ protected: bool fileExists(const std::string & path) override; - void upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override { auto path2 = config->binaryCacheDir + "/" + path; static std::atomic counter{0}; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 387efbeb4..d3e6c7baf 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -20,8 +20,8 @@ public: { } - void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; private: ref s3Config; @@ -67,7 +67,7 @@ private: }; void S3BinaryCacheStore::upsertFile( - const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint); } @@ -86,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload( req.uri = VerbatimURL(url); req.method = HttpMethod::POST; - req.data = ""; + StringSource payload{std::string_view("")}; + req.data = {payload}; req.mimeType = mimeType; if (contentEncoding) { @@ -116,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, url.query["partNumber"] = std::to_string(partNumber); url.query["uploadId"] = uploadId; req.uri = VerbatimURL(url); - req.data = std::move(data); + StringSource payload{data}; + req.data = {payload}; req.mimeType = "application/octet-stream"; auto result = getFileTransfer()->enqueueFileTransfer(req).get(); @@ -163,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload( debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml); - req.data = xml; + StringSource payload{xml}; + req.data = {payload}; req.mimeType = "text/xml"; getFileTransfer()->enqueueFileTransfer(req).get();