mirror of
https://github.com/NixOS/nix.git
synced 2025-11-16 23:42:43 +01:00
Merge remote-tracking branch 'upstream/master' into templated-daemon-protocol
This commit is contained in:
commit
e9fc2031f0
16 changed files with 483 additions and 268 deletions
|
|
@ -295,6 +295,8 @@ struct ConnectionHandle
|
|||
std::rethrow_exception(ex);
|
||||
}
|
||||
}
|
||||
|
||||
void withFramedSink(std::function<void(Sink & sink)> fun);
|
||||
};
|
||||
|
||||
|
||||
|
|
@ -408,11 +410,28 @@ void RemoteStore::querySubstitutablePathInfos(const StorePathCAMap & pathsMap, S
|
|||
}
|
||||
|
||||
|
||||
ref<const ValidPathInfo> RemoteStore::readValidPathInfo(ConnectionHandle & conn, const StorePath & path)
|
||||
{
|
||||
auto deriver = readString(conn->from);
|
||||
auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
|
||||
auto info = make_ref<ValidPathInfo>(path, narHash);
|
||||
if (deriver != "") info->deriver = parseStorePath(deriver);
|
||||
info->references = WorkerProto<StorePathSet>::read(*this, conn->from);
|
||||
conn->from >> info->registrationTime >> info->narSize;
|
||||
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
|
||||
conn->from >> info->ultimate;
|
||||
info->sigs = readStrings<StringSet>(conn->from);
|
||||
info->ca = parseContentAddressOpt(readString(conn->from));
|
||||
}
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
void RemoteStore::queryPathInfoUncached(const StorePath & path,
|
||||
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept
|
||||
{
|
||||
try {
|
||||
std::shared_ptr<ValidPathInfo> info;
|
||||
std::shared_ptr<const ValidPathInfo> info;
|
||||
{
|
||||
auto conn(getConnection());
|
||||
conn->to << wopQueryPathInfo << printStorePath(path);
|
||||
|
|
@ -428,17 +447,7 @@ void RemoteStore::queryPathInfoUncached(const StorePath & path,
|
|||
bool valid; conn->from >> valid;
|
||||
if (!valid) throw InvalidPath("path '%s' is not valid", printStorePath(path));
|
||||
}
|
||||
auto deriver = readString(conn->from);
|
||||
auto narHash = Hash::parseAny(readString(conn->from), htSHA256);
|
||||
info = std::make_shared<ValidPathInfo>(path, narHash);
|
||||
if (deriver != "") info->deriver = parseStorePath(deriver);
|
||||
info->references = WorkerProto<StorePathSet>::read(*this, conn->from);
|
||||
conn->from >> info->registrationTime >> info->narSize;
|
||||
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) {
|
||||
conn->from >> info->ultimate;
|
||||
info->sigs = readStrings<StringSet>(conn->from);
|
||||
info->ca = parseContentAddressOpt(readString(conn->from));
|
||||
}
|
||||
info = readValidPathInfo(conn, path);
|
||||
}
|
||||
callback(std::move(info));
|
||||
} catch (...) { callback.rethrow(); }
|
||||
|
|
@ -512,6 +521,93 @@ std::optional<StorePath> RemoteStore::queryPathFromHashPart(const std::string &
|
|||
}
|
||||
|
||||
|
||||
ref<const ValidPathInfo> RemoteStore::addCAToStore(
|
||||
Source & dump,
|
||||
const string & name,
|
||||
ContentAddressMethod caMethod,
|
||||
const StorePathSet & references,
|
||||
RepairFlag repair)
|
||||
{
|
||||
std::optional<ConnectionHandle> conn_(getConnection());
|
||||
auto & conn = *conn_;
|
||||
|
||||
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 25) {
|
||||
|
||||
conn->to
|
||||
<< wopAddToStore
|
||||
<< name
|
||||
<< renderContentAddressMethod(caMethod);
|
||||
WorkerProto<StorePathSet>::write(*this, conn->to, references);
|
||||
conn->to << repair;
|
||||
|
||||
conn.withFramedSink([&](Sink & sink) {
|
||||
dump.drainInto(sink);
|
||||
});
|
||||
|
||||
auto path = parseStorePath(readString(conn->from));
|
||||
return readValidPathInfo(conn, path);
|
||||
}
|
||||
else {
|
||||
if (repair) throw Error("repairing is not supported when building through the Nix daemon protocol < 1.25");
|
||||
|
||||
std::visit(overloaded {
|
||||
[&](TextHashMethod thm) -> void {
|
||||
std::string s = dump.drain();
|
||||
conn->to << wopAddTextToStore << name << s;
|
||||
WorkerProto<StorePathSet>::write(*this, conn->to, references);
|
||||
conn.processStderr();
|
||||
},
|
||||
[&](FixedOutputHashMethod fohm) -> void {
|
||||
conn->to
|
||||
<< wopAddToStore
|
||||
<< name
|
||||
<< ((fohm.hashType == htSHA256 && fohm.fileIngestionMethod == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
|
||||
<< (fohm.fileIngestionMethod == FileIngestionMethod::Recursive ? 1 : 0)
|
||||
<< printHashType(fohm.hashType);
|
||||
|
||||
try {
|
||||
conn->to.written = 0;
|
||||
conn->to.warn = true;
|
||||
connections->incCapacity();
|
||||
{
|
||||
Finally cleanup([&]() { connections->decCapacity(); });
|
||||
if (fohm.fileIngestionMethod == FileIngestionMethod::Recursive) {
|
||||
dump.drainInto(conn->to);
|
||||
} else {
|
||||
std::string contents = dump.drain();
|
||||
dumpString(contents, conn->to);
|
||||
}
|
||||
}
|
||||
conn->to.warn = false;
|
||||
conn.processStderr();
|
||||
} catch (SysError & e) {
|
||||
/* Daemon closed while we were sending the path. Probably OOM
|
||||
or I/O error. */
|
||||
if (e.errNo == EPIPE)
|
||||
try {
|
||||
conn.processStderr();
|
||||
} catch (EndOfFile & e) { }
|
||||
throw;
|
||||
}
|
||||
|
||||
}
|
||||
}, caMethod);
|
||||
auto path = parseStorePath(readString(conn->from));
|
||||
// Release our connection to prevent a deadlock in queryPathInfo().
|
||||
conn_.reset();
|
||||
return queryPathInfo(path);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
StorePath RemoteStore::addToStoreFromDump(Source & dump, const string & name,
|
||||
FileIngestionMethod method, HashType hashType, RepairFlag repair)
|
||||
{
|
||||
StorePathSet references;
|
||||
return addCAToStore(dump, name, FixedOutputHashMethod{ .fileIngestionMethod = method, .hashType = hashType }, references, repair)->path;
|
||||
}
|
||||
|
||||
|
||||
void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
|
||||
RepairFlag repair, CheckSigsFlag checkSigs)
|
||||
{
|
||||
|
|
@ -552,78 +648,9 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
|
|||
<< repair << !checkSigs;
|
||||
|
||||
if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 23) {
|
||||
|
||||
conn->to.flush();
|
||||
|
||||
std::exception_ptr ex;
|
||||
|
||||
struct FramedSink : BufferedSink
|
||||
{
|
||||
ConnectionHandle & conn;
|
||||
std::exception_ptr & ex;
|
||||
|
||||
FramedSink(ConnectionHandle & conn, std::exception_ptr & ex) : conn(conn), ex(ex)
|
||||
{ }
|
||||
|
||||
~FramedSink()
|
||||
{
|
||||
try {
|
||||
conn->to << 0;
|
||||
conn->to.flush();
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
}
|
||||
|
||||
void write(const unsigned char * data, size_t len) override
|
||||
{
|
||||
/* Don't send more data if the remote has
|
||||
encountered an error. */
|
||||
if (ex) {
|
||||
auto ex2 = ex;
|
||||
ex = nullptr;
|
||||
std::rethrow_exception(ex2);
|
||||
}
|
||||
conn->to << len;
|
||||
conn->to(data, len);
|
||||
};
|
||||
};
|
||||
|
||||
/* Handle log messages / exceptions from the remote on a
|
||||
separate thread. */
|
||||
std::thread stderrThread([&]()
|
||||
{
|
||||
try {
|
||||
conn.processStderr(nullptr, nullptr, false);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
});
|
||||
|
||||
Finally joinStderrThread([&]()
|
||||
{
|
||||
if (stderrThread.joinable()) {
|
||||
stderrThread.join();
|
||||
if (ex) {
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
{
|
||||
FramedSink sink(conn, ex);
|
||||
conn.withFramedSink([&](Sink & sink) {
|
||||
copyNAR(source, sink);
|
||||
sink.flush();
|
||||
}
|
||||
|
||||
stderrThread.join();
|
||||
if (ex)
|
||||
std::rethrow_exception(ex);
|
||||
|
||||
});
|
||||
} else if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 21) {
|
||||
conn.processStderr(0, &source);
|
||||
} else {
|
||||
|
|
@ -634,57 +661,11 @@ void RemoteStore::addToStore(const ValidPathInfo & info, Source & source,
|
|||
}
|
||||
|
||||
|
||||
StorePath RemoteStore::addToStore(const string & name, const Path & _srcPath,
|
||||
FileIngestionMethod method, HashType hashAlgo, PathFilter & filter, RepairFlag repair)
|
||||
{
|
||||
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
|
||||
|
||||
auto conn(getConnection());
|
||||
|
||||
Path srcPath(absPath(_srcPath));
|
||||
|
||||
conn->to
|
||||
<< wopAddToStore
|
||||
<< name
|
||||
<< ((hashAlgo == htSHA256 && method == FileIngestionMethod::Recursive) ? 0 : 1) /* backwards compatibility hack */
|
||||
<< (method == FileIngestionMethod::Recursive ? 1 : 0)
|
||||
<< printHashType(hashAlgo);
|
||||
|
||||
try {
|
||||
conn->to.written = 0;
|
||||
conn->to.warn = true;
|
||||
connections->incCapacity();
|
||||
{
|
||||
Finally cleanup([&]() { connections->decCapacity(); });
|
||||
dumpPath(srcPath, conn->to, filter);
|
||||
}
|
||||
conn->to.warn = false;
|
||||
conn.processStderr();
|
||||
} catch (SysError & e) {
|
||||
/* Daemon closed while we were sending the path. Probably OOM
|
||||
or I/O error. */
|
||||
if (e.errNo == EPIPE)
|
||||
try {
|
||||
conn.processStderr();
|
||||
} catch (EndOfFile & e) { }
|
||||
throw;
|
||||
}
|
||||
|
||||
return parseStorePath(readString(conn->from));
|
||||
}
|
||||
|
||||
|
||||
StorePath RemoteStore::addTextToStore(const string & name, const string & s,
|
||||
const StorePathSet & references, RepairFlag repair)
|
||||
{
|
||||
if (repair) throw Error("repairing is not supported when building through the Nix daemon");
|
||||
|
||||
auto conn(getConnection());
|
||||
conn->to << wopAddTextToStore << name << s;
|
||||
WorkerProto<StorePathSet>::write(*this, conn->to, references);
|
||||
|
||||
conn.processStderr();
|
||||
return parseStorePath(readString(conn->from));
|
||||
StringSource source(s);
|
||||
return addCAToStore(source, name, TextHashMethod{}, references, repair)->path;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -980,6 +961,49 @@ std::exception_ptr RemoteStore::Connection::processStderr(Sink * sink, Source *
|
|||
return nullptr;
|
||||
}
|
||||
|
||||
void ConnectionHandle::withFramedSink(std::function<void(Sink &sink)> fun)
|
||||
{
|
||||
(*this)->to.flush();
|
||||
|
||||
std::exception_ptr ex;
|
||||
|
||||
/* Handle log messages / exceptions from the remote on a
|
||||
separate thread. */
|
||||
std::thread stderrThread([&]()
|
||||
{
|
||||
try {
|
||||
processStderr(nullptr, nullptr, false);
|
||||
} catch (...) {
|
||||
ex = std::current_exception();
|
||||
}
|
||||
});
|
||||
|
||||
Finally joinStderrThread([&]()
|
||||
{
|
||||
if (stderrThread.joinable()) {
|
||||
stderrThread.join();
|
||||
if (ex) {
|
||||
try {
|
||||
std::rethrow_exception(ex);
|
||||
} catch (...) {
|
||||
ignoreException();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
{
|
||||
FramedSink sink((*this)->to, ex);
|
||||
fun(sink);
|
||||
sink.flush();
|
||||
}
|
||||
|
||||
stderrThread.join();
|
||||
if (ex)
|
||||
std::rethrow_exception(ex);
|
||||
|
||||
}
|
||||
|
||||
static RegisterStoreImplementation<UDSRemoteStore, UDSRemoteStoreConfig> regStore;
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue