mirror of
https://github.com/NixOS/nix.git
synced 2025-11-21 09:49:36 +01:00
Merge remote-tracking branch 'origin/master' into flakes
This commit is contained in:
commit
4205234f26
8 changed files with 145 additions and 162 deletions
|
|
@ -8,7 +8,6 @@
|
|||
#include "compression.hh"
|
||||
#include "pathlocks.hh"
|
||||
#include "finally.hh"
|
||||
#include "retry.hh"
|
||||
|
||||
#ifdef ENABLE_S3
|
||||
#include <aws/core/client/ClientConfiguration.h>
|
||||
|
|
@ -20,9 +19,11 @@
|
|||
#include <curl/curl.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cmath>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <queue>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
using namespace std::string_literals;
|
||||
|
|
@ -45,6 +46,9 @@ struct CurlDownloader : public Downloader
|
|||
{
|
||||
CURLM * curlm = 0;
|
||||
|
||||
std::random_device rd;
|
||||
std::mt19937 mt19937;
|
||||
|
||||
struct DownloadItem : public std::enable_shared_from_this<DownloadItem>
|
||||
{
|
||||
CurlDownloader & downloader;
|
||||
|
|
@ -57,10 +61,20 @@ struct CurlDownloader : public Downloader
|
|||
bool active = false; // whether the handle has been added to the multi object
|
||||
std::string status;
|
||||
|
||||
unsigned int attempt = 0;
|
||||
|
||||
/* Don't start this download until the specified time point
|
||||
has been reached. */
|
||||
std::chrono::steady_clock::time_point embargo;
|
||||
|
||||
struct curl_slist * requestHeaders = 0;
|
||||
|
||||
std::string encoding;
|
||||
|
||||
bool acceptRanges = false;
|
||||
|
||||
curl_off_t writtenToSink = 0;
|
||||
|
||||
DownloadItem(CurlDownloader & downloader,
|
||||
const DownloadRequest & request,
|
||||
Callback<DownloadResult> callback)
|
||||
|
|
@ -71,9 +85,10 @@ struct CurlDownloader : public Downloader
|
|||
{request.uri}, request.parentAct)
|
||||
, callback(callback)
|
||||
, finalSink([this](const unsigned char * data, size_t len) {
|
||||
if (this->request.dataCallback)
|
||||
if (this->request.dataCallback) {
|
||||
writtenToSink += len;
|
||||
this->request.dataCallback((char *) data, len);
|
||||
else
|
||||
} else
|
||||
this->result.data->append((char *) data, len);
|
||||
})
|
||||
{
|
||||
|
|
@ -151,6 +166,7 @@ struct CurlDownloader : public Downloader
|
|||
status = ss.size() >= 2 ? ss[1] : "";
|
||||
result.data = std::make_shared<std::string>();
|
||||
result.bodySize = 0;
|
||||
acceptRanges = false;
|
||||
encoding = "";
|
||||
} else {
|
||||
auto i = line.find(':');
|
||||
|
|
@ -168,7 +184,9 @@ struct CurlDownloader : public Downloader
|
|||
return 0;
|
||||
}
|
||||
} else if (name == "content-encoding")
|
||||
encoding = trim(string(line, i + 1));;
|
||||
encoding = trim(string(line, i + 1));
|
||||
else if (name == "accept-ranges" && toLower(trim(std::string(line, i + 1))) == "bytes")
|
||||
acceptRanges = true;
|
||||
}
|
||||
}
|
||||
return realSize;
|
||||
|
|
@ -286,6 +304,9 @@ struct CurlDownloader : public Downloader
|
|||
curl_easy_setopt(req, CURLOPT_NETRC_FILE, settings.netrcFile.get().c_str());
|
||||
curl_easy_setopt(req, CURLOPT_NETRC, CURL_NETRC_OPTIONAL);
|
||||
|
||||
if (writtenToSink)
|
||||
curl_easy_setopt(req, CURLOPT_RESUME_FROM_LARGE, writtenToSink);
|
||||
|
||||
result.data = std::make_shared<std::string>();
|
||||
result.bodySize = 0;
|
||||
}
|
||||
|
|
@ -320,7 +341,7 @@ struct CurlDownloader : public Downloader
|
|||
failEx(writeException);
|
||||
|
||||
else if (code == CURLE_OK &&
|
||||
(httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
|
||||
(httpStatus == 200 || httpStatus == 201 || httpStatus == 204 || httpStatus == 206 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
|
||||
{
|
||||
result.cached = httpStatus == 304;
|
||||
done = true;
|
||||
|
|
@ -377,7 +398,9 @@ struct CurlDownloader : public Downloader
|
|||
}
|
||||
}
|
||||
|
||||
fail(
|
||||
attempt++;
|
||||
|
||||
auto exc =
|
||||
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
|
||||
? DownloadError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
|
||||
: httpStatus != 0
|
||||
|
|
@ -388,15 +411,41 @@ struct CurlDownloader : public Downloader
|
|||
)
|
||||
: DownloadError(err,
|
||||
fmt("unable to %s '%s': %s (%d)",
|
||||
request.verb(), request.uri, curl_easy_strerror(code), code)));
|
||||
request.verb(), request.uri, curl_easy_strerror(code), code));
|
||||
|
||||
/* If this is a transient error, then maybe retry the
|
||||
download after a while. If we're writing to a
|
||||
sink, we can only retry if the server supports
|
||||
ranged requests. */
|
||||
if (err == Transient
|
||||
&& attempt < request.tries
|
||||
&& (!this->request.dataCallback
|
||||
|| writtenToSink == 0
|
||||
|| (acceptRanges && encoding.empty())))
|
||||
{
|
||||
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(downloader.mt19937));
|
||||
if (writtenToSink)
|
||||
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
|
||||
else
|
||||
warn("%s; retrying in %d ms", exc.what(), ms);
|
||||
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
|
||||
downloader.enqueueItem(shared_from_this());
|
||||
}
|
||||
else
|
||||
fail(exc);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct State
|
||||
{
|
||||
struct EmbargoComparator {
|
||||
bool operator() (const std::shared_ptr<DownloadItem> & i1, const std::shared_ptr<DownloadItem> & i2) {
|
||||
return i1->embargo > i2->embargo;
|
||||
}
|
||||
};
|
||||
bool quit = false;
|
||||
std::vector<std::shared_ptr<DownloadItem>> incoming;
|
||||
std::priority_queue<std::shared_ptr<DownloadItem>, std::vector<std::shared_ptr<DownloadItem>>, EmbargoComparator> incoming;
|
||||
};
|
||||
|
||||
Sync<State> state_;
|
||||
|
|
@ -409,6 +458,7 @@ struct CurlDownloader : public Downloader
|
|||
std::thread workerThread;
|
||||
|
||||
CurlDownloader()
|
||||
: mt19937(rd())
|
||||
{
|
||||
static std::once_flag globalInit;
|
||||
std::call_once(globalInit, curl_global_init, CURL_GLOBAL_ALL);
|
||||
|
|
@ -502,7 +552,9 @@ struct CurlDownloader : public Downloader
|
|||
|
||||
nextWakeup = std::chrono::steady_clock::time_point();
|
||||
|
||||
/* Add new curl requests from the incoming requests queue. */
|
||||
/* Add new curl requests from the incoming requests queue,
|
||||
except for requests that are embargoed (waiting for a
|
||||
retry timeout to expire). */
|
||||
if (extraFDs[0].revents & CURL_WAIT_POLLIN) {
|
||||
char buf[1024];
|
||||
auto res = read(extraFDs[0].fd, buf, sizeof(buf));
|
||||
|
|
@ -511,9 +563,22 @@ struct CurlDownloader : public Downloader
|
|||
}
|
||||
|
||||
std::vector<std::shared_ptr<DownloadItem>> incoming;
|
||||
auto now = std::chrono::steady_clock::now();
|
||||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
std::swap(state->incoming, incoming);
|
||||
while (!state->incoming.empty()) {
|
||||
auto item = state->incoming.top();
|
||||
if (item->embargo <= now) {
|
||||
incoming.push_back(item);
|
||||
state->incoming.pop();
|
||||
} else {
|
||||
if (nextWakeup == std::chrono::steady_clock::time_point()
|
||||
|| item->embargo < nextWakeup)
|
||||
nextWakeup = item->embargo;
|
||||
break;
|
||||
}
|
||||
}
|
||||
quit = state->quit;
|
||||
}
|
||||
|
||||
|
|
@ -535,12 +600,12 @@ struct CurlDownloader : public Downloader
|
|||
workerThreadMain();
|
||||
} catch (nix::Interrupted & e) {
|
||||
} catch (std::exception & e) {
|
||||
printError(format("unexpected error in download thread: %s") % e.what());
|
||||
printError("unexpected error in download thread: %s", e.what());
|
||||
}
|
||||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
state->incoming.clear();
|
||||
while (!state->incoming.empty()) state->incoming.pop();
|
||||
state->quit = true;
|
||||
}
|
||||
}
|
||||
|
|
@ -556,7 +621,7 @@ struct CurlDownloader : public Downloader
|
|||
auto state(state_.lock());
|
||||
if (state->quit)
|
||||
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
|
||||
state->incoming.push_back(item);
|
||||
state->incoming.push(item);
|
||||
}
|
||||
writeFull(wakeupPipe.writeSide.get(), " ");
|
||||
}
|
||||
|
|
@ -639,9 +704,7 @@ std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest &
|
|||
|
||||
DownloadResult Downloader::download(const DownloadRequest & request)
|
||||
{
|
||||
return retry<DownloadResult>(request.tries, [&]() {
|
||||
return enqueueDownload(request).get();
|
||||
});
|
||||
return enqueueDownload(request).get();
|
||||
}
|
||||
|
||||
void Downloader::download(DownloadRequest && request, Sink & sink)
|
||||
|
|
@ -828,7 +891,7 @@ CachedDownloadResult Downloader::downloadCached(
|
|||
writeFile(dataFile, url + "\n" + res.etag + "\n" + std::to_string(time(0)) + "\n");
|
||||
} catch (DownloadError & e) {
|
||||
if (storePath.empty()) throw;
|
||||
warn("%s; using cached result", e.msg());
|
||||
warn("warning: %s; using cached result", e.msg());
|
||||
result.etag = expectedETag;
|
||||
}
|
||||
}
|
||||
|
|
@ -892,4 +955,5 @@ bool isUri(const string & s)
|
|||
return scheme == "http" || scheme == "https" || scheme == "file" || scheme == "channel" || scheme == "git" || scheme == "s3" || scheme == "ssh";
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue