1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-11-08 19:46:02 +01:00

Merge pull request #14330 from lovesegfault/nix-s3-multipart

feat(libstore): add support for multipart s3 uploads
This commit is contained in:
John Ericson 2025-11-03 18:36:48 +00:00 committed by GitHub
commit 34ac1792f9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 269 additions and 45 deletions

View file

@ -1,6 +1,6 @@
--- ---
synopsis: "Improved S3 binary cache support via HTTP" synopsis: "Improved S3 binary cache support via HTTP"
prs: [13823, 14026, 14120, 14131, 14135, 14144, 14170, 14190, 14198, 14206, 14209, 14222, 14223, 13752] prs: [13752, 13823, 14026, 14120, 14131, 14135, 14144, 14170, 14190, 14198, 14206, 14209, 14222, 14223, 14330, 14333, 14335, 14336, 14337, 14350, 14356, 14357, 14374, 14375, 14376, 14377, 14391, 14393, 14420, 14421]
issues: [13084, 12671, 11748, 12403] issues: [13084, 12671, 11748, 12403]
--- ---
@ -18,9 +18,23 @@ improvements:
The new implementation requires curl >= 7.75.0 and `aws-crt-cpp` for credential The new implementation requires curl >= 7.75.0 and `aws-crt-cpp` for credential
management. management.
All existing S3 URL formats and parameters remain supported, with the notable All existing S3 URL formats and parameters remain supported, however the store
exception of multi-part uploads, which are no longer supported. settings for configuring multipart uploads have changed:
- **`multipart-upload`** (default: `false`): Enable multipart uploads for large
files. When enabled, files exceeding the multipart threshold will be uploaded
in multiple parts.
- **`multipart-threshold`** (default: `100 MiB`): Minimum file size for using
multipart uploads. Files smaller than this will use regular PUT requests.
Only takes effect when `multipart-upload` is enabled.
- **`multipart-chunk-size`** (default: `5 MiB`): Size of each part in multipart
uploads. Must be at least 5 MiB (AWS S3 requirement). Larger chunk sizes
reduce the number of requests but use more memory.
- **`buffer-size`**: Has been replaced by `multipart-chunk-size` and is now an alias to it.
Note that this change also means Nix now supports S3 binary cache stores even Note that this change also means Nix now supports S3 binary cache stores even
if build without `aws-crt-cpp`, but only for public buckets which do not if built without `aws-crt-cpp`, but only for public buckets which do not
require auth. require authentication.

View file

