diff --git a/src/libfetchers/git-lfs-fetch.cc b/src/libfetchers/git-lfs-fetch.cc index aee1163bb..936976e55 100644 --- a/src/libfetchers/git-lfs-fetch.cc +++ b/src/libfetchers/git-lfs-fetch.cc @@ -219,7 +219,9 @@ std::vector Fetch::fetchUrls(const std::vector & pointe nlohmann::json oidList = pointerToPayload(pointers); nlohmann::json data = {{"operation", "download"}}; data["objects"] = oidList; - request.data = data.dump(); + auto payload = data.dump(); + StringSource source{payload}; + request.data = {source}; FileTransferResult result = getFileTransfer()->upload(request); auto responseString = result.data; diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index 9bb81add7..274e47271 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -79,7 +79,8 @@ std::optional BinaryCacheStore::getNixCacheInfo() void BinaryCacheStore::upsertFile( const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint) { - upsertFile(path, std::make_shared(std::move(data)), mimeType, sizeHint); + auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique(data); }); + upsertFile(path, *source, mimeType, sizeHint); } void BinaryCacheStore::getFile(const std::string & path, Callback> callback) noexcept @@ -271,12 +272,19 @@ ref BinaryCacheStore::addToStoreCommon( /* Atomically write the NAR file. */ if (repair || !fileExists(narInfo->url)) { + auto source = restartableSourceFromFactory([fnTemp]() { + struct AutoCloseFDSource : AutoCloseFD, FdSource + { + AutoCloseFDSource(AutoCloseFD fd) + : AutoCloseFD(std::move(fd)) + , FdSource(get()) + { + } + }; + return std::make_unique(toDescriptor(open(fnTemp.c_str(), O_RDONLY))); + }); stats.narWrite++; - upsertFile( - narInfo->url, - std::make_shared(fnTemp, std::ios_base::in | std::ios_base::binary), - "application/x-nix-nar", - narInfo->fileSize); + upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize); } else stats.narWriteAverted++; diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 304984d99..ae739fcf8 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer return 0; } - size_t readOffset = 0; - - size_t readCallback(char * buffer, size_t size, size_t nitems) - { - if (readOffset == request.data->length()) - return 0; - auto count = std::min(size * nitems, request.data->length() - readOffset); - assert(count); - memcpy(buffer, request.data->data() + readOffset, count); - readOffset += count; - return count; + size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept + try { + auto data = request.data; + return data->source->read(buffer, nitems * size); + } catch (EndOfFile &) { + return 0; + } catch (...) { + return CURL_READFUNC_ABORT; } - static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) + static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept { return ((TransferItem *) userp)->readCallback(buffer, size, nitems); } @@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer } #endif - size_t seekCallback(curl_off_t offset, int origin) - { + size_t seekCallback(curl_off_t offset, int origin) noexcept + try { + auto source = request.data->source; if (origin == SEEK_SET) { - readOffset = offset; + source->restart(); + source->skip(offset); } else if (origin == SEEK_CUR) { - readOffset += offset; + source->skip(offset); } else if (origin == SEEK_END) { - readOffset = request.data->length() + offset; + NullSink sink{}; + source->drainInto(sink); } return CURL_SEEKFUNC_OK; + } catch (...) { + return CURL_SEEKFUNC_FAIL; } - static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) + static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept { return ((TransferItem *) clientp)->seekCallback(offset, origin); } @@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer if (request.data) { if (request.method == HttpMethod::POST) { curl_easy_setopt(req, CURLOPT_POST, 1L); - curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint); } else if (request.method == HttpMethod::PUT) { curl_easy_setopt(req, CURLOPT_UPLOAD, 1L); - curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length()); + curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint); } else { unreachable(); } diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 4b8276572..8a9f06992 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -135,23 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path) } void HttpBinaryCacheStore::upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { auto req = makeRequest(path); req.method = HttpMethod::PUT; - auto data = StreamToSourceAdapter(istream).drain(); - auto compressionMethod = getCompressionMethod(path); + std::string data; + std::optional stringSource{}; + if (compressionMethod) { - data = compress(*compressionMethod, data); + StringSink sink{}; + auto compressionSink = makeCompressionSink(*compressionMethod, sink); + source.drainInto(*compressionSink); + compressionSink->finish(); + data = std::move(sink.s); req.headers.emplace_back("Content-Encoding", *compressionMethod); + stringSource = StringSource{data}; + req.data = {*stringSource}; + } else { + req.data = {sizeHint, source}; } - req.data = std::move(data); req.mimeType = mimeType; try { diff --git a/src/libstore/include/nix/store/binary-cache-store.hh b/src/libstore/include/nix/store/binary-cache-store.hh index 0bed09aec..e64dc3eae 100644 --- a/src/libstore/include/nix/store/binary-cache-store.hh +++ b/src/libstore/include/nix/store/binary-cache-store.hh @@ -101,10 +101,7 @@ public: virtual bool fileExists(const std::string & path) = 0; virtual void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) = 0; + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0; void upsertFile( const std::string & path, diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 08a2b6329..6419a686e 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -115,7 +115,26 @@ struct FileTransferRequest unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT; ActivityId parentAct; bool decompress = true; - std::optional data; + + struct UploadData + { + UploadData(StringSource & s) + : sizeHint(s.s.length()) + , source(&s) + { + } + + UploadData(std::size_t sizeHint, RestartableSource & source) + : sizeHint(sizeHint) + , source(&source) + { + } + + std::size_t sizeHint = 0; + RestartableSource * source = nullptr; + }; + + std::optional data; std::string mimeType; std::function dataCallback; /** diff --git a/src/libstore/include/nix/store/http-binary-cache-store.hh b/src/libstore/include/nix/store/http-binary-cache-store.hh index ecad09975..70ba36feb 100644 --- a/src/libstore/include/nix/store/http-binary-cache-store.hh +++ b/src/libstore/include/nix/store/http-binary-cache-store.hh @@ -81,10 +81,7 @@ protected: bool fileExists(const std::string & path) override; void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) override; + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; FileTransferRequest makeRequest(std::string_view path); diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index c1811bf17..63730a01b 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -54,16 +54,12 @@ protected: bool fileExists(const std::string & path) override; void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) override + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override { auto path2 = config->binaryCacheDir + "/" + path; static std::atomic counter{0}; Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter); AutoDelete del(tmp, false); - StreamToSourceAdapter source(istream); writeFile(tmp, source); std::filesystem::rename(tmp, path2); del.cancel(); diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 8617c9fe9..d3e6c7baf 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -21,10 +21,7 @@ public: } void upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) override; + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override; private: ref s3Config; @@ -70,12 +67,9 @@ private: }; void S3BinaryCacheStore::upsertFile( - const std::string & path, - std::shared_ptr> istream, - const std::string & mimeType, - uint64_t sizeHint) + const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { - HttpBinaryCacheStore::upsertFile(path, istream, mimeType, sizeHint); + HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint); } std::string S3BinaryCacheStore::createMultipartUpload( @@ -92,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload( req.uri = VerbatimURL(url); req.method = HttpMethod::POST; - req.data = ""; + StringSource payload{std::string_view("")}; + req.data = {payload}; req.mimeType = mimeType; if (contentEncoding) { @@ -122,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, url.query["partNumber"] = std::to_string(partNumber); url.query["uploadId"] = uploadId; req.uri = VerbatimURL(url); - req.data = std::move(data); + StringSource payload{data}; + req.data = {payload}; req.mimeType = "application/octet-stream"; auto result = getFileTransfer()->enqueueFileTransfer(req).get(); @@ -169,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload( debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml); - req.data = xml; + StringSource payload{xml}; + req.data = {payload}; req.mimeType = "text/xml"; getFileTransfer()->enqueueFileTransfer(req).get(); diff --git a/src/libutil/include/nix/util/serialise.hh b/src/libutil/include/nix/util/serialise.hh index d6845a494..1bf06c2e8 100644 --- a/src/libutil/include/nix/util/serialise.hh +++ b/src/libutil/include/nix/util/serialise.hh @@ -230,10 +230,18 @@ struct StringSink : Sink void operator()(std::string_view data) override; }; +/** + * Source type that can be restarted. + */ +struct RestartableSource : Source +{ + virtual void restart() = 0; +}; + /** * A source that reads data from a string. */ -struct StringSource : Source +struct StringSource : RestartableSource { std::string_view s; size_t pos; @@ -257,8 +265,22 @@ struct StringSource : Source size_t read(char * data, size_t len) override; void skip(size_t len) override; + + void restart() override + { + pos = 0; + } }; +/** + * Create a restartable Source from a factory function. + * + * @param factory Factory function that returns a fresh instance of the Source. Gets + * called for each source restart. + * @pre factory must return an equivalent source for each invocation. + */ +std::unique_ptr restartableSourceFromFactory(std::function()> factory); + /** * A sink that writes all incoming data to two other sinks. */ diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index ba153625e..51cf48d9a 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -513,4 +513,41 @@ size_t ChainSource::read(char * data, size_t len) } } +std::unique_ptr restartableSourceFromFactory(std::function()> factory) +{ + struct RestartableSourceImpl : RestartableSource + { + RestartableSourceImpl(decltype(factory) factory_) + : factory_(std::move(factory_)) + , impl(this->factory_()) + { + } + + decltype(factory) factory_; + std::unique_ptr impl = factory_(); + + size_t read(char * data, size_t len) override + { + return impl->read(data, len); + } + + bool good() override + { + return impl->good(); + } + + void skip(size_t len) override + { + return impl->skip(len); + } + + void restart() override + { + impl = factory_(); + } + }; + + return std::make_unique(std::move(factory)); +} + } // namespace nix