mirror of
https://github.com/NixOS/nix.git
synced 2025-11-08 19:46:02 +01:00
refactor(libstore): expose HttpBinaryCacheStore and add S3BinaryCacheStore
Move HttpBinaryCacheStore class from .cc file to header to enable inheritance by S3BinaryCacheStore. Create S3BinaryCacheStore class that overrides upsertFile() to implement multipart upload logic.
This commit is contained in:
parent
1a9ba0d6fe
commit
476c21d5ef
4 changed files with 284 additions and 212 deletions
|
|
@ -51,227 +51,209 @@ std::string HttpBinaryCacheStoreConfig::doc()
|
|||
;
|
||||
}
|
||||
|
||||
class HttpBinaryCacheStore : public virtual BinaryCacheStore
|
||||
HttpBinaryCacheStore::HttpBinaryCacheStore(ref<Config> config)
|
||||
: Store{*config} // TODO it will actually mutate the configuration
|
||||
, BinaryCacheStore{*config}
|
||||
, config{config}
|
||||
{
|
||||
struct State
|
||||
{
|
||||
bool enabled = true;
|
||||
std::chrono::steady_clock::time_point disabledUntil;
|
||||
};
|
||||
diskCache = getNarInfoDiskCache();
|
||||
}
|
||||
|
||||
Sync<State> _state;
|
||||
|
||||
public:
|
||||
|
||||
using Config = HttpBinaryCacheStoreConfig;
|
||||
|
||||
ref<Config> config;
|
||||
|
||||
HttpBinaryCacheStore(ref<Config> config)
|
||||
: Store{*config} // TODO it will actually mutate the configuration
|
||||
, BinaryCacheStore{*config}
|
||||
, config{config}
|
||||
{
|
||||
diskCache = getNarInfoDiskCache();
|
||||
}
|
||||
|
||||
void init() override
|
||||
{
|
||||
// FIXME: do this lazily?
|
||||
// For consistent cache key handling, use the reference without parameters
|
||||
// This matches what's used in Store::queryPathInfo() lookups
|
||||
auto cacheKey = config->getReference().render(/*withParams=*/false);
|
||||
|
||||
if (auto cacheInfo = diskCache->upToDateCacheExists(cacheKey)) {
|
||||
config->wantMassQuery.setDefault(cacheInfo->wantMassQuery);
|
||||
config->priority.setDefault(cacheInfo->priority);
|
||||
} else {
|
||||
try {
|
||||
BinaryCacheStore::init();
|
||||
} catch (UploadToHTTP &) {
|
||||
throw Error("'%s' does not appear to be a binary cache", config->cacheUri.to_string());
|
||||
}
|
||||
diskCache->createCache(cacheKey, config->storeDir, config->wantMassQuery, config->priority);
|
||||
}
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
std::optional<std::string> getCompressionMethod(const std::string & path)
|
||||
{
|
||||
if (hasSuffix(path, ".narinfo") && !config->narinfoCompression.get().empty())
|
||||
return config->narinfoCompression;
|
||||
else if (hasSuffix(path, ".ls") && !config->lsCompression.get().empty())
|
||||
return config->lsCompression;
|
||||
else if (hasPrefix(path, "log/") && !config->logCompression.get().empty())
|
||||
return config->logCompression;
|
||||
else
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void maybeDisable()
|
||||
{
|
||||
auto state(_state.lock());
|
||||
if (state->enabled && settings.tryFallback) {
|
||||
int t = 60;
|
||||
printError("disabling binary cache '%s' for %s seconds", config->getHumanReadableURI(), t);
|
||||
state->enabled = false;
|
||||
state->disabledUntil = std::chrono::steady_clock::now() + std::chrono::seconds(t);
|
||||
}
|
||||
}
|
||||
|
||||
void checkEnabled()
|
||||
{
|
||||
auto state(_state.lock());
|
||||
if (state->enabled)
|
||||
return;
|
||||
if (std::chrono::steady_clock::now() > state->disabledUntil) {
|
||||
state->enabled = true;
|
||||
debug("re-enabling binary cache '%s'", config->getHumanReadableURI());
|
||||
return;
|
||||
}
|
||||
throw SubstituterDisabled("substituter '%s' is disabled", config->getHumanReadableURI());
|
||||
}
|
||||
|
||||
bool fileExists(const std::string & path) override
|
||||
{
|
||||
checkEnabled();
|
||||
void HttpBinaryCacheStore::init()
|
||||
{
|
||||
// FIXME: do this lazily?
|
||||
// For consistent cache key handling, use the reference without parameters
|
||||
// This matches what's used in Store::queryPathInfo() lookups
|
||||
auto cacheKey = config->getReference().render(/*withParams=*/false);
|
||||
|
||||
if (auto cacheInfo = diskCache->upToDateCacheExists(cacheKey)) {
|
||||
config->wantMassQuery.setDefault(cacheInfo->wantMassQuery);
|
||||
config->priority.setDefault(cacheInfo->priority);
|
||||
} else {
|
||||
try {
|
||||
FileTransferRequest request(makeRequest(path));
|
||||
request.method = HttpMethod::HEAD;
|
||||
getFileTransfer()->download(request);
|
||||
return true;
|
||||
} catch (FileTransferError & e) {
|
||||
/* S3 buckets return 403 if a file doesn't exist and the
|
||||
bucket is unlistable, so treat 403 as 404. */
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
return false;
|
||||
maybeDisable();
|
||||
throw;
|
||||
BinaryCacheStore::init();
|
||||
} catch (UploadToHTTP &) {
|
||||
throw Error("'%s' does not appear to be a binary cache", config->cacheUri.to_string());
|
||||
}
|
||||
diskCache->createCache(cacheKey, config->storeDir, config->wantMassQuery, config->priority);
|
||||
}
|
||||
}
|
||||
|
||||
void upsertFile(
|
||||
const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType,
|
||||
uint64_t sizeHint) override
|
||||
{
|
||||
auto req = makeRequest(path);
|
||||
|
||||
auto data = StreamToSourceAdapter(istream).drain();
|
||||
|
||||
if (auto compressionMethod = getCompressionMethod(path)) {
|
||||
data = compress(*compressionMethod, data);
|
||||
req.headers.emplace_back("Content-Encoding", *compressionMethod);
|
||||
}
|
||||
|
||||
req.data = std::move(data);
|
||||
req.mimeType = mimeType;
|
||||
|
||||
try {
|
||||
getFileTransfer()->upload(req);
|
||||
} catch (FileTransferError & e) {
|
||||
throw UploadToHTTP(
|
||||
"while uploading to HTTP binary cache at '%s': %s", config->cacheUri.to_string(), e.msg());
|
||||
}
|
||||
}
|
||||
|
||||
FileTransferRequest makeRequest(const std::string & path)
|
||||
{
|
||||
/* Otherwise the last path fragment will get discarded. */
|
||||
auto cacheUriWithTrailingSlash = config->cacheUri;
|
||||
if (!cacheUriWithTrailingSlash.path.empty())
|
||||
cacheUriWithTrailingSlash.path.push_back("");
|
||||
|
||||
/* path is not a path, but a full relative or absolute
|
||||
URL, e.g. we've seen in the wild NARINFO files have a URL
|
||||
field which is
|
||||
`nar/15f99rdaf26k39knmzry4xd0d97wp6yfpnfk1z9avakis7ipb9yg.nar?hash=zphkqn2wg8mnvbkixnl2aadkbn0rcnfj`
|
||||
(note the query param) and that gets passed here. */
|
||||
auto result = parseURLRelative(path, cacheUriWithTrailingSlash);
|
||||
|
||||
/* For S3 URLs, preserve query parameters from the base URL when the
|
||||
relative path doesn't have its own query parameters. This is needed
|
||||
to preserve S3-specific parameters like endpoint and region. */
|
||||
if (config->cacheUri.scheme == "s3" && result.query.empty()) {
|
||||
result.query = config->cacheUri.query;
|
||||
}
|
||||
|
||||
return FileTransferRequest(result);
|
||||
}
|
||||
|
||||
void getFile(const std::string & path, Sink & sink) override
|
||||
{
|
||||
checkEnabled();
|
||||
auto request(makeRequest(path));
|
||||
try {
|
||||
getFileTransfer()->download(std::move(request), sink);
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
throw NoSuchBinaryCacheFile(
|
||||
"file '%s' does not exist in binary cache '%s'", path, config->getHumanReadableURI());
|
||||
maybeDisable();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept override
|
||||
{
|
||||
auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
|
||||
|
||||
try {
|
||||
checkEnabled();
|
||||
|
||||
auto request(makeRequest(path));
|
||||
|
||||
getFileTransfer()->enqueueFileTransfer(
|
||||
request, {[callbackPtr, this](std::future<FileTransferResult> result) {
|
||||
try {
|
||||
(*callbackPtr)(std::move(result.get().data));
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
return (*callbackPtr)({});
|
||||
maybeDisable();
|
||||
callbackPtr->rethrow();
|
||||
} catch (...) {
|
||||
callbackPtr->rethrow();
|
||||
}
|
||||
}});
|
||||
|
||||
} catch (...) {
|
||||
callbackPtr->rethrow();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::string> getNixCacheInfo() override
|
||||
{
|
||||
try {
|
||||
auto result = getFileTransfer()->download(makeRequest(cacheInfoFile));
|
||||
return result.data;
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound)
|
||||
return std::nullopt;
|
||||
maybeDisable();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This isn't actually necessary read only. We support "upsert" now, so we
|
||||
* have a notion of authentication via HTTP POST/PUT.
|
||||
*
|
||||
* For now, we conservatively say we don't know.
|
||||
*
|
||||
* \todo try to expose our HTTP authentication status.
|
||||
*/
|
||||
std::optional<TrustedFlag> isTrustedClient() override
|
||||
{
|
||||
std::optional<std::string> HttpBinaryCacheStore::getCompressionMethod(const std::string & path)
|
||||
{
|
||||
if (hasSuffix(path, ".narinfo") && !config->narinfoCompression.get().empty())
|
||||
return config->narinfoCompression;
|
||||
else if (hasSuffix(path, ".ls") && !config->lsCompression.get().empty())
|
||||
return config->lsCompression;
|
||||
else if (hasPrefix(path, "log/") && !config->logCompression.get().empty())
|
||||
return config->logCompression;
|
||||
else
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void HttpBinaryCacheStore::maybeDisable()
|
||||
{
|
||||
auto state(_state.lock());
|
||||
if (state->enabled && settings.tryFallback) {
|
||||
int t = 60;
|
||||
printError("disabling binary cache '%s' for %s seconds", config->getHumanReadableURI(), t);
|
||||
state->enabled = false;
|
||||
state->disabledUntil = std::chrono::steady_clock::now() + std::chrono::seconds(t);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
void HttpBinaryCacheStore::checkEnabled()
|
||||
{
|
||||
auto state(_state.lock());
|
||||
if (state->enabled)
|
||||
return;
|
||||
if (std::chrono::steady_clock::now() > state->disabledUntil) {
|
||||
state->enabled = true;
|
||||
debug("re-enabling binary cache '%s'", config->getHumanReadableURI());
|
||||
return;
|
||||
}
|
||||
throw SubstituterDisabled("substituter '%s' is disabled", config->getHumanReadableURI());
|
||||
}
|
||||
|
||||
bool HttpBinaryCacheStore::fileExists(const std::string & path)
|
||||
{
|
||||
checkEnabled();
|
||||
|
||||
try {
|
||||
FileTransferRequest request(makeRequest(path));
|
||||
request.method = HttpMethod::HEAD;
|
||||
getFileTransfer()->download(request);
|
||||
return true;
|
||||
} catch (FileTransferError & e) {
|
||||
/* S3 buckets return 403 if a file doesn't exist and the
|
||||
bucket is unlistable, so treat 403 as 404. */
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
return false;
|
||||
maybeDisable();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void HttpBinaryCacheStore::upsertFile(
|
||||
const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType,
|
||||
uint64_t sizeHint)
|
||||
{
|
||||
auto req = makeRequest(path);
|
||||
|
||||
auto data = StreamToSourceAdapter(istream).drain();
|
||||
|
||||
auto compressionMethod = getCompressionMethod(path);
|
||||
|
||||
if (compressionMethod) {
|
||||
data = compress(*compressionMethod, data);
|
||||
req.headers.emplace_back("Content-Encoding", *compressionMethod);
|
||||
}
|
||||
|
||||
req.data = std::move(data);
|
||||
req.mimeType = mimeType;
|
||||
|
||||
try {
|
||||
getFileTransfer()->upload(req);
|
||||
} catch (FileTransferError & e) {
|
||||
throw UploadToHTTP("while uploading to HTTP binary cache at '%s': %s", config->cacheUri.to_string(), e.msg());
|
||||
}
|
||||
}
|
||||
|
||||
FileTransferRequest HttpBinaryCacheStore::makeRequest(const std::string & path)
|
||||
{
|
||||
/* Otherwise the last path fragment will get discarded. */
|
||||
auto cacheUriWithTrailingSlash = config->cacheUri;
|
||||
if (!cacheUriWithTrailingSlash.path.empty())
|
||||
cacheUriWithTrailingSlash.path.push_back("");
|
||||
|
||||
/* path is not a path, but a full relative or absolute
|
||||
URL, e.g. we've seen in the wild NARINFO files have a URL
|
||||
field which is
|
||||
`nar/15f99rdaf26k39knmzry4xd0d97wp6yfpnfk1z9avakis7ipb9yg.nar?hash=zphkqn2wg8mnvbkixnl2aadkbn0rcnfj`
|
||||
(note the query param) and that gets passed here. */
|
||||
auto result = parseURLRelative(path, cacheUriWithTrailingSlash);
|
||||
|
||||
/* For S3 URLs, preserve query parameters from the base URL when the
|
||||
relative path doesn't have its own query parameters. This is needed
|
||||
to preserve S3-specific parameters like endpoint and region. */
|
||||
if (config->cacheUri.scheme == "s3" && result.query.empty()) {
|
||||
result.query = config->cacheUri.query;
|
||||
}
|
||||
|
||||
return FileTransferRequest(result);
|
||||
}
|
||||
|
||||
void HttpBinaryCacheStore::getFile(const std::string & path, Sink & sink)
|
||||
{
|
||||
checkEnabled();
|
||||
auto request(makeRequest(path));
|
||||
try {
|
||||
getFileTransfer()->download(std::move(request), sink);
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound || e.error == FileTransfer::Forbidden)
|
||||
throw NoSuchBinaryCacheFile(
|
||||
"file '%s' does not exist in binary cache '%s'", path, config->getHumanReadableURI());
|
||||
maybeDisable();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void HttpBinaryCacheStore::getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept
|
||||
{
|
||||
auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
|
||||
|
||||
try {
|
||||
checkEnabled();
|
||||
|
||||
auto request(makeRequest(path));
|
||||
|
||||
getFileTransfer()->enqueueFileTransfer(request, {[callbackPtr, this](std::future<FileTransferResult> result) {
|
||||
try {
|
||||
(*callbackPtr)(std::move(result.get().data));
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound
|
||||
|| e.error == FileTransfer::Forbidden)
|
||||
return (*callbackPtr)({});
|
||||
maybeDisable();
|
||||
callbackPtr->rethrow();
|
||||
} catch (...) {
|
||||
callbackPtr->rethrow();
|
||||
}
|
||||
}});
|
||||
|
||||
} catch (...) {
|
||||
callbackPtr->rethrow();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::string> HttpBinaryCacheStore::getNixCacheInfo()
|
||||
{
|
||||
try {
|
||||
auto result = getFileTransfer()->download(makeRequest(cacheInfoFile));
|
||||
return result.data;
|
||||
} catch (FileTransferError & e) {
|
||||
if (e.error == FileTransfer::NotFound)
|
||||
return std::nullopt;
|
||||
maybeDisable();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This isn't actually necessary read only. We support "upsert" now, so we
|
||||
* have a notion of authentication via HTTP POST/PUT.
|
||||
*
|
||||
* For now, we conservatively say we don't know.
|
||||
*
|
||||
* \todo try to expose our HTTP authentication status.
|
||||
*/
|
||||
std::optional<TrustedFlag> HttpBinaryCacheStore::isTrustedClient()
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
ref<Store> HttpBinaryCacheStore::Config::openStore() const
|
||||
{
|
||||
|
|
|
|||
|
|
@ -3,6 +3,10 @@
|
|||
|
||||
#include "nix/util/url.hh"
|
||||
#include "nix/store/binary-cache-store.hh"
|
||||
#include "nix/store/filetransfer.hh"
|
||||
#include "nix/util/sync.hh"
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace nix {
|
||||
|
||||
|
|
@ -46,4 +50,51 @@ struct HttpBinaryCacheStoreConfig : std::enable_shared_from_this<HttpBinaryCache
|
|||
StoreReference getReference() const override;
|
||||
};
|
||||
|
||||
class HttpBinaryCacheStore : public virtual BinaryCacheStore
|
||||
{
|
||||
struct State
|
||||
{
|
||||
bool enabled = true;
|
||||
std::chrono::steady_clock::time_point disabledUntil;
|
||||
};
|
||||
|
||||
Sync<State> _state;
|
||||
|
||||
public:
|
||||
|
||||
using Config = HttpBinaryCacheStoreConfig;
|
||||
|
||||
ref<Config> config;
|
||||
|
||||
HttpBinaryCacheStore(ref<Config> config);
|
||||
|
||||
void init() override;
|
||||
|
||||
protected:
|
||||
|
||||
std::optional<std::string> getCompressionMethod(const std::string & path);
|
||||
|
||||
void maybeDisable();
|
||||
|
||||
void checkEnabled();
|
||||
|
||||
bool fileExists(const std::string & path) override;
|
||||
|
||||
void upsertFile(
|
||||
const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType,
|
||||
uint64_t sizeHint) override;
|
||||
|
||||
FileTransferRequest makeRequest(const std::string & path);
|
||||
|
||||
void getFile(const std::string & path, Sink & sink) override;
|
||||
|
||||
void getFile(const std::string & path, Callback<std::optional<std::string>> callback) noexcept override;
|
||||
|
||||
std::optional<std::string> getNixCacheInfo() override;
|
||||
|
||||
std::optional<TrustedFlag> isTrustedClient() override;
|
||||
};
|
||||
|
||||
} // namespace nix
|
||||
|
|
|
|||
|
|
@ -77,6 +77,8 @@ struct S3BinaryCacheStoreConfig : HttpBinaryCacheStoreConfig
|
|||
static std::string doc();
|
||||
|
||||
std::string getHumanReadableURI() const override;
|
||||
|
||||
ref<Store> openStore() const override;
|
||||
};
|
||||
|
||||
} // namespace nix
|
||||
|
|
|
|||
|
|
@ -7,6 +7,36 @@
|
|||
|
||||
namespace nix {
|
||||
|
||||
class S3BinaryCacheStore : public virtual HttpBinaryCacheStore
|
||||
{
|
||||
public:
|
||||
S3BinaryCacheStore(ref<S3BinaryCacheStoreConfig> config)
|
||||
: Store{*config}
|
||||
, BinaryCacheStore{*config}
|
||||
, HttpBinaryCacheStore{config}
|
||||
, s3Config{config}
|
||||
{
|
||||
}
|
||||
|
||||
void upsertFile(
|
||||
const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType,
|
||||
uint64_t sizeHint) override;
|
||||
|
||||
private:
|
||||
ref<S3BinaryCacheStoreConfig> s3Config;
|
||||
};
|
||||
|
||||
void S3BinaryCacheStore::upsertFile(
|
||||
const std::string & path,
|
||||
std::shared_ptr<std::basic_iostream<char>> istream,
|
||||
const std::string & mimeType,
|
||||
uint64_t sizeHint)
|
||||
{
|
||||
HttpBinaryCacheStore::upsertFile(path, istream, mimeType, sizeHint);
|
||||
}
|
||||
|
||||
StringSet S3BinaryCacheStoreConfig::uriSchemes()
|
||||
{
|
||||
return {"s3"};
|
||||
|
|
@ -51,6 +81,13 @@ std::string S3BinaryCacheStoreConfig::doc()
|
|||
)";
|
||||
}
|
||||
|
||||
ref<Store> S3BinaryCacheStoreConfig::openStore() const
|
||||
{
|
||||
auto sharedThis = std::const_pointer_cast<S3BinaryCacheStoreConfig>(
|
||||
std::static_pointer_cast<const S3BinaryCacheStoreConfig>(shared_from_this()));
|
||||
return make_ref<S3BinaryCacheStore>(ref{sharedThis});
|
||||
}
|
||||
|
||||
static RegisterStoreImplementation<S3BinaryCacheStoreConfig> registerS3BinaryCacheStore;
|
||||
|
||||
} // namespace nix
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue