rocksdb/db/compaction/subcompaction_state.cc
Jay Huh cc487ba367 Fix Compaction Stats for Remote Compaction and Tiered Storage (#13464)
Summary:
## Background

Compaction statistics are collected at various levels across different classes and structs.

* `InternalStats::CompactionStats`: Per-level Compaction Stats within a job (can be at subcompaction level which later get aggregated to the compaction level)
* `InternalStats::CompactionStatsFull`: Contains two per-level compaction stats - `output_level_stats` for primary output level stats and `proximal_level_stats` for proximal level stats. Proximal level statistics are only relevant when using Tiered Storage with the per-key placement feature enabled.
* `InternalStats::CompactionOutputsStats`: Simplified version of `InternalStats::CompactionStats`. Only has a subset of fields from `InternalStats::CompactionStats`
* `CompactionJobStats`: Job-level Compaction Stats. (can be at subcompaction level which later get aggregated to the compaction level)

Please note that some fields in Job-level stats are not in Per-level stats and they don't map 1-to-1 today.

## Issues

* In non-remote compactions, proximal level compaction statistics were not being aggregated into job-level statistics. Job level statistics were missing stats for proximal level for tiered storage compactions with per-key-replacement feature enabled.
* During remote compactions, proximal level compaction statistics were pre-aggregated into job-level statistics on the remote side. However, per-level compaction statistics were not part of the serialized compaction result, so that primary host lost that information and weren't able to populate `per_key_placement_comp_stats_` and `internal_stats_.proximal_level_stats` properly during the installation.
* `TieredCompactionTest` was only checking if (expected stats > 0 && actual stats > 0) instead actual value comparison

## Fixes

* Renamed `compaction_stats_` to `internal_stats_` for `InternalStats::CompactionStatsFull` in `CompactionJob` for better readability
* Removed the usage of `InternalStats::CompactionOutputsStats` and consolidated them to `InternalStats::CompactionStats`.
* Remote Compactions now include the internal stats in the serialized `CompactionServiceResult`. `output_level_stats` and `proximal_level_stats` get later propagated in sub_compact output stats accordingly.
* `CompactionJob::UpdateCompactionJobStats()` now takes `CompactionStatsFull` and aggregates the `proximal_level_stats` as well
* `TieredCompactionTest` is now doing the actual value comparisons for input/output file counts and record counts. Follow up is needed to do the same for the bytes read / written.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/13464

Test Plan:
Unit Tests updated to verify stats

```
./compaction_service_test
```
```
./tiered_compaction_test
```

Reviewed By: pdillinger

Differential Revision: D71220393

Pulled By: jaykorean

fbshipit-source-id: ad70bffd9614ced683f90c7570a17def9b5c8f3f
2025-03-18 16:28:18 -07:00

118 lines
4.1 KiB
C++

// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/compaction/subcompaction_state.h"
#include "rocksdb/sst_partitioner.h"
namespace ROCKSDB_NAMESPACE {
void SubcompactionState::AggregateCompactionOutputStats(
InternalStats::CompactionStatsFull& internal_stats) const {
// Outputs should be closed. By extension, any files created just for
// range deletes have already been written also.
assert(compaction_outputs_.HasBuilder() == false);
assert(proximal_level_outputs_.HasBuilder() == false);
// FIXME: These stats currently include abandonned output files
// assert(compaction_outputs_.stats_.num_output_files ==
// compaction_outputs_.outputs_.size());
// assert(proximal_level_outputs_.stats_.num_output_files ==
// proximal_level_outputs_.outputs_.size());
internal_stats.output_level_stats.Add(compaction_outputs_.stats_);
if (proximal_level_outputs_.HasOutput()) {
internal_stats.has_proximal_level_output = true;
internal_stats.proximal_level_stats.Add(proximal_level_outputs_.stats_);
}
}
OutputIterator SubcompactionState::GetOutputs() const {
return OutputIterator(proximal_level_outputs_.outputs_,
compaction_outputs_.outputs_);
}
void SubcompactionState::Cleanup(Cache* cache) {
proximal_level_outputs_.Cleanup();
compaction_outputs_.Cleanup();
if (!status.ok()) {
for (const auto& out : GetOutputs()) {
// If this file was inserted into the table cache then remove it here
// because this compaction was not committed. This is not strictly
// required because of a backstop TableCache::Evict() in
// PurgeObsoleteFiles() but is our opportunity to apply
// uncache_aggressiveness. TODO: instead, put these files into the
// VersionSet::obsolete_files_ pipeline so that they don't have to
// be picked up by scanning the DB directory.
TableCache::ReleaseObsolete(
cache, out.meta.fd.GetNumber(), nullptr /*handle*/,
compaction->mutable_cf_options().uncache_aggressiveness);
}
}
// TODO: sub_compact.io_status is not checked like status. Not sure if thats
// intentional. So ignoring the io_status as of now.
io_status.PermitUncheckedError();
}
Slice SubcompactionState::SmallestUserKey() const {
if (proximal_level_outputs_.HasOutput()) {
Slice a = compaction_outputs_.SmallestUserKey();
Slice b = proximal_level_outputs_.SmallestUserKey();
if (a.empty()) {
return b;
}
if (b.empty()) {
return a;
}
const Comparator* user_cmp =
compaction->column_family_data()->user_comparator();
if (user_cmp->Compare(a, b) > 0) {
return b;
} else {
return a;
}
} else {
return compaction_outputs_.SmallestUserKey();
}
}
Slice SubcompactionState::LargestUserKey() const {
if (proximal_level_outputs_.HasOutput()) {
Slice a = compaction_outputs_.LargestUserKey();
Slice b = proximal_level_outputs_.LargestUserKey();
if (a.empty()) {
return b;
}
if (b.empty()) {
return a;
}
const Comparator* user_cmp =
compaction->column_family_data()->user_comparator();
if (user_cmp->Compare(a, b) < 0) {
return b;
} else {
return a;
}
} else {
return compaction_outputs_.LargestUserKey();
}
}
Status SubcompactionState::AddToOutput(
const CompactionIterator& iter, bool use_proximal_output,
const CompactionFileOpenFunc& open_file_func,
const CompactionFileCloseFunc& close_file_func) {
// update target output
current_outputs_ =
use_proximal_output ? &proximal_level_outputs_ : &compaction_outputs_;
return current_outputs_->AddToOutput(iter, open_file_func, close_file_func);
}
} // namespace ROCKSDB_NAMESPACE