mirror of
https://github.com/NixOS/nix.git
synced 2025-12-16 22:11:05 +01:00
libstore/filetransfer: Replace curl_multi_wait with curl_multi_poll and get rid of CPP
Since 7.68 libcurl already provides curl_multi_wakeup, so we can drop the hacky pipe setup (libcurl does this internally).
This commit is contained in:
parent
ea96e6d07c
commit
46670a7f46
1 changed files with 32 additions and 58 deletions
|
|
@ -46,6 +46,17 @@ namespace {
|
|||
using curlSList = std::unique_ptr<::curl_slist, decltype([](::curl_slist * list) { ::curl_slist_free_all(list); })>;
|
||||
using curlMulti = std::unique_ptr<::CURLM, decltype([](::CURLM * multi) { ::curl_multi_cleanup(multi); })>;
|
||||
|
||||
struct curlMultiError : Error
|
||||
{
|
||||
::CURLMcode code;
|
||||
|
||||
curlMultiError(::CURLMcode code)
|
||||
: Error{"unexpected curl multi error: %s", ::curl_multi_strerror(code)}
|
||||
{
|
||||
assert(code != CURLM_OK);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
struct curlFileTransfer : public FileTransfer
|
||||
|
|
@ -362,7 +373,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
|
||||
}
|
||||
|
||||
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
|
||||
#if !defined(_WIN32)
|
||||
static int cloexec_callback(void *, curl_socket_t curlfd, curlsocktype purpose)
|
||||
{
|
||||
unix::closeOnExec(curlfd);
|
||||
|
|
@ -425,15 +436,11 @@ struct curlFileTransfer : public FileTransfer
|
|||
("curl/" LIBCURL_VERSION " Nix/" + nixVersion
|
||||
+ (fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : ""))
|
||||
.c_str());
|
||||
#if LIBCURL_VERSION_NUM >= 0x072b00
|
||||
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
|
||||
#endif
|
||||
#if LIBCURL_VERSION_NUM >= 0x072f00
|
||||
if (fileTransferSettings.enableHttp2)
|
||||
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
|
||||
else
|
||||
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
|
||||
#endif
|
||||
curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
|
||||
curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
|
||||
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
|
||||
|
|
@ -473,10 +480,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
if (settings.caFile != "")
|
||||
curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str());
|
||||
|
||||
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
|
||||
#if !defined(_WIN32)
|
||||
curl_easy_setopt(req, CURLOPT_SOCKOPTFUNCTION, cloexec_callback);
|
||||
#endif
|
||||
|
||||
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
|
||||
|
||||
curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
|
||||
|
|
@ -708,13 +714,6 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
Sync<State> state_;
|
||||
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
/* We can't use a std::condition_variable to wake up the curl
|
||||
thread, because it only monitors file descriptors. So use a
|
||||
pipe instead. */
|
||||
Pipe wakeupPipe;
|
||||
#endif
|
||||
|
||||
std::thread workerThread;
|
||||
|
||||
curlFileTransfer()
|
||||
|
|
@ -725,38 +724,33 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
curlm = curlMulti(curl_multi_init());
|
||||
|
||||
#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0
|
||||
curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
|
||||
#endif
|
||||
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
|
||||
curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get());
|
||||
#endif
|
||||
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
wakeupPipe.create();
|
||||
fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
|
||||
#endif
|
||||
|
||||
workerThread = std::thread([&]() { workerThreadEntry(); });
|
||||
}
|
||||
|
||||
~curlFileTransfer()
|
||||
{
|
||||
stopWorkerThread();
|
||||
|
||||
try {
|
||||
stopWorkerThread();
|
||||
} catch (...) {
|
||||
ignoreExceptionInDestructor();
|
||||
}
|
||||
workerThread.join();
|
||||
}
|
||||
|
||||
void stopWorkerThread()
|
||||
{
|
||||
/* Signal the worker thread to exit. */
|
||||
{
|
||||
auto state(state_.lock());
|
||||
state->quit();
|
||||
}
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
writeFull(wakeupPipe.writeSide.get(), " ", false);
|
||||
#endif
|
||||
state_.lock()->quit();
|
||||
wakeupMulti();
|
||||
}
|
||||
|
||||
void wakeupMulti()
|
||||
{
|
||||
if (auto ec = ::curl_multi_wakeup(curlm.get()))
|
||||
throw curlMultiError(ec);
|
||||
}
|
||||
|
||||
void workerThreadMain()
|
||||
|
|
@ -805,13 +799,6 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
|
||||
/* Wait for activity, including wakeup events. */
|
||||
int numfds = 0;
|
||||
struct curl_waitfd extraFDs[1];
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
extraFDs[0].fd = wakeupPipe.readSide.get();
|
||||
extraFDs[0].events = CURL_WAIT_POLLIN;
|
||||
extraFDs[0].revents = 0;
|
||||
#endif
|
||||
long maxSleepTimeMs = items.empty() ? 10000 : 100;
|
||||
auto sleepTimeMs = nextWakeup != std::chrono::steady_clock::time_point()
|
||||
? std::max(
|
||||
|
|
@ -820,23 +807,14 @@ struct curlFileTransfer : public FileTransfer
|
|||
nextWakeup - std::chrono::steady_clock::now())
|
||||
.count())
|
||||
: maxSleepTimeMs;
|
||||
vomit("download thread waiting for %d ms", sleepTimeMs);
|
||||
mc = curl_multi_wait(curlm.get(), extraFDs, 1, sleepTimeMs, &numfds);
|
||||
|
||||
int numfds = 0;
|
||||
mc = curl_multi_poll(curlm.get(), nullptr, 0, sleepTimeMs, &numfds);
|
||||
if (mc != CURLM_OK)
|
||||
throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc));
|
||||
throw curlMultiError(mc);
|
||||
|
||||
nextWakeup = std::chrono::steady_clock::time_point();
|
||||
|
||||
/* Add new curl requests from the incoming requests queue,
|
||||
except for requests that are embargoed (waiting for a
|
||||
retry timeout to expire). */
|
||||
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
|
||||
char buf[1024];
|
||||
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
|
||||
if (res == -1 && errno != EINTR)
|
||||
throw SysError("reading curl wakeup socket");
|
||||
}
|
||||
|
||||
std::vector<std::shared_ptr<TransferItem>> incoming;
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
|
|
@ -910,10 +888,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
|
||||
state->incoming.push(item);
|
||||
}
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
writeFull(wakeupPipe.writeSide.get(), " ");
|
||||
#endif
|
||||
|
||||
wakeupMulti();
|
||||
return ItemHandle(static_cast<Item &>(*item));
|
||||
}
|
||||
|
||||
|
|
@ -933,9 +909,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
{
|
||||
auto state(state_.lock());
|
||||
state->unpause.push_back(std::move(item));
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
writeFull(wakeupPipe.writeSide.get(), " ");
|
||||
#endif
|
||||
wakeupMulti();
|
||||
}
|
||||
|
||||
void unpauseTransfer(ItemHandle handle) override
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue