diff --git a/src/libstore/build/derivation-building-goal.cc b/src/libstore/build/derivation-building-goal.cc index 8221e12c6..9e4605f07 100644 --- a/src/libstore/build/derivation-building-goal.cc +++ b/src/libstore/build/derivation-building-goal.cc @@ -71,14 +71,6 @@ void DerivationBuildingGoal::killChild() #endif } -void DerivationBuildingGoal::timedOut(Error && ex) -{ - killChild(); - // We're not inside a coroutine, hence we can't use co_return here. - // Thus we ignore the return value. - [[maybe_unused]] Done _ = doneFailure({BuildResult::Failure::TimedOut, std::move(ex)}); -} - std::string showKnownOutputs(const StoreDirConfig & store, const Derivation & drv) { std::string msg; @@ -443,7 +435,20 @@ Goal::Co DerivationBuildingGoal::tryToBuild() if (useHook) { buildResult.startTime = time(0); // inexact started(); - co_await Suspend{}; + + while (true) { + auto event = co_await WaitForChildEvent{}; + if (auto * output = std::get_if(&event)) { + co_await processChildOutput(output->fd, output->data); + } else if (std::get_if(&event)) { + if (!currentLogLine.empty()) + flushLine(); + break; + } else if (auto * timeout = std::get_if(&event)) { + killChild(); + co_return doneFailure(std::move(*timeout)); + } + } #ifndef _WIN32 assert(hook); @@ -664,7 +669,20 @@ Goal::Co DerivationBuildingGoal::tryToBuild() worker.childStarted(shared_from_this(), {builderOut}, true, true); started(); - co_await Suspend{}; + + while (true) { + auto event = co_await WaitForChildEvent{}; + if (auto * output = std::get_if(&event)) { + co_await processChildOutput(output->fd, output->data); + } else if (std::get_if(&event)) { + if (!currentLogLine.empty()) + flushLine(); + break; + } else if (auto * timeout = std::get_if(&event)) { + killChild(); + co_return doneFailure(std::move(*timeout)); + } + } trace("build done"); @@ -997,7 +1015,7 @@ bool DerivationBuildingGoal::isReadDesc(Descriptor fd) #endif } -void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view data) +Goal::Co DerivationBuildingGoal::processChildOutput(Descriptor fd, std::string_view data) { // local & `ssh://`-builds are dealt with here. auto isWrittenToLog = isReadDesc(fd); @@ -1005,14 +1023,11 @@ void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view d logSize += data.size(); if (settings.maxLogSize && logSize > settings.maxLogSize) { killChild(); - // We're not inside a coroutine, hence we can't use co_return here. - // Thus we ignore the return value. - [[maybe_unused]] Done _ = doneFailure(BuildError( + co_return doneFailure(BuildError( BuildResult::Failure::LogLimitExceeded, "%s killed after writing more than %d bytes of log output", getName(), settings.maxLogSize)); - return; } for (auto c : data) @@ -1065,13 +1080,7 @@ void DerivationBuildingGoal::handleChildOutput(Descriptor fd, std::string_view d currentHookLine += c; } #endif -} - -void DerivationBuildingGoal::handleEOF(Descriptor fd) -{ - if (!currentLogLine.empty()) - flushLine(); - worker.wakeUp(shared_from_this()); + co_return Return{}; } void DerivationBuildingGoal::flushLine() diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index 8d0a307be..03ebb38bb 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -66,7 +66,16 @@ Goal::Co DrvOutputSubstitutionGoal::init() true, false); - co_await Suspend{}; + while (true) { + auto event = co_await WaitForChildEvent{}; + if (std::get_if(&event)) { + // Doesn't process child output + } else if (std::get_if(&event)) { + break; + } else if (std::get_if(&event)) { + unreachable(); + } + } worker.childTerminated(this); @@ -149,9 +158,4 @@ std::string DrvOutputSubstitutionGoal::key() return "a$" + std::string(id.to_string()); } -void DrvOutputSubstitutionGoal::handleEOF(Descriptor fd) -{ - worker.wakeUp(shared_from_this()); -} - } // namespace nix diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index 6266329e7..8e2b6199a 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -4,8 +4,58 @@ namespace nix { +TimedOut::TimedOut(time_t maxDuration) + : BuildError(BuildResult::Failure::TimedOut, "timed out after %1% seconds", maxDuration) + , maxDuration(maxDuration) +{ +} + using Co = nix::Goal::Co; using promise_type = nix::Goal::promise_type; +using ChildEvents = decltype(promise_type::childEvents); + +void ChildEvents::pushChildEvent(ChildOutput event) +{ + if (childTimeout) + return; // Already timed out, ignore + childOutputs.push(std::move(event)); +} + +void ChildEvents::pushChildEvent(ChildEOF event) +{ + if (childTimeout) + return; // Already timed out, ignore + assert(!childEOF); + childEOF = std::move(event); +} + +void ChildEvents::pushChildEvent(TimedOut event) +{ + // Timeout is immediate - flush pending events + childOutputs = {}; + childEOF.reset(); + childTimeout = std::move(event); +} + +bool ChildEvents::hasChildEvent() const +{ + return !childOutputs.empty() || childEOF || childTimeout; +} + +Goal::ChildEvent ChildEvents::popChildEvent() +{ + if (!childOutputs.empty()) { + auto event = std::move(childOutputs.front()); + childOutputs.pop(); + return event; + } + if (childEOF) + return *std::exchange(childEOF, std::nullopt); + if (childTimeout) + return *std::exchange(childTimeout, std::nullopt); + unreachable(); +} + using handle_type = nix::Goal::handle_type; using Suspend = nix::Goal::Suspend; @@ -206,6 +256,27 @@ void Goal::work() assert(top_co || exitCode != ecBusy); } +void Goal::handleChildOutput(Descriptor fd, std::string_view data) +{ + assert(top_co); + top_co->handle.promise().childEvents.pushChildEvent(ChildOutput{fd, std::string{data}}); + worker.wakeUp(shared_from_this()); +} + +void Goal::handleEOF(Descriptor fd) +{ + assert(top_co); + top_co->handle.promise().childEvents.pushChildEvent(ChildEOF{fd}); + worker.wakeUp(shared_from_this()); +} + +void Goal::timedOut(TimedOut && ex) +{ + assert(top_co); + top_co->handle.promise().childEvents.pushChildEvent(std::move(ex)); + worker.wakeUp(shared_from_this()); +} + Goal::Co Goal::yield() { worker.wakeUp(shared_from_this()); diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index ac18de304..51dc87e8d 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -258,7 +258,16 @@ Goal::Co PathSubstitutionGoal::tryToRun( true, false); - co_await Suspend{}; + while (true) { + auto event = co_await WaitForChildEvent{}; + if (std::get_if(&event)) { + // Substitution doesn't process child output + } else if (std::get_if(&event)) { + break; + } else if (std::get_if(&event)) { + unreachable(); // Substitution doesn't use timeouts + } + } trace("substitute finished"); @@ -310,11 +319,6 @@ Goal::Co PathSubstitutionGoal::tryToRun( co_return doneSuccess(BuildResult::Success::Substituted); } -void PathSubstitutionGoal::handleEOF(Descriptor fd) -{ - worker.wakeUp(shared_from_this()); -} - void PathSubstitutionGoal::cleanup() { try { diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index 3663a2c91..754f4b1cc 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -479,14 +479,13 @@ void Worker::waitForInput() if (goal->exitCode == Goal::ecBusy && 0 != settings.maxSilentTime && j->respectTimeouts && after - j->lastOutput >= std::chrono::seconds(settings.maxSilentTime)) { - goal->timedOut( - Error("%1% timed out after %2% seconds of silence", goal->getName(), settings.maxSilentTime)); + goal->timedOut(TimedOut(settings.maxSilentTime)); } else if ( goal->exitCode == Goal::ecBusy && 0 != settings.buildTimeout && j->respectTimeouts && after - j->timeStarted >= std::chrono::seconds(settings.buildTimeout)) { - goal->timedOut(Error("%1% timed out after %2% seconds", goal->getName(), settings.buildTimeout)); + goal->timedOut(TimedOut(settings.buildTimeout)); } } diff --git a/src/libstore/include/nix/store/build/derivation-building-goal.hh b/src/libstore/include/nix/store/build/derivation-building-goal.hh index be95c796b..eb94df88b 100644 --- a/src/libstore/include/nix/store/build/derivation-building-goal.hh +++ b/src/libstore/include/nix/store/build/derivation-building-goal.hh @@ -100,8 +100,6 @@ private: std::map builderActivities; - void timedOut(Error && ex) override; - std::string key() override; /** @@ -129,10 +127,10 @@ private: bool isReadDesc(Descriptor fd); /** - * Callback used by the worker to write to the log. + * Process output from a child process. */ - void handleChildOutput(Descriptor fd, std::string_view data) override; - void handleEOF(Descriptor fd) override; + Co processChildOutput(Descriptor fd, std::string_view data); + void flushLine(); /** diff --git a/src/libstore/include/nix/store/build/derivation-goal.hh b/src/libstore/include/nix/store/build/derivation-goal.hh index 0fe610987..aaded7551 100644 --- a/src/libstore/include/nix/store/build/derivation-goal.hh +++ b/src/libstore/include/nix/store/build/derivation-goal.hh @@ -52,11 +52,6 @@ struct DerivationGoal : public Goal bool storeDerivation); ~DerivationGoal() = default; - void timedOut(Error && ex) override - { - unreachable(); - }; - std::string key() override; JobCategory jobCategory() const override diff --git a/src/libstore/include/nix/store/build/derivation-resolution-goal.hh b/src/libstore/include/nix/store/build/derivation-resolution-goal.hh index fb4c2a346..b79e6bbb7 100644 --- a/src/libstore/include/nix/store/build/derivation-resolution-goal.hh +++ b/src/libstore/include/nix/store/build/derivation-resolution-goal.hh @@ -43,8 +43,6 @@ struct DerivationResolutionGoal : public Goal */ std::unique_ptr> resolvedDrv; - void timedOut(Error && ex) override {} - private: /** diff --git a/src/libstore/include/nix/store/build/derivation-trampoline-goal.hh b/src/libstore/include/nix/store/build/derivation-trampoline-goal.hh index bfed67f63..33276723a 100644 --- a/src/libstore/include/nix/store/build/derivation-trampoline-goal.hh +++ b/src/libstore/include/nix/store/build/derivation-trampoline-goal.hh @@ -109,8 +109,6 @@ struct DerivationTrampolineGoal : public Goal virtual ~DerivationTrampolineGoal(); - void timedOut(Error && ex) override {} - std::string key() override; JobCategory jobCategory() const override diff --git a/src/libstore/include/nix/store/build/drv-output-substitution-goal.hh b/src/libstore/include/nix/store/build/drv-output-substitution-goal.hh index 6310e0d2c..2d42169dd 100644 --- a/src/libstore/include/nix/store/build/drv-output-substitution-goal.hh +++ b/src/libstore/include/nix/store/build/drv-output-substitution-goal.hh @@ -33,15 +33,8 @@ public: Co init(); - void timedOut(Error && ex) override - { - unreachable(); - }; - std::string key() override; - void handleEOF(Descriptor fd) override; - JobCategory jobCategory() const override { return JobCategory::Substitution; diff --git a/src/libstore/include/nix/store/build/goal.hh b/src/libstore/include/nix/store/build/goal.hh index 4d57afc0f..703b14bd8 100644 --- a/src/libstore/include/nix/store/build/goal.hh +++ b/src/libstore/include/nix/store/build/goal.hh @@ -5,9 +5,18 @@ #include "nix/store/build-result.hh" #include +#include +#include namespace nix { +struct TimedOut : BuildError +{ + time_t maxDuration; + + TimedOut(time_t maxDuration); +}; + /** * Forward definition. */ @@ -138,6 +147,29 @@ public: friend Goal; }; + /** + * Event types for child process communication, delivered via coroutines. + */ + struct ChildOutput + { + Descriptor fd; + std::string data; + }; + + struct ChildEOF + { + Descriptor fd; + }; + + using ChildEvent = std::variant; + + /** + * Tag type for `co_await`-ing child events. + * Returns a `ChildEvent` when resumed. + */ + struct WaitForChildEvent + {}; + // forward declaration of promise_type, see below struct promise_type; @@ -276,6 +308,28 @@ public: */ bool alive = true; + class + { + /** + * Structured queue of child events: + * - outputs: stream of data from child + * - eof: optional end-of-stream marker + * - timeout: optional timeout that flushes/overrides other events + */ + std::queue childOutputs; + std::optional childEOF; + std::optional childTimeout; + + public: + + void pushChildEvent(ChildOutput event); + void pushChildEvent(ChildEOF event); + void pushChildEvent(TimedOut event); + bool hasChildEvent() const; + ChildEvent popChildEvent(); + + } childEvents; + /** * The awaiter used by @ref final_suspend. */ @@ -369,13 +423,66 @@ public: return static_cast(co); } + /** + * Awaiter for @ref Suspend. Always suspends, but asserts + * there are no pending child events (those should be + * consumed first via @ref WaitForChildEvent). + */ + struct SuspendAwaiter + { + promise_type & promise; + + bool await_ready() + { + assert(!promise.childEvents.hasChildEvent()); + return false; + } + + void await_suspend(handle_type) {} + + void await_resume() {} + }; + /** * Allows awaiting a @ref Suspend. * Always suspends. */ - std::suspend_always await_transform(Suspend) + SuspendAwaiter await_transform(Suspend) { - return {}; + return SuspendAwaiter{*this}; + }; + + /** + * Awaiter for child events. Suspends and returns the + * pending child event when resumed. + */ + struct ChildEventAwaiter + { + handle_type handle; + + bool await_ready() + { + return handle && handle.promise().childEvents.hasChildEvent(); + } + + void await_suspend(handle_type h) + { + handle = h; + } + + ChildEvent await_resume() + { + assert(handle); + return handle.promise().childEvents.popChildEvent(); + } + }; + + /** + * Allows awaiting child events (output, EOF, timeout). + */ + ChildEventAwaiter await_transform(WaitForChildEvent) + { + return ChildEventAwaiter{handle_type::from_promise(*this)}; }; }; @@ -432,15 +539,23 @@ public: void work(); - virtual void handleChildOutput(Descriptor fd, std::string_view data) - { - unreachable(); - } + /** + * Called by the worker when data is received from a child process. + * Stores the event and resumes the coroutine. + */ + void handleChildOutput(Descriptor fd, std::string_view data); - virtual void handleEOF(Descriptor fd) - { - unreachable(); - } + /** + * Called by the worker when EOF is received from a child process. + * Stores the event and resumes the coroutine. + */ + void handleEOF(Descriptor fd); + + /** + * Called by the worker when a build times out. + * Stores the event and resumes the coroutine. + */ + void timedOut(TimedOut && ex); void trace(std::string_view s); @@ -449,13 +564,6 @@ public: return name; } - /** - * Callback in case of a timeout. It should wake up its waiters, - * get rid of any running child processes that are being monitored - * by the worker (important!), etc. - */ - virtual void timedOut(Error && ex) = 0; - /** * Used for comparisons. The order matters a bit for scheduling. We * want: diff --git a/src/libstore/include/nix/store/build/substitution-goal.hh b/src/libstore/include/nix/store/build/substitution-goal.hh index 5f33b9aa5..9f5315b46 100644 --- a/src/libstore/include/nix/store/build/substitution-goal.hh +++ b/src/libstore/include/nix/store/build/substitution-goal.hh @@ -53,11 +53,6 @@ public: std::optional ca = std::nullopt); ~PathSubstitutionGoal(); - void timedOut(Error && ex) override - { - unreachable(); - }; - std::string key() override { return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath); @@ -72,12 +67,6 @@ public: StorePath subPath, nix::ref sub, std::shared_ptr info, bool & substituterFailed); Co finished(); - /** - * Callback used by the worker to write to the log. - */ - void handleChildOutput(Descriptor fd, std::string_view data) override {}; - void handleEOF(Descriptor fd) override; - /* Called by destructor, can't be overridden */ void cleanup() override final;