@ -61,6 +61,38 @@ struct S3BinaryCacheStoreConfig : HttpBinaryCacheStoreConfig
> addressing instead of virtual host based addressing. > addressing instead of virtual host based addressing.
)"}; )"};
const Setting<bool> multipartUpload{
this,
false,
"multipart-upload",
R"(
Whether to use multipart uploads for large files. When enabled,
files exceeding the multipart threshold will be uploaded in
multiple parts, which is required for files larger than 5 GiB and
can improve performance and reliability for large uploads.
)"};
const Setting<uint64_t> multipartChunkSize{
this,
5 * 1024 * 1024,
"multipart-chunk-size",
R"(
The size (in bytes) of each part in multipart uploads. Must be
at least 5 MiB (AWS S3 requirement). Larger chunk sizes reduce the
number of requests but use more memory. Default is 5 MiB.
)",
{"buffer-size"}};
const Setting<uint64_t> multipartThreshold{
this,
100 * 1024 * 1024,
"multipart-threshold",
R"(
The minimum file size (in bytes) for using multipart uploads.
Files smaller than this threshold will use regular PUT requests.
Default is 100 MiB. Only takes effect when multipart-upload is enabled.
)"};
/** /**
* Set of settings that are part of the S3 URI itself. * Set of settings that are part of the S3 URI itself.
* These are needed for region specification and other S3-specific settings. * These are needed for region specification and other S3-specific settings.

View file

@ -7,6 +7,7 @@
#include "nix/util/util.hh" #include "nix/util/util.hh"
#include <cassert> #include <cassert>
#include <cstring>
#include <ranges> #include <ranges>
#include <regex> #include <regex>
#include <span> #include <span>
@ -15,7 +16,9 @@ namespace nix {
MakeError(UploadToS3, Error); MakeError(UploadToS3, Error);
static constexpr uint64_t AWS_MIN_PART_SIZE = 5 * 1024 * 1024; // 5MiB
static constexpr uint64_t AWS_MAX_PART_SIZE = 5ULL * 1024 * 1024 * 1024; // 5GiB static constexpr uint64_t AWS_MAX_PART_SIZE = 5ULL * 1024 * 1024 * 1024; // 5GiB
static constexpr uint64_t AWS_MAX_PART_COUNT = 10000;
class S3BinaryCacheStore : public virtual HttpBinaryCacheStore class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
{ {
@ -50,9 +53,48 @@ private:
std::optional<std::string_view> contentEncoding); std::optional<std::string_view> contentEncoding);
/** /**
* Uploads a file to S3 (CompressedSource overload). * Uploads a file to S3 using multipart upload.
*
* This method is suitable for large files that exceed the multipart threshold.
* It orchestrates the complete multipart upload process: creating the upload,
* splitting the data into parts, uploading each part, and completing the upload.
* If any error occurs, the multipart upload is automatically aborted.
*
* @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html
*/ */
void upload(std::string_view path, CompressedSource & source, std::string_view mimeType); void uploadMultipart(
std::string_view path,
RestartableSource & source,
uint64_t sizeHint,
std::string_view mimeType,
std::optional<std::string_view> contentEncoding);
/**
* A Sink that manages a complete S3 multipart upload lifecycle.
* Creates the upload on construction, buffers and uploads chunks as data arrives,
* and completes or aborts the upload appropriately.
*/
struct MultipartSink : Sink
{
S3BinaryCacheStore & store;
std::string_view path;
std::string uploadId;
std::string::size_type chunkSize;
std::vector<std::string> partEtags;
std::string buffer;
MultipartSink(
S3BinaryCacheStore & store,
std::string_view path,
uint64_t sizeHint,
std::string_view mimeType,
std::optional<std::string_view> contentEncoding);
void operator()(std::string_view data) override;
void finish();
void uploadChunk(std::string chunk);
};
/** /**
* Creates a multipart upload for large objects to S3. * Creates a multipart upload for large objects to S3.
@ -72,18 +114,13 @@ private:
*/ */
std::string uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data); std::string uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data);
struct UploadedPart
{
uint64_t partNumber;
std::string etag;
};
/** /**
* Completes a multipart upload by combining all uploaded parts. * Completes a multipart upload by combining all uploaded parts.
* @see * @see
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_RequestSyntax * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_RequestSyntax
*/ */
void completeMultipartUpload(std::string_view key, std::string_view uploadId, std::span<const UploadedPart> parts); void
completeMultipartUpload(std::string_view key, std::string_view uploadId, std::span<const std::string> partEtags);
/** /**
* Abort a multipart upload * Abort a multipart upload
@ -91,17 +128,31 @@ private:
* @see * @see
* https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html#API_AbortMultipartUpload_RequestSyntax * https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html#API_AbortMultipartUpload_RequestSyntax
*/ */
void abortMultipartUpload(std::string_view key, std::string_view uploadId); void abortMultipartUpload(std::string_view key, std::string_view uploadId) noexcept;
}; };
void S3BinaryCacheStore::upsertFile( void S3BinaryCacheStore::upsertFile(
const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint)
{ {
if (auto compressionMethod = getCompressionMethod(path)) { auto doUpload = [&](RestartableSource & src, uint64_t size, std::optional<std::string_view> encoding) {
CompressedSource compressed(source, *compressionMethod); if (s3Config->multipartUpload && size > s3Config->multipartThreshold) {
upload(path, compressed, mimeType); uploadMultipart(path, src, size, mimeType, encoding);
} else { } else {
upload(path, source, sizeHint, mimeType, std::nullopt); upload(path, src, size, mimeType, encoding);
}
};
try {
if (auto compressionMethod = getCompressionMethod(path)) {
CompressedSource compressed(source, *compressionMethod);
doUpload(compressed, compressed.size(), compressed.getCompressionMethod());
} else {
doUpload(source, sizeHint, std::nullopt);
}
} catch (FileTransferError & e) {
UploadToS3 err(e.message());
err.addTrace({}, "while uploading to S3 binary cache at '%s'", config->cacheUri.to_string());
throw err;
} }
} }
@ -119,18 +170,112 @@ void S3BinaryCacheStore::upload(
renderSize(sizeHint), renderSize(sizeHint),
renderSize(AWS_MAX_PART_SIZE)); renderSize(AWS_MAX_PART_SIZE));
try { HttpBinaryCacheStore::upload(path, source, sizeHint, mimeType, contentEncoding);
HttpBinaryCacheStore::upload(path, source, sizeHint, mimeType, contentEncoding); }
} catch (FileTransferError & e) {
UploadToS3 err(e.message()); void S3BinaryCacheStore::uploadMultipart(
err.addTrace({}, "while uploading to S3 binary cache at '%s'", config->cacheUri.to_string()); std::string_view path,
throw err; RestartableSource & source,
uint64_t sizeHint,
std::string_view mimeType,
std::optional<std::string_view> contentEncoding)
{
debug("using S3 multipart upload for '%s' (%d bytes)", path, sizeHint);
MultipartSink sink(*this, path, sizeHint, mimeType, contentEncoding);
source.drainInto(sink);
sink.finish();
}
S3BinaryCacheStore::MultipartSink::MultipartSink(
S3BinaryCacheStore & store,
std::string_view path,
uint64_t sizeHint,
std::string_view mimeType,
std::optional<std::string_view> contentEncoding)
: store(store)
, path(path)
{
// Calculate chunk size and estimated parts
chunkSize = store.s3Config->multipartChunkSize;
uint64_t estimatedParts = (sizeHint + chunkSize - 1) / chunkSize; // ceil division
if (estimatedParts > AWS_MAX_PART_COUNT) {
// Equivalent to ceil(sizeHint / AWS_MAX_PART_COUNT)
uint64_t minChunkSize = (sizeHint + AWS_MAX_PART_COUNT - 1) / AWS_MAX_PART_COUNT;
if (minChunkSize > AWS_MAX_PART_SIZE) {
throw Error(
"file too large for S3 multipart upload: %s would require chunk size of %s "
"(max %s) to stay within %d part limit",
renderSize(sizeHint),
renderSize(minChunkSize),
renderSize(AWS_MAX_PART_SIZE),
AWS_MAX_PART_COUNT);
}
warn(
"adjusting S3 multipart chunk size from %s to %s "
"to stay within %d part limit for %s file",
renderSize(store.s3Config->multipartChunkSize.get()),
renderSize(minChunkSize),
AWS_MAX_PART_COUNT,
renderSize(sizeHint));
chunkSize = minChunkSize;
estimatedParts = AWS_MAX_PART_COUNT;
}
buffer.reserve(chunkSize);
partEtags.reserve(estimatedParts);
uploadId = store.createMultipartUpload(path, mimeType, contentEncoding);
}
void S3BinaryCacheStore::MultipartSink::operator()(std::string_view data)
{
buffer.append(data);
while (buffer.size() >= chunkSize) {
// Move entire buffer, extract excess, copy back remainder
auto chunk = std::move(buffer);
auto excessSize = chunk.size() > chunkSize ? chunk.size() - chunkSize : 0;
if (excessSize > 0) {
buffer.resize(excessSize);
std::memcpy(buffer.data(), chunk.data() + chunkSize, excessSize);
}
chunk.resize(std::min(chunkSize, chunk.size()));
uploadChunk(std::move(chunk));
} }
} }
void S3BinaryCacheStore::upload(std::string_view path, CompressedSource & source, std::string_view mimeType) void S3BinaryCacheStore::MultipartSink::finish()
{ {
upload(path, static_cast<RestartableSource &>(source), source.size(), mimeType, source.getCompressionMethod()); if (!buffer.empty()) {
uploadChunk(std::move(buffer));
}
try {
if (partEtags.empty()) {
throw Error("no data read from stream");
}
store.completeMultipartUpload(path, uploadId, partEtags);
} catch (Error & e) {
store.abortMultipartUpload(path, uploadId);
e.addTrace({}, "while finishing an S3 multipart upload");
throw;
}
}
void S3BinaryCacheStore::MultipartSink::uploadChunk(std::string chunk)
{
auto partNumber = partEtags.size() + 1;
try {
std::string etag = store.uploadPart(path, uploadId, partNumber, std::move(chunk));
partEtags.push_back(std::move(etag));
} catch (Error & e) {
store.abortMultipartUpload(path, uploadId);
e.addTrace({}, "while uploading part %d of an S3 multipart upload", partNumber);
throw;
}
} }
std::string S3BinaryCacheStore::createMultipartUpload( std::string S3BinaryCacheStore::createMultipartUpload(
@ -170,6 +315,10 @@ std::string S3BinaryCacheStore::createMultipartUpload(
std::string std::string
S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data) S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data)
{ {
if (partNumber > AWS_MAX_PART_COUNT) {
throw Error("S3 multipart upload exceeded %d part limit", AWS_MAX_PART_COUNT);
}
auto req = makeRequest(key); auto req = makeRequest(key);
req.method = HttpMethod::PUT; req.method = HttpMethod::PUT;
req.setupForS3(); req.setupForS3();
@ -188,24 +337,29 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId,
throw Error("S3 UploadPart response missing ETag for part %d", partNumber); throw Error("S3 UploadPart response missing ETag for part %d", partNumber);
} }
debug("Part %d uploaded, ETag: %s", partNumber, result.etag);
return std::move(result.etag); return std::move(result.etag);
} }
void S3BinaryCacheStore::abortMultipartUpload(std::string_view key, std::string_view uploadId) void S3BinaryCacheStore::abortMultipartUpload(std::string_view key, std::string_view uploadId) noexcept
{ {
auto req = makeRequest(key); try {
req.setupForS3(); auto req = makeRequest(key);
req.setupForS3();
auto url = req.uri.parsed(); auto url = req.uri.parsed();
url.query["uploadId"] = uploadId; url.query["uploadId"] = uploadId;
req.uri = VerbatimURL(url); req.uri = VerbatimURL(url);
req.method = HttpMethod::DELETE; req.method = HttpMethod::DELETE;
getFileTransfer()->enqueueFileTransfer(req).get(); getFileTransfer()->enqueueFileTransfer(req).get();
} catch (...) {
ignoreExceptionInDestructor();
}
} }
void S3BinaryCacheStore::completeMultipartUpload( void S3BinaryCacheStore::completeMultipartUpload(
std::string_view key, std::string_view uploadId, std::span<const UploadedPart> parts) std::string_view key, std::string_view uploadId, std::span<const std::string> partEtags)
{ {
auto req = makeRequest(key); auto req = makeRequest(key);
req.setupForS3(); req.setupForS3();
@ -216,21 +370,24 @@ void S3BinaryCacheStore::completeMultipartUpload(
req.method = HttpMethod::POST; req.method = HttpMethod::POST;
std::string xml = "<CompleteMultipartUpload>"; std::string xml = "<CompleteMultipartUpload>";
for (const auto & part : parts) { for (const auto & [idx, etag] : enumerate(partEtags)) {
xml += "<Part>"; xml += "<Part>";
xml += "<PartNumber>" + std::to_string(part.partNumber) + "</PartNumber>"; // S3 part numbers are 1-indexed, but vector indices are 0-indexed
xml += "<ETag>" + part.etag + "</ETag>"; xml += "<PartNumber>" + std::to_string(idx + 1) + "</PartNumber>";
xml += "<ETag>" + etag + "</ETag>";
xml += "</Part>"; xml += "</Part>";
} }
xml += "</CompleteMultipartUpload>"; xml += "</CompleteMultipartUpload>";
debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml); debug("S3 CompleteMultipartUpload XML (%d parts): %s", partEtags.size(), xml);
StringSource payload{xml}; StringSource payload{xml};
req.data = {payload}; req.data = {payload};
req.mimeType = "text/xml"; req.mimeType = "text/xml";
getFileTransfer()->enqueueFileTransfer(req).get(); getFileTransfer()->enqueueFileTransfer(req).get();
debug("S3 multipart upload completed: %d parts uploaded for '%s'", partEtags.size(), key);
} }
StringSet S3BinaryCacheStoreConfig::uriSchemes() StringSet S3BinaryCacheStoreConfig::uriSchemes()
@ -253,6 +410,28 @@ S3BinaryCacheStoreConfig::S3BinaryCacheStoreConfig(
cacheUri.query[key] = value; cacheUri.query[key] = value;
} }
} }
if (multipartChunkSize < AWS_MIN_PART_SIZE) {
throw UsageError(
"multipart-chunk-size must be at least %s, got %s",
renderSize(AWS_MIN_PART_SIZE),
renderSize(multipartChunkSize.get()));
}
if (multipartChunkSize > AWS_MAX_PART_SIZE) {
throw UsageError(
"multipart-chunk-size must be at most %s, got %s",
renderSize(AWS_MAX_PART_SIZE),
renderSize(multipartChunkSize.get()));
}
if (multipartUpload && multipartThreshold < multipartChunkSize) {
warn(
"multipart-threshold (%s) is less than multipart-chunk-size (%s), "
"which may result in single-part multipart uploads",
renderSize(multipartThreshold.get()),
renderSize(multipartChunkSize.get()));
}
} }
std::string S3BinaryCacheStoreConfig::getHumanReadableURI() const std::string S3BinaryCacheStoreConfig::getHumanReadableURI() const

View file

@ -794,10 +794,9 @@ in
test_compression_disabled() test_compression_disabled()
test_nix_prefetch_url() test_nix_prefetch_url()
test_versioned_urls() test_versioned_urls()
# FIXME: enable when multipart fully lands test_multipart_upload_basic()
# test_multipart_upload_basic() test_multipart_threshold()
# test_multipart_threshold() test_multipart_with_log_compression()
# test_multipart_with_log_compression()
print("\n" + "="*80) print("\n" + "="*80)
print(" All S3 Binary Cache Store Tests Passed!") print(" All S3 Binary Cache Store Tests Passed!")