mirror of
https://github.com/NixOS/nix.git
synced 2025-11-24 11:19:35 +01:00
libstore/filetransfer: Pause transfers instead of stalling the download thread
Instead of naively stalling the download thread we can instead stop the transfer. This allows the other multiplexed connections to continue downloading (and unpacking), if the result of the download gets piped into a GitFileSystemObjectSink. Prior art in lix project: -4ae6fb5a8f-12156d3bebThis patch is very different from the lix one, since we are using a decompression sink in the middle of the pipeline but the co-authored-by is there since I was motivated to implement this by looking at the lix side of things. Co-authored-by: eldritch horrors <pennae@lix.systems>
This commit is contained in:
parent
ec0b270c6c
commit
4307420c44
3 changed files with 108 additions and 20 deletions
|
|
@ -60,6 +60,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
// buffer to accompany the `req` above
|
||||
char errbuf[CURL_ERROR_SIZE];
|
||||
bool active = false; // whether the handle has been added to the multi object
|
||||
bool paused = false; // whether the request has been paused previously
|
||||
std::string statusMsg;
|
||||
|
||||
unsigned int attempt = 0;
|
||||
|
|
@ -116,7 +117,13 @@ struct curlFileTransfer : public FileTransfer
|
|||
successful response. */
|
||||
if (successfulStatuses.count(httpStatus)) {
|
||||
writtenToSink += data.size();
|
||||
this->request.dataCallback(data);
|
||||
PauseTransfer needsPause = this->request.dataCallback(data);
|
||||
if (needsPause == PauseTransfer::Yes) {
|
||||
/* Smuggle the boolean flag into writeCallback. Note that
|
||||
the finalSink might get called multiple times if there's
|
||||
decompression going on. */
|
||||
paused = true;
|
||||
}
|
||||
}
|
||||
} else
|
||||
this->result.data.append(data);
|
||||
|
|
@ -195,6 +202,14 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
|
||||
(*decompressionSink)({(char *) contents, realSize});
|
||||
if (paused) {
|
||||
/* The callback has signaled that the transfer needs to be
|
||||
paused. Already consumed data won't be returned twice unlike
|
||||
when returning CURL_WRITEFUNC_PAUSE.
|
||||
https://curl-library.cool.haxx.narkive.com/larE1cRA/curl-easy-pause-documentation-question
|
||||
*/
|
||||
curl_easy_pause(req, CURLPAUSE_RECV);
|
||||
}
|
||||
|
||||
return realSize;
|
||||
} catch (...) {
|
||||
|
|
@ -364,6 +379,15 @@ struct curlFileTransfer : public FileTransfer
|
|||
return ((TransferItem *) clientp)->seekCallback(offset, origin);
|
||||
}
|
||||
|
||||
void unpause()
|
||||
{
|
||||
/* Unpausing an already unpaused transfer is a no-op. */
|
||||
if (paused) {
|
||||
curl_easy_pause(req, CURLPAUSE_CONT);
|
||||
paused = false;
|
||||
}
|
||||
}
|
||||
|
||||
void init()
|
||||
{
|
||||
if (!req)
|
||||
|
|
@ -648,15 +672,17 @@ struct curlFileTransfer : public FileTransfer
|
|||
};
|
||||
|
||||
std::priority_queue<ref<TransferItem>, std::vector<ref<TransferItem>>, EmbargoComparator> incoming;
|
||||
std::vector<ref<TransferItem>> unpause;
|
||||
private:
|
||||
bool quitting = false;
|
||||
public:
|
||||
void quit()
|
||||
{
|
||||
quitting = true;
|
||||
/* We wil not be processing any more incoming requests */
|
||||
/* We will not be processing any more incoming requests */
|
||||
while (!incoming.empty())
|
||||
incoming.pop();
|
||||
unpause.clear();
|
||||
}
|
||||
|
||||
bool isQuitting()
|
||||
|
|
@ -825,6 +851,17 @@ struct curlFileTransfer : public FileTransfer
|
|||
item->active = true;
|
||||
items[item->req] = item;
|
||||
}
|
||||
|
||||
/* NOTE: Unpausing may invoke callbacks to flush all buffers. */
|
||||
auto unpause = [&]() {
|
||||
auto state(state_.lock());
|
||||
auto res = state->unpause;
|
||||
state->unpause.clear();
|
||||
return res;
|
||||
}();
|
||||
|
||||
for (auto & item : unpause)
|
||||
item->unpause();
|
||||
}
|
||||
|
||||
debug("download thread shutting down");
|
||||
|
|
@ -879,6 +916,20 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
return enqueueItem(make_ref<TransferItem>(*this, request, std::move(callback)));
|
||||
}
|
||||
|
||||
void unpauseTransfer(ref<TransferItem> item)
|
||||
{
|
||||
auto state(state_.lock());
|
||||
state->unpause.push_back(std::move(item));
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
writeFull(wakeupPipe.writeSide.get(), " ");
|
||||
#endif
|
||||
}
|
||||
|
||||
void unpauseTransfer(ItemHandle handle) override
|
||||
{
|
||||
unpauseTransfer(ref{static_cast<TransferItem &>(handle.item.get()).shared_from_this()});
|
||||
}
|
||||
};
|
||||
|
||||
ref<curlFileTransfer> makeCurlFileTransfer()
|
||||
|
|
@ -974,6 +1025,7 @@ void FileTransfer::download(
|
|||
struct State
|
||||
{
|
||||
bool quit = false;
|
||||
bool paused = false;
|
||||
std::exception_ptr exc;
|
||||
std::string data;
|
||||
std::condition_variable avail, request;
|
||||
|
|
@ -989,31 +1041,38 @@ void FileTransfer::download(
|
|||
state->request.notify_one();
|
||||
});
|
||||
|
||||
request.dataCallback = [_state](std::string_view data) {
|
||||
request.dataCallback = [_state, uri = request.uri.to_string()](std::string_view data) -> PauseTransfer {
|
||||
auto state(_state->lock());
|
||||
|
||||
if (state->quit)
|
||||
return;
|
||||
|
||||
/* If the buffer is full, then go to sleep until the calling
|
||||
thread wakes us up (i.e. when it has removed data from the
|
||||
buffer). We don't wait forever to prevent stalling the
|
||||
download thread. (Hopefully sleeping will throttle the
|
||||
sender.) */
|
||||
if (state->data.size() > fileTransferSettings.downloadBufferSize) {
|
||||
debug("download buffer is full; going to sleep");
|
||||
static bool haveWarned = false;
|
||||
warnOnce(haveWarned, "download buffer is full; consider increasing the 'download-buffer-size' setting");
|
||||
state.wait_for(state->request, std::chrono::seconds(10));
|
||||
}
|
||||
return PauseTransfer::No;
|
||||
|
||||
/* Append data to the buffer and wake up the calling
|
||||
thread. */
|
||||
state->data.append(data);
|
||||
state->avail.notify_one();
|
||||
|
||||
if (state->data.size() <= fileTransferSettings.downloadBufferSize)
|
||||
return PauseTransfer::No;
|
||||
|
||||
/* dataCallback gets called multiple times by an intermediate sink. Only
|
||||
issue the debug message the first time around. */
|
||||
if (!state->paused)
|
||||
debug(
|
||||
"pausing transfer for '%s': download buffer is full (%d > %d)",
|
||||
uri,
|
||||
state->data.size(),
|
||||
fileTransferSettings.downloadBufferSize);
|
||||
|
||||
state->paused = true;
|
||||
|
||||
/* Technically the buffer might become larger than
|
||||
downloadBufferSize, but with sinks there's no way to avoid
|
||||
consuming data. */
|
||||
return PauseTransfer::Yes;
|
||||
};
|
||||
|
||||
enqueueFileTransfer(
|
||||
auto handle = enqueueFileTransfer(
|
||||
request, {[_state, resultCallback{std::move(resultCallback)}](std::future<FileTransferResult> fut) {
|
||||
auto state(_state->lock());
|
||||
state->quit = true;
|
||||
|
|
@ -1046,6 +1105,10 @@ void FileTransfer::download(
|
|||
return;
|
||||
}
|
||||
|
||||
if (state->paused) {
|
||||
unpauseTransfer(handle);
|
||||
state->paused = false;
|
||||
}
|
||||
state.wait(state->avail);
|
||||
|
||||
if (state->data.empty())
|
||||
|
|
|
|||
|
|
@ -105,6 +105,11 @@ struct UsernameAuth
|
|||
std::optional<std::string> password;
|
||||
};
|
||||
|
||||
enum class PauseTransfer : bool {
|
||||
No = false,
|
||||
Yes = true,
|
||||
};
|
||||
|
||||
struct FileTransferRequest
|
||||
{
|
||||
VerbatimURL uri;
|
||||
|
|
@ -136,7 +141,14 @@ struct FileTransferRequest
|
|||
|
||||
std::optional<UploadData> data;
|
||||
std::string mimeType;
|
||||
std::function<void(std::string_view data)> dataCallback;
|
||||
|
||||
/**
|
||||
* Callbacked invoked with a chunk of received data.
|
||||
* Can pause the transfer by returning PauseTransfer::Yes. No data must be consumed
|
||||
* if transfer is paused.
|
||||
*/
|
||||
std::function<PauseTransfer(std::string_view data)> dataCallback;
|
||||
|
||||
/**
|
||||
* Optional username and password for HTTP basic authentication.
|
||||
* When provided, these credentials will be used with curl's CURLOPT_USERNAME/PASSWORD option.
|
||||
|
|
@ -234,12 +246,11 @@ public:
|
|||
/**
|
||||
* An opaque handle to the file transfer. Can be used to reference an in-flight transfer operations.
|
||||
*/
|
||||
class ItemHandle
|
||||
struct ItemHandle
|
||||
{
|
||||
std::reference_wrapper<Item> item;
|
||||
friend struct FileTransfer;
|
||||
|
||||
public:
|
||||
ItemHandle(Item & item)
|
||||
: item(item)
|
||||
{
|
||||
|
|
@ -256,6 +267,11 @@ public:
|
|||
virtual ItemHandle
|
||||
enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) = 0;
|
||||
|
||||
/**
|
||||
* Unpause a transfer that has been previously paused by a dataCallback.
|
||||
*/
|
||||
virtual void unpauseTransfer(ItemHandle handle) = 0;
|
||||
|
||||
std::future<FileTransferResult> enqueueFileTransfer(const FileTransferRequest & request);
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -114,6 +114,15 @@ boost = dependency(
|
|||
deps_other += boost
|
||||
|
||||
curl = dependency('libcurl', 'curl', version : '>= 7.75.0')
|
||||
if curl.version().version_compare('>=8.16.0 <8.17.0')
|
||||
# Out of precaution, avoid building with libcurl version that suffer from https://github.com/curl/curl/issues/19334.
|
||||
error(
|
||||
'curl @0@ has issues with write pausing, please use libcurl < 8.16 or >= 8.17, see https://github.com/curl/curl/issues/19334'.format(
|
||||
curl.version(),
|
||||
),
|
||||
)
|
||||
endif
|
||||
|
||||
deps_private += curl
|
||||
|
||||
# seccomp only makes sense on Linux
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue