diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 14c19dc7b..7f376429e 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -46,6 +46,17 @@ namespace { using curlSList = std::unique_ptr<::curl_slist, decltype([](::curl_slist * list) { ::curl_slist_free_all(list); })>; using curlMulti = std::unique_ptr<::CURLM, decltype([](::CURLM * multi) { ::curl_multi_cleanup(multi); })>; +struct curlMultiError : Error +{ + ::CURLMcode code; + + curlMultiError(::CURLMcode code) + : Error{"unexpected curl multi error: %s", ::curl_multi_strerror(code)} + { + assert(code != CURLM_OK); + } +}; + } // namespace struct curlFileTransfer : public FileTransfer @@ -362,7 +373,7 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) userp)->readCallback(buffer, size, nitems); } -#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000 +#if !defined(_WIN32) static int cloexec_callback(void *, curl_socket_t curlfd, curlsocktype purpose) { unix::closeOnExec(curlfd); @@ -425,15 +436,11 @@ struct curlFileTransfer : public FileTransfer ("curl/" LIBCURL_VERSION " Nix/" + nixVersion + (fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")) .c_str()); -#if LIBCURL_VERSION_NUM >= 0x072b00 curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1); -#endif -#if LIBCURL_VERSION_NUM >= 0x072f00 if (fileTransferSettings.enableHttp2) curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS); else curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); -#endif curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper); curl_easy_setopt(req, CURLOPT_WRITEDATA, this); curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper); @@ -473,10 +480,9 @@ struct curlFileTransfer : public FileTransfer if (settings.caFile != "") curl_easy_setopt(req, CURLOPT_CAINFO, settings.caFile.get().c_str()); -#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000 +#if !defined(_WIN32) curl_easy_setopt(req, CURLOPT_SOCKOPTFUNCTION, cloexec_callback); #endif - curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get()); curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L); @@ -708,13 +714,6 @@ struct curlFileTransfer : public FileTransfer Sync state_; -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - /* We can't use a std::condition_variable to wake up the curl - thread, because it only monitors file descriptors. So use a - pipe instead. */ - Pipe wakeupPipe; -#endif - std::thread workerThread; curlFileTransfer() @@ -725,38 +724,33 @@ struct curlFileTransfer : public FileTransfer curlm = curlMulti(curl_multi_init()); -#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0 curl_multi_setopt(curlm.get(), CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX); -#endif -#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0 curl_multi_setopt(curlm.get(), CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get()); -#endif - -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - wakeupPipe.create(); - fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK); -#endif workerThread = std::thread([&]() { workerThreadEntry(); }); } ~curlFileTransfer() { - stopWorkerThread(); - + try { + stopWorkerThread(); + } catch (...) { + ignoreExceptionInDestructor(); + } workerThread.join(); } void stopWorkerThread() { /* Signal the worker thread to exit. */ - { - auto state(state_.lock()); - state->quit(); - } -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - writeFull(wakeupPipe.writeSide.get(), " ", false); -#endif + state_.lock()->quit(); + wakeupMulti(); + } + + void wakeupMulti() + { + if (auto ec = ::curl_multi_wakeup(curlm.get())) + throw curlMultiError(ec); } void workerThreadMain() @@ -805,13 +799,6 @@ struct curlFileTransfer : public FileTransfer } /* Wait for activity, including wakeup events. */ - int numfds = 0; - struct curl_waitfd extraFDs[1]; -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - extraFDs[0].fd = wakeupPipe.readSide.get(); - extraFDs[0].events = CURL_WAIT_POLLIN; - extraFDs[0].revents = 0; -#endif long maxSleepTimeMs = items.empty() ? 10000 : 100; auto sleepTimeMs = nextWakeup != std::chrono::steady_clock::time_point() ? std::max( @@ -820,23 +807,14 @@ struct curlFileTransfer : public FileTransfer nextWakeup - std::chrono::steady_clock::now()) .count()) : maxSleepTimeMs; - vomit("download thread waiting for %d ms", sleepTimeMs); - mc = curl_multi_wait(curlm.get(), extraFDs, 1, sleepTimeMs, &numfds); + + int numfds = 0; + mc = curl_multi_poll(curlm.get(), nullptr, 0, sleepTimeMs, &numfds); if (mc != CURLM_OK) - throw nix::Error("unexpected error from curl_multi_wait(): %s", curl_multi_strerror(mc)); + throw curlMultiError(mc); nextWakeup = std::chrono::steady_clock::time_point(); - /* Add new curl requests from the incoming requests queue, - except for requests that are embargoed (waiting for a - retry timeout to expire). */ - if (extraFDs[0].revents & CURL_WAIT_POLLIN) { - char buf[1024]; - auto res = read(extraFDs[0].fd, buf, sizeof(buf)); - if (res == -1 && errno != EINTR) - throw SysError("reading curl wakeup socket"); - } - std::vector> incoming; auto now = std::chrono::steady_clock::now(); @@ -910,10 +888,8 @@ struct curlFileTransfer : public FileTransfer throw nix::Error("cannot enqueue download request because the download thread is shutting down"); state->incoming.push(item); } -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - writeFull(wakeupPipe.writeSide.get(), " "); -#endif + wakeupMulti(); return ItemHandle(static_cast(*item)); } @@ -933,9 +909,7 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); state->unpause.push_back(std::move(item)); -#ifndef _WIN32 // TODO need graceful async exit support on Windows? - writeFull(wakeupPipe.writeSide.get(), " "); -#endif + wakeupMulti(); } void unpauseTransfer(ItemHandle handle) override