diff --git a/doc/manual/rl-next/libcurl-pausing.md b/doc/manual/rl-next/libcurl-pausing.md new file mode 100644 index 000000000..68a0b5ecf --- /dev/null +++ b/doc/manual/rl-next/libcurl-pausing.md @@ -0,0 +1,12 @@ +--- +synopsis: Fix "download buffer is full; consider increasing the 'download-buffer-size' setting" warning +prs: [14614] +issues: [11728] +--- + +The underlying issue that led to [#11728](https://github.com/NixOS/nix/issues/11728) has been resolved by utilizing +[libcurl write pausing functionality](https://curl.se/libcurl/c/curl_easy_pause.html) to control backpressure when unpacking to slow destinations like the git-backed tarball cache. The default value of `download-buffer-size` is now 1 MiB and it's no longer recommended to increase it, since the root cause has been fixed. + +This is expected to improve download performance on fast connections, since previously a single slow download consumer would stall the thread and prevent any other transfers from progressing. + +Many thanks go out to the [Lix project](https://lix.systems/) for the [implementation](https://git.lix.systems/lix-project/lix/commit/4ae6fb5a8f0d456b8d2ba2aaca3712b4e49057fc) that served as inspiration for this change and for triaging libcurl [issues with pausing](https://github.com/curl/curl/issues/19334). diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 709cdaffb..57caec384 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -48,7 +48,7 @@ struct curlFileTransfer : public FileTransfer std::random_device rd; std::mt19937 mt19937; - struct TransferItem : public std::enable_shared_from_this + struct TransferItem : public std::enable_shared_from_this, public FileTransfer::Item { curlFileTransfer & fileTransfer; FileTransferRequest request; @@ -60,6 +60,7 @@ struct curlFileTransfer : public FileTransfer // buffer to accompany the `req` above char errbuf[CURL_ERROR_SIZE]; bool active = false; // whether the handle has been added to the multi object + bool paused = false; // whether the request has been paused previously std::string statusMsg; unsigned int attempt = 0; @@ -116,7 +117,13 @@ struct curlFileTransfer : public FileTransfer successful response. */ if (successfulStatuses.count(httpStatus)) { writtenToSink += data.size(); - this->request.dataCallback(data); + PauseTransfer needsPause = this->request.dataCallback(data); + if (needsPause == PauseTransfer::Yes) { + /* Smuggle the boolean flag into writeCallback. Note that + the finalSink might get called multiple times if there's + decompression going on. */ + paused = true; + } } } else this->result.data.append(data); @@ -195,6 +202,14 @@ struct curlFileTransfer : public FileTransfer } (*decompressionSink)({(char *) contents, realSize}); + if (paused) { + /* The callback has signaled that the transfer needs to be + paused. Already consumed data won't be returned twice unlike + when returning CURL_WRITEFUNC_PAUSE. + https://curl-library.cool.haxx.narkive.com/larE1cRA/curl-easy-pause-documentation-question + */ + curl_easy_pause(req, CURLPAUSE_RECV); + } return realSize; } catch (...) { @@ -364,6 +379,15 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) clientp)->seekCallback(offset, origin); } + void unpause() + { + /* Unpausing an already unpaused transfer is a no-op. */ + if (paused) { + curl_easy_pause(req, CURLPAUSE_CONT); + paused = false; + } + } + void init() { if (!req) @@ -624,7 +648,7 @@ struct curlFileTransfer : public FileTransfer errorSink.reset(); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); try { - fileTransfer.enqueueItem(shared_from_this()); + fileTransfer.enqueueItem(ref{shared_from_this()}); } catch (const nix::Error & e) { // If enqueue fails (e.g., during shutdown), fail the transfer properly // instead of letting the exception propagate, which would leave done=false @@ -641,24 +665,24 @@ struct curlFileTransfer : public FileTransfer { struct EmbargoComparator { - bool operator()(const std::shared_ptr & i1, const std::shared_ptr & i2) + bool operator()(const ref & i1, const ref & i2) { return i1->embargo > i2->embargo; } }; - std:: - priority_queue, std::vector>, EmbargoComparator> - incoming; + std::priority_queue, std::vector>, EmbargoComparator> incoming; + std::vector> unpause; private: bool quitting = false; public: void quit() { quitting = true; - /* We wil not be processing any more incoming requests */ + /* We will not be processing any more incoming requests */ while (!incoming.empty()) incoming.pop(); + unpause.clear(); } bool isQuitting() @@ -827,6 +851,17 @@ struct curlFileTransfer : public FileTransfer item->active = true; items[item->req] = item; } + + /* NOTE: Unpausing may invoke callbacks to flush all buffers. */ + auto unpause = [&]() { + auto state(state_.lock()); + auto res = state->unpause; + state->unpause.clear(); + return res; + }(); + + for (auto & item : unpause) + item->unpause(); } debug("download thread shutting down"); @@ -851,7 +886,7 @@ struct curlFileTransfer : public FileTransfer } } - void enqueueItem(std::shared_ptr item) + ItemHandle enqueueItem(ref item) { if (item->request.data && item->request.uri.scheme() != "http" && item->request.uri.scheme() != "https" && item->request.uri.scheme() != "s3") @@ -866,19 +901,34 @@ struct curlFileTransfer : public FileTransfer #ifndef _WIN32 // TODO need graceful async exit support on Windows? writeFull(wakeupPipe.writeSide.get(), " "); #endif + + return ItemHandle(static_cast(*item)); } - void enqueueFileTransfer(const FileTransferRequest & request, Callback callback) override + ItemHandle enqueueFileTransfer(const FileTransferRequest & request, Callback callback) override { /* Handle s3:// URIs by converting to HTTPS and optionally adding auth */ if (request.uri.scheme() == "s3") { auto modifiedRequest = request; modifiedRequest.setupForS3(); - enqueueItem(std::make_shared(*this, std::move(modifiedRequest), std::move(callback))); - return; + return enqueueItem(make_ref(*this, std::move(modifiedRequest), std::move(callback))); } - enqueueItem(std::make_shared(*this, request, std::move(callback))); + return enqueueItem(make_ref(*this, request, std::move(callback))); + } + + void unpauseTransfer(ref item) + { + auto state(state_.lock()); + state->unpause.push_back(std::move(item)); +#ifndef _WIN32 // TODO need graceful async exit support on Windows? + writeFull(wakeupPipe.writeSide.get(), " "); +#endif + } + + void unpauseTransfer(ItemHandle handle) override + { + unpauseTransfer(ref{static_cast(handle.item.get()).shared_from_this()}); } }; @@ -975,6 +1025,7 @@ void FileTransfer::download( struct State { bool quit = false; + bool paused = false; std::exception_ptr exc; std::string data; std::condition_variable avail, request; @@ -990,31 +1041,38 @@ void FileTransfer::download( state->request.notify_one(); }); - request.dataCallback = [_state](std::string_view data) { + request.dataCallback = [_state, uri = request.uri.to_string()](std::string_view data) -> PauseTransfer { auto state(_state->lock()); if (state->quit) - return; - - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > fileTransferSettings.downloadBufferSize) { - debug("download buffer is full; going to sleep"); - static bool haveWarned = false; - warnOnce(haveWarned, "download buffer is full; consider increasing the 'download-buffer-size' setting"); - state.wait_for(state->request, std::chrono::seconds(10)); - } + return PauseTransfer::No; /* Append data to the buffer and wake up the calling thread. */ state->data.append(data); state->avail.notify_one(); + + if (state->data.size() <= fileTransferSettings.downloadBufferSize) + return PauseTransfer::No; + + /* dataCallback gets called multiple times by an intermediate sink. Only + issue the debug message the first time around. */ + if (!state->paused) + debug( + "pausing transfer for '%s': download buffer is full (%d > %d)", + uri, + state->data.size(), + fileTransferSettings.downloadBufferSize); + + state->paused = true; + + /* Technically the buffer might become larger than + downloadBufferSize, but with sinks there's no way to avoid + consuming data. */ + return PauseTransfer::Yes; }; - enqueueFileTransfer( + auto handle = enqueueFileTransfer( request, {[_state, resultCallback{std::move(resultCallback)}](std::future fut) { auto state(_state->lock()); state->quit = true; @@ -1047,6 +1105,10 @@ void FileTransfer::download( return; } + if (state->paused) { + unpauseTransfer(handle); + state->paused = false; + } state.wait(state->avail); if (state->data.empty()) diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 0523f79c7..76d036a78 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -70,12 +70,12 @@ struct FileTransferSettings : Config Setting downloadBufferSize{ this, - 64 * 1024 * 1024, + 1 * 1024 * 1024, "download-buffer-size", R"( The size of Nix's internal download buffer in bytes during `curl` transfers. If data is not processed quickly enough to exceed the size of this buffer, downloads may stall. - The default is 67108864 (64 MiB). + The default is 1048576 (1 MiB). )"}; }; @@ -105,6 +105,11 @@ struct UsernameAuth std::optional password; }; +enum class PauseTransfer : bool { + No = false, + Yes = true, +}; + struct FileTransferRequest { VerbatimURL uri; @@ -136,7 +141,14 @@ struct FileTransferRequest std::optional data; std::string mimeType; - std::function dataCallback; + + /** + * Callbacked invoked with a chunk of received data. + * Can pause the transfer by returning PauseTransfer::Yes. No data must be consumed + * if transfer is paused. + */ + std::function dataCallback; + /** * Optional username and password for HTTP basic authentication. * When provided, these credentials will be used with curl's CURLOPT_USERNAME/PASSWORD option. @@ -226,6 +238,25 @@ class Store; struct FileTransfer { +protected: + class Item + {}; + +public: + /** + * An opaque handle to the file transfer. Can be used to reference an in-flight transfer operations. + */ + struct ItemHandle + { + std::reference_wrapper item; + friend struct FileTransfer; + + ItemHandle(Item & item) + : item(item) + { + } + }; + virtual ~FileTransfer() {} /** @@ -233,7 +264,13 @@ struct FileTransfer * the download. The future may throw a FileTransferError * exception. */ - virtual void enqueueFileTransfer(const FileTransferRequest & request, Callback callback) = 0; + virtual ItemHandle + enqueueFileTransfer(const FileTransferRequest & request, Callback callback) = 0; + + /** + * Unpause a transfer that has been previously paused by a dataCallback. + */ + virtual void unpauseTransfer(ItemHandle handle) = 0; std::future enqueueFileTransfer(const FileTransferRequest & request); diff --git a/src/libstore/meson.build b/src/libstore/meson.build index e3425deb5..2d7217513 100644 --- a/src/libstore/meson.build +++ b/src/libstore/meson.build @@ -114,6 +114,15 @@ boost = dependency( deps_other += boost curl = dependency('libcurl', 'curl', version : '>= 7.75.0') +if curl.version().version_compare('>=8.16.0 <8.17.0') + # Out of precaution, avoid building with libcurl version that suffer from https://github.com/curl/curl/issues/19334. + error( + 'curl @0@ has issues with write pausing, please use libcurl < 8.16 or >= 8.17, see https://github.com/curl/curl/issues/19334'.format( + curl.version(), + ), + ) +endif + deps_private += curl # seccomp only makes sense on Linux