Parallel for with iteration costs and left product

Parallel for with user-supplied [cumulative] iteration costs allows to
get performance improvements on problems with significantly different
time requirements per parallel loop iteration.

One of those problems is left multiplication with block-sparse matrix.
Using number of non-zero values per column block, we partition column
blocks into contiguous sets with approximately equal number of
operations to be performed.

Change-Id: I4a862a10a586cdfbec22e8168a3423537039abc2
diff --git a/internal/ceres/block_sparse_matrix.cc b/internal/ceres/block_sparse_matrix.cc
index 053b3c0..f0dc11a 100644
--- a/internal/ceres/block_sparse_matrix.cc
+++ b/internal/ceres/block_sparse_matrix.cc
@@ -149,13 +149,64 @@
 }
 
 void BlockSparseMatrix::LeftMultiplyAndAccumulate(const double* x,
+                                                  double* y,
+                                                  ContextImpl* context,
+                                                  int num_threads) const {
+  // While utilizing transposed structure allows to perform parallel
+  // left-multiplication by dense vector, it makes access patterns to matrix
+  // elements scattered. Thus, multiplication using transposed structure
+  // is only useful for parallel execution
+  CHECK(x != nullptr);
+  CHECK(y != nullptr);
+  if (transpose_block_structure_ == nullptr || num_threads == 1) {
+    LeftMultiplyAndAccumulate(x, y);
+    return;
+  }
+
+  auto transpose_bs = transpose_block_structure_.get();
+  const auto values = values_.get();
+  const int num_col_blocks = transpose_bs->rows.size();
+  if (!num_col_blocks) {
+    return;
+  }
+
+  // Use non-zero count as iteration cost for guided parallel-for loop
+  ParallelFor(
+      context,
+      0,
+      num_col_blocks,
+      num_threads,
+      [values, transpose_bs, x, y](int row_block_id) {
+        int row_block_pos = transpose_bs->rows[row_block_id].block.position;
+        int row_block_size = transpose_bs->rows[row_block_id].block.size;
+        auto& cells = transpose_bs->rows[row_block_id].cells;
+
+        for (auto& cell : cells) {
+          const int col_block_id = cell.block_id;
+          const int col_block_size = transpose_bs->cols[col_block_id].size;
+          const int col_block_pos = transpose_bs->cols[col_block_id].position;
+          MatrixTransposeVectorMultiply<Eigen::Dynamic, Eigen::Dynamic, 1>(
+              values + cell.position,
+              col_block_size,
+              row_block_size,
+              x + col_block_pos,
+              y + row_block_pos);
+        }
+      },
+      transpose_bs->rows.data(),
+      [](const CompressedRow& row) { return row.cumulative_nnz; });
+}
+
+void BlockSparseMatrix::LeftMultiplyAndAccumulate(const double* x,
                                                   double* y) const {
   CHECK(x != nullptr);
   CHECK(y != nullptr);
+  // Single-threaded left products are always computed using a non-transpose
+  // block structure, because it has linear acess pattern to matrix elements
   for (int i = 0; i < block_structure_->rows.size(); ++i) {
     int row_block_pos = block_structure_->rows[i].block.position;
     int row_block_size = block_structure_->rows[i].block.size;
-    auto& cells = block_structure_->rows[i].cells;
+    const auto& cells = block_structure_->rows[i].cells;
     for (const auto& cell : cells) {
       int col_block_id = cell.block_id;
       int col_block_size = block_structure_->cols[col_block_id].size;
diff --git a/internal/ceres/block_sparse_matrix.h b/internal/ceres/block_sparse_matrix.h
index b4abdc7..72b1d68 100644
--- a/internal/ceres/block_sparse_matrix.h
+++ b/internal/ceres/block_sparse_matrix.h
@@ -78,6 +78,10 @@
                                   ContextImpl* context,
                                   int num_threads) const final;
   void LeftMultiplyAndAccumulate(const double* x, double* y) const final;
+  void LeftMultiplyAndAccumulate(const double* x,
+                                 double* y,
+                                 ContextImpl* context,
+                                 int num_threads) const final;
   void SquaredColumnNorm(double* x) const final;
   void ScaleColumns(const double* scale) final;
   void ToCompressedRowSparseMatrix(CompressedRowSparseMatrix* matrix) const;
diff --git a/internal/ceres/block_sparse_matrix_test.cc b/internal/ceres/block_sparse_matrix_test.cc
index cfed041..6533a80 100644
--- a/internal/ceres/block_sparse_matrix_test.cc
+++ b/internal/ceres/block_sparse_matrix_test.cc
@@ -186,6 +186,22 @@
   }
 }
 
+TEST_F(BlockSparseMatrixTest, LeftMultiplyAndAccumulateParallelTest) {
+  Vector y_0 = Vector::Random(A_->num_rows());
+  Vector y_s = y_0;
+  Vector y_p = y_0;
+
+  Vector x = Vector::Random(A_->num_cols());
+  A_->LeftMultiplyAndAccumulate(x.data(), y_s.data());
+
+  A_->AddTransposeBlockStructure();
+  A_->LeftMultiplyAndAccumulate(x.data(), y_p.data(), &context_, kNumThreads);
+
+  // Parallel implementation for left products uses a different order of
+  // traversal, thus results might be different
+  EXPECT_LT((y_s - y_p).norm(), 1e-12);
+}
+
 TEST_F(BlockSparseMatrixTest, SquaredColumnNormTest) {
   Vector y_a = Vector::Zero(A_->num_cols());
   Vector y_b = Vector::Zero(A_->num_cols());
diff --git a/internal/ceres/cgnr_solver.cc b/internal/ceres/cgnr_solver.cc
index e1c571d..524356a 100644
--- a/internal/ceres/cgnr_solver.cc
+++ b/internal/ceres/cgnr_solver.cc
@@ -99,7 +99,7 @@
     // y = y + Atz
     z_.setZero();
     A_.RightMultiplyAndAccumulate(x, z_, context_, num_threads_);
-    A_.LeftMultiplyAndAccumulate(z_, y);
+    A_.LeftMultiplyAndAccumulate(z_, y, context_, num_threads_);
 
     // y = y + DtDx
     if (D_ != nullptr) {
@@ -165,7 +165,9 @@
       preconditioner_ = std::make_unique<IdentityPreconditioner>(A->num_cols());
     }
   }
-
+  if (options_.num_threads > 1) {
+    A->AddTransposeBlockStructure();
+  }
   preconditioner_->Update(*A, per_solve_options.D);
 
   ConjugateGradientsSolverOptions cg_options;
@@ -181,7 +183,8 @@
   // rhs = Atb.
   Vector rhs(A->num_cols());
   rhs.setZero();
-  A->LeftMultiplyAndAccumulate(b, rhs.data());
+  A->LeftMultiplyAndAccumulate(
+      b, rhs.data(), options_.context, options_.num_threads);
 
   cg_solution_ = Vector::Zero(A->num_cols());
   for (int i = 0; i < 4; ++i) {
diff --git a/internal/ceres/evaluation_benchmark.cc b/internal/ceres/evaluation_benchmark.cc
index b18f563..56b4f19 100644
--- a/internal/ceres/evaluation_benchmark.cc
+++ b/internal/ceres/evaluation_benchmark.cc
@@ -116,6 +116,16 @@
     return block_sparse_jacobian.get();
   }
 
