From cf75079bd83a62367e50ee8ac436dbf53a4cea40 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Wed, 29 Oct 2025 01:29:09 +0300 Subject: [PATCH] libstore: Make uploads with filetransfer.cc consume a RestartableSource Make uploads run in constant memory. Also change the callbacks to be noexcept, since we really don't want to be unwinding the stack in the curl thread. That will definitely corrupt that stack and make nix/curl crash in very bad ways. --- src/libfetchers/git-lfs-fetch.cc | 4 +- src/libstore/binary-cache-store.cc | 19 ++++++--- src/libstore/filetransfer.cc | 42 ++++++++++--------- src/libstore/http-binary-cache-store.cc | 17 ++++++-- .../include/nix/store/binary-cache-store.hh | 4 +- .../include/nix/store/filetransfer.hh | 21 +++++++++- .../nix/store/http-binary-cache-store.hh | 4 +- src/libstore/local-binary-cache-store.cc | 3 +- src/libstore/s3-binary-cache-store.cc | 15 ++++--- 9 files changed, 87 insertions(+), 42 deletions(-) diff --git a/src/libfetchers/git-lfs-fetch.cc b/src/libfetchers/git-lfs-fetch.cc index aee1163bb..936976e55 100644 --- a/src/libfetchers/git-lfs-fetch.cc +++ b/src/libfetchers/git-lfs-fetch.cc @@ -219,7 +219,9 @@ std::vector Fetch::fetchUrls(const std::vector & pointe nlohmann::json oidList = pointerToPayload(pointers); nlohmann::json data = {{"operation", "download"}}; data["objects"] = oidList; - request.data = data.dump(); + auto payload = data.dump(); + StringSource source{payload}; + request.data = {source}; FileTransferResult result = getFileTransfer()->upload(request); auto responseString = result.data; diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 0133a45e7..274e47271 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -79,8 +79,8 @@ std::optional BinaryCacheStore::getNixCacheInfo() void BinaryCacheStore::upsertFile( const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint) { - StringSource source{data}; - upsertFile(path, source, mimeType, sizeHint); + auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique(data); }); + upsertFile(path, *source, mimeType, sizeHint); } void BinaryCacheStore::getFile(const std::string & path, Callback> callback) noexcept @@ -272,10 +272,19 @@ ref BinaryCacheStore::addToStoreCommon( /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { - AutoCloseFD fd = toDescriptor(open(fnTemp.c_str(), O_RDONLY)); - FdSource source{fd.get()}; + auto source = restartableSourceFromFactory([fnTemp]() { + struct AutoCloseFDSource : AutoCloseFD, FdSource + { + AutoCloseFDSource(AutoCloseFD fd) + : AutoCloseFD(std::move(fd)) + , FdSource(get()) + { + } + }; + return std::make_unique(toDescriptor(open(fnTemp.c_str(), O_RDONLY))); + }); stats.narWrite++; - upsertFile(narInfo->url, source, "application/x-nix-nar", narInfo->fileSize); + upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize); } else stats.narWriteAverted++; diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 304984d99..ae739fcf8 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer return 0; } - size_t readOffset = 0; - - size_t readCallback(char * buffer, size_t size, size_t nitems) - { - if (readOffset == request.data->length()) - return 0; - auto count = std::min(size * nitems, request.data->length() - readOffset); - assert(count); - memcpy(buffer, request.data->data() + readOffset, count); - readOffset += count; - return count; + size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept + try { + auto data = request.data; + return data->source->read(buffer, nitems * size); + } catch (EndOfFile &) { + return 0; + } catch (...) { + return CURL_READFUNC_ABORT; } - static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) + static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept { return ((TransferItem *) userp)->readCallback(buffer, size, nitems); } @@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer } #endif - size_t seekCallback(curl_off_t offset, int origin) - { + size_t seekCallback(curl_off_t offset, int origin) noexcept + try { + auto source = request.data->source; if (origin == SEEK_SET) { - readOffset = offset; + source->restart(); + source->skip(offset); } else if (origin == SEEK_CUR) { - readOffset += offset; + source->skip(offset); } else if (origin == SEEK_END) { - readOffset = request.data->length() + offset; + NullSink sink{}; + source->drainInto(sink); } return CURL_SEEKFUNC_OK; + } catch (...) { + return CURL_SEEKFUNC_FAIL; } - static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) + static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept { return ((TransferItem *) clientp)->seekCallback(offset, origin); } @@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer if (request.data) { if (request.method == HttpMethod::POST) { curl_easy_setopt(req, CURLOPT_POST, 1L); - curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint); } else if (request.method == HttpMethod::PUT) { curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint); } else { unreachable(); } diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 987ceef97..8a9f06992 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -135,19 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path) } void HttpBinaryCacheStore::upsertFile( - const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { auto req = makeRequest(path); req.method = HttpMethod::PUT; - auto data = source.drain(); auto compressionMethod = getCompressionMethod(path); + std::string data; + std::optional stringSource{}; + if (compressionMethod) { - data = compress(*compressionMethod, data); + StringSink sink{}; + auto compressionSink = makeCompressionSink(*compressionMethod, sink); + source.drainInto(*compressionSink); + compressionSink->finish(); + data = std::move(sink.s); req.headers.emplace_back("Content-Encoding", *compressionMethod); + stringSource = StringSource{data}; + req.data = {*stringSource}; + } else { + req.data = {sizeHint, source}; } - req.data = std::move(data); req.mimeType = mimeType; try { diff --git a/src/libstore/include/nix/store/binary-cache-store.hh b/src/libstore/include/nix/store/binary-cache-store.hh index e5c88a8d4..e64dc3eae 100644 --- a/src/libstore/include/nix/store/binary-cache-store.hh +++ b/src/libstore/include/nix/store/binary-cache-store.hh @@ -100,8 +100,8 @@ public: virtual bool fileExists(const std::string & path) = 0; - virtual void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) = 0; + virtual void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0; void upsertFile( const std::string & path, diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 08a2b6329..6419a686e 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -115,7 +115,26 @@ struct FileTransferRequest unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT; ActivityId parentAct; bool decompress = true; - std::optional data; + + struct UploadData + { + UploadData(StringSource & s) + : sizeHint(s.s.length()) + , source(&s) + { + } + + UploadData(std::size_t sizeHint, RestartableSource & source) + : sizeHint(sizeHint) + , source(&source) + { + } + + std::size_t sizeHint = 0; + RestartableSource * source = nullptr; + }; + + std::optional data; std::string mimeType; std::function dataCallback; /** diff --git a/src/libstore/include/nix/store/http-binary-cache-store.hh b/src/libstore/include/nix/store/http-binary-cache-store.hh index 723034d4d..70ba36feb 100644 --- a/src/libstore/include/nix/store/http-binary-cache-store.hh +++ b/src/libstore/include/nix/store/http-binary-cache-store.hh @@ -80,8 +80,8 @@ protected: bool fileExists(const std::string & path) override; - void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; FileTransferRequest makeRequest(std::string_view path); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 8f07286f4..63730a01b 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -53,7 +53,8 @@ protected: bool fileExists(const std::string & path) override; - void upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override { auto path2 = config->binaryCacheDir + "/" + path; static std::atomic counter{0}; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 387efbeb4..d3e6c7baf 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -20,8 +20,8 @@ public: { } - void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; private: ref s3Config; @@ -67,7 +67,7 @@ private: }; void S3BinaryCacheStore::upsertFile( - const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint); } @@ -86,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload( req.uri = VerbatimURL(url); req.method = HttpMethod::POST; - req.data = ""; + StringSource payload{std::string_view("")}; + req.data = {payload}; req.mimeType = mimeType; if (contentEncoding) { @@ -116,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, url.query["partNumber"] = std::to_string(partNumber); url.query["uploadId"] = uploadId; req.uri = VerbatimURL(url); - req.data = std::move(data); + StringSource payload{data}; + req.data = {payload}; req.mimeType = "application/octet-stream"; auto result = getFileTransfer()->enqueueFileTransfer(req).get(); @@ -163,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload( debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml); - req.data = xml; + StringSource payload{xml}; + req.data = {payload}; req.mimeType = "text/xml"; getFileTransfer()->enqueueFileTransfer(req).get();