rocksdb/utilities/secondary_index/secondary_index_mixin.h
Levi Tamasi 1f96e652b3 Support using secondary indices with write-committed transactions (#13180)
Summary:
Pull Request resolved: https://github.com/facebook/rocksdb/pull/13180

The patch adds initial support for secondary indices using write-committed transactions. Currently, only the `PutEntity` API is supported; other APIs like `Put` and `Delete` will be added separately. Applications can set up secondary indices using the new configuration option `TransactionDBOptions::secondary_indices`. When secondary indices are enabled, calling `PutEntity` via a (n explicit or implicit) transaction performs the following steps:
1) It retrieves the current value (if any) of the primary key using `GetEntityForUpdate`.
2) If there is an existing primary key-value, it removes any existing secondary index entries using `SingleDelete`. (Note: as a later optimization, we can avoid removing and recreating secondary index entries when neither the secondary key nor the value changes during an update.)
3) It invokes `UpdatePrimaryColumnValue` for all applicable `SecondaryIndex` objects, that is, those for which the primary column family matches the column family from the `PutEntity` call and for which the primary column appears in the new wide-column structure.
4) It writes the new primary key-value. Note that the values of the indexing columns might have been changed in step 3 above.
5) It builds the secondary key-value for each applicable secondary index using `GetSecondaryKeyPrefix` and `GetSecondaryValue`, and writes it to the appropriate secondary column family.

All the above operations are performed as part of the same transaction. The logic uses `SavePoint`s to roll back any earlier operations related to a primary key if a subsequent step fails.

Implementation-wise, the code uses a mixin template `SecondaryIndexMixin` that can inherit from any kind of transaction and use the write APIs and concurrency control mechanisms of the base class to implement the index maintenance logic. The mixin will enable us to later extend secondary indices to optimistic or write-prepared/write-unprepared pessimistic transactions as well.

Reviewed By: jowlyzhang

Differential Revision: D66672931

fbshipit-source-id: cdf6ef9c40dec46d928156bad0a3cc546aa8b887
2024-12-05 19:05:56 -08:00

