mirror of
https://github.com/NixOS/nix.git
synced 2025-11-08 19:46:02 +01:00
libstore: Make uploads with filetransfer.cc consume a RestartableSource
Make uploads run in constant memory. Also change the callbacks to be noexcept, since we really don't want to be unwinding the stack in the curl thread. That will definitely corrupt that stack and make nix/curl crash in very bad ways.
This commit is contained in:
parent
b8d7f551e4
commit
cf75079bd8
9 changed files with 87 additions and 42 deletions
|
|
@ -219,7 +219,9 @@ std::vector<nlohmann::json> Fetch::fetchUrls(const std::vector<Pointer> & pointe
|
||||||
nlohmann::json oidList = pointerToPayload(pointers);
|
nlohmann::json oidList = pointerToPayload(pointers);
|
||||||
nlohmann::json data = {{"operation", "download"}};
|
nlohmann::json data = {{"operation", "download"}};
|
||||||
data["objects"] = oidList;
|
data["objects"] = oidList;
|
||||||
request.data = data.dump();
|
auto payload = data.dump();
|
||||||
|
StringSource source{payload};
|
||||||
|
request.data = {source};
|
||||||
|
|
||||||
FileTransferResult result = getFileTransfer()->upload(request);
|
FileTransferResult result = getFileTransfer()->upload(request);
|
||||||
auto responseString = result.data;
|
auto responseString = result.data;
|
||||||
|
|
|
||||||
|
|
@ -79,8 +79,8 @@ std::optional<std::string> BinaryCacheStore::getNixCacheInfo()
|
||||||
void BinaryCacheStore::upsertFile(
|
void BinaryCacheStore::upsertFile(
|
||||||
const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint)
|
const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint)
|
||||||
{
|
{
|
||||||
StringSource source{data};
|
auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique<StringSource>(data); });
|
||||||
upsertFile(path, source, mimeType, sizeHint);
|
upsertFile(path, *source, mimeType, sizeHint);
|
||||||
}
|
}
|
||||||
|
|
||||||
void BinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
|
void BinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
|
||||||
|
|
@ -272,10 +272,19 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
|
||||||
|
|
||||||
/* Atomically write the NAR file. */
|
/* Atomically write the NAR file. */
|
||||||
if (repair || !fileExists(narInfo->url)) {
|
if (repair || !fileExists(narInfo->url)) {
|
||||||
AutoCloseFD fd = toDescriptor(open(fnTemp.c_str(), O_RDONLY));
|
auto source = restartableSourceFromFactory([fnTemp]() {
|
||||||
FdSource source{fd.get()};
|
struct AutoCloseFDSource : AutoCloseFD, FdSource
|
||||||
|
{
|
||||||
|
AutoCloseFDSource(AutoCloseFD fd)
|
||||||
|
: AutoCloseFD(std::move(fd))
|
||||||
|
, FdSource(get())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return std::make_unique<AutoCloseFDSource>(toDescriptor(open(fnTemp.c_str(), O_RDONLY)));
|
||||||
|
});
|
||||||
stats.narWrite++;
|
stats.narWrite++;
|
||||||
upsertFile(narInfo->url, source, "application/x-nix-nar", narInfo->fileSize);
|
upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize);
|
||||||
} else
|
} else
|
||||||
stats.narWriteAverted++;
|
stats.narWriteAverted++;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t readOffset = 0;
|
size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept
|
||||||
|
try {
|
||||||
size_t readCallback(char * buffer, size_t size, size_t nitems)
|
auto data = request.data;
|
||||||
{
|
return data->source->read(buffer, nitems * size);
|
||||||
if (readOffset == request.data->length())
|
} catch (EndOfFile &) {
|
||||||
return 0;
|
return 0;
|
||||||
auto count = std::min(size * nitems, request.data->length() - readOffset);
|
} catch (...) {
|
||||||
assert(count);
|
return CURL_READFUNC_ABORT;
|
||||||
memcpy(buffer, request.data->data() + readOffset, count);
|
|
||||||
readOffset += count;
|
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp)
|
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept
|
||||||
{
|
{
|
||||||
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
|
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
|
||||||
}
|
}
|
||||||
|
|
@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
size_t seekCallback(curl_off_t offset, int origin)
|
size_t seekCallback(curl_off_t offset, int origin) noexcept
|
||||||
{
|
try {
|
||||||
|
auto source = request.data->source;
|
||||||
if (origin == SEEK_SET) {
|
if (origin == SEEK_SET) {
|
||||||
readOffset = offset;
|
source->restart();
|
||||||
|
source->skip(offset);
|
||||||
} else if (origin == SEEK_CUR) {
|
} else if (origin == SEEK_CUR) {
|
||||||
readOffset += offset;
|
source->skip(offset);
|
||||||
} else if (origin == SEEK_END) {
|
} else if (origin == SEEK_END) {
|
||||||
readOffset = request.data->length() + offset;
|
NullSink sink{};
|
||||||
|
source->drainInto(sink);
|
||||||
}
|
}
|
||||||
return CURL_SEEKFUNC_OK;
|
return CURL_SEEKFUNC_OK;
|
||||||
|
} catch (...) {
|
||||||
|
return CURL_SEEKFUNC_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin)
|
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept
|
||||||
{
|
{
|
||||||
return ((TransferItem *) clientp)->seekCallback(offset, origin);
|
return ((TransferItem *) clientp)->seekCallback(offset, origin);
|
||||||
}
|
}
|
||||||
|
|
@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer
|
||||||
if (request.data) {
|
if (request.data) {
|
||||||
if (request.method == HttpMethod::POST) {
|
if (request.method == HttpMethod::POST) {
|
||||||
curl_easy_setopt(req, CURLOPT_POST, 1L);
|
curl_easy_setopt(req, CURLOPT_POST, 1L);
|
||||||
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length());
|
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint);
|
||||||
} else if (request.method == HttpMethod::PUT) {
|
} else if (request.method == HttpMethod::PUT) {
|
||||||
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
|
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
|
||||||
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
|
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint);
|
||||||
} else {
|
} else {
|
||||||
unreachable();
|
unreachable();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -135,19 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path)
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpBinaryCacheStore::upsertFile(
|
void HttpBinaryCacheStore::upsertFile(
|
||||||
const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint)
|
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
|
||||||
{
|
{
|
||||||
auto req = makeRequest(path);
|
auto req = makeRequest(path);
|
||||||
req.method = HttpMethod::PUT;
|
req.method = HttpMethod::PUT;
|
||||||
auto data = source.drain();
|
|
||||||
auto compressionMethod = getCompressionMethod(path);
|
auto compressionMethod = getCompressionMethod(path);
|
||||||
|
|
||||||
|
std::string data;
|
||||||
|
std::optional<StringSource> stringSource{};
|
||||||
|
|
||||||
if (compressionMethod) {
|
if (compressionMethod) {
|
||||||
data = compress(*compressionMethod, data);
|
StringSink sink{};
|
||||||
|
auto compressionSink = makeCompressionSink(*compressionMethod, sink);
|
||||||
|
source.drainInto(*compressionSink);
|
||||||
|
compressionSink->finish();
|
||||||
|
data = std::move(sink.s);
|
||||||
req.headers.emplace_back("Content-Encoding", *compressionMethod);
|
req.headers.emplace_back("Content-Encoding", *compressionMethod);
|
||||||
|
stringSource = StringSource{data};
|
||||||
|
req.data = {*stringSource};
|
||||||
|
} else {
|
||||||
|
req.data = {sizeHint, source};
|
||||||
}
|
}
|
||||||
|
|
||||||
req.data = std::move(data);
|
|
||||||
req.mimeType = mimeType;
|
req.mimeType = mimeType;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
|
|
@ -100,8 +100,8 @@ public:
|
||||||
|
|
||||||
virtual bool fileExists(const std::string & path) = 0;
|
virtual bool fileExists(const std::string & path) = 0;
|
||||||
|
|
||||||
virtual void
|
virtual void upsertFile(
|
||||||
upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) = 0;
|
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0;
|
||||||
|
|
||||||
void upsertFile(
|
void upsertFile(
|
||||||
const std::string & path,
|
const std::string & path,
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,26 @@ struct FileTransferRequest
|
||||||
unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT;
|
unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT;
|
||||||
ActivityId parentAct;
|
ActivityId parentAct;
|
||||||
bool decompress = true;
|
bool decompress = true;
|
||||||
std::optional<std::string> data;
|
|
||||||
|
struct UploadData
|
||||||
|
{
|
||||||
|
UploadData(StringSource & s)
|
||||||
|
: sizeHint(s.s.length())
|
||||||
|
, source(&s)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
UploadData(std::size_t sizeHint, RestartableSource & source)
|
||||||
|
: sizeHint(sizeHint)
|
||||||
|
, source(&source)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t sizeHint = 0;
|
||||||
|
RestartableSource * source = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
std::optional<UploadData> data;
|
||||||
std::string mimeType;
|
std::string mimeType;
|
||||||
std::function<void(std::string_view data)> dataCallback;
|
std::function<void(std::string_view data)> dataCallback;
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -80,8 +80,8 @@ protected:
|
||||||
|
|
||||||
bool fileExists(const std::string & path) override;
|
bool fileExists(const std::string & path) override;
|
||||||
|
|
||||||
void
|
void upsertFile(
|
||||||
upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override;
|
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
|
||||||
|
|
||||||
FileTransferRequest makeRequest(std::string_view path);
|
FileTransferRequest makeRequest(std::string_view path);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,8 @@ protected:
|
||||||
|
|
||||||
bool fileExists(const std::string & path) override;
|
bool fileExists(const std::string & path) override;
|
||||||
|
|
||||||
void upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override
|
void upsertFile(
|
||||||
|
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override
|
||||||
{
|
{
|
||||||
auto path2 = config->binaryCacheDir + "/" + path;
|
auto path2 = config->binaryCacheDir + "/" + path;
|
||||||
static std::atomic<int> counter{0};
|
static std::atomic<int> counter{0};
|
||||||
|
|
|
||||||
|
|
@ -20,8 +20,8 @@ public:
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void upsertFile(
|
||||||
upsertFile(const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint) override;
|
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ref<S3BinaryCacheStoreConfig> s3Config;
|
ref<S3BinaryCacheStoreConfig> s3Config;
|
||||||
|
|
@ -67,7 +67,7 @@ private:
|
||||||
};
|
};
|
||||||
|
|
||||||
void S3BinaryCacheStore::upsertFile(
|
void S3BinaryCacheStore::upsertFile(
|
||||||
const std::string & path, Source & source, const std::string & mimeType, uint64_t sizeHint)
|
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
|
||||||
{
|
{
|
||||||
HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint);
|
HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint);
|
||||||
}
|
}
|
||||||
|
|
@ -86,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload(
|
||||||
req.uri = VerbatimURL(url);
|
req.uri = VerbatimURL(url);
|
||||||
|
|
||||||
req.method = HttpMethod::POST;
|
req.method = HttpMethod::POST;
|
||||||
req.data = "";
|
StringSource payload{std::string_view("")};
|
||||||
|
req.data = {payload};
|
||||||
req.mimeType = mimeType;
|
req.mimeType = mimeType;
|
||||||
|
|
||||||
if (contentEncoding) {
|
if (contentEncoding) {
|
||||||
|
|
@ -116,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
|
||||||
url.query["partNumber"] = std::to_string(partNumber);
|
url.query["partNumber"] = std::to_string(partNumber);
|
||||||
url.query["uploadId"] = uploadId;
|
url.query["uploadId"] = uploadId;
|
||||||
req.uri = VerbatimURL(url);
|
req.uri = VerbatimURL(url);
|
||||||
req.data = std::move(data);
|
StringSource payload{data};
|
||||||
|
req.data = {payload};
|
||||||
req.mimeType = "application/octet-stream";
|
req.mimeType = "application/octet-stream";
|
||||||
|
|
||||||
auto result = getFileTransfer()->enqueueFileTransfer(req).get();
|
auto result = getFileTransfer()->enqueueFileTransfer(req).get();
|
||||||
|
|
@ -163,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload(
|
||||||
|
|
||||||
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);
|
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);
|
||||||
|
|
||||||
req.data = xml;
|
StringSource payload{xml};
|
||||||
|
req.data = {payload};
|
||||||
req.mimeType = "text/xml";
|
req.mimeType = "text/xml";
|
||||||
|
|
||||||
getFileTransfer()->enqueueFileTransfer(req).get();
|
getFileTransfer()->enqueueFileTransfer(req).get();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue