From 040d1aae41a3bfda86c29910eb1495d75598fd35 Mon Sep 17 00:00:00 2001 From: Bernardo Meurer Costa Date: Wed, 22 Oct 2025 08:42:32 +0000 Subject: [PATCH] feat(libstore/s3-binary-cache-store): implement `uploadMultipart()` Implement `uploadMultipart()`, the main method that orchestrates S3 multipart uploads --- src/libstore/s3-binary-cache-store.cc | 228 ++++++++++++++++++++++---- 1 file changed, 192 insertions(+), 36 deletions(-) diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index 4cf5f987a..37264dfae 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -7,6 +7,7 @@ #include "nix/util/util.hh" #include +#include #include #include #include @@ -17,6 +18,7 @@ MakeError(UploadToS3, Error); static constexpr uint64_t AWS_MIN_PART_SIZE = 5 * 1024 * 1024; // 5MiB static constexpr uint64_t AWS_MAX_PART_SIZE = 5ULL * 1024 * 1024 * 1024; // 5GiB +static constexpr uint64_t AWS_MAX_PART_COUNT = 10000; class S3BinaryCacheStore : public virtual HttpBinaryCacheStore { @@ -51,9 +53,48 @@ private: std::optional contentEncoding); /** - * Uploads a file to S3 (CompressedSource overload). + * Uploads a file to S3 using multipart upload. + * + * This method is suitable for large files that exceed the multipart threshold. + * It orchestrates the complete multipart upload process: creating the upload, + * splitting the data into parts, uploading each part, and completing the upload. + * If any error occurs, the multipart upload is automatically aborted. + * + * @see https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html */ - void upload(std::string_view path, CompressedSource & source, std::string_view mimeType); + void uploadMultipart( + std::string_view path, + RestartableSource & source, + uint64_t sizeHint, + std::string_view mimeType, + std::optional contentEncoding); + + /** + * A Sink that manages a complete S3 multipart upload lifecycle. + * Creates the upload on construction, buffers and uploads chunks as data arrives, + * and completes or aborts the upload appropriately. + */ + struct MultipartSink : Sink + { + S3BinaryCacheStore & store; + std::string_view path; + std::string uploadId; + std::string::size_type chunkSize; + + std::vector partEtags; + std::string buffer; + + MultipartSink( + S3BinaryCacheStore & store, + std::string_view path, + uint64_t sizeHint, + std::string_view mimeType, + std::optional contentEncoding); + + void operator()(std::string_view data) override; + void finish(); + void uploadChunk(std::string chunk); + }; /** * Creates a multipart upload for large objects to S3. @@ -73,18 +114,13 @@ private: */ std::string uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data); - struct UploadedPart - { - uint64_t partNumber; - std::string etag; - }; - /** * Completes a multipart upload by combining all uploaded parts. * @see * https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html#API_CompleteMultipartUpload_RequestSyntax */ - void completeMultipartUpload(std::string_view key, std::string_view uploadId, std::span parts); + void + completeMultipartUpload(std::string_view key, std::string_view uploadId, std::span partEtags); /** * Abort a multipart upload @@ -92,17 +128,31 @@ private: * @see * https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html#API_AbortMultipartUpload_RequestSyntax */ - void abortMultipartUpload(std::string_view key, std::string_view uploadId); + void abortMultipartUpload(std::string_view key, std::string_view uploadId) noexcept; }; void S3BinaryCacheStore::upsertFile( const std::string & path, RestartableSource & source, const std::string & mimeType, uint64_t sizeHint) { - if (auto compressionMethod = getCompressionMethod(path)) { - CompressedSource compressed(source, *compressionMethod); - upload(path, compressed, mimeType); - } else { - upload(path, source, sizeHint, mimeType, std::nullopt); + auto doUpload = [&](RestartableSource & src, uint64_t size, std::optional encoding) { + if (s3Config->multipartUpload && size > s3Config->multipartThreshold) { + uploadMultipart(path, src, size, mimeType, encoding); + } else { + upload(path, src, size, mimeType, encoding); + } + }; + + try { + if (auto compressionMethod = getCompressionMethod(path)) { + CompressedSource compressed(source, *compressionMethod); + doUpload(compressed, compressed.size(), compressed.getCompressionMethod()); + } else { + doUpload(source, sizeHint, std::nullopt); + } + } catch (FileTransferError & e) { + UploadToS3 err(e.message()); + err.addTrace({}, "while uploading to S3 binary cache at '%s'", config->cacheUri.to_string()); + throw err; } } @@ -120,18 +170,112 @@ void S3BinaryCacheStore::upload( renderSize(sizeHint), renderSize(AWS_MAX_PART_SIZE)); - try { - HttpBinaryCacheStore::upload(path, source, sizeHint, mimeType, contentEncoding); - } catch (FileTransferError & e) { - UploadToS3 err(e.message()); - err.addTrace({}, "while uploading to S3 binary cache at '%s'", config->cacheUri.to_string()); - throw err; + HttpBinaryCacheStore::upload(path, source, sizeHint, mimeType, contentEncoding); +} + +void S3BinaryCacheStore::uploadMultipart( + std::string_view path, + RestartableSource & source, + uint64_t sizeHint, + std::string_view mimeType, + std::optional contentEncoding) +{ + debug("using S3 multipart upload for '%s' (%d bytes)", path, sizeHint); + MultipartSink sink(*this, path, sizeHint, mimeType, contentEncoding); + source.drainInto(sink); + sink.finish(); +} + +S3BinaryCacheStore::MultipartSink::MultipartSink( + S3BinaryCacheStore & store, + std::string_view path, + uint64_t sizeHint, + std::string_view mimeType, + std::optional contentEncoding) + : store(store) + , path(path) +{ + // Calculate chunk size and estimated parts + chunkSize = store.s3Config->multipartChunkSize; + uint64_t estimatedParts = (sizeHint + chunkSize - 1) / chunkSize; // ceil division + + if (estimatedParts > AWS_MAX_PART_COUNT) { + // Equivalent to ceil(sizeHint / AWS_MAX_PART_COUNT) + uint64_t minChunkSize = (sizeHint + AWS_MAX_PART_COUNT - 1) / AWS_MAX_PART_COUNT; + + if (minChunkSize > AWS_MAX_PART_SIZE) { + throw Error( + "file too large for S3 multipart upload: %s would require chunk size of %s " + "(max %s) to stay within %d part limit", + renderSize(sizeHint), + renderSize(minChunkSize), + renderSize(AWS_MAX_PART_SIZE), + AWS_MAX_PART_COUNT); + } + + warn( + "adjusting S3 multipart chunk size from %s to %s " + "to stay within %d part limit for %s file", + renderSize(store.s3Config->multipartChunkSize.get()), + renderSize(minChunkSize), + AWS_MAX_PART_COUNT, + renderSize(sizeHint)); + + chunkSize = minChunkSize; + estimatedParts = AWS_MAX_PART_COUNT; + } + + buffer.reserve(chunkSize); + partEtags.reserve(estimatedParts); + uploadId = store.createMultipartUpload(path, mimeType, contentEncoding); +} + +void S3BinaryCacheStore::MultipartSink::operator()(std::string_view data) +{ + buffer.append(data); + + while (buffer.size() >= chunkSize) { + // Move entire buffer, extract excess, copy back remainder + auto chunk = std::move(buffer); + auto excessSize = chunk.size() > chunkSize ? chunk.size() - chunkSize : 0; + if (excessSize > 0) { + buffer.resize(excessSize); + std::memcpy(buffer.data(), chunk.data() + chunkSize, excessSize); + } + chunk.resize(std::min(chunkSize, chunk.size())); + uploadChunk(std::move(chunk)); } } -void S3BinaryCacheStore::upload(std::string_view path, CompressedSource & source, std::string_view mimeType) +void S3BinaryCacheStore::MultipartSink::finish() { - upload(path, static_cast(source), source.size(), mimeType, source.getCompressionMethod()); + if (!buffer.empty()) { + uploadChunk(std::move(buffer)); + } + + try { + if (partEtags.empty()) { + throw Error("no data read from stream"); + } + store.completeMultipartUpload(path, uploadId, partEtags); + } catch (Error & e) { + store.abortMultipartUpload(path, uploadId); + e.addTrace({}, "while finishing an S3 multipart upload"); + throw; + } +} + +void S3BinaryCacheStore::MultipartSink::uploadChunk(std::string chunk) +{ + auto partNumber = partEtags.size() + 1; + try { + std::string etag = store.uploadPart(path, uploadId, partNumber, std::move(chunk)); + partEtags.push_back(std::move(etag)); + } catch (Error & e) { + store.abortMultipartUpload(path, uploadId); + e.addTrace({}, "while uploading part %d of an S3 multipart upload", partNumber); + throw; + } } std::string S3BinaryCacheStore::createMultipartUpload( @@ -171,6 +315,10 @@ std::string S3BinaryCacheStore::createMultipartUpload( std::string S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, uint64_t partNumber, std::string data) { + if (partNumber > AWS_MAX_PART_COUNT) { + throw Error("S3 multipart upload exceeded %d part limit", AWS_MAX_PART_COUNT); + } + auto req = makeRequest(key); req.method = HttpMethod::PUT; req.setupForS3(); @@ -189,24 +337,29 @@ S3BinaryCacheStore::uploadPart(std::string_view key, std::string_view uploadId, throw Error("S3 UploadPart response missing ETag for part %d", partNumber); } + debug("Part %d uploaded, ETag: %s", partNumber, result.etag); return std::move(result.etag); } -void S3BinaryCacheStore::abortMultipartUpload(std::string_view key, std::string_view uploadId) +void S3BinaryCacheStore::abortMultipartUpload(std::string_view key, std::string_view uploadId) noexcept { - auto req = makeRequest(key); - req.setupForS3(); + try { + auto req = makeRequest(key); + req.setupForS3(); - auto url = req.uri.parsed(); - url.query["uploadId"] = uploadId; - req.uri = VerbatimURL(url); - req.method = HttpMethod::DELETE; + auto url = req.uri.parsed(); + url.query["uploadId"] = uploadId; + req.uri = VerbatimURL(url); + req.method = HttpMethod::DELETE; - getFileTransfer()->enqueueFileTransfer(req).get(); + getFileTransfer()->enqueueFileTransfer(req).get(); + } catch (...) { + ignoreExceptionInDestructor(); + } } void S3BinaryCacheStore::completeMultipartUpload( - std::string_view key, std::string_view uploadId, std::span parts) + std::string_view key, std::string_view uploadId, std::span partEtags) { auto req = makeRequest(key); req.setupForS3(); @@ -217,21 +370,24 @@ void S3BinaryCacheStore::completeMultipartUpload( req.method = HttpMethod::POST; std::string xml = ""; - for (const auto & part : parts) { + for (const auto & [idx, etag] : enumerate(partEtags)) { xml += ""; - xml += "" + std::to_string(part.partNumber) + ""; - xml += "" + part.etag + ""; + // S3 part numbers are 1-indexed, but vector indices are 0-indexed + xml += "" + std::to_string(idx + 1) + ""; + xml += "" + etag + ""; xml += ""; } xml += ""; - debug("S3 CompleteMultipartUpload XML (%d parts): %s", parts.size(), xml); + debug("S3 CompleteMultipartUpload XML (%d parts): %s", partEtags.size(), xml); StringSource payload{xml}; req.data = {payload}; req.mimeType = "text/xml"; getFileTransfer()->enqueueFileTransfer(req).get(); + + debug("S3 multipart upload completed: %d parts uploaded for '%s'", partEtags.size(), key); } StringSet S3BinaryCacheStoreConfig::uriSchemes()