diff --git a/src/libfetchers-tests/git-utils.cc b/src/libfetchers-tests/git-utils.cc index cdd920e54..0b21fd0c6 100644 --- a/src/libfetchers-tests/git-utils.cc +++ b/src/libfetchers-tests/git-utils.cc @@ -115,9 +115,10 @@ TEST_F(GitUtilsTest, sink_hardlink) try { sink->createHardlink(CanonPath("foo-1.1/link"), CanonPath("hello")); + sink->flush(); FAIL() << "Expected an exception"; } catch (const nix::Error & e) { - ASSERT_THAT(e.msg(), testing::HasSubstr("cannot find hard link target")); + ASSERT_THAT(e.msg(), testing::HasSubstr("does not exist")); ASSERT_THAT(e.msg(), testing::HasSubstr("/hello")); ASSERT_THAT(e.msg(), testing::HasSubstr("foo-1.1/link")); } diff --git a/src/libfetchers/git-utils.cc b/src/libfetchers/git-utils.cc index c729f98d3..4ad90a636 100644 --- a/src/libfetchers/git-utils.cc +++ b/src/libfetchers/git-utils.cc @@ -369,7 +369,26 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this { // TODO: as an optimization, it would be nice to include `this` in the pool. return Pool(std::numeric_limits::max(), [this]() -> ref { - return make_ref(path, options); + auto repo = make_ref(path, options); + + /* Monkey-patching the pack backend to only read the pack directory + once. Otherwise it will do a readdir for each added oid when it's + not found and that translates to ~6 syscalls. Since we are never + writing pack files until flushing we can force the odb backend to + read the directory just once. It's very convenient that the vtable is + semi-public interface and is up for grabs. + + This is purely an optimization for our use-case with a tarball cache. + libgit2 calls refresh() if the backend provides it when an oid isn't found. + We are only writing objects to a mempack (it has higher priority) and there isn't + a realistic use-case where a previously missing object would appear from thin air + on the disk (unless another process happens to be unpacking a similar tarball to + the cache at the same time, but that's a very unrealistic scenario). + */ + if (auto * backend = repo->packBackend) + backend->refresh = nullptr; + + return repo; }); } @@ -668,6 +687,7 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this keyDecoded = base64::decode(k.key); } catch (Error & e) { e.addTrace({}, "while decoding public key '%s' used for git signature", k.key); + throw; } auto fingerprint = trim(hashString(HashAlgorithm::SHA256, keyDecoded).to_string(nix::HashFormat::Base64, false), "="); @@ -1058,185 +1078,155 @@ struct GitFileSystemObjectSinkImpl : GitFileSystemObjectSink { ref repo; - struct PendingDir - { - std::string name; - TreeBuilder builder; - }; + Pool repoPool; - std::vector pendingDirs; + unsigned int concurrency = std::min(std::thread::hardware_concurrency(), 10U); - /** - * Temporary buffer used by createRegularFile for storing small file contents. - */ - std::string regularFileContentsBuffer; + ThreadPool workers{concurrency}; - /** - * If repo has a non-null packBackend, this has a copy of the refresh function - * from the backend virtual table. This is needed to restore it after we've flushed - * the sink. We modify it to avoid unnecessary I/O on non-existent oids. - */ - decltype(::git_odb_backend::refresh) packfileOdbRefresh = nullptr; + /** Total file contents in flight. */ + std::atomic totalBufSize{0}; - void pushBuilder(std::string name) - { - const git_tree_entry * entry; - Tree prevTree = nullptr; - - if (!pendingDirs.empty() && (entry = git_treebuilder_get(pendingDirs.back().builder.get(), name.c_str()))) { - /* Clone a tree that we've already finished. This happens - if a tarball has directory entries that are not - contiguous. */ - if (git_tree_entry_type(entry) != GIT_OBJECT_TREE) - throw Error("parent of '%s' is not a directory", name); - - if (git_tree_entry_to_object((git_object **) (git_tree **) Setter(prevTree), *repo, entry)) - throw Error("looking up parent of '%s': %s", name, git_error_last()->message); - } - - git_treebuilder * b; - if (git_treebuilder_new(&b, *repo, prevTree.get())) - throw Error("creating a tree builder: %s", git_error_last()->message); - pendingDirs.push_back({.name = std::move(name), .builder = TreeBuilder(b)}); - }; + static constexpr std::size_t maxBufSize = 16 * 1024 * 1024; GitFileSystemObjectSinkImpl(ref repo) : repo(repo) + , repoPool(repo->getPool()) { - /* Monkey-patching the pack backend to only read the pack directory - once. Otherwise it will do a readdir for each added oid when it's - not found and that translates to ~6 syscalls. Since we are never - writing pack files until flushing we can force the odb backend to - read the directory just once. It's very convenient that the vtable is - semi-public interface and is up for grabs. - - This is purely an optimization for our use-case with a tarball cache. - libgit2 calls refresh() if the backend provides it when an oid isn't found. - We are only writing objects to a mempack (it has higher priority) and there isn't - a realistic use-case where a previously missing object would appear from thin air - on the disk (unless another process happens to be unpacking a similar tarball to - the cache at the same time, but that's a very unrealistic scenario). - */ - if (auto * backend = repo->packBackend) { - if (backend->refresh(backend)) /* Refresh just once manually. */ - throw Error("refreshing packfiles: %s", git_error_last()->message); - /* Save the function pointer to restore it later in flush() and - unset it in the vtable. libgit2 does nothing if it's a nullptr: - https://github.com/libgit2/libgit2/blob/58d9363f02f1fa39e46d49b604f27008e75b72f2/src/libgit2/odb.c#L1922 - */ - packfileOdbRefresh = std::exchange(backend->refresh, nullptr); - } - pushBuilder(""); } - std::pair popBuilder() + ~GitFileSystemObjectSinkImpl() { - assert(!pendingDirs.empty()); - auto pending = std::move(pendingDirs.back()); - git_oid oid; - if (git_treebuilder_write(&oid, pending.builder.get())) - throw Error("creating a tree object: %s", git_error_last()->message); - pendingDirs.pop_back(); - return {oid, pending.name}; + // Make sure the worker threads are destroyed before any state + // they're referring to. + workers.shutdown(); + } + + struct Child; + + /// A directory to be written as a Git tree. + struct Directory + { + std::map children; + std::optional oid; + + Child & lookup(const CanonPath & path) + { + assert(!path.isRoot()); + auto parent = path.parent(); + auto cur = this; + for (auto & name : *parent) { + auto i = cur->children.find(std::string(name)); + if (i == cur->children.end()) + throw Error("path '%s' does not exist", path); + auto dir = std::get_if(&i->second.file); + if (!dir) + throw Error("path '%s' has a non-directory parent", path); + cur = dir; + } + + auto i = cur->children.find(std::string(*path.baseName())); + if (i == cur->children.end()) + throw Error("path '%s' does not exist", path); + return i->second; + } }; - void addToTree(const std::string & name, const git_oid & oid, git_filemode_t mode) + size_t nextId = 0; // for Child.id + + struct Child { - assert(!pendingDirs.empty()); - auto & pending = pendingDirs.back(); - if (git_treebuilder_insert(nullptr, pending.builder.get(), name.c_str(), &oid, mode)) - throw Error("adding a file to a tree builder: %s", git_error_last()->message); + git_filemode_t mode; + std::variant file; + + /// Sequential numbering of the file in the tarball. This is + /// used to make sure we only import the latest version of a + /// path. + size_t id{0}; }; - void updateBuilders(std::span names) + struct State { - // Find the common prefix of pendingDirs and names. - size_t prefixLen = 0; - for (; prefixLen < names.size() && prefixLen + 1 < pendingDirs.size(); ++prefixLen) - if (names[prefixLen] != pendingDirs[prefixLen + 1].name) - break; + Directory root; + }; - // Finish the builders that are not part of the common prefix. - for (auto n = pendingDirs.size(); n > prefixLen + 1; --n) { - auto [oid, name] = popBuilder(); - addToTree(name, oid, GIT_FILEMODE_TREE); + Sync _state; + + void addNode(State & state, const CanonPath & path, Child && child) + { + assert(!path.isRoot()); + auto parent = path.parent(); + + Directory * cur = &state.root; + + for (auto & i : *parent) { + auto child = std::get_if( + &cur->children.emplace(std::string(i), Child{GIT_FILEMODE_TREE, {Directory()}}).first->second.file); + assert(child); + cur = child; } - // Create builders for the new directories. - for (auto n = prefixLen; n < names.size(); ++n) - pushBuilder(names[n]); - }; + std::string name(*path.baseName()); - bool prepareDirs(const std::vector & pathComponents, bool isDir) - { - std::span pathComponents2{pathComponents}; - - updateBuilders(isDir ? pathComponents2 : pathComponents2.first(pathComponents2.size() - 1)); - - return true; + if (auto prev = cur->children.find(name); prev == cur->children.end() || prev->second.id < child.id) + cur->children.insert_or_assign(name, std::move(child)); } void createRegularFile(const CanonPath & path, std::function func) override { - auto pathComponents = tokenizeString>(path.rel(), "/"); - if (!prepareDirs(pathComponents, false)) - return; + checkInterrupt(); + + /* Multithreaded blob writing. We read the incoming file data into memory and asynchronously write it to a Git + blob object. However, to avoid unbounded memory usage, if the amount of data in flight exceeds a threshold, + we switch to writing directly to a Git write stream. */ using WriteStream = std::unique_ptr<::git_writestream, decltype([](::git_writestream * stream) { if (stream) stream->free(stream); })>; - /* Maximum file size that gets buffered in memory before flushing to a WriteStream, - that's backed by a temporary objects/streamed_git2_* file. We should avoid that - for common cases, since creating (and deleting) a temporary file for each blob - is insanely expensive. */ - static constexpr std::size_t maxBufferSize = 1024 * 1024; /* 1 MiB */ - struct CRF : CreateRegularFileSink { - const CanonPath & path; - GitFileSystemObjectSinkImpl & back; + CanonPath path; + GitFileSystemObjectSinkImpl & parent; WriteStream stream; - std::string & contents; + std::optional repo; + + std::string contents; bool executable = false; - CRF(const CanonPath & path, GitFileSystemObjectSinkImpl & back, std::string & regularFileContentsBuffer) - : path(path) - , back(back) - , stream(nullptr) - , contents(regularFileContentsBuffer) + CRF(CanonPath path, GitFileSystemObjectSinkImpl & parent) + : path(std::move(path)) + , parent(parent) { - contents.clear(); } - void writeToStream(std::string_view data) + ~CRF() { - /* Lazily create the stream. */ - if (!stream) { - ::git_writestream * stream2 = nullptr; - if (git_blob_create_from_stream(&stream2, *back.repo, nullptr)) - throw Error("creating a blob stream object: %s", git_error_last()->message); - stream = WriteStream{stream2}; - assert(stream); - } - - if (stream->write(stream.get(), data.data(), data.size())) - throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message); + parent.totalBufSize -= contents.size(); } void operator()(std::string_view data) override { - /* Already in slow path. Just write to the slow stream. */ - if (stream) { - writeToStream(data); - return; - } + if (!stream) { + contents.append(data); + parent.totalBufSize += data.size(); - contents += data; - if (contents.size() > maxBufferSize) { - writeToStream(contents); /* Will initialize stream. */ - contents.clear(); + if (parent.totalBufSize > parent.maxBufSize) { + repo.emplace(parent.repoPool.get()); + + if (git_blob_create_from_stream(Setter(stream), **repo, nullptr)) + throw Error("creating a blob stream object: %s", git_error_last()->message); + + if (stream->write(stream.get(), contents.data(), contents.size())) + throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message); + + parent.totalBufSize -= contents.size(); + contents.clear(); + } + } else { + if (stream->write(stream.get(), data.data(), data.size())) + throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message); } } @@ -1244,112 +1234,140 @@ struct GitFileSystemObjectSinkImpl : GitFileSystemObjectSink { executable = true; } - } crf{path, *this, regularFileContentsBuffer}; + }; - func(crf); + auto crf = std::make_shared(path, *this); - git_oid oid; - if (crf.stream) { - /* Call .release(), since git_blob_create_from_stream_commit + func(*crf); + + auto id = nextId++; + + if (crf->stream) { + /* Finish the slow path by creating the blob object synchronously. + Call .release(), since git_blob_create_from_stream_commit acquires ownership and frees the stream. */ - if (git_blob_create_from_stream_commit(&oid, crf.stream.release())) + git_oid oid; + if (git_blob_create_from_stream_commit(&oid, crf->stream.release())) throw Error("creating a blob object for '%s': %s", path, git_error_last()->message); - } else { - if (git_blob_create_from_buffer(&oid, *repo, crf.contents.data(), crf.contents.size())) - throw Error( - "creating a blob object for '%s' from in-memory buffer: %s", path, git_error_last()->message); + addNode( + *_state.lock(), + crf->path, + Child{crf->executable ? GIT_FILEMODE_BLOB_EXECUTABLE : GIT_FILEMODE_BLOB, oid, id}); + return; } - addToTree(*pathComponents.rbegin(), oid, crf.executable ? GIT_FILEMODE_BLOB_EXECUTABLE : GIT_FILEMODE_BLOB); + /* Fast path: create the blob object in a separate thread. */ + workers.enqueue([this, crf{std::move(crf)}, id]() { + auto repo(repoPool.get()); + + git_oid oid; + if (git_blob_create_from_buffer(&oid, *repo, crf->contents.data(), crf->contents.size())) + throw Error( + "creating a blob object for '%s' from in-memory buffer: %s", crf->path, git_error_last()->message); + + addNode( + *_state.lock(), + crf->path, + Child{crf->executable ? GIT_FILEMODE_BLOB_EXECUTABLE : GIT_FILEMODE_BLOB, oid, id}); + }); } void createDirectory(const CanonPath & path) override { - auto pathComponents = tokenizeString>(path.rel(), "/"); - (void) prepareDirs(pathComponents, true); + if (path.isRoot()) + return; + auto state(_state.lock()); + addNode(*state, path, {GIT_FILEMODE_TREE, Directory()}); } void createSymlink(const CanonPath & path, const std::string & target) override { - auto pathComponents = tokenizeString>(path.rel(), "/"); - if (!prepareDirs(pathComponents, false)) - return; + workers.enqueue([this, path, target]() { + auto repo(repoPool.get()); - git_oid oid; - if (git_blob_create_from_buffer(&oid, *repo, target.c_str(), target.size())) - throw Error("creating a blob object for tarball symlink member '%s': %s", path, git_error_last()->message); + git_oid oid; + if (git_blob_create_from_buffer(&oid, *repo, target.c_str(), target.size())) + throw Error( + "creating a blob object for tarball symlink member '%s': %s", path, git_error_last()->message); - addToTree(*pathComponents.rbegin(), oid, GIT_FILEMODE_LINK); + auto state(_state.lock()); + addNode(*state, path, Child{GIT_FILEMODE_LINK, oid}); + }); } + std::map hardLinks; + void createHardlink(const CanonPath & path, const CanonPath & target) override { - std::vector pathComponents; - for (auto & c : path) - pathComponents.emplace_back(c); - - if (!prepareDirs(pathComponents, false)) - return; - - // We can't just look up the path from the start of the root, since - // some parent directories may not have finished yet, so we compute - // a relative path that helps us find the right git_tree_builder or object. - auto relTarget = CanonPath(path).parent()->makeRelative(target); - - auto dir = pendingDirs.rbegin(); - - // For each ../ component at the start, go up one directory. - // CanonPath::makeRelative() always puts all .. elements at the start, - // so they're all handled by this loop: - std::string_view relTargetLeft(relTarget); - while (hasPrefix(relTargetLeft, "../")) { - if (dir == pendingDirs.rend()) - throw Error("invalid hard link target '%s' for path '%s'", target, path); - ++dir; - relTargetLeft = relTargetLeft.substr(3); - } - if (dir == pendingDirs.rend()) - throw Error("invalid hard link target '%s' for path '%s'", target, path); - - // Look up the remainder of the target, starting at the - // top-most `git_treebuilder`. - std::variant curDir{dir->builder.get()}; - Object tree; // needed to keep `entry` alive - const git_tree_entry * entry = nullptr; - - for (auto & c : CanonPath(relTargetLeft)) { - if (auto builder = std::get_if(&curDir)) { - assert(*builder); - if (!(entry = git_treebuilder_get(*builder, std::string(c).c_str()))) - throw Error("cannot find hard link target '%s' for path '%s'", target, path); - curDir = *git_tree_entry_id(entry); - } else if (auto oid = std::get_if(&curDir)) { - tree = lookupObject(*repo, *oid, GIT_OBJECT_TREE); - if (!(entry = git_tree_entry_byname((const git_tree *) &*tree, std::string(c).c_str()))) - throw Error("cannot find hard link target '%s' for path '%s'", target, path); - curDir = *git_tree_entry_id(entry); - } - } - - assert(entry); - - addToTree(*pathComponents.rbegin(), *git_tree_entry_id(entry), git_tree_entry_filemode(entry)); + hardLinks.insert_or_assign(path, target); } Hash flush() override { - updateBuilders({}); + workers.process(); - auto [oid, _name] = popBuilder(); - - if (auto * backend = repo->packBackend) { - /* We are done writing blobs, can restore refresh functionality. */ - backend->refresh = packfileOdbRefresh; + /* Create hard links. */ + { + auto state(_state.lock()); + for (auto & [path, target] : hardLinks) { + if (target.isRoot()) + continue; + try { + auto child = state->root.lookup(target); + auto oid = std::get_if(&child.file); + if (!oid) + throw Error("cannot create a hard link to a directory"); + addNode(*state, path, {child.mode, *oid}); + } catch (Error & e) { + e.addTrace(nullptr, "while creating a hard link from '%s' to '%s'", path, target); + throw; + } + } } + // Flush all repo objects to disk. + { + auto repos = repoPool.clear(); + ThreadPool workers{repos.size()}; + for (auto & repo : repos) + workers.enqueue([repo]() { repo->flush(); }); + workers.process(); + } + + // Write the Git trees to disk. Would be nice to have this multithreaded too, but that's hard because a tree + // can't refer to an object that hasn't been written yet. Also it doesn't make a big difference for performance. + auto repo(repoPool.get()); + + [&](this const auto & visit, Directory & node) -> void { + checkInterrupt(); + + // Write the child directories. + for (auto & child : node.children) + if (auto dir = std::get_if(&child.second.file)) + visit(*dir); + + // Write this directory. + git_treebuilder * b; + if (git_treebuilder_new(&b, *repo, nullptr)) + throw Error("creating a tree builder: %s", git_error_last()->message); + TreeBuilder builder(b); + + for (auto & [name, child] : node.children) { + auto oid_p = std::get_if(&child.file); + auto oid = oid_p ? *oid_p : std::get(child.file).oid.value(); + if (git_treebuilder_insert(nullptr, builder.get(), name.c_str(), &oid, child.mode)) + throw Error("adding a file to a tree builder: %s", git_error_last()->message); + } + + git_oid oid; + if (git_treebuilder_write(&oid, builder.get())) + throw Error("creating a tree object: %s", git_error_last()->message); + node.oid = oid; + }(_state.lock()->root); + repo->flush(); - return toHash(oid); + return toHash(_state.lock()->root.oid.value()); } }; diff --git a/src/libutil/include/nix/util/pool.hh b/src/libutil/include/nix/util/pool.hh index a9091c2de..952c29ad5 100644 --- a/src/libutil/include/nix/util/pool.hh +++ b/src/libutil/include/nix/util/pool.hh @@ -211,6 +211,12 @@ public: left.push_back(p); std::swap(state_->idle, left); } + + std::vector> clear() + { + auto state_(state.lock()); + return std::exchange(state_->idle, {}); + } }; } // namespace nix diff --git a/src/libutil/include/nix/util/thread-pool.hh b/src/libutil/include/nix/util/thread-pool.hh index a07354146..61fea39b7 100644 --- a/src/libutil/include/nix/util/thread-pool.hh +++ b/src/libutil/include/nix/util/thread-pool.hh @@ -52,6 +52,12 @@ public: */ void process(); + /** + * Shut down all worker threads and wait until they've exited. + * Active work items are finished, but any pending work items are discarded. + */ + void shutdown(); + private: size_t maxThreads; @@ -72,8 +78,6 @@ private: std::condition_variable work; void doWork(bool mainThread); - - void shutdown(); }; /**