mirror of
https://github.com/NixOS/nix.git
synced 2025-12-22 17:01:08 +01:00
Merge pull request #14801 from NixOS/coroutine-child-output-0
Use coroutines for worker child I/O
This commit is contained in:
commit
df7542247e
12 changed files with 252 additions and 86 deletions
|
|
@ -71,14 +71,6 @@ void DerivationBuildingGoal::killChild()
|
|||
#endif
|
||||
}
|
||||
|
||||
void DerivationBuildingGoal::timedOut(Error && ex)
|
||||
{
|
||||
killChild();
|
||||
// We're not inside a coroutine, hence we can't use co_return here.
|
||||
// Thus we ignore the return value.
|
||||
[[maybe_unused]] Done _ = doneFailure({BuildResult::Failure::TimedOut, std::move(ex)});
|
||||
}
|
||||
|
||||
std::string showKnownOutputs(const StoreDirConfig & store, const Derivation & drv)
|
||||
{
|
||||
std::string msg;
|
||||
|
|
@ -443,7 +435,20 @@ Goal::Co DerivationBuildingGoal::tryToBuild()
|
|||
if (useHook) {
|
||||
buildResult.startTime = time(0); // inexact
|
||||
started();
|
||||
co_await Suspend{};
|
||||
|
||||
while (true) {
|
||||
auto event = co_await WaitForChildEvent{};
|
||||
if (auto * output = std::get_if<ChildOutput>(&event)) {
|
||||
co_await processChildOutput(output->fd, output->data);
|
||||
} else if (std::get_if<ChildEOF>(&event)) {
|
||||
if (!currentLogLine.empty())
|
||||
flushLine();
|
||||
break;
|
||||
} else if (auto * timeout = std::get_if<TimedOut>(&event)) {
|
||||
killChild();
|
||||
co_return doneFailure(std::move(*timeout));
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef _WIN32
|
||||
assert(hook);
|
||||
|
|
@ -664,7 +669,20 @@ Goal::Co DerivationBuildingGoal::tryToBuild()
|
|||
worker.childStarted(shared_from_this(), {builderOut}, true, true);
|
||||
|
||||
started();
|
||||
co_await Suspend{};
|
||||
|
||||
while (true) {
|
||||
auto event = co_await WaitForChildEvent{};
|
||||
if (auto * output = std::get_if<ChildOutput>(&event)) {
|
||||
co_await processChildOutput(output->fd, output->data);
|
||||
} else if (std::get_if<ChildEOF>(&event)) {
|
||||
if (!currentLogLine.empty())
|
||||
flushLine();
|
||||
break;
|
||||
} else if (auto * timeout = std::get_if<TimedOut>(&event)) {
|
||||
killChild();
|
||||
co_return doneFailure(std::move(*timeout));
|
||||
}
|
||||
}
|
||||
|
||||
trace("build done");
|
||||
|
||||
|
|
@ -997,7 +1015,7 @@ bool DerivationBuildingGoal::isReadDesc(Descriptor fd)
|
|||
#endif
|
||||
}
|
||||
|
||||
void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view data)
|
||||
Goal::Co DerivationBuildingGoal::processChildOutput(Descriptor fd, std::string_view data)
|
||||
{
|
||||
// local & `ssh://`-builds are dealt with here.
|
||||
auto isWrittenToLog = isReadDesc(fd);
|
||||
|
|
@ -1005,14 +1023,11 @@ void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view d
|
|||
logSize += data.size();
|
||||
if (settings.maxLogSize && logSize > settings.maxLogSize) {
|
||||
killChild();
|
||||
// We're not inside a coroutine, hence we can't use co_return here.
|
||||
// Thus we ignore the return value.
|
||||
[[maybe_unused]] Done _ = doneFailure(BuildError(
|
||||
co_return doneFailure(BuildError(
|
||||
BuildResult::Failure::LogLimitExceeded,
|
||||
"%s killed after writing more than %d bytes of log output",
|
||||
getName(),
|
||||
settings.maxLogSize));
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto c : data)
|
||||
|
|
@ -1065,13 +1080,7 @@ void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view d
|
|||
currentHookLine += c;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void DerivationBuildingGoal::handleEOF(Descriptor fd)
|
||||
{
|
||||
if (!currentLogLine.empty())
|
||||
flushLine();
|
||||
worker.wakeUp(shared_from_this());
|
||||
co_return Return{};
|
||||
}
|
||||
|
||||
void DerivationBuildingGoal::flushLine()
|
||||
|
|
|
|||
|
|
@ -66,7 +66,16 @@ Goal::Co DrvOutputSubstitutionGoal::init()
|
|||
true,
|
||||
false);
|
||||
|
||||
co_await Suspend{};
|
||||
while (true) {
|
||||
auto event = co_await WaitForChildEvent{};
|
||||
if (std::get_if<ChildOutput>(&event)) {
|
||||
// Doesn't process child output
|
||||
} else if (std::get_if<ChildEOF>(&event)) {
|
||||
break;
|
||||
} else if (std::get_if<TimedOut>(&event)) {
|
||||
unreachable();
|
||||
}
|
||||
}
|
||||
|
||||
worker.childTerminated(this);
|
||||
|
||||
|
|
@ -149,9 +158,4 @@ std::string DrvOutputSubstitutionGoal::key()
|
|||
return "a$" + std::string(id.to_string());
|
||||
}
|
||||
|
||||
void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd)
|
||||
{
|
||||
worker.wakeUp(shared_from_this());
|
||||
}
|
||||
|
||||
} // namespace nix
|
||||
|
|
|
|||
|
|
@ -4,8 +4,58 @@
|
|||
|
||||
namespace nix {
|
||||
|
||||
TimedOut::TimedOut(time_t maxDuration)
|
||||
: BuildError(BuildResult::Failure::TimedOut, "timed out after %1% seconds", maxDuration)
|
||||
, maxDuration(maxDuration)
|
||||
{
|
||||
}
|
||||
|
||||
using Co = nix::Goal::Co;
|
||||
using promise_type = nix::Goal::promise_type;
|
||||
using ChildEvents = decltype(promise_type::childEvents);
|
||||
|
||||
void ChildEvents::pushChildEvent(ChildOutput event)
|
||||
{
|
||||
if (childTimeout)
|
||||
return; // Already timed out, ignore
|
||||
childOutputs.push(std::move(event));
|
||||
}
|
||||
|
||||
void ChildEvents::pushChildEvent(ChildEOF event)
|
||||
{
|
||||
if (childTimeout)
|
||||
return; // Already timed out, ignore
|
||||
assert(!childEOF);
|
||||
childEOF = std::move(event);
|
||||
}
|
||||
|
||||
void ChildEvents::pushChildEvent(TimedOut event)
|
||||
{
|
||||
// Timeout is immediate - flush pending events
|
||||
childOutputs = {};
|
||||
childEOF.reset();
|
||||
childTimeout = std::move(event);
|
||||
}
|
||||
|
||||
bool ChildEvents::hasChildEvent() const
|
||||
{
|
||||
return !childOutputs.empty() || childEOF || childTimeout;
|
||||
}
|
||||
|
||||
Goal::ChildEvent ChildEvents::popChildEvent()
|
||||
{
|
||||
if (!childOutputs.empty()) {
|
||||
auto event = std::move(childOutputs.front());
|
||||
childOutputs.pop();
|
||||
return event;
|
||||
}
|
||||
if (childEOF)
|
||||
return *std::exchange(childEOF, std::nullopt);
|
||||
if (childTimeout)
|
||||
return *std::exchange(childTimeout, std::nullopt);
|
||||
unreachable();
|
||||
}
|
||||
|
||||
using handle_type = nix::Goal::handle_type;
|
||||
using Suspend = nix::Goal::Suspend;
|
||||
|
||||
|
|
@ -206,6 +256,27 @@ void Goal::work()
|
|||
assert(top_co || exitCode != ecBusy);
|
||||
}
|
||||
|
||||
void Goal::handleChildOutput(Descriptor fd, std::string_view data)
|
||||
{
|
||||
assert(top_co);
|
||||
top_co->handle.promise().childEvents.pushChildEvent(ChildOutput{fd, std::string{data}});
|
||||
worker.wakeUp(shared_from_this());
|
||||
}
|
||||
|
||||
void Goal::handleEOF(Descriptor fd)
|
||||
{
|
||||
assert(top_co);
|
||||
top_co->handle.promise().childEvents.pushChildEvent(ChildEOF{fd});
|
||||
worker.wakeUp(shared_from_this());
|
||||
}
|
||||
|
||||
void Goal::timedOut(TimedOut && ex)
|
||||
{
|
||||
assert(top_co);
|
||||
top_co->handle.promise().childEvents.pushChildEvent(std::move(ex));
|
||||
worker.wakeUp(shared_from_this());
|
||||
}
|
||||
|
||||
Goal::Co Goal::yield()
|
||||
{
|
||||
worker.wakeUp(shared_from_this());
|
||||
|
|
|
|||
|
|
@ -258,7 +258,16 @@ Goal::Co PathSubstitutionGoal::tryToRun(
|
|||
true,
|
||||
false);
|
||||
|
||||
co_await Suspend{};
|
||||
while (true) {
|
||||
auto event = co_await WaitForChildEvent{};
|
||||
if (std::get_if<ChildOutput>(&event)) {
|
||||
// Substitution doesn't process child output
|
||||
} else if (std::get_if<ChildEOF>(&event)) {
|
||||
break;
|
||||
} else if (std::get_if<TimedOut>(&event)) {
|
||||
unreachable(); // Substitution doesn't use timeouts
|
||||
}
|
||||
}
|
||||
|
||||
trace("substitute finished");
|
||||
|
||||
|
|
@ -310,11 +319,6 @@ Goal::Co PathSubstitutionGoal::tryToRun(
|
|||
co_return doneSuccess(BuildResult::Success::Substituted);
|
||||
}
|
||||
|
||||
void PathSubstitutionGoal::handleEOF(Descriptor fd)
|
||||
{
|
||||
worker.wakeUp(shared_from_this());
|
||||
}
|
||||
|
||||
void PathSubstitutionGoal::cleanup()
|
||||
{
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -479,14 +479,13 @@ void Worker::waitForInput()
|
|||
|
||||
if (goal->exitCode == Goal::ecBusy && 0 != settings.maxSilentTime && j->respectTimeouts
|
||||
&& after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) {
|
||||
goal->timedOut(
|
||||
Error("%1% timed out after %2% seconds of silence", goal->getName(), settings.maxSilentTime));
|
||||
goal->timedOut(TimedOut(settings.maxSilentTime));
|
||||
}
|
||||
|
||||
else if (
|
||||
goal->exitCode == Goal::ecBusy && 0 != settings.buildTimeout && j->respectTimeouts
|
||||
&& after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) {
|
||||
goal->timedOut(Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout));
|
||||
goal->timedOut(TimedOut(settings.buildTimeout));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -100,8 +100,6 @@ private:
|
|||
|
||||
std::map<ActivityId, Activity> builderActivities;
|
||||
|
||||
void timedOut(Error && ex) override;
|
||||
|
||||
std::string key() override;
|
||||
|
||||
/**
|
||||
|
|
@ -129,10 +127,10 @@ private:
|
|||
bool isReadDesc(Descriptor fd);
|
||||
|
||||
/**
|
||||
* Callback used by the worker to write to the log.
|
||||
* Process output from a child process.
|
||||
*/
|
||||
void handleChildOutput(Descriptor fd, std::string_view data) override;
|
||||
void handleEOF(Descriptor fd) override;
|
||||
Co processChildOutput(Descriptor fd, std::string_view data);
|
||||
|
||||
void flushLine();
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -52,11 +52,6 @@ struct DerivationGoal : public Goal
|
|||
bool storeDerivation);
|
||||
~DerivationGoal() = default;
|
||||
|
||||
void timedOut(Error && ex) override
|
||||
{
|
||||
unreachable();
|
||||
};
|
||||
|
||||
std::string key() override;
|
||||
|
||||
JobCategory jobCategory() const override
|
||||
|
|
|
|||
|
|
@ -43,8 +43,6 @@ struct DerivationResolutionGoal : public Goal
|
|||
*/
|
||||
std::unique_ptr<std::pair<StorePath, BasicDerivation>> resolvedDrv;
|
||||
|
||||
void timedOut(Error && ex) override {}
|
||||
|
||||
private:
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -109,8 +109,6 @@ struct DerivationTrampolineGoal : public Goal
|
|||
|
||||
virtual ~DerivationTrampolineGoal();
|
||||
|
||||
void timedOut(Error && ex) override {}
|
||||
|
||||
std::string key() override;
|
||||
|
||||
JobCategory jobCategory() const override
|
||||
|
|
|
|||
|
|
@ -33,15 +33,8 @@ public:
|
|||
|
||||
Co init();
|
||||
|
||||
void timedOut(Error && ex) override
|
||||
{
|
||||
unreachable();
|
||||
};
|
||||
|
||||
std::string key() override;
|
||||
|
||||
void handleEOF(Descriptor fd) override;
|
||||
|
||||
JobCategory jobCategory() const override
|
||||
{
|
||||
return JobCategory::Substitution;
|
||||
|
|
|
|||
|
|
@ -5,9 +5,18 @@
|
|||
#include "nix/store/build-result.hh"
|
||||
|
||||
#include <coroutine>
|
||||
#include <queue>
|
||||
#include <variant>
|
||||
|
||||
namespace nix {
|
||||
|
||||
struct TimedOut : BuildError
|
||||
{
|
||||
time_t maxDuration;
|
||||
|
||||
TimedOut(time_t maxDuration);
|
||||
};
|
||||
|
||||
/**
|
||||
* Forward definition.
|
||||
*/
|
||||
|
|
@ -138,6 +147,29 @@ public:
|
|||
friend Goal;
|
||||
};
|
||||
|
||||
/**
|
||||
* Event types for child process communication, delivered via coroutines.
|
||||
*/
|
||||
struct ChildOutput
|
||||
{
|
||||
Descriptor fd;
|
||||
std::string data;
|
||||
};
|
||||
|
||||
struct ChildEOF
|
||||
{
|
||||
Descriptor fd;
|
||||
};
|
||||
|
||||
using ChildEvent = std::variant<ChildOutput, ChildEOF, TimedOut>;
|
||||
|
||||
/**
|
||||
* Tag type for `co_await`-ing child events.
|
||||
* Returns a `ChildEvent` when resumed.
|
||||
*/
|
||||
struct WaitForChildEvent
|
||||
{};
|
||||
|
||||
// forward declaration of promise_type, see below
|
||||
struct promise_type;
|
||||
|
||||
|
|
@ -276,6 +308,28 @@ public:
|
|||
*/
|
||||
bool alive = true;
|
||||
|
||||
class
|
||||
{
|
||||
/**
|
||||
* Structured queue of child events:
|
||||
* - outputs: stream of data from child
|
||||
* - eof: optional end-of-stream marker
|
||||
* - timeout: optional timeout that flushes/overrides other events
|
||||
*/
|
||||
std::queue<ChildOutput> childOutputs;
|
||||
std::optional<ChildEOF> childEOF;
|
||||
std::optional<TimedOut> childTimeout;
|
||||
|
||||
public:
|
||||
|
||||
void pushChildEvent(ChildOutput event);
|
||||
void pushChildEvent(ChildEOF event);
|
||||
void pushChildEvent(TimedOut event);
|
||||
bool hasChildEvent() const;
|
||||
ChildEvent popChildEvent();
|
||||
|
||||
} childEvents;
|
||||
|
||||
/**
|
||||
* The awaiter used by @ref final_suspend.
|
||||
*/
|
||||
|
|
@ -369,13 +423,66 @@ public:
|
|||
return static_cast<Co &&>(co);
|
||||
}
|
||||
|
||||
/**
|
||||
* Awaiter for @ref Suspend. Always suspends, but asserts
|
||||
* there are no pending child events (those should be
|
||||
* consumed first via @ref WaitForChildEvent).
|
||||
*/
|
||||
struct SuspendAwaiter
|
||||
{
|
||||
promise_type & promise;
|
||||
|
||||
bool await_ready()
|
||||
{
|
||||
assert(!promise.childEvents.hasChildEvent());
|
||||
return false;
|
||||
}
|
||||
|
||||
void await_suspend(handle_type) {}
|
||||
|
||||
void await_resume() {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Allows awaiting a @ref Suspend.
|
||||
* Always suspends.
|
||||
*/
|
||||
std::suspend_always await_transform(Suspend)
|
||||
SuspendAwaiter await_transform(Suspend)
|
||||
{
|
||||
return {};
|
||||
return SuspendAwaiter{*this};
|
||||
};
|
||||
|
||||
/**
|
||||
* Awaiter for child events. Suspends and returns the
|
||||
* pending child event when resumed.
|
||||
*/
|
||||
struct ChildEventAwaiter
|
||||
{
|
||||
handle_type handle;
|
||||
|
||||
bool await_ready()
|
||||
{
|
||||
return handle && handle.promise().childEvents.hasChildEvent();
|
||||
}
|
||||
|
||||
void await_suspend(handle_type h)
|
||||
{
|
||||
handle = h;
|
||||
}
|
||||
|
||||
ChildEvent await_resume()
|
||||
{
|
||||
assert(handle);
|
||||
return handle.promise().childEvents.popChildEvent();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Allows awaiting child events (output, EOF, timeout).
|
||||
*/
|
||||
ChildEventAwaiter await_transform(WaitForChildEvent)
|
||||
{
|
||||
return ChildEventAwaiter{handle_type::from_promise(*this)};
|
||||
};
|
||||
};
|
||||
|
||||
|
|
@ -432,15 +539,23 @@ public:
|
|||
|
||||
void work();
|
||||
|
||||
virtual void handleChildOutput(Descriptor fd, std::string_view data)
|
||||
{
|
||||
unreachable();
|
||||
}
|
||||
/**
|
||||
* Called by the worker when data is received from a child process.
|
||||
* Stores the event and resumes the coroutine.
|
||||
*/
|
||||
void handleChildOutput(Descriptor fd, std::string_view data);
|
||||
|
||||
virtual void handleEOF(Descriptor fd)
|
||||
{
|
||||
unreachable();
|
||||
}
|
||||
/**
|
||||
* Called by the worker when EOF is received from a child process.
|
||||
* Stores the event and resumes the coroutine.
|
||||
*/
|
||||
void handleEOF(Descriptor fd);
|
||||
|
||||
/**
|
||||
* Called by the worker when a build times out.
|
||||
* Stores the event and resumes the coroutine.
|
||||
*/
|
||||
void timedOut(TimedOut && ex);
|
||||
|
||||
void trace(std::string_view s);
|
||||
|
||||
|
|
@ -449,13 +564,6 @@ public:
|
|||
return name;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback in case of a timeout. It should wake up its waiters,
|
||||
* get rid of any running child processes that are being monitored
|
||||
* by the worker (important!), etc.
|
||||
*/
|
||||
virtual void timedOut(Error && ex) = 0;
|
||||
|
||||
/**
|
||||
* Used for comparisons. The order matters a bit for scheduling. We
|
||||
* want:
|
||||
|
|
|
|||
|
|
@ -53,11 +53,6 @@ public:
|
|||
std::optional<ContentAddress> ca = std::nullopt);
|
||||
~PathSubstitutionGoal();
|
||||
|
||||
void timedOut(Error && ex) override
|
||||
{
|
||||
unreachable();
|
||||
};
|
||||
|
||||
std::string key() override
|
||||
{
|
||||
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
|
||||
|
|
@ -72,12 +67,6 @@ public:
|
|||
StorePath subPath, nix::ref<Store> sub, std::shared_ptr<const ValidPathInfo> info, bool & substituterFailed);
|
||||
Co finished();
|
||||
|
||||
/**
|
||||
* Callback used by the worker to write to the log.
|
||||
*/
|
||||
void handleChildOutput(Descriptor fd, std::string_view data) override {};
|
||||
void handleEOF(Descriptor fd) override;
|
||||
|
||||
/* Called by destructor, can't be overridden */
|
||||
void cleanup() override final;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue