1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-12-01 14:41:00 +01:00

DataTransfer -> FileTransfer

This commit is contained in:
Nikola Knezevic 2020-04-06 23:43:43 +02:00
parent 7848372b0f
commit c330109bfa
13 changed files with 107 additions and 107 deletions

View file

@ -27,9 +27,9 @@ using namespace std::string_literals;
namespace nix {
DataTransferSettings dataTransferSettings;
FileTransferSettings fileTransferSettings;
static GlobalConfig::Register r1(&dataTransferSettings);
static GlobalConfig::Register r1(&fileTransferSettings);
std::string resolveUri(const std::string & uri)
{
@ -39,7 +39,7 @@ std::string resolveUri(const std::string & uri)
return uri;
}
struct curlDataTransfer : public DataTransfer
struct curlFileTransfer : public FileTransfer
{
CURLM * curlm = 0;
@ -48,12 +48,12 @@ struct curlDataTransfer : public DataTransfer
struct TransferItem : public std::enable_shared_from_this<TransferItem>
{
curlDataTransfer & dataTransfer;
DataTransferRequest request;
DataTransferResult result;
curlFileTransfer & fileTransfer;
FileTransferRequest request;
FileTransferResult result;
Activity act;
bool done = false; // whether either the success or failure function has been called
Callback<DataTransferResult> callback;
Callback<FileTransferResult> callback;
CURL * req = 0;
bool active = false; // whether the handle has been added to the multi object
std::string status;
@ -72,12 +72,12 @@ struct curlDataTransfer : public DataTransfer
curl_off_t writtenToSink = 0;
TransferItem(curlDataTransfer & dataTransfer,
const DataTransferRequest & request,
Callback<DataTransferResult> && callback)
: dataTransfer(dataTransfer)
TransferItem(curlFileTransfer & fileTransfer,
const FileTransferRequest & request,
Callback<FileTransferResult> && callback)
: fileTransfer(fileTransfer)
, request(request)
, act(*logger, lvlTalkative, actDataTransfer,
, act(*logger, lvlTalkative, actFileTransfer,
fmt(request.data ? "uploading '%s'" : "downloading '%s'", request.uri),
{request.uri}, request.parentAct)
, callback(std::move(callback))
@ -106,13 +106,13 @@ struct curlDataTransfer : public DataTransfer
{
if (req) {
if (active)
curl_multi_remove_handle(dataTransfer.curlm, req);
curl_multi_remove_handle(fileTransfer.curlm, req);
curl_easy_cleanup(req);
}
if (requestHeaders) curl_slist_free_all(requestHeaders);
try {
if (!done)
fail(DataTransferError(Interrupted, format("download of '%s' was interrupted") % request.uri));
fail(FileTransferError(Interrupted, format("download of '%s' was interrupted") % request.uri));
} catch (...) {
ignoreException();
}
@ -257,12 +257,12 @@ struct curlDataTransfer : public DataTransfer
curl_easy_setopt(req, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(req, CURLOPT_USERAGENT,
("curl/" LIBCURL_VERSION " Nix/" + nixVersion +
(dataTransferSettings.userAgentSuffix != "" ? " " + dataTransferSettings.userAgentSuffix.get() : "")).c_str());
(fileTransferSettings.userAgentSuffix != "" ? " " + fileTransferSettings.userAgentSuffix.get() : "")).c_str());
#if LIBCURL_VERSION_NUM >= 0x072b00
curl_easy_setopt(req, CURLOPT_PIPEWAIT, 1);
#endif
#if LIBCURL_VERSION_NUM >= 0x072f00
if (dataTransferSettings.enableHttp2)
if (fileTransferSettings.enableHttp2)
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
else
curl_easy_setopt(req, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1);
@ -297,10 +297,10 @@ struct curlDataTransfer : public DataTransfer
curl_easy_setopt(req, CURLOPT_SSL_VERIFYHOST, 0);
}
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, dataTransferSettings.connectTimeout.get());
curl_easy_setopt(req, CURLOPT_CONNECTTIMEOUT, fileTransferSettings.connectTimeout.get());
curl_easy_setopt(req, CURLOPT_LOW_SPEED_LIMIT, 1L);
curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, dataTransferSettings.stalledDownloadTimeout.get());
curl_easy_setopt(req, CURLOPT_LOW_SPEED_TIME, fileTransferSettings.stalledDownloadTimeout.get());
/* If no file exist in the specified path, curl continues to work
anyway as if netrc support was disabled. */
@ -401,14 +401,14 @@ struct curlDataTransfer : public DataTransfer
auto exc =
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? DataTransferError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
? FileTransferError(Interrupted, fmt("%s of '%s' was interrupted", request.verb(), request.uri))
: httpStatus != 0
? DataTransferError(err,
? FileTransferError(err,
fmt("unable to %s '%s': HTTP error %d",
request.verb(), request.uri, httpStatus)
+ (code == CURLE_OK ? "" : fmt(" (curl error: %s)", curl_easy_strerror(code)))
)
: DataTransferError(err,
: FileTransferError(err,
fmt("unable to %s '%s': %s (%d)",
request.verb(), request.uri, curl_easy_strerror(code), code));
@ -422,13 +422,13 @@ struct curlDataTransfer : public DataTransfer
|| writtenToSink == 0
|| (acceptRanges && encoding.empty())))
{
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(dataTransfer.mt19937));
int ms = request.baseRetryTimeMs * std::pow(2.0f, attempt - 1 + std::uniform_real_distribution<>(0.0, 0.5)(fileTransfer.mt19937));
if (writtenToSink)
warn("%s; retrying from offset %d in %d ms", exc.what(), writtenToSink, ms);
else
warn("%s; retrying in %d ms", exc.what(), ms);
embargo = std::chrono::steady_clock::now() + std::chrono::milliseconds(ms);
dataTransfer.enqueueItem(shared_from_this());
fileTransfer.enqueueItem(shared_from_this());
}
else
fail(exc);
@ -456,7 +456,7 @@ struct curlDataTransfer : public DataTransfer
std::thread workerThread;
curlDataTransfer()
curlFileTransfer()
: mt19937(rd())
{
static std::once_flag globalInit;
@ -469,7 +469,7 @@ struct curlDataTransfer : public DataTransfer
#endif
#if LIBCURL_VERSION_NUM >= 0x071e00 // Max connections requires >= 7.30.0
curl_multi_setopt(curlm, CURLMOPT_MAX_TOTAL_CONNECTIONS,
dataTransferSettings.httpConnections.get());
fileTransferSettings.httpConnections.get());
#endif
wakeupPipe.create();
@ -478,7 +478,7 @@ struct curlDataTransfer : public DataTransfer
workerThread = std::thread([&]() { workerThreadEntry(); });
}
~curlDataTransfer()
~curlFileTransfer()
{
stopWorkerThread();
@ -641,8 +641,8 @@ struct curlDataTransfer : public DataTransfer
}
#endif
void enqueueDataTransfer(const DataTransferRequest & request,
Callback<DataTransferResult> callback) override
void enqueueFileTransfer(const FileTransferRequest & request,
Callback<FileTransferResult> callback) override
{
/* Ugly hack to support s3:// URIs. */
if (hasPrefix(request.uri, "s3://")) {
@ -660,9 +660,9 @@ struct curlDataTransfer : public DataTransfer
// FIXME: implement ETag
auto s3Res = s3Helper.getObject(bucketName, key);
DataTransferResult res;
FileTransferResult res;
if (!s3Res.data)
throw DataTransferError(NotFound, fmt("S3 object '%s' does not exist", request.uri));
throw FileTransferError(NotFound, fmt("S3 object '%s' does not exist", request.uri));
res.data = s3Res.data;
callback(std::move(res));
#else
@ -676,22 +676,22 @@ struct curlDataTransfer : public DataTransfer
}
};
ref<DataTransfer> getDataTransfer()
ref<FileTransfer> getFileTransfer()
{
static ref<DataTransfer> dataTransfer = makeDataTransfer();
return dataTransfer;
static ref<FileTransfer> fileTransfer = makeFileTransfer();
return fileTransfer;
}
ref<DataTransfer> makeDataTransfer()
ref<FileTransfer> makeFileTransfer()
{
return make_ref<curlDataTransfer>();
return make_ref<curlFileTransfer>();
}
std::future<DataTransferResult> DataTransfer::enqueueDataTransfer(const DataTransferRequest & request)
std::future<FileTransferResult> FileTransfer::enqueueFileTransfer(const FileTransferRequest & request)
{
auto promise = std::make_shared<std::promise<DataTransferResult>>();
enqueueDataTransfer(request,
{[promise](std::future<DataTransferResult> fut) {
auto promise = std::make_shared<std::promise<FileTransferResult>>();
enqueueFileTransfer(request,
{[promise](std::future<FileTransferResult> fut) {
try {
promise->set_value(fut.get());
} catch (...) {
@ -701,21 +701,21 @@ std::future<DataTransferResult> DataTransfer::enqueueDataTransfer(const DataTran
return promise->get_future();
}
DataTransferResult DataTransfer::download(const DataTransferRequest & request)
FileTransferResult FileTransfer::download(const FileTransferRequest & request)
{
return enqueueDataTransfer(request).get();
return enqueueFileTransfer(request).get();
}
DataTransferResult DataTransfer::upload(const DataTransferRequest & request)
FileTransferResult FileTransfer::upload(const FileTransferRequest & request)
{
/* Note: this method is the same as download, but helps in readability */
return enqueueDataTransfer(request).get();
return enqueueFileTransfer(request).get();
}
void DataTransfer::download(DataTransferRequest && request, Sink & sink)
void FileTransfer::download(FileTransferRequest && request, Sink & sink)
{
/* Note: we can't call 'sink' via request.dataCallback, because
that would cause the sink to execute on the dataTransfer
that would cause the sink to execute on the fileTransfer
thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much.
@ -761,8 +761,8 @@ void DataTransfer::download(DataTransferRequest && request, Sink & sink)
state->avail.notify_one();
};
enqueueDataTransfer(request,
{[_state](std::future<DataTransferResult> fut) {
enqueueFileTransfer(request,
{[_state](std::future<FileTransferResult> fut) {
auto state(_state->lock());
state->quit = true;
try {