diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 73fe1bdd9..57caec384 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -60,6 +60,7 @@ struct curlFileTransfer : public FileTransfer // buffer to accompany the `req` above char errbuf[CURL_ERROR_SIZE]; bool active = false; // whether the handle has been added to the multi object + bool paused = false; // whether the request has been paused previously std::string statusMsg; unsigned int attempt = 0; @@ -116,7 +117,13 @@ struct curlFileTransfer : public FileTransfer successful response. */ if (successfulStatuses.count(httpStatus)) { writtenToSink += data.size(); - this->request.dataCallback(data); + PauseTransfer needsPause = this->request.dataCallback(data); + if (needsPause == PauseTransfer::Yes) { + /* Smuggle the boolean flag into writeCallback. Note that + the finalSink might get called multiple times if there's + decompression going on. */ + paused = true; + } } } else this->result.data.append(data); @@ -195,6 +202,14 @@ struct curlFileTransfer : public FileTransfer } (*decompressionSink)({(char *) contents, realSize}); + if (paused) { + /* The callback has signaled that the transfer needs to be + paused. Already consumed data won't be returned twice unlike + when returning CURL_WRITEFUNC_PAUSE. + https://curl-library.cool.haxx.narkive.com/larE1cRA/curl-easy-pause-documentation-question + */ + curl_easy_pause(req, CURLPAUSE_RECV); + } return realSize; } catch (...) { @@ -364,6 +379,15 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) clientp)->seekCallback(offset, origin); } + void unpause() + { + /* Unpausing an already unpaused transfer is a no-op. */ + if (paused) { + curl_easy_pause(req, CURLPAUSE_CONT); + paused = false; + } + } + void init() { if (!req) @@ -648,15 +672,17 @@ struct curlFileTransfer : public FileTransfer }; std::priority_queue, std::vector>, EmbargoComparator> incoming; + std::vector> unpause; private: bool quitting = false; public: void quit() { quitting = true; - /* We wil not be processing any more incoming requests */ + /* We will not be processing any more incoming requests */ while (!incoming.empty()) incoming.pop(); + unpause.clear(); } bool isQuitting() @@ -825,6 +851,17 @@ struct curlFileTransfer : public FileTransfer item->active = true; items[item->req] = item; } + + /* NOTE: Unpausing may invoke callbacks to flush all buffers. */ + auto unpause = [&]() { + auto state(state_.lock()); + auto res = state->unpause; + state->unpause.clear(); + return res; + }(); + + for (auto & item : unpause) + item->unpause(); } debug("download thread shutting down"); @@ -879,6 +916,20 @@ struct curlFileTransfer : public FileTransfer return enqueueItem(make_ref(*this, request, std::move(callback))); } + + void unpauseTransfer(ref item) + { + auto state(state_.lock()); + state->unpause.push_back(std::move(item)); +#ifndef _WIN32 // TODO need graceful async exit support on Windows? + writeFull(wakeupPipe.writeSide.get(), " "); +#endif + } + + void unpauseTransfer(ItemHandle handle) override + { + unpauseTransfer(ref{static_cast(handle.item.get()).shared_from_this()}); + } }; ref makeCurlFileTransfer() @@ -974,6 +1025,7 @@ void FileTransfer::download( struct State { bool quit = false; + bool paused = false; std::exception_ptr exc; std::string data; std::condition_variable avail, request; @@ -989,31 +1041,38 @@ void FileTransfer::download( state->request.notify_one(); }); - request.dataCallback = [_state](std::string_view data) { + request.dataCallback = [_state, uri = request.uri.to_string()](std::string_view data) -> PauseTransfer { auto state(_state->lock()); if (state->quit) - return; - - /* If the buffer is full, then go to sleep until the calling - thread wakes us up (i.e. when it has removed data from the - buffer). We don't wait forever to prevent stalling the - download thread. (Hopefully sleeping will throttle the - sender.) */ - if (state->data.size() > fileTransferSettings.downloadBufferSize) { - debug("download buffer is full; going to sleep"); - static bool haveWarned = false; - warnOnce(haveWarned, "download buffer is full; consider increasing the 'download-buffer-size' setting"); - state.wait_for(state->request, std::chrono::seconds(10)); - } + return PauseTransfer::No; /* Append data to the buffer and wake up the calling thread. */ state->data.append(data); state->avail.notify_one(); + + if (state->data.size() <= fileTransferSettings.downloadBufferSize) + return PauseTransfer::No; + + /* dataCallback gets called multiple times by an intermediate sink. Only + issue the debug message the first time around. */ + if (!state->paused) + debug( + "pausing transfer for '%s': download buffer is full (%d > %d)", + uri, + state->data.size(), + fileTransferSettings.downloadBufferSize); + + state->paused = true; + + /* Technically the buffer might become larger than + downloadBufferSize, but with sinks there's no way to avoid + consuming data. */ + return PauseTransfer::Yes; }; - enqueueFileTransfer( + auto handle = enqueueFileTransfer( request, {[_state, resultCallback{std::move(resultCallback)}](std::future fut) { auto state(_state->lock()); state->quit = true; @@ -1046,6 +1105,10 @@ void FileTransfer::download( return; } + if (state->paused) { + unpauseTransfer(handle); + state->paused = false; + } state.wait(state->avail); if (state->data.empty()) diff --git a/src/libstore/include/nix/store/filetransfer.hh b/src/libstore/include/nix/store/filetransfer.hh index 257c90509..3862093db 100644 --- a/src/libstore/include/nix/store/filetransfer.hh +++ b/src/libstore/include/nix/store/filetransfer.hh @@ -105,6 +105,11 @@ struct UsernameAuth std::optional password; }; +enum class PauseTransfer : bool { + No = false, + Yes = true, +}; + struct FileTransferRequest { VerbatimURL uri; @@ -136,7 +141,14 @@ struct FileTransferRequest std::optional data; std::string mimeType; - std::function dataCallback; + + /** + * Callbacked invoked with a chunk of received data. + * Can pause the transfer by returning PauseTransfer::Yes. No data must be consumed + * if transfer is paused. + */ + std::function dataCallback; + /** * Optional username and password for HTTP basic authentication. * When provided, these credentials will be used with curl's CURLOPT_USERNAME/PASSWORD option. @@ -234,12 +246,11 @@ public: /** * An opaque handle to the file transfer. Can be used to reference an in-flight transfer operations. */ - class ItemHandle + struct ItemHandle { std::reference_wrapper item; friend struct FileTransfer; - public: ItemHandle(Item & item) : item(item) { @@ -256,6 +267,11 @@ public: virtual ItemHandle enqueueFileTransfer(const FileTransferRequest & request, Callback callback) = 0; + /** + * Unpause a transfer that has been previously paused by a dataCallback. + */ + virtual void unpauseTransfer(ItemHandle handle) = 0; + std::future enqueueFileTransfer(const FileTransferRequest & request); /** diff --git a/src/libstore/meson.build b/src/libstore/meson.build index e3425deb5..2d7217513 100644 --- a/src/libstore/meson.build +++ b/src/libstore/meson.build @@ -114,6 +114,15 @@ boost = dependency( deps_other += boost curl = dependency('libcurl', 'curl', version : '>= 7.75.0') +if curl.version().version_compare('>=8.16.0 <8.17.0') + # Out of precaution, avoid building with libcurl version that suffer from https://github.com/curl/curl/issues/19334. + error( + 'curl @0@ has issues with write pausing, please use libcurl < 8.16 or >= 8.17, see https://github.com/curl/curl/issues/19334'.format( + curl.version(), + ), + ) +endif + deps_private += curl # seccomp only makes sense on Linux