mirror of
https://github.com/NixOS/nix.git
synced 2025-11-16 23:42:43 +01:00
Apply clang-format universally.
* It is tough to contribute to a project that doesn't use a formatter, * It is extra hard to contribute to a project which has configured the formatter, but ignores it for some files * Code formatting makes it harder to hide obscure / weird bugs by accident or on purpose, Let's rip the bandaid off? Note that PRs currently in flight should be able to be merged relatively easily by applying `clang-format` to their tip prior to merge.
This commit is contained in:
parent
41bf87ec70
commit
e4f62e4608
587 changed files with 23258 additions and 23135 deletions
|
|
@ -10,11 +10,11 @@
|
|||
|
||||
#include "store-config-private.hh"
|
||||
#if NIX_WITH_S3_SUPPORT
|
||||
#include <aws/core/client/ClientConfiguration.h>
|
||||
# include <aws/core/client/ClientConfiguration.h>
|
||||
#endif
|
||||
|
||||
#ifdef __linux__
|
||||
# include "nix/util/linux-namespaces.hh"
|
||||
# include "nix/util/linux-namespaces.hh"
|
||||
#endif
|
||||
|
||||
#include <unistd.h>
|
||||
|
|
@ -77,7 +77,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now();
|
||||
|
||||
inline static const std::set<long> successfulStatuses {200, 201, 204, 206, 304, 0 /* other protocol */};
|
||||
inline static const std::set<long> successfulStatuses{200, 201, 204, 206, 304, 0 /* other protocol */};
|
||||
|
||||
/* Get the HTTP status code, or 0 for other protocols. */
|
||||
long getHTTPStatus()
|
||||
|
|
@ -90,14 +90,18 @@ struct curlFileTransfer : public FileTransfer
|
|||
return httpStatus;
|
||||
}
|
||||
|
||||
TransferItem(curlFileTransfer & fileTransfer,
|
||||
TransferItem(
|
||||
curlFileTransfer & fileTransfer,
|
||||
const FileTransferRequest & request,
|
||||
Callback<FileTransferResult> && callback)
|
||||
: fileTransfer(fileTransfer)
|
||||
, request(request)
|
||||
, act(*logger, lvlTalkative, actFileTransfer,
|
||||
fmt("%sing '%s'", request.verb(), request.uri),
|
||||
{request.uri}, request.parentAct)
|
||||
, act(*logger,
|
||||
lvlTalkative,
|
||||
actFileTransfer,
|
||||
fmt("%sing '%s'", request.verb(), request.uri),
|
||||
{request.uri},
|
||||
request.parentAct)
|
||||
, callback(std::move(callback))
|
||||
, finalSink([this](std::string_view data) {
|
||||
if (errorSink) {
|
||||
|
|
@ -115,7 +119,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
} else
|
||||
this->result.data.append(data);
|
||||
})
|
||||
})
|
||||
{
|
||||
result.urls.push_back(request.uri);
|
||||
|
||||
|
|
@ -124,7 +128,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
requestHeaders = curl_slist_append(requestHeaders, ("If-None-Match: " + request.expectedETag).c_str());
|
||||
if (!request.mimeType.empty())
|
||||
requestHeaders = curl_slist_append(requestHeaders, ("Content-Type: " + request.mimeType).c_str());
|
||||
for (auto it = request.headers.begin(); it != request.headers.end(); ++it){
|
||||
for (auto it = request.headers.begin(); it != request.headers.end(); ++it) {
|
||||
requestHeaders = curl_slist_append(requestHeaders, fmt("%s: %s", it->first, it->second).c_str());
|
||||
}
|
||||
}
|
||||
|
|
@ -136,7 +140,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
curl_multi_remove_handle(fileTransfer.curlm, req);
|
||||
curl_easy_cleanup(req);
|
||||
}
|
||||
if (requestHeaders) curl_slist_free_all(requestHeaders);
|
||||
if (requestHeaders)
|
||||
curl_slist_free_all(requestHeaders);
|
||||
try {
|
||||
if (!done)
|
||||
fail(FileTransferError(Interrupted, {}, "download of '%s' was interrupted", request.uri));
|
||||
|
|
@ -172,12 +177,12 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
if (!decompressionSink) {
|
||||
decompressionSink = makeDecompressionSink(encoding, finalSink);
|
||||
if (! successfulStatuses.count(getHTTPStatus())) {
|
||||
if (!successfulStatuses.count(getHTTPStatus())) {
|
||||
// In this case we want to construct a TeeSink, to keep
|
||||
// the response around (which we figure won't be big
|
||||
// like an actual download should be) to improve error
|
||||
// messages.
|
||||
errorSink = StringSink { };
|
||||
errorSink = StringSink{};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -247,7 +252,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
else if (name == "link" || name == "x-amz-meta-link") {
|
||||
auto value = trim(line.substr(i + 1));
|
||||
static std::regex linkRegex("<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase);
|
||||
static std::regex linkRegex(
|
||||
"<([^>]*)>; rel=\"immutable\"", std::regex::extended | std::regex::icase);
|
||||
if (std::smatch match; std::regex_match(value, match, linkRegex))
|
||||
result.immutableUrl = match.str(1);
|
||||
else
|
||||
|
|
@ -273,7 +279,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
return getInterrupted();
|
||||
}
|
||||
|
||||
static int progressCallbackWrapper(void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
|
||||
static int progressCallbackWrapper(
|
||||
void * userp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
|
||||
{
|
||||
auto & item = *static_cast<TransferItem *>(userp);
|
||||
auto isUpload = bool(item.request.data);
|
||||
|
|
@ -288,7 +295,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
|
||||
size_t readOffset = 0;
|
||||
size_t readCallback(char *buffer, size_t size, size_t nitems)
|
||||
|
||||
size_t readCallback(char * buffer, size_t size, size_t nitems)
|
||||
{
|
||||
if (readOffset == request.data->length())
|
||||
return 0;
|
||||
|
|
@ -299,18 +307,19 @@ struct curlFileTransfer : public FileTransfer
|
|||
return count;
|
||||
}
|
||||
|
||||
static size_t readCallbackWrapper(char *buffer, size_t size, size_t nitems, void * userp)
|
||||
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp)
|
||||
{
|
||||
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
|
||||
}
|
||||
|
||||
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
|
||||
static int cloexec_callback(void *, curl_socket_t curlfd, curlsocktype purpose) {
|
||||
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
|
||||
static int cloexec_callback(void *, curl_socket_t curlfd, curlsocktype purpose)
|
||||
{
|
||||
unix::closeOnExec(curlfd);
|
||||
vomit("cloexec set for fd %i", curlfd);
|
||||
return CURL_SOCKOPT_OK;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
size_t seekCallback(curl_off_t offset, int origin)
|
||||
{
|
||||
|
|
@ -324,14 +333,15 @@ struct curlFileTransfer : public FileTransfer
|
|||
return CURL_SEEKFUNC_OK;
|
||||
}
|
||||
|
||||
static size_t seekCallbackWrapper(void *clientp, curl_off_t offset, int origin)
|
||||
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin)
|
||||
{
|
||||
return ((TransferItem *) clientp)->seekCallback(offset, origin);
|
||||
}
|
||||
|
||||
void init()
|
||||
{
|
||||
if (!req) req = curl_easy_init();
|
||||
if (!req)
|
||||
req = curl_easy_init();
|
||||
|
||||
curl_easy_reset(req);
|
||||
|
||||
|
|
@ -344,18 +354,21 @@ struct curlFileTransfer : public FileTransfer
|
|||
curl_easy_setopt(req, CURLOPT_FOLLOWLOCATION, 1L);
|
||||
curl_easy_setopt(req, CURLOPT_MAXREDIRS, 10);
|
||||
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
|
||||
curl_easy_setopt(req, CURLOPT_USERAGENT,
|
||||
("curl/" LIBCURL_VERSION " Nix/" + nixVersion +
|
||||
(fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")).c_str());
|
||||
#if LIBCURL_VERSION_NUM >= 0x072b00
|
||||
curl_easy_setopt(
|
||||
req,
|
||||
CURLOPT_USERAGENT,
|
||||
("curl/" LIBCURL_VERSION " Nix/" + nixVersion
|
||||
+ (fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : ""))
|
||||
.c_str());
|
||||
#if LIBCURL_VERSION_NUM >= 0x072b00
|
||||
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
|
||||
#endif
|
||||
#if LIBCURL_VERSION_NUM >= 0x072f00
|
||||
#endif
|
||||
#if LIBCURL_VERSION_NUM >= 0x072f00
|
||||
if (fileTransferSettings.enableHttp2)
|
||||
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
|
||||
else
|
||||
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
|
||||
#endif
|
||||
#endif
|
||||
curl_easy_setopt(req, CURLOPT_WRITEFUNCTION, TransferItem::writeCallbackWrapper);
|
||||
curl_easy_setopt(req, CURLOPT_WRITEDATA, this);
|
||||
curl_easy_setopt(req, CURLOPT_HEADERFUNCTION, TransferItem::headerCallbackWrapper);
|
||||
|
|
@ -393,9 +406,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
|
||||
}
|
||||
|
||||
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
|
||||
#if !defined(_WIN32) && LIBCURL_VERSION_NUM >= 0x071000
|
||||
curl_easy_setopt(req, CURLOPT_SOCKOPTFUNCTION, cloexec_callback);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
|
||||
|
||||
|
|
@ -425,10 +438,14 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
auto httpStatus = getHTTPStatus();
|
||||
|
||||
debug("finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes, duration = %.2f s",
|
||||
request.verb(), request.uri, code, httpStatus, result.bodySize,
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(finishTime - startTime).count() / 1000.0f
|
||||
);
|
||||
debug(
|
||||
"finished %s of '%s'; curl status = %d, HTTP status = %d, body = %d bytes, duration = %.2f s",
|
||||
request.verb(),
|
||||
request.uri,
|
||||
code,
|
||||
httpStatus,
|
||||
result.bodySize,
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(finishTime - startTime).count() / 1000.0f);
|
||||
|
||||
appendCurrentUrl();
|
||||
|
||||
|
|
@ -448,8 +465,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
if (writeException)
|
||||
failEx(writeException);
|
||||
|
||||
else if (code == CURLE_OK && successfulStatuses.count(httpStatus))
|
||||
{
|
||||
else if (code == CURLE_OK && successfulStatuses.count(httpStatus)) {
|
||||
result.cached = httpStatus == 304;
|
||||
|
||||
// In 2021, GitHub responds to If-None-Match with 304,
|
||||
|
|
@ -487,32 +503,32 @@ struct curlFileTransfer : public FileTransfer
|
|||
// * 511 we're behind a captive portal
|
||||
err = Misc;
|
||||
} else {
|
||||
// Don't bother retrying on certain cURL errors either
|
||||
// Don't bother retrying on certain cURL errors either
|
||||
|
||||
// Allow selecting a subset of enum values
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wswitch-enum"
|
||||
// Allow selecting a subset of enum values
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wswitch-enum"
|
||||
switch (code) {
|
||||
case CURLE_FAILED_INIT:
|
||||
case CURLE_URL_MALFORMAT:
|
||||
case CURLE_NOT_BUILT_IN:
|
||||
case CURLE_REMOTE_ACCESS_DENIED:
|
||||
case CURLE_FILE_COULDNT_READ_FILE:
|
||||
case CURLE_FUNCTION_NOT_FOUND:
|
||||
case CURLE_ABORTED_BY_CALLBACK:
|
||||
case CURLE_BAD_FUNCTION_ARGUMENT:
|
||||
case CURLE_INTERFACE_FAILED:
|
||||
case CURLE_UNKNOWN_OPTION:
|
||||
case CURLE_SSL_CACERT_BADFILE:
|
||||
case CURLE_TOO_MANY_REDIRECTS:
|
||||
case CURLE_WRITE_ERROR:
|
||||
case CURLE_UNSUPPORTED_PROTOCOL:
|
||||
err = Misc;
|
||||
break;
|
||||
default: // Shut up warnings
|
||||
break;
|
||||
case CURLE_FAILED_INIT:
|
||||
case CURLE_URL_MALFORMAT:
|
||||
case CURLE_NOT_BUILT_IN:
|
||||
case CURLE_REMOTE_ACCESS_DENIED:
|
||||
case CURLE_FILE_COULDNT_READ_FILE:
|
||||
case CURLE_FUNCTION_NOT_FOUND:
|
||||
case CURLE_ABORTED_BY_CALLBACK:
|
||||
case CURLE_BAD_FUNCTION_ARGUMENT:
|
||||
case CURLE_INTERFACE_FAILED:
|
||||
case CURLE_UNKNOWN_OPTION:
|
||||
case CURLE_SSL_CACERT_BADFILE:
|
||||
case CURLE_TOO_MANY_REDIRECTS:
|
||||
case CURLE_WRITE_ERROR:
|
||||
case CURLE_UNSUPPORTED_PROTOCOL:
|
||||
err = Misc;
|
||||
break;
|
||||
default: // Shut up warnings
|
||||
break;
|
||||
}
|
||||
#pragma GCC diagnostic pop
|
||||
#pragma GCC diagnostic pop
|
||||
}
|
||||
|
||||
attempt++;
|
||||
|
|
@ -520,31 +536,40 @@ struct curlFileTransfer : public FileTransfer
|
|||
std::optional<std::string> response;
|
||||
if (errorSink)
|
||||
response = std::move(errorSink->s);
|
||||
auto exc =
|
||||
code == CURLE_ABORTED_BY_CALLBACK && getInterrupted()
|
||||
? FileTransferError(Interrupted, std::move(response), "%s of '%s' was interrupted", request.verb(), request.uri)
|
||||
: httpStatus != 0
|
||||
? FileTransferError(err,
|
||||
std::move(response),
|
||||
"unable to %s '%s': HTTP error %d%s",
|
||||
request.verb(), request.uri, httpStatus,
|
||||
code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
|
||||
: FileTransferError(err,
|
||||
std::move(response),
|
||||
"unable to %s '%s': %s (%d) %s",
|
||||
request.verb(), request.uri, curl_easy_strerror(code), code, errbuf);
|
||||
auto exc = code == CURLE_ABORTED_BY_CALLBACK && getInterrupted() ? FileTransferError(
|
||||
Interrupted,
|
||||
std::move(response),
|
||||
"%s of '%s' was interrupted",
|
||||
request.verb(),
|
||||
request.uri)
|
||||
: httpStatus != 0
|
||||
? FileTransferError(
|
||||
err,
|
||||
std::move(response),
|
||||
"unable to %s '%s': HTTP error %d%s",
|
||||
request.verb(),
|
||||
request.uri,
|
||||
httpStatus,
|
||||
code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
|
||||
: FileTransferError(
|
||||
err,
|
||||
std::move(response),
|
||||
"unable to %s '%s': %s (%d) %s",
|
||||
request.verb(),
|
||||
request.uri,
|
||||
curl_easy_strerror(code),
|
||||
code,
|
||||
errbuf);
|
||||
|
||||
/* If this is a transient error, then maybe retry the
|
||||
download after a while. If we're writing to a
|
||||
sink, we can only retry if the server supports
|
||||
ranged requests. */
|
||||
if (err == Transient
|
||||
&& attempt < request.tries
|
||||
&& (!this->request.dataCallback
|
||||
|| writtenToSink == 0
|
||||
|| (acceptRanges && encoding.empty())))
|
||||
{
|
||||
int ms = retryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937));
|
||||
if (err == Transient && attempt < request.tries
|
||||
&& (!this->request.dataCallback || writtenToSink == 0 || (acceptRanges && encoding.empty()))) {
|
||||
int ms = retryTimeMs
|
||||
* std::pow(
|
||||
2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937));
|
||||
if (writtenToSink)
|
||||
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
|
||||
else
|
||||
|
|
@ -553,8 +578,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
errorSink.reset();
|
||||
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
|
||||
fileTransfer.enqueueItem(shared_from_this());
|
||||
}
|
||||
else
|
||||
} else
|
||||
fail(std::move(exc));
|
||||
}
|
||||
}
|
||||
|
|
@ -562,23 +586,28 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
struct State
|
||||
{
|
||||
struct EmbargoComparator {
|
||||
bool operator() (const std::shared_ptr<TransferItem> & i1, const std::shared_ptr<TransferItem> & i2) {
|
||||
struct EmbargoComparator
|
||||
{
|
||||
bool operator()(const std::shared_ptr<TransferItem> & i1, const std::shared_ptr<TransferItem> & i2)
|
||||
{
|
||||
return i1->embargo > i2->embargo;
|
||||
}
|
||||
};
|
||||
|
||||
bool quit = false;
|
||||
std::priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator> incoming;
|
||||
std::
|
||||
priority_queue<std::shared_ptr<TransferItem>, std::vector<std::shared_ptr<TransferItem>>, EmbargoComparator>
|
||||
incoming;
|
||||
};
|
||||
|
||||
Sync<State> state_;
|
||||
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
/* We can't use a std::condition_variable to wake up the curl
|
||||
thread, because it only monitors file descriptors. So use a
|
||||
pipe instead. */
|
||||
Pipe wakeupPipe;
|
||||
#endif
|
||||
#endif
|
||||
|
||||
std::thread workerThread;
|
||||
|
||||
|
|
@ -590,18 +619,17 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
curlm = curl_multi_init();
|
||||
|
||||
#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0
|
||||
#if LIBCURL_VERSION_NUM >= 0x072b00 // Multiplex requires >= 7.43.0
|
||||
curl_multi_setopt(curlm, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
|
||||
#endif
|
||||
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
|
||||
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
|
||||
fileTransferSettings.httpConnections.get());
|
||||
#endif
|
||||
#endif
|
||||
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
|
||||
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS, fileTransferSettings.httpConnections.get());
|
||||
#endif
|
||||
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
wakeupPipe.create();
|
||||
fcntl(wakeupPipe.readSide.get(), F_SETFL, O_NONBLOCK);
|
||||
#endif
|
||||
#endif
|
||||
|
||||
workerThread = std::thread([&]() { workerThreadEntry(); });
|
||||
}
|
||||
|
|
@ -612,7 +640,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
workerThread.join();
|
||||
|
||||
if (curlm) curl_multi_cleanup(curlm);
|
||||
if (curlm)
|
||||
curl_multi_cleanup(curlm);
|
||||
}
|
||||
|
||||
void stopWorkerThread()
|
||||
|
|
@ -622,28 +651,26 @@ struct curlFileTransfer : public FileTransfer
|
|||
auto state(state_.lock());
|
||||
state->quit = true;
|
||||
}
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
writeFull(wakeupPipe.writeSide.get(), " ", false);
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
void workerThreadMain()
|
||||
{
|
||||
/* Cause this thread to be notified on SIGINT. */
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
auto callback = createInterruptCallback([&]() {
|
||||
stopWorkerThread();
|
||||
});
|
||||
#endif
|
||||
/* Cause this thread to be notified on SIGINT. */
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
auto callback = createInterruptCallback([&]() { stopWorkerThread(); });
|
||||
#endif
|
||||
|
||||
#ifdef __linux__
|
||||
#ifdef __linux__
|
||||
try {
|
||||
tryUnshareFilesystem();
|
||||
} catch (nix::Error & e) {
|
||||
e.addTrace({}, "in download thread");
|
||||
throw;
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
std::map<CURL *, std::shared_ptr<TransferItem>> items;
|
||||
|
||||
|
|
@ -677,16 +704,19 @@ struct curlFileTransfer : public FileTransfer
|
|||
/* Wait for activity, including wakeup events. */
|
||||
int numfds = 0;
|
||||
struct curl_waitfd extraFDs[1];
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
extraFDs[0].fd = wakeupPipe.readSide.get();
|
||||
extraFDs[0].events = CURL_WAIT_POLLIN;
|
||||
extraFDs[0].revents = 0;
|
||||
#endif
|
||||
#endif
|
||||
long maxSleepTimeMs = items.empty() ? 10000 : 100;
|
||||
auto sleepTimeMs =
|
||||
nextWakeup != std::chrono::steady_clock::time_point()
|
||||
? std::max(0, (int) std::chrono::duration_cast<std::chrono::milliseconds>(nextWakeup - std::chrono::steady_clock::now()).count())
|
||||
: maxSleepTimeMs;
|
||||
auto sleepTimeMs = nextWakeup != std::chrono::steady_clock::time_point()
|
||||
? std::max(
|
||||
0,
|
||||
(int) std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
nextWakeup - std::chrono::steady_clock::now())
|
||||
.count())
|
||||
: maxSleepTimeMs;
|
||||
vomit("download thread waiting for %d ms", sleepTimeMs);
|
||||
mc = curl_multi_wait(curlm, extraFDs, 1, sleepTimeMs, &numfds);
|
||||
if (mc != CURLM_OK)
|
||||
|
|
@ -715,8 +745,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
incoming.push_back(item);
|
||||
state->incoming.pop();
|
||||
} else {
|
||||
if (nextWakeup == std::chrono::steady_clock::time_point()
|
||||
|| item->embargo < nextWakeup)
|
||||
if (nextWakeup == std::chrono::steady_clock::time_point() || item->embargo < nextWakeup)
|
||||
nextWakeup = item->embargo;
|
||||
break;
|
||||
}
|
||||
|
|
@ -747,16 +776,15 @@ struct curlFileTransfer : public FileTransfer
|
|||
|
||||
{
|
||||
auto state(state_.lock());
|
||||
while (!state->incoming.empty()) state->incoming.pop();
|
||||
while (!state->incoming.empty())
|
||||
state->incoming.pop();
|
||||
state->quit = true;
|
||||
}
|
||||
}
|
||||
|
||||
void enqueueItem(std::shared_ptr<TransferItem> item)
|
||||
{
|
||||
if (item->request.data
|
||||
&& !hasPrefix(item->request.uri, "http://")
|
||||
&& !hasPrefix(item->request.uri, "https://"))
|
||||
if (item->request.data && !hasPrefix(item->request.uri, "http://") && !hasPrefix(item->request.uri, "https://"))
|
||||
throw nix::Error("uploading to '%s' is not supported", item->request.uri);
|
||||
|
||||
{
|
||||
|
|
@ -765,9 +793,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
|
||||
state->incoming.push(item);
|
||||
}
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
#ifndef _WIN32 // TODO need graceful async exit support on Windows?
|
||||
writeFull(wakeupPipe.writeSide.get(), " ");
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
#if NIX_WITH_S3_SUPPORT
|
||||
|
|
@ -776,8 +804,8 @@ struct curlFileTransfer : public FileTransfer
|
|||
auto [path, params] = splitUriAndParams(uri);
|
||||
|
||||
auto slash = path.find('/', 5); // 5 is the length of "s3://" prefix
|
||||
if (slash == std::string::npos)
|
||||
throw nix::Error("bad S3 URI '%s'", path);
|
||||
if (slash == std::string::npos)
|
||||
throw nix::Error("bad S3 URI '%s'", path);
|
||||
|
||||
std::string bucketName(path, 5, slash - 5);
|
||||
std::string key(path, slash + 1);
|
||||
|
|
@ -786,8 +814,7 @@ struct curlFileTransfer : public FileTransfer
|
|||
}
|
||||
#endif
|
||||
|
||||
void enqueueFileTransfer(const FileTransferRequest & request,
|
||||
Callback<FileTransferResult> callback) override
|
||||
void enqueueFileTransfer(const FileTransferRequest & request, Callback<FileTransferResult> callback) override
|
||||
{
|
||||
/* Ugly hack to support s3:// URIs. */
|
||||
if (hasPrefix(request.uri, "s3://")) {
|
||||
|
|
@ -814,7 +841,9 @@ struct curlFileTransfer : public FileTransfer
|
|||
#else
|
||||
throw nix::Error("cannot download '%s' because Nix is not built with S3 support", request.uri);
|
||||
#endif
|
||||
} catch (...) { callback.rethrow(); }
|
||||
} catch (...) {
|
||||
callback.rethrow();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -845,14 +874,13 @@ ref<FileTransfer> makeFileTransfer()
|
|||
std::future<FileTransferResult> FileTransfer::enqueueFileTransfer(const FileTransferRequest & request)
|
||||
{
|
||||
auto promise = std::make_shared<std::promise<FileTransferResult>>();
|
||||
enqueueFileTransfer(request,
|
||||
{[promise](std::future<FileTransferResult> fut) {
|
||||
try {
|
||||
promise->set_value(fut.get());
|
||||
} catch (...) {
|
||||
promise->set_exception(std::current_exception());
|
||||
}
|
||||
}});
|
||||
enqueueFileTransfer(request, {[promise](std::future<FileTransferResult> fut) {
|
||||
try {
|
||||
promise->set_value(fut.get());
|
||||
} catch (...) {
|
||||
promise->set_exception(std::current_exception());
|
||||
}
|
||||
}});
|
||||
return promise->get_future();
|
||||
}
|
||||
|
||||
|
|
@ -868,9 +896,7 @@ FileTransferResult FileTransfer::upload(const FileTransferRequest & request)
|
|||
}
|
||||
|
||||
void FileTransfer::download(
|
||||
FileTransferRequest && request,
|
||||
Sink & sink,
|
||||
std::function<void(FileTransferResult)> resultCallback)
|
||||
FileTransferRequest && request, Sink & sink, std::function<void(FileTransferResult)> resultCallback)
|
||||
{
|
||||
/* Note: we can't call 'sink' via request.dataCallback, because
|
||||
that would cause the sink to execute on the fileTransfer
|
||||
|
|
@ -880,7 +906,8 @@ void FileTransfer::download(
|
|||
Therefore we use a buffer to communicate data between the
|
||||
download thread and the calling thread. */
|
||||
|
||||
struct State {
|
||||
struct State
|
||||
{
|
||||
bool quit = false;
|
||||
std::exception_ptr exc;
|
||||
std::string data;
|
||||
|
|
@ -898,10 +925,10 @@ void FileTransfer::download(
|
|||
});
|
||||
|
||||
request.dataCallback = [_state](std::string_view data) {
|
||||
|
||||
auto state(_state->lock());
|
||||
|
||||
if (state->quit) return;
|
||||
if (state->quit)
|
||||
return;
|
||||
|
||||
/* If the buffer is full, then go to sleep until the calling
|
||||
thread wakes us up (i.e. when it has removed data from the
|
||||
|
|
@ -921,8 +948,8 @@ void FileTransfer::download(
|
|||
state->avail.notify_one();
|
||||
};
|
||||
|
||||
enqueueFileTransfer(request,
|
||||
{[_state, resultCallback{std::move(resultCallback)}](std::future<FileTransferResult> fut) {
|
||||
enqueueFileTransfer(
|
||||
request, {[_state, resultCallback{std::move(resultCallback)}](std::future<FileTransferResult> fut) {
|
||||
auto state(_state->lock());
|
||||
state->quit = true;
|
||||
try {
|
||||
|
|
@ -949,13 +976,15 @@ void FileTransfer::download(
|
|||
if (state->data.empty()) {
|
||||
|
||||
if (state->quit) {
|
||||
if (state->exc) std::rethrow_exception(state->exc);
|
||||
if (state->exc)
|
||||
std::rethrow_exception(state->exc);
|
||||
return;
|
||||
}
|
||||
|
||||
state.wait(state->avail);
|
||||
|
||||
if (state->data.empty()) continue;
|
||||
if (state->data.empty())
|
||||
continue;
|
||||
}
|
||||
|
||||
chunk = std::move(state->data);
|
||||
|
|
@ -974,8 +1003,11 @@ void FileTransfer::download(
|
|||
}
|
||||
|
||||
template<typename... Args>
|
||||
FileTransferError::FileTransferError(FileTransfer::Error error, std::optional<std::string> response, const Args & ... args)
|
||||
: Error(args...), error(error), response(response)
|
||||
FileTransferError::FileTransferError(
|
||||
FileTransfer::Error error, std::optional<std::string> response, const Args &... args)
|
||||
: Error(args...)
|
||||
, error(error)
|
||||
, response(response)
|
||||
{
|
||||
const auto hf = HintFmt(args...);
|
||||
// FIXME: Due to https://github.com/NixOS/nix/issues/3841 we don't know how
|
||||
|
|
@ -987,4 +1019,4 @@ FileTransferError::FileTransferError(FileTransfer::Error error, std::optional<st
|
|||
err.msg = hf;
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace nix
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue