From ea96e6d07cb920aab79ba3f2fdd089814f8f781e Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Sat, 13 Dec 2025 02:31:46 +0300 Subject: [PATCH 1/2] libstore/filetransfer: Factor out appendHeaders, use std::unique_ptr to simplify ownership Pretty self-explanatory. More RAII is good and unclutters the already heavily overloaded destructors from ownership logic. Not yet touching CURL *req because that would be too churny. --- src/libstore/filetransfer.cc | 53 ++++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 26ceba729..14c19dc7b 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -41,9 +41,16 @@ FileTransferSettings fileTransferSettings; static GlobalConfig::Register rFileTransferSettings(&fileTransferSettings); +namespace { + +using curlSList = std::unique_ptr<::curl_slist, decltype([](::curl_slist * list) { ::curl_slist_free_all(list); })>; +using curlMulti = std::unique_ptr<::CURLM, decltype([](::CURLM * multi) { ::curl_multi_cleanup(multi); })>; + +} // namespace + struct curlFileTransfer : public FileTransfer { - CURLM * curlm = 0; + curlMulti curlm; std::random_device rd; std::mt19937 mt19937; @@ -69,7 +76,7 @@ struct curlFileTransfer : public FileTransfer has been reached. */ std::chrono::steady_clock::time_point embargo; - struct curl_slist * requestHeaders = 0; + curlSList requestHeaders; std::string encoding; @@ -92,6 +99,15 @@ struct curlFileTransfer : public FileTransfer return httpStatus; } + void appendHeaders(const std::string & header) + { + curlSList tmpSList = curlSList(::curl_slist_append(requestHeaders.get(), requireCString(header))); + if (!tmpSList) + throw std::bad_alloc(); + requestHeaders.release(); + requestHeaders = std::move(tmpSList); + } + TransferItem( curlFileTransfer & fileTransfer, const FileTransferRequest & request, @@ -131,13 +147,13 @@ struct curlFileTransfer : public FileTransfer { result.urls.push_back(request.uri.to_string()); - requestHeaders = curl_slist_append(requestHeaders, "Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz"); + appendHeaders("Accept-Encoding: zstd, br, gzip, deflate, bzip2, xz"); if (!request.expectedETag.empty()) - requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str()); + appendHeaders("If-None-Match: " + request.expectedETag); if (!request.mimeType.empty()) - requestHeaders = curl_slist_append(requestHeaders, ("Content-Type: " + request.mimeType).c_str()); + appendHeaders("Content-Type: " + request.mimeType); for (auto it = request.headers.begin(); it != request.headers.end(); ++it) { - requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str()); + appendHeaders(fmt("%s: %s", it->first, it->second)); } } @@ -145,11 +161,9 @@ struct curlFileTransfer : public FileTransfer { if (req) { if (active) - curl_multi_remove_handle(fileTransfer.curlm, req); + curl_multi_remove_handle(fileTransfer.curlm.get(), req); curl_easy_cleanup(req); } - if (requestHeaders) - curl_slist_free_all(requestHeaders); try { if (!done) fail(FileTransferError( @@ -429,7 +443,7 @@ struct curlFileTransfer : public FileTransfer curl_easy_setopt(req, CURLOPT_XFERINFODATA, this); curl_easy_setopt(req, CURLOPT_NOPROGRESS, 0); - curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders); + curl_easy_setopt(req, CURLOPT_HTTPHEADER, requestHeaders.get()); if (settings.downloadSpeed.get() > 0) curl_easy_setopt(req, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t) (settings.downloadSpeed.get() * 1024)); @@ -709,13 +723,13 @@ struct curlFileTransfer : public FileTransfer static std::once_flag globalInit; std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL); - curlm = curl_multi_init(); + curlm = curlMulti(curl_multi_init()); #if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0 - curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); + curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); #endif #if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0 - curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get()); + curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get()); #endif #ifndef _WIN32 // TODO need graceful async exit support on Windows? @@ -731,9 +745,6 @@ struct curlFileTransfer : public FileTransfer stopWorkerThread(); workerThread.join(); - - if (curlm) - curl_multi_cleanup(curlm); } void stopWorkerThread() @@ -775,19 +786,19 @@ struct curlFileTransfer : public FileTransfer /* Let curl do its thing. */ int running; - CURLMcode mc = curl_multi_perform(curlm, &running); + CURLMcode mc = curl_multi_perform(curlm.get(), &running); if (mc != CURLM_OK) throw nix::Error("unexpected error from curl_multi_perform(): %s", curl_multi_strerror(mc)); /* Set the promises of any finished requests. */ CURLMsg * msg; int left; - while ((msg = curl_multi_info_read(curlm, &left))) { + while ((msg = curl_multi_info_read(curlm.get(), &left))) { if (msg->msg == CURLMSG_DONE) { auto i = items.find(msg->easy_handle); assert(i != items.end()); i->second->finish(msg->data.result); - curl_multi_remove_handle(curlm, i->second->req); + curl_multi_remove_handle(curlm.get(), i->second->req); i->second->active = false; items.erase(i); } @@ -810,7 +821,7 @@ struct curlFileTransfer : public FileTransfer .count()) : maxSleepTimeMs; vomit("download thread waiting for %d ms", sleepTimeMs); - mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds); + mc = curl_multi_wait(curlm.get(), extraFDs, 1, sleepTimeMs, &numfds); if (mc != CURLM_OK) throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc)); @@ -848,7 +859,7 @@ struct curlFileTransfer : public FileTransfer for (auto & item : incoming) { debug("starting %s of %s", item->request.noun(), item->request.uri); item->init(); - curl_multi_add_handle(curlm, item->req); + curl_multi_add_handle(curlm.get(), item->req); item->active = true; items[item->req] = item; } From 46670a7f46399cbc81555178e839a1cada56bd9c Mon Sep 17 00:00:00 2001 From: Sergei Zimmerman Date: Sat, 13 Dec 2025 03:00:58 +0300 Subject: [PATCH 2/2] libstore/filetransfer: Replace curl_multi_wait with curl_multi_poll and get rid of CPP Since 7.68 libcurl already provides curl_multi_wakeup, so we can drop the hacky pipe setup (libcurl does this internally). --- src/libstore/filetransfer.cc | 90 +++++++++++++----------------------- 1 file changed, 32 insertions(+), 58 deletions(-) diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 14c19dc7b..7f376429e 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -46,6 +46,17 @@ namespace { using curlSList = std::unique_ptr<::curl_slist, decltype([](::curl_slist * list) { ::curl_slist_free_all(list); })>; using curlMulti = std::unique_ptr<::CURLM, decltype([](::CURLM * multi) { ::curl_multi_cleanup(multi); })>; +struct curlMultiError : Error +{ + ::CURLMcode code; + + curlMultiError(::CURLMcode code) + : Error{"unexpected curl multi error: %s", ::curl_multi_strerror(code)} + { + assert(code != CURLM_OK); + } +}; + } // namespace struct curlFileTransfer : public FileTransfer @@ -362,7 +373,7 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) userp)->readCallback(buffer, size, nitems); } -#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000 +#if !defined(_WIN32) static int cloexec_callback(void *, curl_socket_t curlfd, curlsocktype purpose) { unix::closeOnExec(curlfd); @@ -425,15 +436,11 @@ struct curlFileTransfer : public FileTransfer ("curl/" LIBCURL_VERSION " Nix/" + nixVersion + (fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")) .c_str()); -#if LIBCURL_VERSION_NUM >= 0x072b00 curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); -#endif -#if LIBCURL_VERSION_NUM >= 0x072f00 if (fileTransferSettings.enableHttp2) curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); else curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); -#endif curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper); curl_easy_setopt(req, CURLOPT_WRITEDATA, this); curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper); @@ -473,10 +480,9 @@ struct curlFileTransfer : public FileTransfer if (settings.caFile != "") curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str()); -#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000 +#if !defined(_WIN32) curl_easy_setopt(req, CURLOPT_SOCKOPTFUNCTION, cloexec_callback); #endif - curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get()); curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L); @@ -708,13 +714,6 @@ struct curlFileTransfer : public FileTransfer Sync state_; -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - /* We can't use a std::condition_variable to wake up the curl - thread, because it only monitors file descriptors. So use a - pipe instead. */ - Pipe wakeupPipe; -#endif - std::thread workerThread; curlFileTransfer() @@ -725,38 +724,33 @@ struct curlFileTransfer : public FileTransfer curlm = curlMulti(curl_multi_init()); -#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0 curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); -#endif -#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0 curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get()); -#endif - -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - wakeupPipe.create(); - fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK); -#endif workerThread = std::thread([&]() { workerThreadEntry(); }); } ~curlFileTransfer() { - stopWorkerThread(); - + try { + stopWorkerThread(); + } catch (...) { + ignoreExceptionInDestructor(); + } workerThread.join(); } void stopWorkerThread() { /* Signal the worker thread to exit. */ - { - auto state(state_.lock()); - state->quit(); - } -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - writeFull(wakeupPipe.writeSide.get(), " ", false); -#endif + state_.lock()->quit(); + wakeupMulti(); + } + + void wakeupMulti() + { + if (auto ec = ::curl_multi_wakeup(curlm.get())) + throw curlMultiError(ec); } void workerThreadMain() @@ -805,13 +799,6 @@ struct curlFileTransfer : public FileTransfer } /* Wait for activity, including wakeup events. */ - int numfds = 0; - struct curl_waitfd extraFDs[1]; -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - extraFDs[0].fd = wakeupPipe.readSide.get(); - extraFDs[0].events = CURL_WAIT_POLLIN; - extraFDs[0].revents = 0; -#endif long maxSleepTimeMs = items.empty() ? 10000 : 100; auto sleepTimeMs = nextWakeup != std::chrono::steady_clock::time_point() ? std::max( @@ -820,23 +807,14 @@ struct curlFileTransfer : public FileTransfer nextWakeup - std::chrono::steady_clock::now()) .count()) : maxSleepTimeMs; - vomit("download thread waiting for %d ms", sleepTimeMs); - mc = curl_multi_wait(curlm.get(), extraFDs, 1, sleepTimeMs, &numfds); + + int numfds = 0; + mc = curl_multi_poll(curlm.get(), nullptr, 0, sleepTimeMs, &numfds); if (mc != CURLM_OK) - throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc)); + throw curlMultiError(mc); nextWakeup = std::chrono::steady_clock::time_point(); - /* Add new curl requests from the incoming requests queue, - except for requests that are embargoed (waiting for a - retry timeout to expire). */ - if (extraFDs[0].revents & CURL_WAIT_POLLIN) { - char buf[1024]; - auto res = read(extraFDs[0].fd, buf, sizeof(buf)); - if (res == -1 && errno != EINTR) - throw SysError("reading curl wakeup socket"); - } - std::vector> incoming; auto now = std::chrono::steady_clock::now(); @@ -910,10 +888,8 @@ struct curlFileTransfer : public FileTransfer throw nix::Error("cannot enqueue download request because the download thread is shutting down"); state->incoming.push(item); } -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - writeFull(wakeupPipe.writeSide.get(), " "); -#endif + wakeupMulti(); return ItemHandle(static_cast(*item)); } @@ -933,9 +909,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); state->unpause.push_back(std::move(item)); -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - writeFull(wakeupPipe.writeSide.get(), " "); -#endif + wakeupMulti(); } void unpauseTransfer(ItemHandle handle) override