#include "nix/store/async-path-writer.hh" #include "nix/util/archive.hh" #include #include namespace nix { struct AsyncPathWriterImpl : AsyncPathWriter { ref store; struct Item { StorePath storePath; std::string contents; std::string name; Hash hash; StorePathSet references; RepairFlag repair; std::promise promise; }; struct State { std::vector items; std::unordered_map> futures; bool quit = false; }; Sync state_; std::thread workerThread; std::condition_variable wakeupCV; AsyncPathWriterImpl(ref store) : store(store) { workerThread = std::thread([&]() { while (true) { std::vector items; { auto state(state_.lock()); while (!state->quit && state->items.empty()) state.wait(wakeupCV); if (state->items.empty() && state->quit) return; std::swap(items, state->items); } try { writePaths(items); for (auto & item : items) item.promise.set_value(); } catch (...) { for (auto & item : items) item.promise.set_exception(std::current_exception()); } } }); } ~AsyncPathWriterImpl() { state_.lock()->quit = true; wakeupCV.notify_all(); workerThread.join(); } StorePath addPath(std::string contents, std::string name, StorePathSet references, RepairFlag repair, bool readOnly) override { auto hash = hashString(HashAlgorithm::SHA256, contents); auto storePath = store->makeFixedOutputPathFromCA( name, TextInfo{ .hash = hash, .references = references, }); if (!readOnly) { auto state(state_.lock()); std::promise promise; state->futures.insert_or_assign(storePath, promise.get_future()); state->items.push_back( Item{ .storePath = storePath, .contents = std::move(contents), .name = std::move(name), .hash = hash, .references = std::move(references), .repair = repair, .promise = std::move(promise), }); wakeupCV.notify_all(); } return storePath; } void waitForPath(const StorePath & path) override { auto future = ({ auto state = state_.lock(); auto i = state->futures.find(path); if (i == state->futures.end()) return; i->second; }); future.get(); } void waitForAllPaths() override { auto futures = ({ auto state(state_.lock()); std::move(state->futures); }); for (auto & future : futures) future.second.get(); } void writePaths(const std::vector & items) { // FIXME: use addMultipeToStore() once it doesn't require a // NAR hash from the client for CA objects. for (auto & item : items) { StringSource source(item.contents); auto storePath = store->addToStoreFromDump( source, item.storePath.name(), FileSerialisationMethod::Flat, ContentAddressMethod::Raw::Text, HashAlgorithm::SHA256, item.references, item.repair); assert(storePath == item.storePath); } } }; ref AsyncPathWriter::make(ref store) { return make_ref(store); } } // namespace nix