1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-11-08 19:46:02 +01:00

Merge pull request #14390 from NixOS/constant-memory-uploads

libstore: Make HTTP binary cache uploads run in constant memory
This commit is contained in:
John Ericson 2025-10-29 23:14:42 +00:00 committed by GitHub
commit bffbdcfddc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 144 additions and 62 deletions

View file

@ -219,7 +219,9 @@ std::vector<nlohmann::json> Fetch::fetchUrls(const std::vector<Pointer> & pointe
nlohmann::json oidList = pointerToPayload(pointers);
nlohmann::json data = {{"operation", "download"}};
data["objects"] = oidList;
request.data = data.dump();
auto payload = data.dump();
StringSource source{payload};
request.data = {source};
FileTransferResult result = getFileTransfer()->upload(request);
auto responseString = result.data;

View file

@ -79,7 +79,8 @@ std::optional<std::string> BinaryCacheStore::getNixCacheInfo()
void BinaryCacheStore::upsertFile(
const std::string & path, std::string && data, const std::string & mimeType, uint64_t sizeHint)
{
upsertFile(path, std::make_shared<std::stringstream>(std::move(data)), mimeType, sizeHint);
auto source = restartableSourceFromFactory([data = std::move(data)]() { return make_unique<StringSource>(data); });
upsertFile(path, *source, mimeType, sizeHint);
}
void BinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
@ -271,12 +272,19 @@ ref<const ValidPathInfo> BinaryCacheStore::addToStoreCommon(
/* Atomically write the NAR file. */
if (repair || !fileExists(narInfo->url)) {
auto source = restartableSourceFromFactory([fnTemp]() {
struct AutoCloseFDSource : AutoCloseFD, FdSource
{
AutoCloseFDSource(AutoCloseFD fd)
: AutoCloseFD(std::move(fd))
, FdSource(get())
{
}
};
return std::make_unique<AutoCloseFDSource>(toDescriptor(open(fnTemp.c_str(), O_RDONLY)));
});
stats.narWrite++;
upsertFile(
narInfo->url,
std::make_shared<std::fstream>(fnTemp, std::ios_base::in | std::ios_base::binary),
"application/x-nix-nar",
narInfo->fileSize);
upsertFile(narInfo->url, *source, "application/x-nix-nar", narInfo->fileSize);
} else
stats.narWriteAverted++;

View file

@ -295,20 +295,17 @@ struct curlFileTransfer : public FileTransfer
return 0;
}
size_t readOffset = 0;
size_t readCallback(char * buffer, size_t size, size_t nitems)
{
if (readOffset == request.data->length())
return 0;
auto count = std::min(size * nitems, request.data->length() - readOffset);
assert(count);
memcpy(buffer, request.data->data() + readOffset, count);
readOffset += count;
return count;
size_t readCallback(char * buffer, size_t size, size_t nitems) noexcept
try {
auto data = request.data;
return data->source->read(buffer, nitems * size);
} catch (EndOfFile &) {
return 0;
} catch (...) {
return CURL_READFUNC_ABORT;
}
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp)
static size_t readCallbackWrapper(char * buffer, size_t size, size_t nitems, void * userp) noexcept
{
return ((TransferItem *) userp)->readCallback(buffer, size, nitems);
}
@ -322,19 +319,24 @@ struct curlFileTransfer : public FileTransfer
}
#endif
size_t seekCallback(curl_off_t offset, int origin)
{
size_t seekCallback(curl_off_t offset, int origin) noexcept
try {
auto source = request.data->source;
if (origin == SEEK_SET) {
readOffset = offset;
source->restart();
source->skip(offset);
} else if (origin == SEEK_CUR) {
readOffset += offset;
source->skip(offset);
} else if (origin == SEEK_END) {
readOffset = request.data->length() + offset;
NullSink sink{};
source->drainInto(sink);
}
return CURL_SEEKFUNC_OK;
} catch (...) {
return CURL_SEEKFUNC_FAIL;
}
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin)
static size_t seekCallbackWrapper(void * clientp, curl_off_t offset, int origin) noexcept
{
return ((TransferItem *) clientp)->seekCallback(offset, origin);
}
@ -393,10 +395,10 @@ struct curlFileTransfer : public FileTransfer
if (request.data) {
if (request.method == HttpMethod::POST) {
curl_easy_setopt(req, CURLOPT_POST, 1L);
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->length());
curl_easy_setopt(req, CURLOPT_POSTFIELDSIZE_LARGE, (curl_off_t) request.data->sizeHint);
} else if (request.method == HttpMethod::PUT) {
curl_easy_setopt(req, CURLOPT_UPLOAD, 1L);
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->length());
curl_easy_setopt(req, CURLOPT_INFILESIZE_LARGE, (curl_off_t) request.data->sizeHint);
} else {
unreachable();
}

View file

@ -135,23 +135,28 @@ bool HttpBinaryCacheStore::fileExists(const std::string & path)
}
void HttpBinaryCacheStore::upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint)
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
{
auto req = makeRequest(path);
req.method = HttpMethod::PUT;
auto data = StreamToSourceAdapter(istream).drain();
auto compressionMethod = getCompressionMethod(path);
std::string data;
std::optional<StringSource> stringSource{};
if (compressionMethod) {
data = compress(*compressionMethod, data);
StringSink sink{};
auto compressionSink = makeCompressionSink(*compressionMethod, sink);
source.drainInto(*compressionSink);
compressionSink->finish();
data = std::move(sink.s);
req.headers.emplace_back("Content-Encoding", *compressionMethod);
stringSource = StringSource{data};
req.data = {*stringSource};
} else {
req.data = {sizeHint, source};
}
req.data = std::move(data);
req.mimeType = mimeType;
try {

View file

@ -101,10 +101,7 @@ public:
virtual bool fileExists(const std::string & path) = 0;
virtual void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) = 0;
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) = 0;
void upsertFile(
const std::string & path,

View file

@ -115,7 +115,26 @@ struct FileTransferRequest
unsigned int baseRetryTimeMs = RETRY_TIME_MS_DEFAULT;
ActivityId parentAct;
bool decompress = true;
std::optional<std::string> data;
struct UploadData
{
UploadData(StringSource & s)
: sizeHint(s.s.length())
, source(&s)
{
}
UploadData(std::size_t sizeHint, RestartableSource & source)
: sizeHint(sizeHint)
, source(&source)
{
}
std::size_t sizeHint = 0;
RestartableSource * source = nullptr;
};
std::optional<UploadData> data;
std::string mimeType;
std::function<void(std::string_view data)> dataCallback;
/**

View file

@ -81,10 +81,7 @@ protected:
bool fileExists(const std::string & path) override;
void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) override;
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
FileTransferRequest makeRequest(std::string_view path);

View file

@ -54,16 +54,12 @@ protected:
bool fileExists(const std::string & path) override;
void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) override
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override
{
auto path2 = config->binaryCacheDir + "/" + path;
static std::atomic<int> counter{0};
Path tmp = fmt("%s.tmp.%d.%d", path2, getpid(), ++counter);
AutoDelete del(tmp, false);
StreamToSourceAdapter source(istream);
writeFile(tmp, source);
std::filesystem::rename(tmp, path2);
del.cancel();

View file

@ -21,10 +21,7 @@ public:
}
void upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint) override;
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) override;
private:
ref<S3BinaryCacheStoreConfig> s3Config;
@ -70,12 +67,9 @@ private:
};
void S3BinaryCacheStore::upsertFile(
const std::string & path,
std::shared_ptr<std::basic_iostream<char>> istream,
const std::string & mimeType,
uint64_t sizeHint)
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
{
HttpBinaryCacheStore::upsertFile(path, istream, mimeType, sizeHint);
HttpBinaryCacheStore::upsertFile(path, source, mimeType, sizeHint);
}
std::string S3BinaryCacheStore::createMultipartUpload(
@ -92,7 +86,8 @@ std::string S3BinaryCacheStore::createMultipartUpload(
req.uri = VerbatimURL(url);
req.method = HttpMethod::POST;
req.data = "";
StringSource payload{std::string_view("")};
req.data = {payload};
req.mimeType = mimeType;
if (contentEncoding) {
@ -122,7 +117,8 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
url.query["partNumber"] = std::to_string(partNumber);
url.query["uploadId"] = uploadId;
req.uri = VerbatimURL(url);
req.data = std::move(data);
StringSource payload{data};
req.data = {payload};
req.mimeType = "application/octet-stream";
auto result = getFileTransfer()->enqueueFileTransfer(req).get();
@ -169,7 +165,8 @@ void S3BinaryCacheStore::completeMultipartUpload(
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml);
req.data = xml;
StringSource payload{xml};
req.data = {payload};
req.mimeType = "text/xml";
getFileTransfer()->enqueueFileTransfer(req).get();

View file

@ -230,10 +230,18 @@ struct StringSink : Sink
void operator()(std::string_view data) override;
};
/**
* Source type that can be restarted.
*/
struct RestartableSource : Source
{
virtual void restart() = 0;
};
/**
* A source that reads data from a string.
*/
struct StringSource : Source
struct StringSource : RestartableSource
{
std::string_view s;
size_t pos;
@ -257,8 +265,22 @@ struct StringSource : Source
size_t read(char * data, size_t len) override;
void skip(size_t len) override;
void restart() override
{
pos = 0;
}
};
/**
* Create a restartable Source from a factory function.
*
* @param factory Factory function that returns a fresh instance of the Source. Gets
* called for each source restart.
* @pre factory must return an equivalent source for each invocation.
*/
std::unique_ptr<RestartableSource> restartableSourceFromFactory(std::function<std::unique_ptr<Source>()> factory);
/**
* A sink that writes all incoming data to two other sinks.
*/

View file

@ -513,4 +513,41 @@ size_t ChainSource::read(char * data, size_t len)
}
}
std::unique_ptr<RestartableSource> restartableSourceFromFactory(std::function<std::unique_ptr<Source>()> factory)
{
struct RestartableSourceImpl : RestartableSource
{
RestartableSourceImpl(decltype(factory) factory_)
: factory_(std::move(factory_))
, impl(this->factory_())
{
}
decltype(factory) factory_;
std::unique_ptr<Source> impl = factory_();
size_t read(char * data, size_t len) override
{
return impl->read(data, len);
}
bool good() override
{
return impl->good();
}
void skip(size_t len) override
{
return impl->skip(len);
}
void restart() override
{
impl = factory_();
}
};
return std::make_unique<RestartableSourceImpl>(std::move(factory));
}
} // namespace nix