From 3f8474a62f54fc10d1e215753a4aa462c3a80687 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Fri, 21 Nov 2025 23:52:00 +0300 Subject: [PATCH 1/5] libstore/filetransfer: Use ref instead of std::shared_ptr Those can never be nullptr, so we should use the type system to ensure this invariant. --- src/libstore/filetransfer.cc | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 709cdaffb..76d134093 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -624,7 +624,7 @@ struct curlFileTransfer : public FileTransfer errorSink.reset(); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); try { - fileTransfer.enqueueItem(shared_from_this()); + fileTransfer.enqueueItem(ref{shared_from_this()}); } catch (const nix::Error & e) { // If enqueue fails (e.g., during shutdown), fail the transfer properly // instead of letting the exception propagate, which would leave done=false @@ -641,15 +641,13 @@ struct curlFileTransfer : public FileTransfer { struct EmbargoComparator { - bool operator()(const std::shared_ptr & i1, const std::shared_ptr & i2) + bool operator()(const ref & i1, const ref & i2) { return i1->embargo > i2->embargo; } }; - std:: - priority_queue, std::vector>, EmbargoComparator> - incoming; + std::priority_queue, std::vector>, EmbargoComparator> incoming; private: bool quitting = false; public: @@ -851,7 +849,7 @@ struct curlFileTransfer : public FileTransfer } } - void enqueueItem(std::shared_ptr item) + void enqueueItem(ref item) { if (item->request.data && item->request.uri.scheme() != "http" && item->request.uri.scheme() != "https" && item->request.uri.scheme() != "s3") @@ -874,11 +872,11 @@ struct curlFileTransfer : public FileTransfer if (request.uri.scheme() == "s3") { auto modifiedRequest = request; modifiedRequest.setupForS3(); - enqueueItem(std::make_shared(*this, std::move(modifiedRequest), std::move(callback))); + enqueueItem(make_ref(*this, std::move(modifiedRequest), std::move(callback))); return; } - enqueueItem(std::make_shared(*this, request, std::move(callback))); + enqueueItem(make_ref(*this, request, std::move(callback))); } }; From ec0b270c6c154bad1d60832b10ed9676611a8c3e Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Sat, 22 Nov 2025 00:18:50 +0300 Subject: [PATCH 2/5] libstore/filetransfer: Return an opaque handle from enqueueFileTransfer This is necessary to make pausing/unpausing possible in a follow-up commit. --- src/libstore/filetransfer.cc | 13 ++++++----- .../include/nix/store/filetransfer.hh | 23 ++++++++++++++++++- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 76d134093..73fe1bdd9 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -48,7 +48,7 @@ struct curlFileTransfer : public FileTransfer std::random_device rd; std::mt19937 mt19937; - struct TransferItem : public std::enable_shared_from_this + struct TransferItem : public std::enable_shared_from_this, public FileTransfer::Item { curlFileTransfer & fileTransfer; FileTransferRequest request; @@ -849,7 +849,7 @@ struct curlFileTransfer : public FileTransfer } } - void enqueueItem(ref item) + ItemHandle enqueueItem(ref item) { if (item->request.data && item->request.uri.scheme() != "http" && item->request.uri.scheme() != "https" && item->request.uri.scheme() != "s3") @@ -864,19 +864,20 @@ struct curlFileTransfer : public FileTransfer #ifndef _WIN32 // TODO need graceful async exit support on Windows? writeFull(wakeupPipe.writeSide.get(), " "); #endif + + return ItemHandle(static_cast(*item)); } - void enqueueFileTransfer(const FileTransferRequest & request, Callback callback) override + ItemHandle enqueueFileTransfer(const FileTransferRequest & request, Callback callback) override { /* Handle s3:// URIs by converting to HTTPS and optionally adding auth */ if (request.uri.scheme() == "s3") { auto modifiedRequest = request; modifiedRequest.setupForS3(); - enqueueItem(make_ref(*this, std::move(modifiedRequest), std::move(callback))); - return; + return enqueueItem(make_ref(*this, std::move(modifiedRequest), std::move(callback))); } - enqueueItem(make_ref(*this, request, std::move(callback))); + return enqueueItem(make_ref(*this, request, std::move(callback))); } }; diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 0523f79c7..257c90509 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -226,6 +226,26 @@ class Store; struct FileTransfer { +protected: + class Item + {}; + +public: + /** + * An opaque handle to the file transfer. Can be used to reference an in-flight transfer operations. + */ + class ItemHandle + { + std::reference_wrapper item; + friend struct FileTransfer; + + public: + ItemHandle(Item & item) + : item(item) + { + } + }; + virtual ~FileTransfer() {} /** @@ -233,7 +253,8 @@ struct FileTransfer * the download. The future may throw a FileTransferError * exception. */ - virtual void enqueueFileTransfer(const FileTransferRequest & request, Callback callback) = 0; + virtual ItemHandle + enqueueFileTransfer(const FileTransferRequest & request, Callback callback) = 0; std::future enqueueFileTransfer(const FileTransferRequest & request); From 4307420c4473bed25eef8e27d1935f5af0658d03 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Sat, 22 Nov 2025 03:51:40 +0300 Subject: [PATCH 3/5] libstore/filetransfer: Pause transfers instead of stalling the download thread Instead of naively stalling the download thread we can instead stop the transfer. This allows the other multiplexed connections to continue downloading (and unpacking), if the result of the download gets piped into a GitFileSystemObjectSink. Prior art in lix project: - https://git.lix.systems/lix-project/lix/commit/4ae6fb5a8f0d456b8d2ba2aaca3712b4e49057fc - https://git.lix.systems/lix-project/lix/commit/12156d3beb8a16c0e2e8cf7180e1fbf27280a669 This patch is very different from the lix one, since we are using a decompression sink in the middle of the pipeline but the co-authored-by is there since I was motivated to implement this by looking at the lix side of things. Co-authored-by: eldritch horrors --- src/libstore/filetransfer.cc | 97 +++++++++++++++---- .../include/nix/store/filetransfer.hh | 22 ++++- src/libstore/meson.build | 9 ++ 3 files changed, 108 insertions(+), 20 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 73fe1bdd9..57caec384 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -60,6 +60,7 @@ struct curlFileTransfer : public FileTransfer // buffer to accompany the `req` above char errbuf[CURL_ERROR_SIZE]; bool active = false; // whether the handle has been added to the multi object + bool paused = false; // whether the request has been paused previously std::string statusMsg; unsigned int attempt = 0; @@ -116,7 +117,13 @@ struct curlFileTransfer : public FileTransfer successful response. */ if (successfulStatuses.count(httpStatus)) { writtenToSink += data.size(); - this->request.dataCallback(data); + PauseTransfer needsPause = this->request.dataCallback(data); + if (needsPause == PauseTransfer::Yes) { + /* Smuggle the boolean flag into writeCallback. Note that + the finalSink might get called multiple times if there's + decompression going on. */ + paused = true; + } } } else this->result.data.append(data); @@ -195,6 +202,14 @@ struct curlFileTransfer : public FileTransfer } (*decompressionSink)({(char *) contents, realSize}); + if (paused) { + /* The callback has signaled that the transfer needs to be + paused. Already consumed data won't be returned twice unlike + when returning CURL_WRITEFUNC_PAUSE. + https://curl-library.cool.haxx.narkive.com/larE1cRA/curl-easy-pause-documentation-question + */ + curl_easy_pause(req, CURLPAUSE_RECV); + } return realSize; } catch (...) { @@ -364,6 +379,15 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) clientp)->seekCallback(offset, origin); } + void unpause() + { + /* Unpausing an already unpaused transfer is a no-op. */ + if (paused) { + curl_easy_pause(req, CURLPAUSE_CONT); + paused = false; + } + } + void init() { if (!req) @@ -648,15 +672,17 @@ struct curlFileTransfer : public FileTransfer }; std::priority_queue, std::vector>, EmbargoComparator> incoming; + std::vector> unpause; private: bool quitting = false; public: void quit() { quitting = true; - /* We wil not be processing any more incoming requests */ + /* We will not be processing any more incoming requests */ while (!incoming.empty()) incoming.pop(); + unpause.clear(); } bool isQuitting() @@ -825,6 +851,17 @@ struct curlFileTransfer : public FileTransfer item->active = true; items[item->req] = item; } + + /* NOTE: Unpausing may invoke callbacks to flush all buffers. */ + auto unpause = [&]() { + auto state(state_.lock()); + auto res = state->unpause; + state->unpause.clear(); + return res; + }(); + + for (auto & item : unpause) + item->unpause(); } debug("download thread shutting down"); @@ -879,6 +916,20 @@ struct curlFileTransfer : public FileTransfer return enqueueItem(make_ref(*this, request, std::move(callback))); } + + void unpauseTransfer(ref item) + { + auto state(state_.lock()); + state->unpause.push_back(std::move(item)); +#ifndef _WIN32 // TODO need graceful async exit support on Windows? + writeFull(wakeupPipe.writeSide.get(), " "); +#endif + } + + void unpauseTransfer(ItemHandle handle) override + { + unpauseTransfer(ref{static_cast(handle.item.get()).shared_from_this()}); + } }; ref makeCurlFileTransfer() @@ -974,6 +1025,7 @@ void FileTransfer::download( struct State { bool quit = false; + bool paused = false; std::exception_ptr exc; std::string data; std::condition_variable avail, request; @@ -989,31 +1041,38 @@ void FileTransfer::download( state->request.notify_one(); }); - request.dataCallback = [_state](std::string_view data) { + request.dataCallback = [_state, uri = request.uri.to_string()](std::string_view data) -> PauseTransfer { auto state(_state->lock()); if (state->quit) - return; - - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > fileTransferSettings.downloadBufferSize) { - debug("download buffer is full; going to sleep"); - static bool haveWarned = false; - warnOnce(haveWarned, "download buffer is full; consider increasing the 'download-buffer-size' setting"); - state.wait_for(state->request, std::chrono::seconds(10)); - } + return PauseTransfer::No; /* Append data to the buffer and wake up the calling thread. */ state->data.append(data); state->avail.notify_one(); + + if (state->data.size() <= fileTransferSettings.downloadBufferSize) + return PauseTransfer::No; + + /* dataCallback gets called multiple times by an intermediate sink. Only + issue the debug message the first time around. */ + if (!state->paused) + debug( + "pausing transfer for '%s': download buffer is full (%d > %d)", + uri, + state->data.size(), + fileTransferSettings.downloadBufferSize); + + state->paused = true; + + /* Technically the buffer might become larger than + downloadBufferSize, but with sinks there's no way to avoid + consuming data. */ + return PauseTransfer::Yes; }; - enqueueFileTransfer( + auto handle = enqueueFileTransfer( request, {[_state, resultCallback{std::move(resultCallback)}](std::future fut) { auto state(_state->lock()); state->quit = true; @@ -1046,6 +1105,10 @@ void FileTransfer::download( return; } + if (state->paused) { + unpauseTransfer(handle); + state->paused = false; + } state.wait(state->avail); if (state->data.empty()) diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 257c90509..3862093db 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -105,6 +105,11 @@ struct UsernameAuth std::optional password; }; +enum class PauseTransfer : bool { + No = false, + Yes = true, +}; + struct FileTransferRequest { VerbatimURL uri; @@ -136,7 +141,14 @@ struct FileTransferRequest std::optional data; std::string mimeType; - std::function dataCallback; + + /** + * Callbacked invoked with a chunk of received data. + * Can pause the transfer by returning PauseTransfer::Yes. No data must be consumed + * if transfer is paused. + */ + std::function dataCallback; + /** * Optional username and password for HTTP basic authentication. * When provided, these credentials will be used with curl's CURLOPT_USERNAME/PASSWORD option. @@ -234,12 +246,11 @@ public: /** * An opaque handle to the file transfer. Can be used to reference an in-flight transfer operations. */ - class ItemHandle + struct ItemHandle { std::reference_wrapper item; friend struct FileTransfer; - public: ItemHandle(Item & item) : item(item) { @@ -256,6 +267,11 @@ public: virtual ItemHandle enqueueFileTransfer(const FileTransferRequest & request, Callback callback) = 0; + /** + * Unpause a transfer that has been previously paused by a dataCallback. + */ + virtual void unpauseTransfer(ItemHandle handle) = 0; + std::future enqueueFileTransfer(const FileTransferRequest & request); /** diff --git a/src/libstore/meson.build b/src/libstore/meson.build index e3425deb5..2d7217513 100644 --- a/src/libstore/meson.build +++ b/src/libstore/meson.build @@ -114,6 +114,15 @@ boost = dependency( deps_other += boost curl = dependency('libcurl', 'curl', version : '>= 7.75.0') +if curl.version().version_compare('>=8.16.0 <8.17.0') + # Out of precaution, avoid building with libcurl version that suffer from https://github.com/curl/curl/issues/19334. + error( + 'curl @0@ has issues with write pausing, please use libcurl < 8.16 or >= 8.17, see https://github.com/curl/curl/issues/19334'.format( + curl.version(), + ), + ) +endif + deps_private += curl # seccomp only makes sense on Linux From a2d6a69d45d83cc44126b61b8926882cf63edcac Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Sat, 22 Nov 2025 03:04:16 +0300 Subject: [PATCH 4/5] libstore: Reduce the default download-buffer-size down to 1 MiB Since the root cause (the lack of backpressure control) has been fixed in the previous commit we can revert the change from 8ffea0a018874e60584eabeb620ec3495873c30d and make the default size much smaller. --- src/libstore/include/nix/store/filetransfer.hh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 3862093db..76d036a78 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -70,12 +70,12 @@ struct FileTransferSettings : Config Setting downloadBufferSize{ this, - 64 * 1024 * 1024, + 1 * 1024 * 1024, "download-buffer-size", R"( The size of Nix's internal download buffer in bytes during `curl` transfers. If data is not processed quickly enough to exceed the size of this buffer, downloads may stall. - The default is 67108864 (64 MiB). + The default is 1048576 (1 MiB). )"}; }; From be28ad92fd27e34873f4493fc8f3287769073ff4 Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Sat, 22 Nov 2025 04:25:59 +0300 Subject: [PATCH 5/5] rl-next: Add docs for libcurl pausing --- doc/manual/rl-next/libcurl-pausing.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 doc/manual/rl-next/libcurl-pausing.md diff --git a/doc/manual/rl-next/libcurl-pausing.md b/doc/manual/rl-next/libcurl-pausing.md new file mode 100644 index 000000000..68a0b5ecf --- /dev/null +++ b/doc/manual/rl-next/libcurl-pausing.md @@ -0,0 +1,12 @@ +--- +synopsis: Fix "download buffer is full; consider increasing the 'download-buffer-size' setting" warning +prs: [14614] +issues: [11728] +--- + +The underlying issue that led to [#11728](https://github.com/NixOS/nix/issues/11728) has been resolved by utilizing +[libcurl write pausing functionality](https://curl.se/libcurl/c/curl_easy_pause.html) to control backpressure when unpacking to slow destinations like the git-backed tarball cache. The default value of `download-buffer-size` is now 1 MiB and it's no longer recommended to increase it, since the root cause has been fixed. + +This is expected to improve download performance on fast connections, since previously a single slow download consumer would stall the thread and prevent any other transfers from progressing. + +Many thanks go out to the [Lix project](https://lix.systems/) for the [implementation](https://git.lix.systems/lix-project/lix/commit/4ae6fb5a8f0d456b8d2ba2aaca3712b4e49057fc) that served as inspiration for this change and for triaging libcurl [issues with pausing](https://github.com/curl/curl/issues/19334).