+  const BlockSparseMatrix* BlockSparseJacobianWithTranspose(
+      ContextImpl* context) {
+    if (!block_sparse_jacobian_with_transpose) {
+      auto regular = BlockSparseJacobian(context);
+      block_sparse_jacobian_with_transpose = CreateBlockSparseJacobian(context);
+      block_sparse_jacobian_with_transpose->AddTransposeBlockStructure();
+    }
+    return block_sparse_jacobian_with_transpose.get();
+  }
+
   const CompressedRowSparseMatrix* CompressedRowSparseJacobian(
       ContextImpl* context) {
     if (!crs_jacobian) {
@@ -127,6 +137,7 @@
   Vector parameters;
   std::unique_ptr<BundleAdjustmentProblem> bal_problem;
   std::unique_ptr<BlockSparseMatrix> block_sparse_jacobian;
+  std::unique_ptr<BlockSparseMatrix> block_sparse_jacobian_with_transpose;
   std::unique_ptr<CompressedRowSparseMatrix> crs_jacobian;
 };
 
@@ -217,13 +228,32 @@
 static void JacobianLeftMultiplyAndAccumulate(benchmark::State& state,
                                               BALData* data,
                                               ContextImpl* context) {
+  const int num_threads = state.range(0);
+
   auto jacobian = data->BlockSparseJacobian(context);
 
   Vector y = Vector::Zero(jacobian->num_cols());
   Vector x = Vector::Random(jacobian->num_rows());
 
   for (auto _ : state) {
-    jacobian->LeftMultiplyAndAccumulate(x.data(), y.data());
+    jacobian->LeftMultiplyAndAccumulate(
+        x.data(), y.data(), context, num_threads);
+  }
+  CHECK_GT(y.squaredNorm(), 0.);
+}
+
+static void JacobianLeftMultiplyAndAccumulateWithTranspose(
+    benchmark::State& state, BALData* data, ContextImpl* context) {
+  const int num_threads = state.range(0);
+
+  auto jacobian = data->BlockSparseJacobianWithTranspose(context);
+
+  Vector y = Vector::Zero(jacobian->num_cols());
+  Vector x = Vector::Random(jacobian->num_rows());
+
+  for (auto _ : state) {
+    jacobian->LeftMultiplyAndAccumulate(
+        x.data(), y.data(), context, num_threads);
   }
   CHECK_GT(y.squaredNorm(), 0.);
 }
@@ -362,11 +392,25 @@
         &context)
         ->Arg(1);
 
+    const std::string name_left_product_transpose =
+        "JacobianLeftMultiplyAndAccumulateWithTranspose<" + path + ">";
+    ::benchmark::RegisterBenchmark(
+        name_left_product_transpose.c_str(),
+        ceres::internal::JacobianLeftMultiplyAndAccumulateWithTranspose,
+        data,
+        &context)
+        ->Arg(1)
+        ->Arg(2)
+        ->Arg(4)
+        ->Arg(8)
+        ->Arg(16)
+        ->Arg(28);
+
 #ifndef CERES_NO_CUDA
     const std::string name_left_product_cuda =
         "JacobianLeftMultiplyAndAccumulateCuda<" + path + ">";
     ::benchmark::RegisterBenchmark(
-        name_right_product_cuda.c_str(),
+        name_left_product_cuda.c_str(),
         ceres::internal::JacobianLeftMultiplyAndAccumulateCuda,
         data,
         &context)
diff --git a/internal/ceres/parallel_for.h b/internal/ceres/parallel_for.h
index 234c7db..107d19c 100644
--- a/internal/ceres/parallel_for.h
+++ b/internal/ceres/parallel_for.h
@@ -32,6 +32,7 @@
 #ifndef CERES_INTERNAL_PARALLEL_FOR_H_
 #define CERES_INTERNAL_PARALLEL_FOR_H_
 
+#include <algorithm>
 #include <functional>
 #include <mutex>
 
@@ -93,6 +94,124 @@
 void Invoke(int thread_id, int i, const F& function) {
   InvokeImpl<F, args_of_t<F>>::Invoke(thread_id, i, function);
 }
+
+// Check if it is possible to split range [start; end) into at most
+// max_num_partitions  contiguous partitions of cost not greater than
+// max_partition_cost. Inclusive integer cumulative costs are provided by
+// cumulative_cost_data objects, with cumulative_cost_offset being a total cost
+// of all indices (starting from zero) preceding start element. Cumulative costs
+// are returned by cumulative_cost_fun called with a reference to
+// cumulative_cost_data element with index from range[start; end), and should be
+// non-decreasing. Partition of the range is returned via partition argument
+template <typename CumulativeCostData, typename CumulativeCostFun>
+bool MaxPartitionCostIsFeasible(int start,
+                                int end,
+                                int max_num_partitions,
+                                int max_partition_cost,
+                                int cumulative_cost_offset,
+                                const CumulativeCostData* cumulative_cost_data,
+                                const CumulativeCostFun& cumulative_cost_fun,
+                                std::vector<int>* partition) {
+  partition->clear();
+  partition->push_back(start);
+  int partition_start = start;
+  int cost_offset = cumulative_cost_offset;
+
+  const CumulativeCostData* const range_end = cumulative_cost_data + end;
+  while (partition_start < end) {
+    // Already have max_num_partitions
+    if (partition->size() > max_num_partitions) {
+      return false;
+    }
+    const int target = max_partition_cost + cost_offset;
+    const int partition_end =
+        std::partition_point(
+            cumulative_cost_data + partition_start,
+            cumulative_cost_data + end,
+            [&cumulative_cost_fun, target](const CumulativeCostData& item) {
+              return cumulative_cost_fun(item) <= target;
+            }) -
+        cumulative_cost_data;
+    // Unable to make a partition from a single element
+    if (partition_end == partition_start) {
+      return false;
+    }
+
+    const int cost_last =
+        cumulative_cost_fun(cumulative_cost_data[partition_end - 1]);
+    partition->push_back(partition_end);
+    partition_start = partition_end;
+    cost_offset = cost_last;
+  }
+  return true;
+}
+
+// Split integer interval [start, end) into at most max_num_partitions
+// contiguous intervals, minimizing maximal total cost of a single interval.
+// Inclusive integer cumulative costs for each (zero-based) index are provided
+// by cumulative_cost_data objects, and are returned by cumulative_cost_fun call
+// with a reference to one of the objects from range [start, end)
+template <typename CumulativeCostData, typename CumulativeCostFun>
+std::vector<int> ComputePartition(
+    int start,
+    int end,
+    int max_num_partitions,
+    const CumulativeCostData* cumulative_cost_data,
+    const CumulativeCostFun& cumulative_cost_fun) {
+  // Given maximal partition cost, it is possible to verify if it is admissible
+  // and obtain corresponding partition using MaxPartitionCostIsFeasible
+  // function. In order to find the lowest admissible value, a binary search
+  // over all potentially optimal cost values is being performed
+  const int cumulative_cost_last =
+      cumulative_cost_fun(cumulative_cost_data[end - 1]);
+  const int cumulative_cost_offset =
+      start ? cumulative_cost_fun(cumulative_cost_data[start - 1]) : 0;
+  const int total_cost = cumulative_cost_last - cumulative_cost_offset;
+
+  // Minimal maximal partition cost is not smaller than the average
+  // We will use non-inclusive lower bound
+  int partition_cost_lower_bound = total_cost / max_num_partitions - 1;
+  // Minimal maximal partition cost is not larger than the total cost
+  // Upper bound is inclusive
+  int partition_cost_upper_bound = total_cost;
+
+  std::vector<int> partition, partition_upper_bound;
+  // Binary search over partition cost, returning the lowest admissible cost
+  while (partition_cost_upper_bound - partition_cost_lower_bound > 1) {
+    partition.reserve(max_num_partitions + 1);
+    const int partition_cost =
+        partition_cost_lower_bound +
+        (partition_cost_upper_bound - partition_cost_lower_bound) / 2;
+    bool admissible = MaxPartitionCostIsFeasible(start,
+                                                 end,
+                                                 max_num_partitions,
+                                                 partition_cost,
+                                                 cumulative_cost_offset,
+                                                 cumulative_cost_data,
+                                                 cumulative_cost_fun,
+                                                 &partition);
+    if (admissible) {
+      partition_cost_upper_bound = partition_cost;
+      std::swap(partition, partition_upper_bound);
+    } else {
+      partition_cost_lower_bound = partition_cost;
+    }
+  }
+
+  // After binary search over partition cost, interval
+  // (partition_cost_lower_bound, partition_cost_upper_bound] contains the only
+  // admissible partition cost value - partition_cost_upper_bound
+  //
+  // Partition for this cost value might have been already computed
+  if (partition_upper_bound.empty() == false) {
+    return partition_upper_bound;
+  }
+  // Partition for upper bound is not computed if and only if upper bound was
+  // never updated This is a simple case of a single interval containing all
+  // values, which we were not able to break into pieces
+  partition = {start, end};
+  return partition;
+}
 }  // namespace parallel_for_details
 
 // Forward declaration of parallel invocation function that is to be
@@ -131,6 +250,55 @@
   CHECK(context != nullptr);
   ParallelInvoke<F>(context, start, end, num_threads, function);
 }
+
+// Execute function for every element in the range [start, end) with at most
+// num_threads, taking into account user-provided integer cumulative costs of
+// iterations. Cumulative costs of iteration for indices in range [0, end) are
+// stored in objects from cumulative_cost_data. User-provided
+// cumulative_cost_fun returns non-decreasing integer values corresponding to
+// inclusive cumulative cost of loop iterations, provided with a reference to
+// user-defined object. Only indices from [start, end) will be referenced. This
+// routine assumes that cumulative_cost_fun is non-decreasing (in other words,
+// all costs are non-negative);
+template <typename F, typename CumulativeCostData, typename CumulativeCostFun>
+void ParallelFor(ContextImpl* context,
+                 int start,
+                 int end,
+                 int num_threads,
+                 const F& function,
+                 const CumulativeCostData* cumulative_cost_data,
+                 const CumulativeCostFun& cumulative_cost_fun) {
+  using namespace parallel_for_details;
+  CHECK_GT(num_threads, 0);
+  if (start >= end) {
+    return;
+  }
+  if (num_threads == 1 || end - start <= num_threads) {
+    ParallelFor(context, start, end, num_threads, function);
+    return;
+  }
+  // Creating several partitions allows us to tolerate imperfections of
+  // partitioning and user-supplied iteration costs up to a certain extent
+  const int kNumPartitionsPerThread = 4;
+  const int kMaxPartitions = num_threads * kNumPartitionsPerThread;
+  const std::vector<int> partitions = ComputePartition(
+      start, end, kMaxPartitions, cumulative_cost_data, cumulative_cost_fun);
+  CHECK_GT(partitions.size(), 1);
+  const int num_partitions = partitions.size() - 1;
+  ParallelFor(context,
+              0,
+              num_partitions,
+              num_threads,
+              [&function, &partitions](int thread_id, int partition_id) {
+                const int partition_start = partitions[partition_id];
+                const int partition_end = partitions[partition_id + 1];
+
+                for (int i = partition_start; i < partition_end; ++i) {
+                  Invoke<F>(thread_id, i, function);
+                }
+              });
+}
+
 }  // namespace ceres::internal
 
 // Backend-specific implementations of ParallelInvoke
diff --git a/internal/ceres/parallel_for_test.cc b/internal/ceres/parallel_for_test.cc
index 451d80e..b74165e 100644
--- a/internal/ceres/parallel_for_test.cc
+++ b/internal/ceres/parallel_for_test.cc
@@ -38,6 +38,8 @@
 #include <cmath>
 #include <condition_variable>
 #include <mutex>
+#include <numeric>
+#include <random>
 #include <thread>
 #include <vector>
 
@@ -166,4 +168,251 @@
 }
 #endif  // CERES_NO_THREADS
 