441 lines
14 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cassert>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include "db/wide/wide_columns_helper.h"
#include "rocksdb/options.h"
#include "rocksdb/utilities/secondary_index.h"
#include "rocksdb/wide_columns.h"
#include "util/autovector.h"
#include "util/overload.h"
namespace ROCKSDB_NAMESPACE {
template <typename Txn>
class SecondaryIndexMixin : public Txn {
public:
template <typename... Args>
explicit SecondaryIndexMixin(
const std::vector<std::shared_ptr<SecondaryIndex>>* secondary_indices,
Args&&... args)
: Txn(std::forward<Args>(args)...),
secondary_indices_(secondary_indices) {
assert(secondary_indices_);
assert(!secondary_indices_->empty());
}
using Txn::Put;
Status Put(ColumnFamilyHandle* /* column_family */, const Slice& /* key */,
const Slice& /* value */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported("Put with secondary indices not yet supported");
}
Status Put(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */, const SliceParts& /* value */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported("Put with secondary indices not yet supported");
}
Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns,
bool assume_tracked = false) override {
return PerformWithSavePoint([&]() {
const bool do_validate = !assume_tracked;
return PutEntityWithSecondaryIndices(column_family, key, columns,
do_validate);
});
}
using Txn::Merge;
Status Merge(ColumnFamilyHandle* /* column_family */, const Slice& /* key */,
const Slice& /* value */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported(
"Merge with secondary indices not yet supported");
}
using Txn::Delete;
Status Delete(ColumnFamilyHandle* /* column_family */, const Slice& /* key */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported(
"Delete with secondary indices not yet supported");
}
Status Delete(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported(
"Delete with secondary indices not yet supported");
}
using Txn::SingleDelete;
Status SingleDelete(ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported(
"SingleDelete with secondary indices not yet supported");
}
Status SingleDelete(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */,
const bool /* assume_tracked */ = false) override {
return Status::NotSupported(
"SingleDelete with secondary indices not yet supported");
}
using Txn::PutUntracked;
Status PutUntracked(ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const Slice& /* value */) override {
return Status::NotSupported(
"PutUntracked with secondary indices not yet supported");
}
Status PutUntracked(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */,
const SliceParts& /* value */) override {
return Status::NotSupported(
"PutUntracked with secondary indices not yet supported");
}
Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) override {
return PerformWithSavePoint([&]() {
constexpr bool do_validate = false;
return PutEntityWithSecondaryIndices(column_family, key, columns,
do_validate);
});
}
using Txn::MergeUntracked;
Status MergeUntracked(ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const Slice& /* value */) override {
return Status::NotSupported(
"MergeUntracked with secondary indices not yet supported");
}
using Txn::DeleteUntracked;
Status DeleteUntracked(ColumnFamilyHandle* /* column_family */,
const Slice& /* key */) override {
return Status::NotSupported(
"DeleteUntracked with secondary indices not yet supported");
}
Status DeleteUntracked(ColumnFamilyHandle* /* column_family */,
const SliceParts& /* key */) override {
return Status::NotSupported(
"DeleteUntracked with secondary indices not yet supported");
}
using Txn::SingleDeleteUntracked;
Status SingleDeleteUntracked(ColumnFamilyHandle* /* column_family */,
const Slice& /* key */) override {
return Status::NotSupported(
"SingleDeleteUntracked with secondary indices not yet supported");
}
private:
class IndexData {
public:
IndexData(const SecondaryIndex* index, const Slice& previous_column_value)
: index_(index), previous_column_value_(previous_column_value) {
assert(index_);
}
const SecondaryIndex* index() const { return index_; }
const Slice& previous_column_value() const {
return previous_column_value_;
}
std::optional<std::variant<Slice, std::string>>& updated_column_value() {
return updated_column_value_;
}
Slice primary_column_value() const {
return updated_column_value_.has_value() ? AsSlice(*updated_column_value_)
: previous_column_value_;
}
private:
const SecondaryIndex* index_;
Slice previous_column_value_;
std::optional<std::variant<Slice, std::string>> updated_column_value_;
};
static Slice AsSlice(const std::variant<Slice, std::string>& var) {
return std::visit([](const auto& value) -> Slice { return value; }, var);
}
static std::string AsString(const std::variant<Slice, std::string>& var) {
return std::visit(
overload{
[](const Slice& value) -> std::string { return value.ToString(); },
[](const std::string& value) -> std::string { return value; }},
var);
}
template <typename Iterator>
static Iterator FindPrimaryColumn(const SecondaryIndex* secondary_index,
ColumnFamilyHandle* column_family,
Iterator begin, Iterator end) {
assert(secondary_index);
assert(column_family);
if (column_family != secondary_index->GetPrimaryColumnFamily()) {
return end;
}
return WideColumnsHelper::Find(begin, end,
secondary_index->GetPrimaryColumnName());
}
template <typename Operation>
Status PerformWithSavePoint(Operation&& operation) {
Txn::SetSavePoint();
const Status s = operation();
if (!s.ok()) {
[[maybe_unused]] const Status st = Txn::RollbackToSavePoint();
assert(st.ok());
return s;
}
[[maybe_unused]] const Status st = Txn::PopSavePoint();
assert(st.ok());
return Status::OK();
}
Status GetPrimaryEntryForUpdate(ColumnFamilyHandle* column_family,
const Slice& primary_key,
PinnableWideColumns* existing_primary_columns,
bool do_validate) {
assert(column_family);
assert(existing_primary_columns);
constexpr bool exclusive = true;
return Txn::GetEntityForUpdate(ReadOptions(), column_family, primary_key,
existing_primary_columns, exclusive,
do_validate);
}
Status RemoveSecondaryEntry(const SecondaryIndex* secondary_index,
const Slice& primary_key,
const Slice& existing_primary_column_value) {
assert(secondary_index);
std::variant<Slice, std::string> secondary_key_prefix;
const Status s = secondary_index->GetSecondaryKeyPrefix(
primary_key, existing_primary_column_value, &secondary_key_prefix);
if (!s.ok()) {
return s;
}
const std::string secondary_key =
AsString(secondary_key_prefix) + primary_key.ToString();
return Txn::SingleDelete(secondary_index->GetSecondaryColumnFamily(),
secondary_key);
}
Status AddPrimaryEntry(ColumnFamilyHandle* column_family,
const Slice& primary_key,
const WideColumns& primary_columns) {
assert(column_family);
constexpr bool assume_tracked = true;
return Txn::PutEntity(column_family, primary_key, primary_columns,
assume_tracked);
}
Status AddSecondaryEntry(const SecondaryIndex* secondary_index,
const Slice& primary_key,
const Slice& primary_column_value,
const Slice& previous_column_value) {
assert(secondary_index);
std::variant<Slice, std::string> secondary_key_prefix;
{
const Status s = secondary_index->GetSecondaryKeyPrefix(
primary_key, primary_column_value, &secondary_key_prefix);
if (!s.ok()) {
return s;
}
}
std::optional<std::variant<Slice, std::string>> secondary_value;
{
const Status s = secondary_index->GetSecondaryValue(
primary_key, primary_column_value, previous_column_value,
&secondary_value);
if (!s.ok()) {
return s;
}
}
{
const std::string secondary_key =
AsString(secondary_key_prefix) + primary_key.ToString();
const Status s = Txn::Put(
secondary_index->GetSecondaryColumnFamily(), secondary_key,
secondary_value.has_value() ? AsSlice(*secondary_value) : Slice());
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status RemoveSecondaryEntries(ColumnFamilyHandle* column_family,
const Slice& primary_key, bool do_validate) {
assert(column_family);
PinnableWideColumns existing_primary_columns;
const Status s = GetPrimaryEntryForUpdate(
column_family, primary_key, &existing_primary_columns, do_validate);
if (s.IsNotFound()) {
return Status::OK();
}
if (!s.ok()) {
return s;
}
const auto& existing_columns = existing_primary_columns.columns();
for (const auto& secondary_index : *secondary_indices_) {
const auto it =
FindPrimaryColumn(secondary_index.get(), column_family,
existing_columns.cbegin(), existing_columns.cend());
if (it == existing_columns.cend()) {
continue;
}
const Status st =
RemoveSecondaryEntry(secondary_index.get(), primary_key, it->value());
if (!st.ok()) {
return st;
}
}
return Status::OK();
}
Status UpdatePrimaryColumnValues(ColumnFamilyHandle* column_family,
const Slice& primary_key,
WideColumns& primary_columns,
autovector<IndexData>& applicable_indices) {
assert(applicable_indices.empty());
applicable_indices.reserve(secondary_indices_->size());
for (const auto& secondary_index : *secondary_indices_) {
const auto it =
FindPrimaryColumn(secondary_index.get(), column_family,
primary_columns.begin(), primary_columns.end());
if (it == primary_columns.end()) {
continue;
}
applicable_indices.emplace_back(
IndexData(secondary_index.get(), it->value()));
auto& index_data = applicable_indices.back();
const Status s = secondary_index->UpdatePrimaryColumnValue(
primary_key, index_data.previous_column_value(),
&index_data.updated_column_value());
if (!s.ok()) {
return s;
}
it->value() = index_data.primary_column_value();
}
return Status::OK();
}
Status AddSecondaryEntries(const Slice& primary_key,
const autovector<IndexData>& applicable_indices) {
for (const auto& index_data : applicable_indices) {
const Status s = AddSecondaryEntry(index_data.index(), primary_key,
index_data.primary_column_value(),
index_data.previous_column_value());
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
Status PutEntityWithSecondaryIndices(ColumnFamilyHandle* column_family,
const Slice& key,
const WideColumns& columns,
bool do_validate) {
// TODO: we could avoid removing and recreating secondary entries for
// which neither the secondary key prefix nor the value has changed
if (!column_family) {
column_family = Txn::DefaultColumnFamily();
}
const Slice& primary_key = key;
{
const Status s =
RemoveSecondaryEntries(column_family, primary_key, do_validate);
if (!s.ok()) {
return s;
}
}
autovector<IndexData> applicable_indices;
WideColumns primary_columns(columns);
WideColumnsHelper::SortColumns(primary_columns);
{
const Status s = UpdatePrimaryColumnValues(
column_family, primary_key, primary_columns, applicable_indices);
if (!s.ok()) {
return s;
}
}
{
const Status s =
AddPrimaryEntry(column_family, primary_key, primary_columns);
if (!s.ok()) {
return s;
}
}
{
const Status s = AddSecondaryEntries(primary_key, applicable_indices);
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
const std::vector<std::shared_ptr<SecondaryIndex>>* secondary_indices_;
};
} // namespace ROCKSDB_NAMESPACE