1
1
Fork 0
mirror of https://github.com/NixOS/nix.git synced 2025-12-23 17:31:08 +01:00

Merge pull request #12087 from DeterminateSystems/multithreaded-git-sink

Make GitFileSystemObjectSink multi-threaded
This commit is contained in:
Eelco Dolstra 2025-12-19 12:20:22 +00:00 committed by GitHub
commit 6b52fa8360
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 244 additions and 215 deletions

View file

@ -115,9 +115,10 @@ TEST_F(GitUtilsTest, sink_hardlink)
try {
sink->createHardlink(CanonPath("foo-1.1/link"), CanonPath("hello"));
sink->flush();
FAIL() << "Expected an exception";
} catch (const nix::Error & e) {
ASSERT_THAT(e.msg(), testing::HasSubstr("cannot find hard link target"));
ASSERT_THAT(e.msg(), testing::HasSubstr("does not exist"));
ASSERT_THAT(e.msg(), testing::HasSubstr("/hello"));
ASSERT_THAT(e.msg(), testing::HasSubstr("foo-1.1/link"));
}

View file

@ -369,7 +369,26 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
{
// TODO: as an optimization, it would be nice to include `this` in the pool.
return Pool<GitRepoImpl>(std::numeric_limits<size_t>::max(), [this]() -> ref<GitRepoImpl> {
return make_ref<GitRepoImpl>(path, options);
auto repo = make_ref<GitRepoImpl>(path, options);
/* Monkey-patching the pack backend to only read the pack directory
once. Otherwise it will do a readdir for each added oid when it's
not found and that translates to ~6 syscalls. Since we are never
writing pack files until flushing we can force the odb backend to
read the directory just once. It's very convenient that the vtable is
semi-public interface and is up for grabs.
This is purely an optimization for our use-case with a tarball cache.
libgit2 calls refresh() if the backend provides it when an oid isn't found.
We are only writing objects to a mempack (it has higher priority) and there isn't
a realistic use-case where a previously missing object would appear from thin air
on the disk (unless another process happens to be unpacking a similar tarball to
the cache at the same time, but that's a very unrealistic scenario).
*/
if (auto * backend = repo->packBackend)
backend->refresh = nullptr;
return repo;
});
}
@ -668,6 +687,7 @@ struct GitRepoImpl : GitRepo, std::enable_shared_from_this<GitRepoImpl>
keyDecoded = base64::decode(k.key);
} catch (Error & e) {
e.addTrace({}, "while decoding public key '%s' used for git signature", k.key);
throw;
}
auto fingerprint =
trim(hashString(HashAlgorithm::SHA256, keyDecoded).to_string(nix::HashFormat::Base64, false), "=");
@ -1058,185 +1078,155 @@ struct GitFileSystemObjectSinkImpl : GitFileSystemObjectSink
{
ref<GitRepoImpl> repo;
struct PendingDir
{
std::string name;
TreeBuilder builder;
};
Pool<GitRepoImpl> repoPool;
std::vector<PendingDir> pendingDirs;
unsigned int concurrency = std::min(std::thread::hardware_concurrency(), 10U);
/**
* Temporary buffer used by createRegularFile for storing small file contents.
*/
std::string regularFileContentsBuffer;
ThreadPool workers{concurrency};
/**
* If repo has a non-null packBackend, this has a copy of the refresh function
* from the backend virtual table. This is needed to restore it after we've flushed
* the sink. We modify it to avoid unnecessary I/O on non-existent oids.
*/
decltype(::git_odb_backend::refresh) packfileOdbRefresh = nullptr;
/** Total file contents in flight. */
std::atomic<size_t> totalBufSize{0};
void pushBuilder(std::string name)
{
const git_tree_entry * entry;
Tree prevTree = nullptr;
if (!pendingDirs.empty() && (entry = git_treebuilder_get(pendingDirs.back().builder.get(), name.c_str()))) {
/* Clone a tree that we've already finished. This happens
if a tarball has directory entries that are not
contiguous. */
if (git_tree_entry_type(entry) != GIT_OBJECT_TREE)
throw Error("parent of '%s' is not a directory", name);
if (git_tree_entry_to_object((git_object **) (git_tree **) Setter(prevTree), *repo, entry))
throw Error("looking up parent of '%s': %s", name, git_error_last()->message);
}
git_treebuilder * b;
if (git_treebuilder_new(&b, *repo, prevTree.get()))
throw Error("creating a tree builder: %s", git_error_last()->message);
pendingDirs.push_back({.name = std::move(name), .builder = TreeBuilder(b)});
};
static constexpr std::size_t maxBufSize = 16 * 1024 * 1024;
GitFileSystemObjectSinkImpl(ref<GitRepoImpl> repo)
: repo(repo)
, repoPool(repo->getPool())
{
/* Monkey-patching the pack backend to only read the pack directory
once. Otherwise it will do a readdir for each added oid when it's
not found and that translates to ~6 syscalls. Since we are never
writing pack files until flushing we can force the odb backend to
read the directory just once. It's very convenient that the vtable is
semi-public interface and is up for grabs.
This is purely an optimization for our use-case with a tarball cache.
libgit2 calls refresh() if the backend provides it when an oid isn't found.
We are only writing objects to a mempack (it has higher priority) and there isn't
a realistic use-case where a previously missing object would appear from thin air
on the disk (unless another process happens to be unpacking a similar tarball to
the cache at the same time, but that's a very unrealistic scenario).
*/
if (auto * backend = repo->packBackend) {
if (backend->refresh(backend)) /* Refresh just once manually. */
throw Error("refreshing packfiles: %s", git_error_last()->message);
/* Save the function pointer to restore it later in flush() and
unset it in the vtable. libgit2 does nothing if it's a nullptr:
https://github.com/libgit2/libgit2/blob/58d9363f02f1fa39e46d49b604f27008e75b72f2/src/libgit2/odb.c#L1922
*/
packfileOdbRefresh = std::exchange(backend->refresh, nullptr);
}
pushBuilder("");
}
std::pair<git_oid, std::string> popBuilder()
~GitFileSystemObjectSinkImpl()
{
assert(!pendingDirs.empty());
auto pending = std::move(pendingDirs.back());
git_oid oid;
if (git_treebuilder_write(&oid, pending.builder.get()))
throw Error("creating a tree object: %s", git_error_last()->message);
pendingDirs.pop_back();
return {oid, pending.name};
// Make sure the worker threads are destroyed before any state
// they're referring to.
workers.shutdown();
}
struct Child;
/// A directory to be written as a Git tree.
struct Directory
{
std::map<std::string, Child> children;
std::optional<git_oid> oid;
Child & lookup(const CanonPath & path)
{
assert(!path.isRoot());
auto parent = path.parent();
auto cur = this;
for (auto & name : *parent) {
auto i = cur->children.find(std::string(name));
if (i == cur->children.end())
throw Error("path '%s' does not exist", path);
auto dir = std::get_if<Directory>(&i->second.file);
if (!dir)
throw Error("path '%s' has a non-directory parent", path);
cur = dir;
}
auto i = cur->children.find(std::string(*path.baseName()));
if (i == cur->children.end())
throw Error("path '%s' does not exist", path);
return i->second;
}
};
void addToTree(const std::string & name, const git_oid & oid, git_filemode_t mode)
size_t nextId = 0; // for Child.id
struct Child
{
assert(!pendingDirs.empty());
auto & pending = pendingDirs.back();
if (git_treebuilder_insert(nullptr, pending.builder.get(), name.c_str(), &oid, mode))
throw Error("adding a file to a tree builder: %s", git_error_last()->message);
git_filemode_t mode;
std::variant<Directory, git_oid> file;
/// Sequential numbering of the file in the tarball. This is
/// used to make sure we only import the latest version of a
/// path.
size_t id{0};
};
void updateBuilders(std::span<const std::string> names)
struct State
{
// Find the common prefix of pendingDirs and names.
size_t prefixLen = 0;
for (; prefixLen < names.size() && prefixLen + 1 < pendingDirs.size(); ++prefixLen)
if (names[prefixLen] != pendingDirs[prefixLen + 1].name)
break;
Directory root;
};
// Finish the builders that are not part of the common prefix.
for (auto n = pendingDirs.size(); n > prefixLen + 1; --n) {
auto [oid, name] = popBuilder();
addToTree(name, oid, GIT_FILEMODE_TREE);
Sync<State> _state;
void addNode(State & state, const CanonPath & path, Child && child)
{
assert(!path.isRoot());
auto parent = path.parent();
Directory * cur = &state.root;
for (auto & i : *parent) {
auto child = std::get_if<Directory>(
&cur->children.emplace(std::string(i), Child{GIT_FILEMODE_TREE, {Directory()}}).first->second.file);
assert(child);
cur = child;
}
// Create builders for the new directories.
for (auto n = prefixLen; n < names.size(); ++n)
pushBuilder(names[n]);
};
std::string name(*path.baseName());
bool prepareDirs(const std::vector<std::string> & pathComponents, bool isDir)
{
std::span<const std::string> pathComponents2{pathComponents};
updateBuilders(isDir ? pathComponents2 : pathComponents2.first(pathComponents2.size() - 1));
return true;
if (auto prev = cur->children.find(name); prev == cur->children.end() || prev->second.id < child.id)
cur->children.insert_or_assign(name, std::move(child));
}
void createRegularFile(const CanonPath & path, std::function<void(CreateRegularFileSink &)> func) override
{
auto pathComponents = tokenizeString<std::vector<std::string>>(path.rel(), "/");
if (!prepareDirs(pathComponents, false))
return;
checkInterrupt();
/* Multithreaded blob writing. We read the incoming file data into memory and asynchronously write it to a Git
blob object. However, to avoid unbounded memory usage, if the amount of data in flight exceeds a threshold,
we switch to writing directly to a Git write stream. */
using WriteStream = std::unique_ptr<::git_writestream, decltype([](::git_writestream * stream) {
if (stream)
stream->free(stream);
})>;
/* Maximum file size that gets buffered in memory before flushing to a WriteStream,
that's backed by a temporary objects/streamed_git2_* file. We should avoid that
for common cases, since creating (and deleting) a temporary file for each blob
is insanely expensive. */
static constexpr std::size_t maxBufferSize = 1024 * 1024; /* 1 MiB */
struct CRF : CreateRegularFileSink
{
const CanonPath & path;
GitFileSystemObjectSinkImpl & back;
CanonPath path;
GitFileSystemObjectSinkImpl & parent;
WriteStream stream;
std::string & contents;
std::optional<decltype(parent.repoPool)::Handle> repo;
std::string contents;
bool executable = false;
CRF(const CanonPath & path, GitFileSystemObjectSinkImpl & back, std::string & regularFileContentsBuffer)
: path(path)
, back(back)
, stream(nullptr)
, contents(regularFileContentsBuffer)
CRF(CanonPath path, GitFileSystemObjectSinkImpl & parent)
: path(std::move(path))
, parent(parent)
{
contents.clear();
}
void writeToStream(std::string_view data)
~CRF()
{
/* Lazily create the stream. */
if (!stream) {
::git_writestream * stream2 = nullptr;
if (git_blob_create_from_stream(&stream2, *back.repo, nullptr))
throw Error("creating a blob stream object: %s", git_error_last()->message);
stream = WriteStream{stream2};
assert(stream);
}
if (stream->write(stream.get(), data.data(), data.size()))
throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message);
parent.totalBufSize -= contents.size();
}
void operator()(std::string_view data) override
{
/* Already in slow path. Just write to the slow stream. */
if (stream) {
writeToStream(data);
return;
}
if (!stream) {
contents.append(data);
parent.totalBufSize += data.size();
contents += data;
if (contents.size() > maxBufferSize) {
writeToStream(contents); /* Will initialize stream. */
contents.clear();
if (parent.totalBufSize > parent.maxBufSize) {
repo.emplace(parent.repoPool.get());
if (git_blob_create_from_stream(Setter(stream), **repo, nullptr))
throw Error("creating a blob stream object: %s", git_error_last()->message);
if (stream->write(stream.get(), contents.data(), contents.size()))
throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message);
parent.totalBufSize -= contents.size();
contents.clear();
}
} else {
if (stream->write(stream.get(), data.data(), data.size()))
throw Error("writing a blob for tarball member '%s': %s", path, git_error_last()->message);
}
}
@ -1244,112 +1234,140 @@ struct GitFileSystemObjectSinkImpl : GitFileSystemObjectSink
{
executable = true;
}
} crf{path, *this, regularFileContentsBuffer};
};
func(crf);
auto crf = std::make_shared<CRF>(path, *this);
git_oid oid;
if (crf.stream) {
/* Call .release(), since git_blob_create_from_stream_commit
func(*crf);
auto id = nextId++;
if (crf->stream) {
/* Finish the slow path by creating the blob object synchronously.
Call .release(), since git_blob_create_from_stream_commit
acquires ownership and frees the stream. */
if (git_blob_create_from_stream_commit(&oid, crf.stream.release()))
git_oid oid;
if (git_blob_create_from_stream_commit(&oid, crf->stream.release()))
throw Error("creating a blob object for '%s': %s", path, git_error_last()->message);
} else {
if (git_blob_create_from_buffer(&oid, *repo, crf.contents.data(), crf.contents.size()))
throw Error(
"creating a blob object for '%s' from in-memory buffer: %s", path, git_error_last()->message);
addNode(
*_state.lock(),
crf->path,
Child{crf->executable ? GIT_FILEMODE_BLOB_EXECUTABLE : GIT_FILEMODE_BLOB, oid, id});
return;
}
addToTree(*pathComponents.rbegin(), oid, crf.executable ? GIT_FILEMODE_BLOB_EXECUTABLE : GIT_FILEMODE_BLOB);
/* Fast path: create the blob object in a separate thread. */
workers.enqueue([this, crf{std::move(crf)}, id]() {
auto repo(repoPool.get());
git_oid oid;
if (git_blob_create_from_buffer(&oid, *repo, crf->contents.data(), crf->contents.size()))
throw Error(
"creating a blob object for '%s' from in-memory buffer: %s", crf->path, git_error_last()->message);
addNode(
*_state.lock(),
crf->path,
Child{crf->executable ? GIT_FILEMODE_BLOB_EXECUTABLE : GIT_FILEMODE_BLOB, oid, id});
});
}
void createDirectory(const CanonPath & path) override
{
auto pathComponents = tokenizeString<std::vector<std::string>>(path.rel(), "/");
(void) prepareDirs(pathComponents, true);
if (path.isRoot())
return;
auto state(_state.lock());
addNode(*state, path, {GIT_FILEMODE_TREE, Directory()});
}
void createSymlink(const CanonPath & path, const std::string & target) override
{
auto pathComponents = tokenizeString<std::vector<std::string>>(path.rel(), "/");
if (!prepareDirs(pathComponents, false))
return;
workers.enqueue([this, path, target]() {
auto repo(repoPool.get());
git_oid oid;
if (git_blob_create_from_buffer(&oid, *repo, target.c_str(), target.size()))
throw Error("creating a blob object for tarball symlink member '%s': %s", path, git_error_last()->message);
git_oid oid;
if (git_blob_create_from_buffer(&oid, *repo, target.c_str(), target.size()))
throw Error(
"creating a blob object for tarball symlink member '%s': %s", path, git_error_last()->message);
addToTree(*pathComponents.rbegin(), oid, GIT_FILEMODE_LINK);
auto state(_state.lock());
addNode(*state, path, Child{GIT_FILEMODE_LINK, oid});
});
}
std::map<CanonPath, CanonPath> hardLinks;
void createHardlink(const CanonPath & path, const CanonPath & target) override
{
std::vector<std::string> pathComponents;
for (auto & c : path)
pathComponents.emplace_back(c);
if (!prepareDirs(pathComponents, false))
return;
// We can't just look up the path from the start of the root, since
// some parent directories may not have finished yet, so we compute
// a relative path that helps us find the right git_tree_builder or object.
auto relTarget = CanonPath(path).parent()->makeRelative(target);
auto dir = pendingDirs.rbegin();
// For each ../ component at the start, go up one directory.
// CanonPath::makeRelative() always puts all .. elements at the start,
// so they're all handled by this loop:
std::string_view relTargetLeft(relTarget);
while (hasPrefix(relTargetLeft, "../")) {
if (dir == pendingDirs.rend())
throw Error("invalid hard link target '%s' for path '%s'", target, path);
++dir;
relTargetLeft = relTargetLeft.substr(3);
}
if (dir == pendingDirs.rend())
throw Error("invalid hard link target '%s' for path '%s'", target, path);
// Look up the remainder of the target, starting at the
// top-most `git_treebuilder`.
std::variant<git_treebuilder *, git_oid> curDir{dir->builder.get()};
Object tree; // needed to keep `entry` alive
const git_tree_entry * entry = nullptr;
for (auto & c : CanonPath(relTargetLeft)) {
if (auto builder = std::get_if<git_treebuilder *>(&curDir)) {
assert(*builder);
if (!(entry = git_treebuilder_get(*builder, std::string(c).c_str())))
throw Error("cannot find hard link target '%s' for path '%s'", target, path);
curDir = *git_tree_entry_id(entry);
} else if (auto oid = std::get_if<git_oid>(&curDir)) {
tree = lookupObject(*repo, *oid, GIT_OBJECT_TREE);
if (!(entry = git_tree_entry_byname((const git_tree *) &*tree, std::string(c).c_str())))
throw Error("cannot find hard link target '%s' for path '%s'", target, path);
curDir = *git_tree_entry_id(entry);
}
}
assert(entry);
addToTree(*pathComponents.rbegin(), *git_tree_entry_id(entry), git_tree_entry_filemode(entry));
hardLinks.insert_or_assign(path, target);
}
Hash flush() override
{
updateBuilders({});
workers.process();
auto [oid, _name] = popBuilder();
if (auto * backend = repo->packBackend) {
/* We are done writing blobs, can restore refresh functionality. */
backend->refresh = packfileOdbRefresh;
/* Create hard links. */
{
auto state(_state.lock());
for (auto & [path, target] : hardLinks) {
if (target.isRoot())
continue;
try {
auto child = state->root.lookup(target);
auto oid = std::get_if<git_oid>(&child.file);
if (!oid)
throw Error("cannot create a hard link to a directory");
addNode(*state, path, {child.mode, *oid});
} catch (Error & e) {
e.addTrace(nullptr, "while creating a hard link from '%s' to '%s'", path, target);
throw;
}
}
}
// Flush all repo objects to disk.
{
auto repos = repoPool.clear();
ThreadPool workers{repos.size()};
for (auto & repo : repos)
workers.enqueue([repo]() { repo->flush(); });
workers.process();
}
// Write the Git trees to disk. Would be nice to have this multithreaded too, but that's hard because a tree
// can't refer to an object that hasn't been written yet. Also it doesn't make a big difference for performance.
auto repo(repoPool.get());
[&](this const auto & visit, Directory & node) -> void {
checkInterrupt();
// Write the child directories.
for (auto & child : node.children)
if (auto dir = std::get_if<Directory>(&child.second.file))
visit(*dir);
// Write this directory.
git_treebuilder * b;
if (git_treebuilder_new(&b, *repo, nullptr))
throw Error("creating a tree builder: %s", git_error_last()->message);
TreeBuilder builder(b);
for (auto & [name, child] : node.children) {
auto oid_p = std::get_if<git_oid>(&child.file);
auto oid = oid_p ? *oid_p : std::get<Directory>(child.file).oid.value();
if (git_treebuilder_insert(nullptr, builder.get(), name.c_str(), &oid, child.mode))
throw Error("adding a file to a tree builder: %s", git_error_last()->message);
}
git_oid oid;
if (git_treebuilder_write(&oid, builder.get()))
throw Error("creating a tree object: %s", git_error_last()->message);
node.oid = oid;
}(_state.lock()->root);
repo->flush();
return toHash(oid);
return toHash(_state.lock()->root.oid.value());
}
};

View file

@ -211,6 +211,12 @@ public:
left.push_back(p);
std::swap(state_->idle, left);
}
std::vector<ref<R>> clear()
{
auto state_(state.lock());
return std::exchange(state_->idle, {});
}
};
} // namespace nix

View file

@ -52,6 +52,12 @@ public:
*/
void process();
/**
* Shut down all worker threads and wait until they've exited.
* Active work items are finished, but any pending work items are discarded.
*/
void shutdown();
private:
size_t maxThreads;
@ -72,8 +78,6 @@ private:
std::condition_variable work;
void doWork(bool mainThread);
void shutdown();
};
/**