1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-12-22 17:01:08 +01:00

Use coroutines for worker child I/O

This will enable way more RAII going forward.
This commit is contained in:
John Ericson 2025-12-13 23:09:11 -05:00
parent 906334686c
commit 92e698426b
11 changed files with 237 additions and 83 deletions

View file

@ -71,14 +71,6 @@ void DerivationBuildingGoal::killChild()
#endif
}
void DerivationBuildingGoal::timedOut(TimedOut && ex)
{
killChild();
// We're not inside a coroutine, hence we can't use co_return here.
// Thus we ignore the return value.
[[maybe_unused]] Done _ = doneFailure(std::move(ex));
}
std::string showKnownOutputs(const StoreDirConfig & store, const Derivation & drv)
{
std::string msg;
@ -443,7 +435,20 @@ Goal::Co DerivationBuildingGoal::tryToBuild()
if (useHook) {
buildResult.startTime = time(0); // inexact
started();
co_await Suspend{};
while (true) {
auto event = co_await WaitForChildEvent{};
if (auto * output = std::get_if<ChildOutput>(&event)) {
co_await processChildOutput(output->fd, output->data);
} else if (std::get_if<ChildEOF>(&event)) {
if (!currentLogLine.empty())
flushLine();
break;
} else if (auto * timeout = std::get_if<TimedOut>(&event)) {
killChild();
co_return doneFailure(std::move(*timeout));
}
}
#ifndef _WIN32
assert(hook);
@ -664,7 +669,20 @@ Goal::Co DerivationBuildingGoal::tryToBuild()
worker.childStarted(shared_from_this(), {builderOut}, true, true);
started();
co_await Suspend{};
while (true) {
auto event = co_await WaitForChildEvent{};
if (auto * output = std::get_if<ChildOutput>(&event)) {
co_await processChildOutput(output->fd, output->data);
} else if (std::get_if<ChildEOF>(&event)) {
if (!currentLogLine.empty())
flushLine();
break;
} else if (auto * timeout = std::get_if<TimedOut>(&event)) {
killChild();
co_return doneFailure(std::move(*timeout));
}
}
trace("build done");
@ -997,7 +1015,7 @@ bool DerivationBuildingGoal::isReadDesc(Descriptor fd)
#endif
}
void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view data)
Goal::Co DerivationBuildingGoal::processChildOutput(Descriptor fd, std::string_view data)
{
// local & `ssh://`-builds are dealt with here.
auto isWrittenToLog = isReadDesc(fd);
@ -1005,14 +1023,11 @@ void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view d
logSize += data.size();
if (settings.maxLogSize && logSize > settings.maxLogSize) {
killChild();
// We're not inside a coroutine, hence we can't use co_return here.
// Thus we ignore the return value.
[[maybe_unused]] Done _ = doneFailure(BuildError(
co_return doneFailure(BuildError(
BuildResult::Failure::LogLimitExceeded,
"%s killed after writing more than %d bytes of log output",
getName(),
settings.maxLogSize));
return;
}
for (auto c : data)
@ -1065,13 +1080,7 @@ void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view d
currentHookLine += c;
}
#endif
}
void DerivationBuildingGoal::handleEOF(Descriptor fd)
{
if (!currentLogLine.empty())
flushLine();
worker.wakeUp(shared_from_this());
co_return Return{};
}
void DerivationBuildingGoal::flushLine()

View file

@ -66,7 +66,16 @@ Goal::Co DrvOutputSubstitutionGoal::init()
true,
false);
co_await Suspend{};
while (true) {
auto event = co_await WaitForChildEvent{};
if (std::get_if<ChildOutput>(&event)) {
// Doesn't process child output
} else if (std::get_if<ChildEOF>(&event)) {
break;
} else if (std::get_if<TimedOut>(&event)) {
unreachable();
}
}
worker.childTerminated(this);
@ -149,9 +158,4 @@ std::string DrvOutputSubstitutionGoal::key()
return "a$" + std::string(id.to_string());
}
void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd)
{
worker.wakeUp(shared_from_this());
}
} // namespace nix

View file

@ -12,6 +12,50 @@ TimedOut::TimedOut(time_t maxDuration)
using Co = nix::Goal::Co;
using promise_type = nix::Goal::promise_type;
using ChildEvents = decltype(promise_type::childEvents);
void ChildEvents::pushChildEvent(ChildOutput event)
{
if (childTimeout)
return; // Already timed out, ignore
childOutputs.push(std::move(event));
}
void ChildEvents::pushChildEvent(ChildEOF event)
{
if (childTimeout)
return; // Already timed out, ignore
assert(!childEOF);
childEOF = std::move(event);
}
void ChildEvents::pushChildEvent(TimedOut event)
{
// Timeout is immediate - flush pending events
childOutputs = {};
childEOF.reset();
childTimeout = std::move(event);
}
bool ChildEvents::hasChildEvent() const
{
return !childOutputs.empty() || childEOF || childTimeout;
}
Goal::ChildEvent ChildEvents::popChildEvent()
{
if (!childOutputs.empty()) {
auto event = std::move(childOutputs.front());
childOutputs.pop();
return event;
}
if (childEOF)
return *std::exchange(childEOF, std::nullopt);
if (childTimeout)
return *std::exchange(childTimeout, std::nullopt);
unreachable();
}
using handle_type = nix::Goal::handle_type;
using Suspend = nix::Goal::Suspend;
@ -212,6 +256,27 @@ void Goal::work()
assert(top_co || exitCode != ecBusy);
}
void Goal::handleChildOutput(Descriptor fd, std::string_view data)
{
assert(top_co);
top_co->handle.promise().childEvents.pushChildEvent(ChildOutput{fd, std::string{data}});
worker.wakeUp(shared_from_this());
}
void Goal::handleEOF(Descriptor fd)
{
assert(top_co);
top_co->handle.promise().childEvents.pushChildEvent(ChildEOF{fd});
worker.wakeUp(shared_from_this());
}
void Goal::timedOut(TimedOut && ex)
{
assert(top_co);
top_co->handle.promise().childEvents.pushChildEvent(std::move(ex));
worker.wakeUp(shared_from_this());
}
Goal::Co Goal::yield()
{
worker.wakeUp(shared_from_this());

View file

@ -258,7 +258,16 @@ Goal::Co PathSubstitutionGoal::tryToRun(
true,
false);
co_await Suspend{};
while (true) {
auto event = co_await WaitForChildEvent{};
if (std::get_if<ChildOutput>(&event)) {
// Substitution doesn't process child output
} else if (std::get_if<ChildEOF>(&event)) {
break;
} else if (std::get_if<TimedOut>(&event)) {
unreachable(); // Substitution doesn't use timeouts
}
}
trace("substitute finished");
@ -310,11 +319,6 @@ Goal::Co PathSubstitutionGoal::tryToRun(
co_return doneSuccess(BuildResult::Success::Substituted);
}
void PathSubstitutionGoal::handleEOF(Descriptor fd)
{
worker.wakeUp(shared_from_this());
}
void PathSubstitutionGoal::cleanup()
{
try {

View file

@ -100,8 +100,6 @@ private:
std::map<ActivityId, Activity> builderActivities;
void timedOut(TimedOut && ex) override;
std::string key() override;
/**
@ -129,10 +127,10 @@ private:
bool isReadDesc(Descriptor fd);
/**
* Callback used by the worker to write to the log.
* Process output from a child process.
*/
void handleChildOutput(Descriptor fd, std::string_view data) override;
void handleEOF(Descriptor fd) override;
Co processChildOutput(Descriptor fd, std::string_view data);
void flushLine();
/**

View file

@ -52,11 +52,6 @@ struct DerivationGoal : public Goal
bool storeDerivation);
~DerivationGoal() = default;
void timedOut(TimedOut && ex) override
{
unreachable();
};
std::string key() override;
JobCategory jobCategory() const override

View file

@ -43,8 +43,6 @@ struct DerivationResolutionGoal : public Goal
*/
std::unique_ptr<std::pair<StorePath, BasicDerivation>> resolvedDrv;
void timedOut(TimedOut && ex) override {}
private:
/**

View file

@ -109,8 +109,6 @@ struct DerivationTrampolineGoal : public Goal
virtual ~DerivationTrampolineGoal();
void timedOut(TimedOut && ex) override {}
std::string key() override;
JobCategory jobCategory() const override

View file

@ -33,15 +33,8 @@ public:
Co init();
void timedOut(TimedOut && ex) override
{
unreachable();
};
std::string key() override;
void handleEOF(Descriptor fd) override;
JobCategory jobCategory() const override
{
return JobCategory::Substitution;

View file

@ -5,6 +5,8 @@
#include "nix/store/build-result.hh"
#include <coroutine>
#include <queue>
#include <variant>
namespace nix {
@ -145,6 +147,29 @@ public:
friend Goal;
};
/**
* Event types for child process communication, delivered via coroutines.
*/
struct ChildOutput
{
Descriptor fd;
std::string data;
};
struct ChildEOF
{
Descriptor fd;
};
using ChildEvent = std::variant<ChildOutput, ChildEOF, TimedOut>;
/**
* Tag type for `co_await`-ing child events.
* Returns a `ChildEvent` when resumed.
*/
struct WaitForChildEvent
{};
// forward declaration of promise_type, see below
struct promise_type;
@ -283,6 +308,28 @@ public:
*/
bool alive = true;
class
{
/**
* Structured queue of child events:
* - outputs: stream of data from child
* - eof: optional end-of-stream marker
* - timeout: optional timeout that flushes/overrides other events
*/
std::queue<ChildOutput> childOutputs;
std::optional<ChildEOF> childEOF;
std::optional<TimedOut> childTimeout;
public:
void pushChildEvent(ChildOutput event);
void pushChildEvent(ChildEOF event);
void pushChildEvent(TimedOut event);
bool hasChildEvent() const;
ChildEvent popChildEvent();
} childEvents;
/**
* The awaiter used by @ref final_suspend.
*/
@ -376,13 +423,66 @@ public:
return static_cast<Co &&>(co);
}
/**
* Awaiter for @ref Suspend. Always suspends, but asserts
* there are no pending child events (those should be
* consumed first via @ref WaitForChildEvent).
*/
struct SuspendAwaiter
{
promise_type & promise;
bool await_ready()
{
assert(!promise.childEvents.hasChildEvent());
return false;
}
void await_suspend(handle_type) {}
void await_resume() {}
};
/**
* Allows awaiting a @ref Suspend.
* Always suspends.
*/
std::suspend_always await_transform(Suspend)
SuspendAwaiter await_transform(Suspend)
{
return {};
return SuspendAwaiter{*this};
};
/**
* Awaiter for child events. Suspends and returns the
* pending child event when resumed.
*/
struct ChildEventAwaiter
{
handle_type handle;
bool await_ready()
{
return handle && handle.promise().childEvents.hasChildEvent();
}
void await_suspend(handle_type h)
{
handle = h;
}
ChildEvent await_resume()
{
assert(handle);
return handle.promise().childEvents.popChildEvent();
}
};
/**
* Allows awaiting child events (output, EOF, timeout).
*/
ChildEventAwaiter await_transform(WaitForChildEvent)
{
return ChildEventAwaiter{handle_type::from_promise(*this)};
};
};
@ -439,15 +539,23 @@ public:
void work();
virtual void handleChildOutput(Descriptor fd, std::string_view data)
{
unreachable();
}
/**
* Called by the worker when data is received from a child process.
* Stores the event and resumes the coroutine.
*/
void handleChildOutput(Descriptor fd, std::string_view data);
virtual void handleEOF(Descriptor fd)
{
unreachable();
}
/**
* Called by the worker when EOF is received from a child process.
* Stores the event and resumes the coroutine.
*/
void handleEOF(Descriptor fd);
/**
* Called by the worker when a build times out.
* Stores the event and resumes the coroutine.
*/
void timedOut(TimedOut && ex);
void trace(std::string_view s);
@ -456,13 +564,6 @@ public:
return name;
}
/**
* Callback in case of a timeout. It should wake up its waiters,
* get rid of any running child processes that are being monitored
* by the worker (important!), etc.
*/
virtual void timedOut(TimedOut && ex) = 0;
/**
* Used for comparisons. The order matters a bit for scheduling. We
* want:

View file

@ -53,11 +53,6 @@ public:
std::optional<ContentAddress> ca = std::nullopt);
~PathSubstitutionGoal();
void timedOut(TimedOut && ex) override
{
unreachable();
};
std::string key() override
{
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
@ -72,12 +67,6 @@ public:
StorePath subPath, nix::ref<Store> sub, std::shared_ptr<const ValidPathInfo> info, bool & substituterFailed);
Co finished();
/**
* Callback used by the worker to write to the log.
*/
void handleChildOutput(Descriptor fd, std::string_view data) override {};
void handleEOF(Descriptor fd) override;
/* Called by destructor, can't be overridden */
void cleanup() override final;