+// Helper function for partition tests
+bool BruteForcePartition(
+    int* costs, int start, int end, int max_cost, int max_partitions);
+// Basic test if MaxPartitionCostIsFeasible and BruteForcePartition agree on
+// simple test-cases
+TEST(GuidedParallelFor, MaxPartitionCostIsFeasible) {
+  using namespace parallel_for_details;
+  std::vector<int> costs, cumulative_costs, partition;
+  costs = {1, 2, 3, 5, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0};
+  cumulative_costs.resize(costs.size());
+  std::partial_sum(costs.begin(), costs.end(), cumulative_costs.begin());
+  const auto dummy_getter = [](const int v) { return v; };
+
+  // [1, 2, 3] [5], [0 ... 0, 7, 0, ... 0]
+  EXPECT_TRUE(MaxPartitionCostIsFeasible(0,
+                                         costs.size(),
+                                         3,
+                                         7,
+                                         0,
+                                         cumulative_costs.data(),
+                                         dummy_getter,
+                                         &partition));
+  EXPECT_TRUE(BruteForcePartition(costs.data(), 0, costs.size(), 3, 7));
+  // [1, 2, 3, 5, 0 ... 0, 7, 0, ... 0]
+  EXPECT_TRUE(MaxPartitionCostIsFeasible(0,
+                                         costs.size(),
+                                         3,
+                                         18,
+                                         0,
+                                         cumulative_costs.data(),
+                                         dummy_getter,
+                                         &partition));
+  EXPECT_TRUE(BruteForcePartition(costs.data(), 0, costs.size(), 3, 18));
+  // Impossible since there is item of cost 7
+  EXPECT_FALSE(MaxPartitionCostIsFeasible(0,
+                                          costs.size(),
+                                          3,
+                                          6,
+                                          0,
+                                          cumulative_costs.data(),
+                                          dummy_getter,
+                                          &partition));
+  EXPECT_FALSE(BruteForcePartition(costs.data(), 0, costs.size(), 3, 6));
+  // Impossible
+  EXPECT_FALSE(MaxPartitionCostIsFeasible(0,
+                                          costs.size(),
+                                          2,
+                                          10,
+                                          0,
+                                          cumulative_costs.data(),
+                                          dummy_getter,
+                                          &partition));
+  EXPECT_FALSE(BruteForcePartition(costs.data(), 0, costs.size(), 2, 10));
+}
+
+// Randomized tests for MaxPartitionCostIsFeasible
+TEST(GuidedParallelFor, MaxPartitionCostIsFeasibleRandomized) {
+  using namespace parallel_for_details;
+  std::vector<int> costs, cumulative_costs, partition;
+  const auto dummy_getter = [](const int v) { return v; };
+
+  // Random tests
+  const int kNumTests = 1000;
+  const int kMaxElements = 32;
+  const int kMaxPartitions = 16;
+  const int kMaxElCost = 8;
+  std::mt19937 rng;
+  std::uniform_int_distribution<int> rng_N(1, kMaxElements);
+  std::uniform_int_distribution<int> rng_M(1, kMaxPartitions);
+  std::uniform_int_distribution<int> rng_e(0, kMaxElCost);
+  for (int t = 0; t < kNumTests; ++t) {
+    const int N = rng_N(rng);
+    const int M = rng_M(rng);
+    int total = 0;
+    costs.clear();
+    for (int i = 0; i < N; ++i) {
+      costs.push_back(rng_e(rng));
+      total += costs.back();
+    }
+
+    cumulative_costs.resize(N);
+    std::partial_sum(costs.begin(), costs.end(), cumulative_costs.begin());
+
+    std::uniform_int_distribution<int> rng_seg(0, N - 1);
+    int start = rng_seg(rng);
+    int end = rng_seg(rng);
+    if (start > end) std::swap(start, end);
+    ++end;
+
+    int first_admissible = 0;
+    for (int threshold = 1; threshold <= total; ++threshold) {
+      const bool bruteforce =
+          BruteForcePartition(costs.data(), start, end, M, threshold);
+      if (bruteforce && !first_admissible) {
+        first_admissible = threshold;
+      }
+      const bool binary_search =
+          MaxPartitionCostIsFeasible(start,
+                                     end,
+                                     M,
+                                     threshold,
+                                     start ? cumulative_costs[start - 1] : 0,
+                                     cumulative_costs.data(),
+                                     dummy_getter,
+                                     &partition);
+      EXPECT_EQ(bruteforce, binary_search);
+      EXPECT_LE(partition.size(), M + 1);
+      // check partition itself
+      if (binary_search) {
+        ASSERT_GT(partition.size(), 1);
+        EXPECT_EQ(partition.front(), start);
+        EXPECT_EQ(partition.back(), end);
+
+        const int num_partitions = partition.size() - 1;
+        EXPECT_LE(num_partitions, M);
+        for (int j = 0; j < num_partitions; ++j) {
+          int total = 0;
+          for (int k = partition[j]; k < partition[j + 1]; ++k) {
+            EXPECT_LT(k, end);
+            EXPECT_GE(k, start);
+            total += costs[k];
+          }
+          EXPECT_LE(total, threshold);
+        }
+      }
+    }
+  }
+}
+
+// Randomized tests for CreatePartition
+TEST(GuidedParallelFor, CreatePartition) {
+  using namespace parallel_for_details;
+  std::vector<int> costs, cumulative_costs, partition;
+  const auto dummy_getter = [](const int v) { return v; };
+
+  // Random tests
+  const int kNumTests = 1000;
+  const int kMaxElements = 32;
+  const int kMaxPartitions = 16;
+  const int kMaxElCost = 8;
+  std::mt19937 rng;
+  std::uniform_int_distribution<int> rng_N(1, kMaxElements);
+  std::uniform_int_distribution<int> rng_M(1, kMaxPartitions);
+  std::uniform_int_distribution<int> rng_e(0, kMaxElCost);
+  for (int t = 0; t < kNumTests; ++t) {
+    const int N = rng_N(rng);
+    const int M = rng_M(rng);
+    int total = 0;
+    costs.clear();
+    for (int i = 0; i < N; ++i) {
+      costs.push_back(rng_e(rng));
+      total += costs.back();
+    }
+
+    cumulative_costs.resize(N);
+    std::partial_sum(costs.begin(), costs.end(), cumulative_costs.begin());
+
+    std::uniform_int_distribution<int> rng_seg(0, N - 1);
+    int start = rng_seg(rng);
+    int end = rng_seg(rng);
+    if (start > end) std::swap(start, end);
+    ++end;
+
+    int first_admissible = 0;
+    for (int threshold = 1; threshold <= total; ++threshold) {
+      const bool bruteforce =
+          BruteForcePartition(costs.data(), start, end, M, threshold);
+      if (bruteforce) {
+        first_admissible = threshold;
+        break;
+      }
+    }
+    EXPECT_TRUE(first_admissible != 0 || total == 0);
+    partition =
+        ComputePartition(start, end, M, cumulative_costs.data(), dummy_getter);
+    ASSERT_GT(partition.size(), 1);
+    EXPECT_EQ(partition.front(), start);
+    EXPECT_EQ(partition.back(), end);
+
+    const int num_partitions = partition.size() - 1;
+    EXPECT_LE(num_partitions, M);
+    for (int j = 0; j < num_partitions; ++j) {
+      int total = 0;
+      for (int k = partition[j]; k < partition[j + 1]; ++k) {
+        EXPECT_LT(k, end);
+        EXPECT_GE(k, start);
+        total += costs[k];
+      }
+      EXPECT_LE(total, first_admissible);
+    }
+  }
+}
+
+// Recursively try to partition range into segements of total cost
+// less than max_cost
+bool BruteForcePartition(
+    int* costs, int start, int end, int max_partitions, int max_cost) {
+  if (start == end) return true;
+  if (start < end && max_partitions == 0) return false;
+  int total_cost = 0;
+  for (int last_curr = start + 1; last_curr <= end; ++last_curr) {
+    total_cost += costs[last_curr - 1];
+    if (total_cost > max_cost) break;
+    if (BruteForcePartition(
+            costs, last_curr, end, max_partitions - 1, max_cost))
+      return true;
+  }
+  return false;
+}
+
+// Tests if guided parallel for loop computes the correct result for various
+// number of threads.
+TEST(GuidedParallelFor, NumThreads) {
+  ContextImpl context;
+  context.EnsureMinimumThreads(/*num_threads=*/2);
+
+  const int size = 16;
+  std::vector<int> expected_results(size, 0);
+  for (int i = 0; i < size; ++i) {
+    expected_results[i] = std::sqrt(i);
+  }
+
+  std::vector<int> costs, cumulative_costs;
+  for (int i = 1; i <= size; ++i) {
+    int cost = i * i;
+    costs.push_back(cost);
+    if (i == 1) {
+      cumulative_costs.push_back(cost);
+    } else {
+      cumulative_costs.push_back(cost + cumulative_costs.back());
+    }
+  }
+
+  for (int num_threads = 1; num_threads <= 8; ++num_threads) {
+    std::vector<int> values(size, 0);
+    ParallelFor(
+        &context,
+        0,
+        size,
+        num_threads,
+        [&values](int i) { values[i] = std::sqrt(i); },
+        cumulative_costs.data(),
+        [](const int v) { return v; });
+    EXPECT_THAT(values, ElementsAreArray(expected_results));
+  }
+}
+
 }  // namespace ceres::internal