1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-12-16 22:11:05 +01:00

Merge pull request #14781 from NixOS/curl-cleanup

libstore: Clean up cruft from filetransfer
This commit is contained in:
John Ericson 2025-12-13 03:40:01 +00:00 committed by GitHub
commit 3b3bd018a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -41,9 +41,27 @@ FileTransferSettings fileTransferSettings;
static GlobalConfig::Register rFileTransferSettings(&fileTransferSettings);
namespace {
using curlSList = std::unique_ptr<::curl_slist, decltype([](::curl_slist * list) { ::curl_slist_free_all(list); })>;
using curlMulti = std::unique_ptr<::CURLM, decltype([](::CURLM * multi) { ::curl_multi_cleanup(multi); })>;
struct curlMultiError : Error
{
::CURLMcode code;
curlMultiError(::CURLMcode code)
: Error{"unexpected curl multi error: %s", ::curl_multi_strerror(code)}
{
assert(code != CURLM_OK);
}
};
} // namespace
struct curlFileTransfer : public FileTransfer
{
CURLM * curlm = 0;
curlMulti curlm;
std::random_device rd;
std::mt19937 mt19937;
@ -69,7 +87,7 @@ struct curlFileTransfer : public FileTransfer
has been reached. */
std::chrono::steady_clock::time_point embargo;
struct curl_slist * requestHeaders = 0;
curlSList requestHeaders;
std::string encoding;
@ -92,6 +110,15 @@ struct curlFileTransfer : public FileTransfer
return httpStatus;
}
void appendHeaders(const std::string & header)
{
curlSList tmpSList = curlSList(::curl_slist_append(requestHeaders.get(), requireCString(header)));
if (!tmpSList)
throw std::bad_alloc();
requestHeaders.release();
requestHeaders = std::move(tmpSList);
}
TransferItem(
curlFileTransfer & fileTransfer,
const FileTransferRequest & request,
@ -131,13 +158,13 @@ struct curlFileTransfer : public FileTransfer
{
result.urls.push_back(request.uri.to_string());
requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
appendHeaders("Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz");
if (!request.expectedETag.empty())
requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
appendHeaders("If-None-Match: " + request.expectedETag);
if (!request.mimeType.empty())
requestHeaders = curl_slist_append(requestHeaders, ("Content-Type: " + request.mimeType).c_str());
appendHeaders("Content-Type: " + request.mimeType);
for (auto it = request.headers.begin(); it != request.headers.end(); ++it) {
requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str());
appendHeaders(fmt("%s: %s", it->first, it->second));
}
}
@ -145,11 +172,9 @@ struct curlFileTransfer : public FileTransfer
{
if (req) {
if (active)
curl_multi_remove_handle(fileTransfer.curlm, req);
curl_multi_remove_handle(fileTransfer.curlm.get(), req);
curl_easy_cleanup(req);
}
if (requestHeaders)
curl_slist_free_all(requestHeaders);
try {
if (!done)
fail(FileTransferError(
@ -348,7 +373,7 @@ struct curlFileTransfer : public FileTransfer
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
}
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
#if !defined(_WIN32)
static int cloexec_callback(void *, curl_socket_t curlfd, curlsocktype purpose)
{
unix::closeOnExec(curlfd);
@ -411,15 +436,11 @@ struct curlFileTransfer : public FileTransfer
("curl/" LIBCURL_VERSION " Nix/" + nixVersion
+ (fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : ""))
.c_str());
#if LIBCURL_VERSION_NUM >= 0x072b00
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
#endif
#if LIBCURL_VERSION_NUM >= 0x072f00
if (fileTransferSettings.enableHttp2)
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
else
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
#endif
curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
@ -429,7 +450,7 @@ struct curlFileTransfer : public FileTransfer
curl_easy_setopt(req, CURLOPT_XFERINFODATA, this);
curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0);
curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders);
curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders.get());
if (settings.downloadSpeed.get() > 0)
curl_easy_setopt(req, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024));
@ -459,10 +480,9 @@ struct curlFileTransfer : public FileTransfer
if (settings.caFile != "")
curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str());
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
#if !defined(_WIN32)
curl_easy_setopt(req, CURLOPT_SOCKOPTFUNCTION, cloexec_callback);
#endif
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
@ -694,13 +714,6 @@ struct curlFileTransfer : public FileTransfer
Sync<State> state_;
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
/* We can't use a std::condition_variable to wake up the curl
thread, because it only monitors file descriptors. So use a
pipe instead. */
Pipe wakeupPipe;
#endif
std::thread workerThread;
curlFileTransfer()
@ -709,43 +722,35 @@ struct curlFileTransfer : public FileTransfer
static std::once_flag globalInit;
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
curlm = curl_multi_init();
curlm = curlMulti(curl_multi_init());
#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0
curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
#endif
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get());
#endif
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
wakeupPipe.create();
fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
#endif
curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get());
workerThread = std::thread([&]() { workerThreadEntry(); });
}
~curlFileTransfer()
{
stopWorkerThread();
try {
stopWorkerThread();
} catch (...) {
ignoreExceptionInDestructor();
}
workerThread.join();
if (curlm)
curl_multi_cleanup(curlm);
}
void stopWorkerThread()
{
/* Signal the worker thread to exit. */
{
auto state(state_.lock());
state->quit();
}
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
writeFull(wakeupPipe.writeSide.get(), " ", false);
#endif
state_.lock()->quit();
wakeupMulti();
}
void wakeupMulti()
{
if (auto ec = ::curl_multi_wakeup(curlm.get()))
throw curlMultiError(ec);
}
void workerThreadMain()
@ -775,32 +780,25 @@ struct curlFileTransfer : public FileTransfer
/* Let curl do its thing. */
int running;
CURLMcode mc = curl_multi_perform(curlm, &running);
CURLMcode mc = curl_multi_perform(curlm.get(), &running);
if (mc != CURLM_OK)
throw nix::Error("unexpected error from curl_multi_perform(): %s", curl_multi_strerror(mc));
/* Set the promises of any finished requests. */
CURLMsg * msg;
int left;
while ((msg = curl_multi_info_read(curlm, &left))) {
while ((msg = curl_multi_info_read(curlm.get(), &left))) {
if (msg->msg == CURLMSG_DONE) {
auto i = items.find(msg->easy_handle);
assert(i != items.end());
i->second->finish(msg->data.result);
curl_multi_remove_handle(curlm, i->second->req);
curl_multi_remove_handle(curlm.get(), i->second->req);
i->second->active = false;
items.erase(i);
}
}
/* Wait for activity, including wakeup events. */
int numfds = 0;
struct curl_waitfd extraFDs[1];
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
extraFDs[0].fd = wakeupPipe.readSide.get();
extraFDs[0].events = CURL_WAIT_POLLIN;
extraFDs[0].revents = 0;
#endif
long maxSleepTimeMs = items.empty() ? 10000 : 100;
auto sleepTimeMs = nextWakeup != std::chrono::steady_clock::time_point()
? std::max(
@ -809,23 +807,14 @@ struct curlFileTransfer : public FileTransfer
nextWakeup - std::chrono::steady_clock::now())
.count())
: maxSleepTimeMs;
vomit("download thread waiting for %d ms", sleepTimeMs);
mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
int numfds = 0;
mc = curl_multi_poll(curlm.get(), nullptr, 0, sleepTimeMs, &numfds);
if (mc != CURLM_OK)
throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc));
throw curlMultiError(mc);
nextWakeup = std::chrono::steady_clock::time_point();
/* Add new curl requests from the incoming requests queue,
except for requests that are embargoed (waiting for a
retry timeout to expire). */
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
char buf[1024];
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
if (res == -1 && errno != EINTR)
throw SysError("reading curl wakeup socket");
}
std::vector<std::shared_ptr<TransferItem>> incoming;
auto now = std::chrono::steady_clock::now();
@ -848,7 +837,7 @@ struct curlFileTransfer : public FileTransfer
for (auto & item : incoming) {
debug("starting %s of %s", item->request.noun(), item->request.uri);
item->init();
curl_multi_add_handle(curlm, item->req);
curl_multi_add_handle(curlm.get(), item->req);
item->active = true;
items[item->req] = item;
}
@ -899,10 +888,8 @@ struct curlFileTransfer : public FileTransfer
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item);
}
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
writeFull(wakeupPipe.writeSide.get(), " ");
#endif
wakeupMulti();
return ItemHandle(static_cast<Item &>(*item));
}
@ -922,9 +909,7 @@ struct curlFileTransfer : public FileTransfer
{
auto state(state_.lock());
state->unpause.push_back(std::move(item));
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
writeFull(wakeupPipe.writeSide.get(), " ");
#endif
wakeupMulti();
}
void unpauseTransfer(ItemHandle handle) override