1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-11-24 11:19:35 +01:00

Merge pull request #14614 from NixOS/libcurl-pause

libstore/filetransfer: Pause transfers instead of stalling the download thread
This commit is contained in:
John Ericson 2025-11-22 05:41:18 +00:00 committed by GitHub
commit 79dcc094b0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 152 additions and 32 deletions

View file

@ -0,0 +1,12 @@
---
synopsis: Fix "download buffer is full; consider increasing the 'download-buffer-size' setting" warning
prs: [14614]
issues: [11728]
---
The underlying issue that led to [#11728](https://github.com/NixOS/nix/issues/11728) has been resolved by utilizing
[libcurl write pausing functionality](https://curl.se/libcurl/c/curl_easy_pause.html) to control backpressure when unpacking to slow destinations like the git-backed tarball cache. The default value of `download-buffer-size` is now 1 MiB and it's no longer recommended to increase it, since the root cause has been fixed.
This is expected to improve download performance on fast connections, since previously a single slow download consumer would stall the thread and prevent any other transfers from progressing.
Many thanks go out to the [Lix project](https://lix.systems/) for the [implementation](https://git.lix.systems/lix-project/lix/commit/4ae6fb5a8f0d456b8d2ba2aaca3712b4e49057fc) that served as inspiration for this change and for triaging libcurl [issues with pausing](https://github.com/curl/curl/issues/19334).

View file

@ -48,7 +48,7 @@ struct curlFileTransfer : public FileTransfer
std::random_device rd; std::random_device rd;
std::mt19937 mt19937; std::mt19937 mt19937;
struct TransferItem : public std::enable_shared_from_this<TransferItem> struct TransferItem : public std::enable_shared_from_this<TransferItem>, public FileTransfer::Item
{ {
curlFileTransfer & fileTransfer; curlFileTransfer & fileTransfer;
FileTransferRequest request; FileTransferRequest request;
@ -60,6 +60,7 @@ struct curlFileTransfer : public FileTransfer
// buffer to accompany the `req` above // buffer to accompany the `req` above
char errbuf[CURL_ERROR_SIZE]; char errbuf[CURL_ERROR_SIZE];
bool active = false; // whether the handle has been added to the multi object bool active = false; // whether the handle has been added to the multi object
bool paused = false; // whether the request has been paused previously
std::string statusMsg; std::string statusMsg;
unsigned int attempt = 0; unsigned int attempt = 0;
@ -116,7 +117,13 @@ struct curlFileTransfer : public FileTransfer
successful response. */ successful response. */
if (successfulStatuses.count(httpStatus)) { if (successfulStatuses.count(httpStatus)) {
writtenToSink += data.size(); writtenToSink += data.size();
this->request.dataCallback(data); PauseTransfer needsPause = this->request.dataCallback(data);
if (needsPause == PauseTransfer::Yes) {
/* Smuggle the boolean flag into writeCallback. Note that
the finalSink might get called multiple times if there's
decompression going on. */
paused = true;
}
} }
} else } else
this->result.data.append(data); this->result.data.append(data);
@ -195,6 +202,14 @@ struct curlFileTransfer : public FileTransfer
} }
(*decompressionSink)({(char *) contents, realSize}); (*decompressionSink)({(char *) contents, realSize});
if (paused) {
/* The callback has signaled that the transfer needs to be
paused. Already consumed data won't be returned twice unlike
when returning CURL_WRITEFUNC_PAUSE.
https://curl-library.cool.haxx.narkive.com/larE1cRA/curl-easy-pause-documentation-question
*/
curl_easy_pause(req, CURLPAUSE_RECV);
}
return realSize; return realSize;
} catch (...) { } catch (...) {
@ -364,6 +379,15 @@ struct curlFileTransfer : public FileTransfer
return ((TransferItem *) clientp)->seekCallback(offset, origin); return ((TransferItem *) clientp)->seekCallback(offset, origin);
} }
void unpause()
{
/* Unpausing an already unpaused transfer is a no-op. */
if (paused) {
curl_easy_pause(req, CURLPAUSE_CONT);
paused = false;
}
}
void init() void init()
{ {
if (!req) if (!req)
@ -624,7 +648,7 @@ struct curlFileTransfer : public FileTransfer
errorSink.reset(); errorSink.reset();
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms); embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
try { try {
fileTransfer.enqueueItem(shared_from_this()); fileTransfer.enqueueItem(ref{shared_from_this()});
} catch (const nix::Error & e) { } catch (const nix::Error & e) {
// If enqueue fails (e.g., during shutdown), fail the transfer properly // If enqueue fails (e.g., during shutdown), fail the transfer properly
// instead of letting the exception propagate, which would leave done=false // instead of letting the exception propagate, which would leave done=false
@ -641,24 +665,24 @@ struct curlFileTransfer : public FileTransfer
{ {
struct EmbargoComparator struct EmbargoComparator
{ {
bool operator()(const std::shared_ptr<TransferItem> & i1, const std::shared_ptr<TransferItem> & i2) bool operator()(const ref<TransferItem> & i1, const ref<TransferItem> & i2)
{ {
return i1->embargo > i2->embargo; return i1->embargo > i2->embargo;
} }
}; };
std:: std::priority_queue<ref<TransferItem>, std::vector<ref<TransferItem>>, EmbargoComparator> incoming;
priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> std::vector<ref<TransferItem>> unpause;
incoming;
private: private:
bool quitting = false; bool quitting = false;
public: public:
void quit() void quit()
{ {
quitting = true; quitting = true;
/* We wil not be processing any more incoming requests */ /* We will not be processing any more incoming requests */
while (!incoming.empty()) while (!incoming.empty())
incoming.pop(); incoming.pop();
unpause.clear();
} }
bool isQuitting() bool isQuitting()
@ -827,6 +851,17 @@ struct curlFileTransfer : public FileTransfer
item->active = true; item->active = true;
items[item->req] = item; items[item->req] = item;
} }
/* NOTE: Unpausing may invoke callbacks to flush all buffers. */
auto unpause = [&]() {
auto state(state_.lock());
auto res = state->unpause;
state->unpause.clear();
return res;
}();
for (auto & item : unpause)
item->unpause();
} }
debug("download thread shutting down"); debug("download thread shutting down");
@ -851,7 +886,7 @@ struct curlFileTransfer : public FileTransfer
} }
} }
void enqueueItem(std::shared_ptr<TransferItem> item) ItemHandle enqueueItem(ref<TransferItem> item)
{ {
if (item->request.data && item->request.uri.scheme() != "http" && item->request.uri.scheme() != "https" if (item->request.data && item->request.uri.scheme() != "http" && item->request.uri.scheme() != "https"
&& item->request.uri.scheme() != "s3") && item->request.uri.scheme() != "s3")
@ -866,19 +901,34 @@ struct curlFileTransfer : public FileTransfer
#ifndef _WIN32 // TODO need graceful async exit support on Windows? #ifndef _WIN32 // TODO need graceful async exit support on Windows?
writeFull(wakeupPipe.writeSide.get(), " "); writeFull(wakeupPipe.writeSide.get(), " ");
#endif #endif
return ItemHandle(static_cast<Item &>(*item));
} }
void enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) override ItemHandle enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) override
{ {
/* Handle s3:// URIs by converting to HTTPS and optionally adding auth */ /* Handle s3:// URIs by converting to HTTPS and optionally adding auth */
if (request.uri.scheme() == "s3") { if (request.uri.scheme() == "s3") {
auto modifiedRequest = request; auto modifiedRequest = request;
modifiedRequest.setupForS3(); modifiedRequest.setupForS3();
enqueueItem(std::make_shared<TransferItem>(*this, std::move(modifiedRequest), std::move(callback))); return enqueueItem(make_ref<TransferItem>(*this, std::move(modifiedRequest), std::move(callback)));
return;
} }
enqueueItem(std::make_shared<TransferItem>(*this, request, std::move(callback))); return enqueueItem(make_ref<TransferItem>(*this, request, std::move(callback)));
}
void unpauseTransfer(ref<TransferItem> item)
{
auto state(state_.lock());
state->unpause.push_back(std::move(item));
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
writeFull(wakeupPipe.writeSide.get(), " ");
#endif
}
void unpauseTransfer(ItemHandle handle) override
{
unpauseTransfer(ref{static_cast<TransferItem &>(handle.item.get()).shared_from_this()});
} }
}; };
@ -975,6 +1025,7 @@ void FileTransfer::download(
struct State struct State
{ {
bool quit = false; bool quit = false;
bool paused = false;
std::exception_ptr exc; std::exception_ptr exc;
std::string data; std::string data;
std::condition_variable avail, request; std::condition_variable avail, request;
@ -990,31 +1041,38 @@ void FileTransfer::download(
state->request.notify_one(); state->request.notify_one();
}); });
request.dataCallback = [_state](std::string_view data) { request.dataCallback = [_state, uri = request.uri.to_string()](std::string_view data) -> PauseTransfer {
auto state(_state->lock()); auto state(_state->lock());
if (state->quit) if (state->quit)
return; return PauseTransfer::No;
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). We don't wait forever to prevent stalling the
download thread. (Hopefully sleeping will throttle the
sender.) */
if (state->data.size() > fileTransferSettings.downloadBufferSize) {
debug("download buffer is full; going to sleep");
static bool haveWarned = false;
warnOnce(haveWarned, "download buffer is full; consider increasing the 'download-buffer-size' setting");
state.wait_for(state->request, std::chrono::seconds(10));
}
/* Append data to the buffer and wake up the calling /* Append data to the buffer and wake up the calling
thread. */ thread. */
state->data.append(data); state->data.append(data);
state->avail.notify_one(); state->avail.notify_one();
if (state->data.size() <= fileTransferSettings.downloadBufferSize)
return PauseTransfer::No;
/* dataCallback gets called multiple times by an intermediate sink. Only
issue the debug message the first time around. */
if (!state->paused)
debug(
"pausing transfer for '%s': download buffer is full (%d > %d)",
uri,
state->data.size(),
fileTransferSettings.downloadBufferSize);
state->paused = true;
/* Technically the buffer might become larger than
downloadBufferSize, but with sinks there's no way to avoid
consuming data. */
return PauseTransfer::Yes;
}; };
enqueueFileTransfer( auto handle = enqueueFileTransfer(
request, {[_state, resultCallback{std::move(resultCallback)}](std::future<FileTransferResult> fut) { request, {[_state, resultCallback{std::move(resultCallback)}](std::future<FileTransferResult> fut) {
auto state(_state->lock()); auto state(_state->lock());
state->quit = true; state->quit = true;
@ -1047,6 +1105,10 @@ void FileTransfer::download(
return; return;
} }
if (state->paused) {
unpauseTransfer(handle);
state->paused = false;
}
state.wait(state->avail); state.wait(state->avail);
if (state->data.empty()) if (state->data.empty())

View file

@ -70,12 +70,12 @@ struct FileTransferSettings : Config
Setting<size_t> downloadBufferSize{ Setting<size_t> downloadBufferSize{
this, this,
64 * 1024 * 1024, 1 * 1024 * 1024,
"download-buffer-size", "download-buffer-size",
R"( R"(
The size of Nix's internal download buffer in bytes during `curl` transfers. If data is The size of Nix's internal download buffer in bytes during `curl` transfers. If data is
not processed quickly enough to exceed the size of this buffer, downloads may stall. not processed quickly enough to exceed the size of this buffer, downloads may stall.
The default is 67108864 (64 MiB). The default is 1048576 (1 MiB).
)"}; )"};
}; };
@ -105,6 +105,11 @@ struct UsernameAuth
std::optional<std::string> password; std::optional<std::string> password;
}; };
enum class PauseTransfer : bool {
No = false,
Yes = true,
};
struct FileTransferRequest struct FileTransferRequest
{ {
VerbatimURL uri; VerbatimURL uri;
@ -136,7 +141,14 @@ struct FileTransferRequest
std::optional<UploadData> data; std::optional<UploadData> data;
std::string mimeType; std::string mimeType;
std::function<void(std::string_view data)> dataCallback;
/**
* Callbacked invoked with a chunk of received data.
* Can pause the transfer by returning PauseTransfer::Yes. No data must be consumed
* if transfer is paused.
*/
std::function<PauseTransfer(std::string_view data)> dataCallback;
/** /**
* Optional username and password for HTTP basic authentication. * Optional username and password for HTTP basic authentication.
* When provided, these credentials will be used with curl's CURLOPT_USERNAME/PASSWORD option. * When provided, these credentials will be used with curl's CURLOPT_USERNAME/PASSWORD option.
@ -226,6 +238,25 @@ class Store;
struct FileTransfer struct FileTransfer
{ {
protected:
class Item
{};
public:
/**
* An opaque handle to the file transfer. Can be used to reference an in-flight transfer operations.
*/
struct ItemHandle
{
std::reference_wrapper<Item> item;
friend struct FileTransfer;
ItemHandle(Item & item)
: item(item)
{
}
};
virtual ~FileTransfer() {} virtual ~FileTransfer() {}
/** /**
@ -233,7 +264,13 @@ struct FileTransfer
* the download. The future may throw a FileTransferError * the download. The future may throw a FileTransferError
* exception. * exception.
*/ */
virtual void enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) = 0; virtual ItemHandle
enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) = 0;
/**
* Unpause a transfer that has been previously paused by a dataCallback.
*/
virtual void unpauseTransfer(ItemHandle handle) = 0;
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request); std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request);

View file

@ -114,6 +114,15 @@ boost = dependency(
deps_other += boost deps_other += boost
curl = dependency('libcurl', 'curl', version : '>= 7.75.0') curl = dependency('libcurl', 'curl', version : '>= 7.75.0')
if curl.version().version_compare('>=8.16.0 <8.17.0')
# Out of precaution, avoid building with libcurl version that suffer from https://github.com/curl/curl/issues/19334.
error(
'curl @0@ has issues with write pausing, please use libcurl < 8.16 or >= 8.17, see https://github.com/curl/curl/issues/19334'.format(
curl.version(),
),
)
endif
deps_private += curl deps_private += curl
# seccomp only makes sense on Linux # seccomp only makes sense on Linux