diff --git a/src/libfetchers/git-lfs-fetch.cc b/src/libfetchers/git-lfs-fetch.cc index aee1163bb..936976e55 100644 --- a/src/libfetchers/git-lfs-fetch.cc +++ b/src/libfetchers/git-lfs-fetch.cc @@ -219,7 +219,9 @@ std::vector Fetch::fetchUrls(const std::vector & pointe nlohmann::json oidList = pointerToPayload(pointers); nlohmann::json data = {{"operation", "download"}}; data["objects"] = oidList; - request.data = data.dump(); + auto payload = data.dump(); + StringSource source{payload}; + request.data = {source}; FileTransferResult result = getFileTransfer()->upload(request); auto responseString = result.data; diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 0133a45e7..274e47271 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -79,8 +79,8 @@ std::optional BinaryCacheStore::getNixCacheInfo() void BinaryCacheStore::upsertFile( const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint) { - StringSource source{data}; - upsertFile(path, source, mimeType, sizeHint); + auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique(data); }); + upsertFile(path, *source, mimeType, sizeHint); } void BinaryCacheStore::getFile(const std::string & path, Callback> callback) noexcept @@ -272,10 +272,19 @@ ref BinaryCacheStore::addToStoreCommon( /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { - AutoCloseFD fd = toDescriptor(open(fnTemp.c_str(), O_RDONLY)); - FdSource source{fd.get()}; + auto source = restartableSourceFromFactory([fnTemp]() { + struct AutoCloseFDSource : AutoCloseFD, FdSource + { + AutoCloseFDSource(AutoCloseFD fd) + : AutoCloseFD(std::move(fd)) + , FdSource(get()) + { + } + }; + return std::make_unique(toDescriptor(open(fnTemp.c_str(), O_RDONLY))); + }); stats.narWrite++; - upsertFile(narInfo->url, source, "application/x-nix-nar", narInfo->fileSize); + upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize); } else stats.narWriteAverted++; diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 304984d99..ae739fcf8 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer return 0; } - size_t readOffset = 0; - - size_t readCallback(char * buffer, size_t size, size_t nitems) - { - if (readOffset == request.data->length()) - return 0; - auto count = std::min(size * nitems, request.data->length() - readOffset); - assert(count); - memcpy(buffer, request.data->data() + readOffset, count); - readOffset += count; - return count; + size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept + try { + auto data = request.data; + return data->source->read(buffer, nitems * size); + } catch (EndOfFile &) { + return 0; + } catch (...) { + return CURL_READFUNC_ABORT; } - static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) + static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept { return ((TransferItem *) userp)->readCallback(buffer, size, nitems); } @@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer } #endif - size_t seekCallback(curl_off_t offset, int origin) - { + size_t seekCallback(curl_off_t offset, int origin) noexcept + try { + auto source = request.data->source; if (origin == SEEK_SET) { - readOffset = offset; + source->restart(); + source->skip(offset); } else if (origin == SEEK_CUR) { - readOffset += offset; + source->skip(offset); } else if (origin == SEEK_END) { - readOffset = request.data->length() + offset; + NullSink sink{}; + source->drainInto(sink); } return CURL_SEEKFUNC_OK; + } catch (...) { + return CURL_SEEKFUNC_FAIL; } - static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) + static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept { return ((TransferItem *) clientp)->seekCallback(offset, origin); } @@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer if (request.data) { if (request.method == HttpMethod::POST) { curl_easy_setopt(req, CURLOPT_POST, 1L); - curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint); } else if (request.method == HttpMethod::PUT) { curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint); } else { unreachable(); } diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 987ceef97..8a9f06992 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -135,19 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path) } void HttpBinaryCacheStore::upsertFile( - const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { auto req = makeRequest(path); req.method = HttpMethod::PUT; - auto data = source.drain(); auto compressionMethod = getCompressionMethod(path); + std::string data; + std::optional stringSource{}; + if (compressionMethod) { - data = compress(*compressionMethod, data); + StringSink sink{}; + auto compressionSink = makeCompressionSink(*compressionMethod, sink); + source.drainInto(*compressionSink); + compressionSink->finish(); + data = std::move(sink.s); req.headers.emplace_back("Content-Encoding", *compressionMethod); + stringSource = StringSource{data}; + req.data = {*stringSource}; + } else { + req.data = {sizeHint, source}; } - req.data = std::move(data); req.mimeType = mimeType; try { diff --git a/src/libstore/include/nix/store/binary-cache-store.hh b/src/libstore/include/nix/store/binary-cache-store.hh index e5c88a8d4..e64dc3eae 100644 --- a/src/libstore/include/nix/store/binary-cache-store.hh +++ b/src/libstore/include/nix/store/binary-cache-store.hh @@ -100,8 +100,8 @@ public: virtual bool fileExists(const std::string & path) = 0; - virtual void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) = 0; + virtual void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0; void upsertFile( const std::string & path, diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 08a2b6329..6419a686e 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -115,7 +115,26 @@ struct FileTransferRequest unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT; ActivityId parentAct; bool decompress = true; - std::optional data; + + struct UploadData + { + UploadData(StringSource & s) + : sizeHint(s.s.length()) + , source(&s) + { + } + + UploadData(std::size_t sizeHint, RestartableSource & source) + : sizeHint(sizeHint) + , source(&source) + { + } + + std::size_t sizeHint = 0; + RestartableSource * source = nullptr; + }; + + std::optional data; std::string mimeType; std::function dataCallback; /** diff --git a/src/libstore/include/nix/store/http-binary-cache-store.hh b/src/libstore/include/nix/store/http-binary-cache-store.hh index 723034d4d..70ba36feb 100644 --- a/src/libstore/include/nix/store/http-binary-cache-store.hh +++ b/src/libstore/include/nix/store/http-binary-cache-store.hh @@ -80,8 +80,8 @@ protected: bool fileExists(const std::string & path) override; - void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; FileTransferRequest makeRequest(std::string_view path); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 8f07286f4..63730a01b 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -53,7 +53,8 @@ protected: bool fileExists(const std::string & path) override; - void upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override { auto path2 = config->binaryCacheDir + "/" + path; static std::atomic counter{0}; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 387efbeb4..d3e6c7baf 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -20,8 +20,8 @@ public: { } - void - upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override; + void upsertFile( + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; private: ref s3Config; @@ -67,7 +67,7 @@ private: }; void S3BinaryCacheStore::upsertFile( - const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint); } @@ -86,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload( req.uri = VerbatimURL(url); req.method = HttpMethod::POST; - req.data = ""; + StringSource payload{std::string_view("")}; + req.data = {payload}; req.mimeType = mimeType; if (contentEncoding) { @@ -116,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, url.query["partNumber"] = std::to_string(partNumber); url.query["uploadId"] = uploadId; req.uri = VerbatimURL(url); - req.data = std::move(data); + StringSource payload{data}; + req.data = {payload}; req.mimeType = "application/octet-stream"; auto result = getFileTransfer()->enqueueFileTransfer(req).get(); @@ -163,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload( debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml); - req.data = xml; + StringSource payload{xml}; + req.data = {payload}; req.mimeType = "text/xml"; getFileTransfer()->enqueueFileTransfer(req).get();