Reorganize ParallelFor source files Change-Id: Ic4941919e59210b48e447cbb61e539200c8c89df
diff --git a/bazel/ceres.bzl b/bazel/ceres.bzl index a1117aa..dc92e0b 100644 --- a/bazel/ceres.bzl +++ b/bazel/ceres.bzl
@@ -95,7 +95,8 @@ "manifold.cc", "minimizer.cc", "normal_prior.cc", - "parallel_for_cxx.cc", + "parallel_for.cc", + "parallel_invoke.cc", "parallel_utils.cc", "parameter_block_ordering.cc", "partitioned_matrix_view.cc", @@ -205,7 +206,6 @@ "CERES_NO_EXPORT=", "CERES_NO_LAPACK", "CERES_NO_SUITESPARSE", - "CERES_USE_CXX_THREADS", "CERES_USE_EIGEN_SPARSE", ], includes = [
diff --git a/docs/source/features.rst b/docs/source/features.rst index d905415..ba22b9e 100644 --- a/docs/source/features.rst +++ b/docs/source/features.rst
@@ -47,7 +47,7 @@ linear system. To this end Ceres ships with a variety of linear solvers - dense QR and dense Cholesky factorization (using `Eigen`_, `LAPACK`_ or `CUDA`_) for dense problems, sparse - Cholesky factorization (`SuiteSparse`_, `Accelerate`_, `Eigen`_) + Cholesky factorization (`SuiteSparse`_, `Apple's Accelerate`_, `Eigen`_) for large sparse problems, custom Schur complement based dense, sparse, and iterative linear solvers for `bundle adjustment`_ problems. @@ -59,9 +59,8 @@ of Non-linear Conjugate Gradients, BFGS and LBFGS. * **Speed** - Ceres Solver has been extensively optimized, with C++ - templating, hand written linear algebra routines and OpenMP or - modern C++ threads based multithreading of the Jacobian evaluation - and the linear solvers. + templating, hand written linear algebra routines and modern C++ threads + based multithreading of the Jacobian evaluation and the linear solvers. * **GPU Acceleration** If your system supports `CUDA`_ then Ceres Solver can use the Nvidia GPU on your system to speed up the solver.
diff --git a/docs/source/installation.rst b/docs/source/installation.rst index d9e338f..3838ccb 100644 --- a/docs/source/installation.rst +++ b/docs/source/installation.rst
@@ -663,11 +663,6 @@ gains in the ``SPARSE_SCHUR`` solver, you can disable some of the template specializations by turning this ``OFF``. -#. ``CERES_THREADING_MODEL [Default: CXX_THREADS > OPENMP > NO_THREADS]``: - Multi-threading backend Ceres should be compiled with. This will - automatically be set to only accept the available subset of threading - options in the CMake GUI. - #. ``BUILD_SHARED_LIBS [Default: OFF]``: By default Ceres is built as a static library, turn this ``ON`` to instead build Ceres as a shared library. @@ -860,11 +855,6 @@ #. ``SchurSpecializations``: Ceres built with Schur specializations (``SCHUR_SPECIALIZATIONS=ON``). -#. ``OpenMP``: Ceres built with OpenMP (``CERES_THREADING_MODEL=OPENMP``). - -#. ``Multithreading``: Ceres built with *a* multithreading library. - This is equivalent to (``CERES_THREAD != NO_THREADS``). - To specify one/multiple Ceres components use the ``COMPONENTS`` argument to `find_package() <http://www.cmake.org/cmake/help/v3.5/command/find_package.html>`_ like so:
diff --git a/docs/source/nnls_solving.rst b/docs/source/nnls_solving.rst index e281b28..7f7340b 100644 --- a/docs/source/nnls_solving.rst +++ b/docs/source/nnls_solving.rst
@@ -2298,9 +2298,7 @@ .. member:: int Solver::Summary::num_threads_used Number of threads actually used by the solver for Jacobian and - residual evaluation. This number is not equal to - :member:`Solver::Summary::num_threads_given` if none of `OpenMP` - or `CXX_THREADS` is available. + residual evaluation. .. member:: LinearSolverType Solver::Summary::linear_solver_type_given
diff --git a/include/ceres/solver.h b/include/ceres/solver.h index 09c6525..52249c5 100644 --- a/include/ceres/solver.h +++ b/include/ceres/solver.h
@@ -991,8 +991,7 @@ int num_threads_given = -1; // Number of threads actually used by the solver for Jacobian and - // residual evaluation. This number is not equal to - // num_threads_given if OpenMP is not available. + // residual evaluation. int num_threads_used = -1; // Type of the linear solver requested by the user.
diff --git a/internal/ceres/CMakeLists.txt b/internal/ceres/CMakeLists.txt index 23dbbd6..9729ef6 100644 --- a/internal/ceres/CMakeLists.txt +++ b/internal/ceres/CMakeLists.txt
@@ -219,7 +219,8 @@ linear_solver.cc low_rank_inverse_hessian.cc minimizer.cc - parallel_for_cxx.cc + parallel_for.cc + parallel_invoke.cc parallel_utils.cc parameter_block_ordering.cc partitioned_matrix_view.cc
diff --git a/internal/ceres/block_random_access_dense_matrix.cc b/internal/ceres/block_random_access_dense_matrix.cc index deb0a80..30d73f5 100644 --- a/internal/ceres/block_random_access_dense_matrix.cc +++ b/internal/ceres/block_random_access_dense_matrix.cc
@@ -34,7 +34,7 @@ #include <vector> #include "ceres/internal/eigen.h" -#include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "glog/logging.h" namespace ceres::internal {
diff --git a/internal/ceres/block_random_access_diagonal_matrix.cc b/internal/ceres/block_random_access_diagonal_matrix.cc index fd109ba..daabf8b 100644 --- a/internal/ceres/block_random_access_diagonal_matrix.cc +++ b/internal/ceres/block_random_access_diagonal_matrix.cc
@@ -40,6 +40,7 @@ #include "ceres/compressed_row_sparse_matrix.h" #include "ceres/internal/export.h" #include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "ceres/stl_util.h" #include "ceres/types.h" #include "glog/logging.h"
diff --git a/internal/ceres/block_random_access_sparse_matrix.cc b/internal/ceres/block_random_access_sparse_matrix.cc index 36a3188..bd9f1ba 100644 --- a/internal/ceres/block_random_access_sparse_matrix.cc +++ b/internal/ceres/block_random_access_sparse_matrix.cc
@@ -37,7 +37,7 @@ #include <vector> #include "ceres/internal/export.h" -#include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "ceres/triplet_sparse_matrix.h" #include "ceres/types.h" #include "glog/logging.h"
diff --git a/internal/ceres/block_sparse_matrix.cc b/internal/ceres/block_sparse_matrix.cc index f49f229..a60ee89 100644 --- a/internal/ceres/block_sparse_matrix.cc +++ b/internal/ceres/block_sparse_matrix.cc
@@ -40,6 +40,7 @@ #include "ceres/crs_matrix.h" #include "ceres/internal/eigen.h" #include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "ceres/small_blas.h" #include "ceres/triplet_sparse_matrix.h" #include "glog/logging.h"
diff --git a/internal/ceres/eigen_vector_ops.h b/internal/ceres/eigen_vector_ops.h index 91f2946..8b519e6 100644 --- a/internal/ceres/eigen_vector_ops.h +++ b/internal/ceres/eigen_vector_ops.h
@@ -35,6 +35,7 @@ #include "ceres/internal/eigen.h" #include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" namespace ceres::internal {
diff --git a/internal/ceres/implicit_schur_complement.cc b/internal/ceres/implicit_schur_complement.cc index dbba3a7..9c73de6 100644 --- a/internal/ceres/implicit_schur_complement.cc +++ b/internal/ceres/implicit_schur_complement.cc
@@ -36,6 +36,7 @@ #include "ceres/internal/eigen.h" #include "ceres/linear_solver.h" #include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "ceres/types.h" #include "glog/logging.h"
diff --git a/internal/ceres/levenberg_marquardt_strategy.cc b/internal/ceres/levenberg_marquardt_strategy.cc index 81ced2c..b37f74e 100644 --- a/internal/ceres/levenberg_marquardt_strategy.cc +++ b/internal/ceres/levenberg_marquardt_strategy.cc
@@ -38,7 +38,7 @@ #include "ceres/internal/eigen.h" #include "ceres/linear_least_squares_problems.h" #include "ceres/linear_solver.h" -#include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "ceres/sparse_matrix.h" #include "ceres/trust_region_strategy.h" #include "ceres/types.h"
diff --git a/internal/ceres/parallel_for_cxx.cc b/internal/ceres/parallel_for.cc similarity index 71% copy from internal/ceres/parallel_for_cxx.cc copy to internal/ceres/parallel_for.cc index 3c0671e..a92276d 100644 --- a/internal/ceres/parallel_for_cxx.cc +++ b/internal/ceres/parallel_for.cc
@@ -28,6 +28,8 @@ // // Author: vitus@google.com (Michael Vitus) +#include "ceres/parallel_for.h" + #include <algorithm> #include <atomic> #include <cmath> @@ -37,40 +39,11 @@ #include <tuple> #include "ceres/internal/config.h" -#include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "glog/logging.h" namespace ceres::internal { -BlockUntilFinished::BlockUntilFinished(int num_total_jobs) - : num_total_jobs_finished_(0), num_total_jobs_(num_total_jobs) {} - -void BlockUntilFinished::Finished(int num_jobs_finished) { - if (num_jobs_finished == 0) return; - std::lock_guard<std::mutex> lock(mutex_); - num_total_jobs_finished_ += num_jobs_finished; - CHECK_LE(num_total_jobs_finished_, num_total_jobs_); - if (num_total_jobs_finished_ == num_total_jobs_) { - condition_.notify_one(); - } -} - -void BlockUntilFinished::Block() { - std::unique_lock<std::mutex> lock(mutex_); - condition_.wait( - lock, [&]() { return num_total_jobs_finished_ == num_total_jobs_; }); -} - -ThreadPoolState::ThreadPoolState(int start, int end, int num_work_blocks) - : start(start), - end(end), - num_work_blocks(num_work_blocks), - base_block_size((end - start) / num_work_blocks), - num_base_p1_sized_blocks((end - start) % num_work_blocks), - block_id(0), - thread_id(0), - block_until_finished(num_work_blocks) {} - int MaxNumThreadsAvailable() { return ThreadPool::MaxNumThreadsAvailable(); } void ParallelSetZero(ContextImpl* context,
diff --git a/internal/ceres/parallel_for.h b/internal/ceres/parallel_for.h index bf38603..7536c74 100644 --- a/internal/ceres/parallel_for.h +++ b/internal/ceres/parallel_for.h
@@ -32,14 +32,14 @@ #ifndef CERES_INTERNAL_PARALLEL_FOR_H_ #define CERES_INTERNAL_PARALLEL_FOR_H_ -#include <algorithm> -#include <functional> #include <mutex> +#include <vector> #include "ceres/context_impl.h" -#include "ceres/internal/disable_warnings.h" #include "ceres/internal/eigen.h" #include "ceres/internal/export.h" +#include "ceres/parallel_invoke.h" +#include "ceres/partition_range_for_parallel_for.h" #include "glog/logging.h" namespace ceres::internal { @@ -51,237 +51,82 @@ : std::unique_lock<std::mutex>{m}; } -// Returns the maximum number of threads supported by the threading backend -// Ceres was compiled with. -CERES_NO_EXPORT -int MaxNumThreadsAvailable(); - -// Parallel for implementations share a common set of routines in order -// to enforce inlining of loop bodies, ensuring that single-threaded -// performance is equivalent to a simple for loop -namespace parallel_for_details { -// Get arguments of callable as a tuple -template <typename F, typename... Args> -std::tuple<std::decay_t<Args>...> args_of(void (F::*)(Args...) const); - -template <typename F> -using args_of_t = decltype(args_of(&F::operator())); - -// Parallelizable functions might require passing thread_id as the first -// argument. This class supplies thread_id argument to functions that -// support it and ignores it otherwise. -template <typename F, typename Args> -struct InvokeImpl; - -// For parallel for iterations of type [](int i) -> void -template <typename F> -struct InvokeImpl<F, std::tuple<int>> { - static void InvokeOnSegment(int thread_id, - int start, - int end, - const F& function) { - (void)thread_id; - for (int i = start; i < end; ++i) { - function(i); - } - } -}; - -// For parallel for iterations of type [](int thread_id, int i) -> void -template <typename F> -struct InvokeImpl<F, std::tuple<int, int>> { - static void InvokeOnSegment(int thread_id, - int start, - int end, - const F& function) { - for (int i = start; i < end; ++i) { - function(thread_id, i); - } - } -}; - -// For parallel for iterations of type [](tuple<int, int> range) -> void -template <typename F> -struct InvokeImpl<F, std::tuple<std::tuple<int, int>>> { - static void InvokeOnSegment(int thread_id, - int start, - int end, - const F& function) { - (void)thread_id; - function(std::make_tuple(start, end)); - } -}; - -// For parallel for iterations of type [](int thread_id, tuple<int, int> range) -// -> void -template <typename F> -struct InvokeImpl<F, std::tuple<int, std::tuple<int, int>>> { - static void InvokeOnSegment(int thread_id, - int start, - int end, - const F& function) { - function(thread_id, std::make_tuple(start, end)); - } -}; - -// Invoke function passing thread_id only if required -template <typename F> -void InvokeOnSegment(int thread_id, int start, int end, const F& function) { - InvokeImpl<F, args_of_t<F>>::InvokeOnSegment(thread_id, start, end, function); -} - -// Check if it is possible to split range [start; end) into at most -// max_num_partitions contiguous partitions of cost not greater than -// max_partition_cost. Inclusive integer cumulative costs are provided by -// cumulative_cost_data objects, with cumulative_cost_offset being a total cost -// of all indices (starting from zero) preceding start element. Cumulative costs -// are returned by cumulative_cost_fun called with a reference to -// cumulative_cost_data element with index from range[start; end), and should be -// non-decreasing. Partition of the range is returned via partition argument -template <typename CumulativeCostData, typename CumulativeCostFun> -bool MaxPartitionCostIsFeasible(int start, - int end, - int max_num_partitions, - int max_partition_cost, - int cumulative_cost_offset, - const CumulativeCostData* cumulative_cost_data, - const CumulativeCostFun& cumulative_cost_fun, - std::vector<int>* partition) { - partition->clear(); - partition->push_back(start); - int partition_start = start; - int cost_offset = cumulative_cost_offset; - - while (partition_start < end) { - // Already have max_num_partitions - if (partition->size() > max_num_partitions) { - return false; - } - const int target = max_partition_cost + cost_offset; - const int partition_end = - std::partition_point( - cumulative_cost_data + partition_start, - cumulative_cost_data + end, - [&cumulative_cost_fun, target](const CumulativeCostData& item) { - return cumulative_cost_fun(item) <= target; - }) - - cumulative_cost_data; - // Unable to make a partition from a single element - if (partition_end == partition_start) { - return false; - } - - const int cost_last = - cumulative_cost_fun(cumulative_cost_data[partition_end - 1]); - partition->push_back(partition_end); - partition_start = partition_end; - cost_offset = cost_last; - } - return true; -} - -// Split integer interval [start, end) into at most max_num_partitions -// contiguous intervals, minimizing maximal total cost of a single interval. -// Inclusive integer cumulative costs for each (zero-based) index are provided -// by cumulative_cost_data objects, and are returned by cumulative_cost_fun call -// with a reference to one of the objects from range [start, end) -template <typename CumulativeCostData, typename CumulativeCostFun> -std::vector<int> ComputePartition( - int start, - int end, - int max_num_partitions, - const CumulativeCostData* cumulative_cost_data, - const CumulativeCostFun& cumulative_cost_fun) { - // Given maximal partition cost, it is possible to verify if it is admissible - // and obtain corresponding partition using MaxPartitionCostIsFeasible - // function. In order to find the lowest admissible value, a binary search - // over all potentially optimal cost values is being performed - const int cumulative_cost_last = - cumulative_cost_fun(cumulative_cost_data[end - 1]); - const int cumulative_cost_offset = - start ? cumulative_cost_fun(cumulative_cost_data[start - 1]) : 0; - const int total_cost = cumulative_cost_last - cumulative_cost_offset; - - // Minimal maximal partition cost is not smaller than the average - // We will use non-inclusive lower bound - int partition_cost_lower_bound = total_cost / max_num_partitions - 1; - // Minimal maximal partition cost is not larger than the total cost - // Upper bound is inclusive - int partition_cost_upper_bound = total_cost; - - std::vector<int> partition, partition_upper_bound; - // Binary search over partition cost, returning the lowest admissible cost - while (partition_cost_upper_bound - partition_cost_lower_bound > 1) { - partition.reserve(max_num_partitions + 1); - const int partition_cost = - partition_cost_lower_bound + - (partition_cost_upper_bound - partition_cost_lower_bound) / 2; - bool admissible = MaxPartitionCostIsFeasible(start, - end, - max_num_partitions, - partition_cost, - cumulative_cost_offset, - cumulative_cost_data, - cumulative_cost_fun, - &partition); - if (admissible) { - partition_cost_upper_bound = partition_cost; - std::swap(partition, partition_upper_bound); - } else { - partition_cost_lower_bound = partition_cost; - } - } - - // After binary search over partition cost, interval - // (partition_cost_lower_bound, partition_cost_upper_bound] contains the only - // admissible partition cost value - partition_cost_upper_bound - // - // Partition for this cost value might have been already computed - if (partition_upper_bound.empty() == false) { - return partition_upper_bound; - } - // Partition for upper bound is not computed if and only if upper bound was - // never updated This is a simple case of a single interval containing all - // values, which we were not able to break into pieces - partition = {start, end}; - return partition; -} -} // namespace parallel_for_details - -// Forward declaration of parallel invocation function that is to be -// implemented by each threading backend -template <typename F> -void ParallelInvoke(ContextImpl* context, - int start, - int end, - int num_threads, - const F& function); +// Returns the maximum supported number of threads +CERES_NO_EXPORT int MaxNumThreadsAvailable(); // Execute the function for every element in the range [start, end) with at most // num_threads. It will execute all the work on the calling thread if // num_threads or (end - start) is equal to 1. +// Depending on function signature, it will be supplied with either loop index +// or a range of loop indicies; function can also be supplied with thread_id. +// The following function signatures are supported: +// - Functions accepting a single loop index: +// - [](int index) { ... } +// - [](int thread_id, int index) { ... } +// - Functions accepting a range of loop index: +// - [](std::tuple<int, int> index) { ... } +// - [](int thread_id, std::tuple<int, int> index) { ... } // -// Functions with two arguments will be passed thread_id and loop index on each -// invocation, functions with one argument will be invoked with loop index +// When distributing workload between threads, it is assumed that each loop +// iteration takes approximately equal time to complete. template <typename F> -void ParallelFor(ContextImpl* context, - int start, - int end, - int num_threads, - const F& function) { - using namespace parallel_for_details; +void ParallelFor( + ContextImpl* context, int start, int end, int num_threads, F&& function) { CHECK_GT(num_threads, 0); if (start >= end) { return; } if (num_threads == 1 || end - start == 1) { - InvokeOnSegment<F>(0, start, end, function); + InvokeOnSegment(0, std::make_tuple(start, end), std::forward<F>(function)); return; } CHECK(context != nullptr); - ParallelInvoke<F>(context, start, end, num_threads, function); + ParallelInvoke(context, start, end, num_threads, std::forward<F>(function)); +} + +// Execute function for every element in the range [start, end) with at most +// num_threads, using user-provided partitions array. +// When distributing workload between threads, it is assumed that each segment +// bounded by adjacent elements of partitions array takes approximately equal +// time to process. +template <typename F> +void ParallelFor(ContextImpl* context, + int start, + int end, + int num_threads, + F&& function, + const std::vector<int>& partitions) { + CHECK_GT(num_threads, 0); + if (start >= end) { + return; + } + CHECK_EQ(partitions.front(), start); + CHECK_EQ(partitions.back(), end); + if (num_threads == 1 || end - start <= num_threads) { + ParallelFor(context, start, end, num_threads, std::forward<F>(function)); + return; + } + CHECK_GT(partitions.size(), 1); + const int num_partitions = partitions.size() - 1; + ParallelFor(context, + 0, + num_partitions, + num_threads, + [&function, &partitions](int thread_id, + std::tuple<int, int> partition_ids) { + // partition_ids is a range of partition indices + const auto [partition_start, partition_end] = partition_ids; + // Execution over several adjacent segments is equivalent + // to execution over union of those segments (which is also a + // contiguous segment) + const int range_start = partitions[partition_start]; + const int range_end = partitions[partition_end]; + // Range of original loop indices + const auto range = std::make_tuple(range_start, range_end); + InvokeOnSegment(thread_id, range, function); + }); } // Execute function for every element in the range [start, end) with at most @@ -293,123 +138,43 @@ // user-defined object. Only indices from [start, end) will be referenced. This // routine assumes that cumulative_cost_fun is non-decreasing (in other words, // all costs are non-negative); +// When distributing workload between threads, input range of loop indices will +// be partitioned into disjoint contiguous intervals, with the maximal cost +// being minimized. +// For example, with iteration costs of [1, 1, 5, 3, 1, 4] cumulative_cost_fun +// should return [1, 2, 7, 10, 11, 15], and with num_threads = 4 this range +// will be split into segments [0, 2) [2, 3) [3, 5) [5, 6) with costs +// [2, 5, 4, 4]. template <typename F, typename CumulativeCostData, typename CumulativeCostFun> void ParallelFor(ContextImpl* context, int start, int end, int num_threads, - const F& function, + F&& function, const CumulativeCostData* cumulative_cost_data, - const CumulativeCostFun& cumulative_cost_fun) { - using namespace parallel_for_details; + CumulativeCostFun&& cumulative_cost_fun) { CHECK_GT(num_threads, 0); if (start >= end) { return; } if (num_threads == 1 || end - start <= num_threads) { - ParallelFor(context, start, end, num_threads, function); + ParallelFor(context, start, end, num_threads, std::forward<F>(function)); return; } // Creating several partitions allows us to tolerate imperfections of // partitioning and user-supplied iteration costs up to a certain extent - const int kNumPartitionsPerThread = 4; + constexpr int kNumPartitionsPerThread = 4; const int kMaxPartitions = num_threads * kNumPartitionsPerThread; - const std::vector<int> partitions = ComputePartition( - start, end, kMaxPartitions, cumulative_cost_data, cumulative_cost_fun); + const auto& partitions = PartitionRangeForParallelFor( + start, + end, + kMaxPartitions, + cumulative_cost_data, + std::forward<CumulativeCostFun>(cumulative_cost_fun)); CHECK_GT(partitions.size(), 1); - const int num_partitions = partitions.size() - 1; - ParallelFor(context, - 0, - num_partitions, - num_threads, - [&function, &partitions](int thread_id, int partition_id) { - const int partition_start = partitions[partition_id]; - const int partition_end = partitions[partition_id + 1]; - - InvokeOnSegment<F>( - thread_id, partition_start, partition_end, function); - }); + ParallelFor( + context, start, end, num_threads, std::forward<F>(function), partitions); } - -// Execute function for every element in the range [start, end) with at most -// num_threads, using the user provided partitioning. taking into account -// user-provided integer cumulative costs of iterations. -template <typename F> -void ParallelFor(ContextImpl* context, - int start, - int end, - int num_threads, - const F& function, - const std::vector<int>& partitions) { - using namespace parallel_for_details; - CHECK_GT(num_threads, 0); - if (start >= end) { - return; - } - CHECK_EQ(partitions.front(), start); - CHECK_EQ(partitions.back(), end); - if (num_threads == 1 || end - start <= num_threads) { - ParallelFor(context, start, end, num_threads, function); - return; - } - CHECK_GT(partitions.size(), 1); - const int num_partitions = partitions.size() - 1; - ParallelFor(context, - 0, - num_partitions, - num_threads, - [&function, &partitions](int thread_id, int partition_id) { - const int partition_start = partitions[partition_id]; - const int partition_end = partitions[partition_id + 1]; - - InvokeOnSegment<F>( - thread_id, partition_start, partition_end, function); - }); -} - -// Evaluate vector expression in parallel -// Assuming LhsExpression and RhsExpression are some sort of -// column-vector expression, assignment lhs = rhs -// is eavluated over a set of contiguous blocks in parallel. -// This is expected to work well in the case of vector-based -// expressions (since they typically do not result into -// temporaries). -// This method expects lhs to be size-compatible with rhs -template <typename LhsExpression, typename RhsExpression> -void ParallelAssign(ContextImpl* context, - int num_threads, - LhsExpression& lhs, - const RhsExpression& rhs) { - static_assert(LhsExpression::ColsAtCompileTime == 1); - static_assert(RhsExpression::ColsAtCompileTime == 1); - CHECK_EQ(lhs.rows(), rhs.rows()); - const int num_rows = lhs.rows(); - ParallelFor(context, - 0, - num_rows, - num_threads, - [&lhs, &rhs](const std::tuple<int, int>& range) { - auto [start, end] = range; - lhs.segment(start, end - start) = - rhs.segment(start, end - start); - }); -} - -// Set vector to zero using num_threads -template <typename VectorType> -void ParallelSetZero(ContextImpl* context, - int num_threads, - VectorType& vector) { - ParallelSetZero(context, num_threads, vector.data(), vector.rows()); -} -void ParallelSetZero(ContextImpl* context, - int num_threads, - double* values, - int num_values); } // namespace ceres::internal -// Backend-specific implementations of ParallelInvoke -#include "ceres/internal/disable_warnings.h" -#include "ceres/parallel_for_cxx.h" - #endif // CERES_INTERNAL_PARALLEL_FOR_H_
diff --git a/internal/ceres/parallel_for_test.cc b/internal/ceres/parallel_for_test.cc index 871bb52..b3c210f 100644 --- a/internal/ceres/parallel_for_test.cc +++ b/internal/ceres/parallel_for_test.cc
@@ -41,6 +41,7 @@ #include "ceres/context_impl.h" #include "ceres/internal/config.h" +#include "ceres/parallel_vector_ops.h" #include "glog/logging.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -190,8 +191,6 @@ // Basic test if MaxPartitionCostIsFeasible and BruteForcePartition agree on // simple test-cases TEST(GuidedParallelFor, MaxPartitionCostIsFeasible) { - using parallel_for_details::MaxPartitionCostIsFeasible; - std::vector<int> costs, cumulative_costs, partition; costs = {1, 2, 3, 5, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0}; cumulative_costs.resize(costs.size()); @@ -242,8 +241,6 @@ // Randomized tests for MaxPartitionCostIsFeasible TEST(GuidedParallelFor, MaxPartitionCostIsFeasibleRandomized) { - using parallel_for_details::MaxPartitionCostIsFeasible; - std::vector<int> costs, cumulative_costs, partition; const auto dummy_getter = [](const int v) { return v; }; @@ -315,9 +312,7 @@ } } -TEST(GuidedParallelFor, ComputePartition) { - using parallel_for_details::ComputePartition; - +TEST(GuidedParallelFor, PartitionRangeForParallelFor) { std::vector<int> costs, cumulative_costs, partition; const auto dummy_getter = [](const int v) { return v; }; @@ -359,8 +354,8 @@ } } EXPECT_TRUE(first_admissible != 0 || total == 0); - partition = - ComputePartition(start, end, M, cumulative_costs.data(), dummy_getter); + partition = PartitionRangeForParallelFor( + start, end, M, cumulative_costs.data(), dummy_getter); ASSERT_GT(partition.size(), 1); EXPECT_EQ(partition.front(), start); EXPECT_EQ(partition.back(), end);
diff --git a/internal/ceres/parallel_for_cxx.cc b/internal/ceres/parallel_invoke.cc similarity index 79% rename from internal/ceres/parallel_for_cxx.cc rename to internal/ceres/parallel_invoke.cc index 3c0671e..0e387c5 100644 --- a/internal/ceres/parallel_for_cxx.cc +++ b/internal/ceres/parallel_invoke.cc
@@ -1,5 +1,5 @@ // Ceres Solver - A fast non-linear least squares minimizer -// Copyright 2018 Google Inc. All rights reserved. +// Copyright 2023 Google Inc. All rights reserved. // http://ceres-solver.org/ // // Redistribution and use in source and binary forms, with or without @@ -38,6 +38,7 @@ #include "ceres/internal/config.h" #include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "glog/logging.h" namespace ceres::internal { @@ -58,10 +59,12 @@ void BlockUntilFinished::Block() { std::unique_lock<std::mutex> lock(mutex_); condition_.wait( - lock, [&]() { return num_total_jobs_finished_ == num_total_jobs_; }); + lock, [this]() { return num_total_jobs_finished_ == num_total_jobs_; }); } -ThreadPoolState::ThreadPoolState(int start, int end, int num_work_blocks) +ParallelInvokeState::ParallelInvokeState(int start, + int end, + int num_work_blocks) : start(start), end(end), num_work_blocks(num_work_blocks), @@ -71,20 +74,4 @@ thread_id(0), block_until_finished(num_work_blocks) {} -int MaxNumThreadsAvailable() { return ThreadPool::MaxNumThreadsAvailable(); } - -void ParallelSetZero(ContextImpl* context, - int num_threads, - double* values, - int num_values) { - ParallelFor(context, - 0, - num_values, - num_threads, - [values](std::tuple<int, int> range) { - auto [start, end] = range; - std::fill(values + start, values + end, 0.); - }); -} - } // namespace ceres::internal
diff --git a/internal/ceres/parallel_for_cxx.h b/internal/ceres/parallel_invoke.h similarity index 85% rename from internal/ceres/parallel_for_cxx.h rename to internal/ceres/parallel_invoke.h index fb87156..3e67e37 100644 --- a/internal/ceres/parallel_for_cxx.h +++ b/internal/ceres/parallel_invoke.h
@@ -1,5 +1,5 @@ // Ceres Solver - A fast non-linear least squares minimizer -// Copyright 2018 Google Inc. All rights reserved. +// Copyright 2023 Google Inc. All rights reserved. // http://ceres-solver.org/ // // Redistribution and use in source and binary forms, with or without @@ -29,19 +29,47 @@ // Authors: vitus@google.com (Michael Vitus), // dmitriy.korchemkin@gmail.com (Dmitriy Korchemkin) -#ifndef CERES_INTERNAL_PARALLEL_FOR_CXX_H_ -#define CERES_INTERNAL_PARALLEL_FOR_CXX_H_ +#ifndef CERES_INTERNAL_PARALLEL_INVOKE_H_ +#define CERES_INTERNAL_PARALLEL_INVOKE_H_ #include <atomic> -#include <cmath> #include <condition_variable> #include <memory> #include <mutex> - -#include "ceres/internal/config.h" -#include "glog/logging.h" +#include <tuple> +#include <type_traits> namespace ceres::internal { + +// InvokeWithThreadId handles passing thread_id to the function +template <typename F, typename... Args> +void InvokeWithThreadId(int thread_id, F&& function, Args&&... args) { + constexpr bool kPassThreadId = std::is_invocable_v<F, int, Args...>; + + if constexpr (kPassThreadId) { + function(thread_id, std::forward<Args>(args)...); + } else { + function(std::forward<Args>(args)...); + } +} + +// InvokeOnSegment either runs a loop over segment indices or passes it to the +// function +template <typename F> +void InvokeOnSegment(int thread_id, std::tuple<int, int> range, F&& function) { + constexpr bool kExplicitLoop = + std::is_invocable_v<F, int> || std::is_invocable_v<F, int, int>; + + if constexpr (kExplicitLoop) { + const auto [start, end] = range; + for (int i = start; i != end; ++i) { + InvokeWithThreadId(thread_id, std::forward<F>(function), i); + } + } else { + InvokeWithThreadId(thread_id, std::forward<F>(function), range); + } +} + // This class creates a thread safe barrier which will block until a // pre-specified number of threads call Finished. This allows us to block the // main thread until all the parallel threads are finished processing all the @@ -62,18 +90,17 @@ std::mutex mutex_; std::condition_variable condition_; int num_total_jobs_finished_; - int num_total_jobs_; + const int num_total_jobs_; }; // Shared state between the parallel tasks. Each thread will use this // information to get the next block of work to be performed. -struct ThreadPoolState { +struct ParallelInvokeState { // The entire range [start, end) is split into num_work_blocks contiguous // disjoint intervals (blocks), which are as equal as possible given // total index count and requested number of blocks. // - // Those num_work_blocks blocks are then processed by num_workers - // workers + // Those num_work_blocks blocks are then processed in parallel. // // Total number of integer indices in interval [start, end) is // end - start, and when splitting them into num_work_blocks blocks @@ -87,7 +114,7 @@ // Note that this splitting is optimal in the sense of maximal difference // between block sizes, since splitting into equal blocks is possible // if and only if number of indices is divisible by number of blocks. - ThreadPoolState(int start, int end, int num_work_blocks); + ParallelInvokeState(int start, int end, int num_work_blocks); // The start and end index of the for loop. const int start; @@ -135,12 +162,8 @@ // A performance analysis has shown this implementation is on par with OpenMP // and TBB. template <typename F> -void ParallelInvoke(ContextImpl* context, - int start, - int end, - int num_threads, - const F& function) { - using namespace parallel_for_details; +void ParallelInvoke( + ContextImpl* context, int start, int end, int num_threads, F&& function) { CHECK(context != nullptr); // Maximal number of work items scheduled for a single thread @@ -159,8 +182,8 @@ // We use a std::shared_ptr because the main thread can finish all // the work before the tasks have been popped off the queue. So the // shared state needs to exist for the duration of all the tasks. - std::shared_ptr<ThreadPoolState> shared_state( - new ThreadPoolState(start, end, num_work_blocks)); + auto shared_state = + std::make_shared<ParallelInvokeState>(start, end, num_work_blocks); // A function which tries to perform several chunks of work. auto task = [shared_state, num_threads, &function]() { @@ -213,7 +236,8 @@ const int curr_end = curr_start + base_block_size + (block_id < num_base_p1_sized_blocks ? 1 : 0); // Perform each task in current block - InvokeOnSegment<F>(thread_id, curr_start, curr_end, function); + const auto range = std::make_tuple(curr_start, curr_end); + InvokeOnSegment(thread_id, range, function); } shared_state->block_until_finished.Finished(num_jobs_finished); }; @@ -239,4 +263,4 @@ } // namespace ceres::internal -#endif // CERES_INTERNAL_PARALLEL_FOR_CXX_H_ +#endif
diff --git a/internal/ceres/parallel_vector_ops.h b/internal/ceres/parallel_vector_ops.h new file mode 100644 index 0000000..20d6dbc --- /dev/null +++ b/internal/ceres/parallel_vector_ops.h
@@ -0,0 +1,87 @@ +// Ceres Solver - A fast non-linear least squares minimizer +// Copyright 2023 Google Inc. All rights reserved. +// http://ceres-solver.org/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// * Neither the name of Google Inc. nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Authors: vitus@google.com (Michael Vitus), +// dmitriy.korchemkin@gmail.com (Dmitriy Korchemkin) + +#ifndef CERES_INTERNAL_PARALLEL_VECTOR_OPS_H_ +#define CERES_INTERNAL_PARALLEL_VECTOR_OPS_H_ + +#include <mutex> +#include <vector> + +#include "ceres/context_impl.h" +#include "ceres/internal/eigen.h" +#include "ceres/internal/export.h" +#include "ceres/parallel_for.h" + +namespace ceres::internal { + +// Evaluate vector expression in parallel +// Assuming LhsExpression and RhsExpression are some sort of +// column-vector expression, assignment lhs = rhs +// is eavluated over a set of contiguous blocks in parallel. +// This is expected to work well in the case of vector-based +// expressions (since they typically do not result into +// temporaries). +// This method expects lhs to be size-compatible with rhs +template <typename LhsExpression, typename RhsExpression> +void ParallelAssign(ContextImpl* context, + int num_threads, + LhsExpression& lhs, + const RhsExpression& rhs) { + static_assert(LhsExpression::ColsAtCompileTime == 1); + static_assert(RhsExpression::ColsAtCompileTime == 1); + CHECK_EQ(lhs.rows(), rhs.rows()); + const int num_rows = lhs.rows(); + ParallelFor(context, + 0, + num_rows, + num_threads, + [&lhs, &rhs](const std::tuple<int, int>& range) { + auto [start, end] = range; + lhs.segment(start, end - start) = + rhs.segment(start, end - start); + }); +} + +// Set vector to zero using num_threads +template <typename VectorType> +void ParallelSetZero(ContextImpl* context, + int num_threads, + VectorType& vector) { + ParallelSetZero(context, num_threads, vector.data(), vector.rows()); +} +void ParallelSetZero(ContextImpl* context, + int num_threads, + double* values, + int num_values); + +} // namespace ceres::internal + +#endif // CERES_INTERNAL_PARALLEL_FOR_H_
diff --git a/internal/ceres/partition_range_for_parallel_for.h b/internal/ceres/partition_range_for_parallel_for.h new file mode 100644 index 0000000..309d7a8 --- /dev/null +++ b/internal/ceres/partition_range_for_parallel_for.h
@@ -0,0 +1,150 @@ +// Ceres Solver - A fast non-linear least squares minimizer +// Copyright 2023 Google Inc. All rights reserved. +// http://ceres-solver.org/ +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// * Neither the name of Google Inc. nor the names of its contributors may be +// used to endorse or promote products derived from this software without +// specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Authors: vitus@google.com (Michael Vitus), +// dmitriy.korchemkin@gmail.com (Dmitriy Korchemkin) + +#ifndef CERES_INTERNAL_PARTITION_RANGE_FOR_PARALLEL_FOR_H_ +#define CERES_INTERNAL_PARTITION_RANGE_FOR_PARALLEL_FOR_H_ + +#include <algorithm> +#include <vector> + +namespace ceres::internal { +// Check if it is possible to split range [start; end) into at most +// max_num_partitions contiguous partitions of cost not greater than +// max_partition_cost. Inclusive integer cumulative costs are provided by +// cumulative_cost_data objects, with cumulative_cost_offset being a total cost +// of all indices (starting from zero) preceding start element. Cumulative costs +// are returned by cumulative_cost_fun called with a reference to +// cumulative_cost_data element with index from range[start; end), and should be +// non-decreasing. Partition of the range is returned via partition argument +template <typename CumulativeCostData, typename CumulativeCostFun> +bool MaxPartitionCostIsFeasible(int start, + int end, + int max_num_partitions, + int max_partition_cost, + int cumulative_cost_offset, + const CumulativeCostData* cumulative_cost_data, + CumulativeCostFun&& cumulative_cost_fun, + std::vector<int>* partition) { + partition->clear(); + partition->push_back(start); + int partition_start = start; + int cost_offset = cumulative_cost_offset; + + while (partition_start < end) { + // Already have max_num_partitions + if (partition->size() > max_num_partitions) { + return false; + } + const int target = max_partition_cost + cost_offset; + const int partition_end = + std::partition_point( + cumulative_cost_data + partition_start, + cumulative_cost_data + end, + [&cumulative_cost_fun, target](const CumulativeCostData& item) { + return cumulative_cost_fun(item) <= target; + }) - + cumulative_cost_data; + // Unable to make a partition from a single element + if (partition_end == partition_start) { + return false; + } + + const int cost_last = + cumulative_cost_fun(cumulative_cost_data[partition_end - 1]); + partition->push_back(partition_end); + partition_start = partition_end; + cost_offset = cost_last; + } + return true; +} + +// Split integer interval [start, end) into at most max_num_partitions +// contiguous intervals, minimizing maximal total cost of a single interval. +// Inclusive integer cumulative costs for each (zero-based) index are provided +// by cumulative_cost_data objects, and are returned by cumulative_cost_fun call +// with a reference to one of the objects from range [start, end) +template <typename CumulativeCostData, typename CumulativeCostFun> +std::vector<int> PartitionRangeForParallelFor( + int start, + int end, + int max_num_partitions, + const CumulativeCostData* cumulative_cost_data, + CumulativeCostFun&& cumulative_cost_fun) { + // Given maximal partition cost, it is possible to verify if it is admissible + // and obtain corresponding partition using MaxPartitionCostIsFeasible + // function. In order to find the lowest admissible value, a binary search + // over all potentially optimal cost values is being performed + const int cumulative_cost_last = + cumulative_cost_fun(cumulative_cost_data[end - 1]); + const int cumulative_cost_offset = + start ? cumulative_cost_fun(cumulative_cost_data[start - 1]) : 0; + const int total_cost = cumulative_cost_last - cumulative_cost_offset; + + // Minimal maximal partition cost is not smaller than the average + // We will use non-inclusive lower bound + int partition_cost_lower_bound = total_cost / max_num_partitions - 1; + // Minimal maximal partition cost is not larger than the total cost + // Upper bound is inclusive + int partition_cost_upper_bound = total_cost; + + std::vector<int> partition; + // Range partition corresponding to the latest evaluated upper bound. + // A single segment covering the whole input interval [start, end) corresponds + // to minimal maximal partition cost of total_cost. + std::vector<int> partition_upper_bound = {start, end}; + // Binary search over partition cost, returning the lowest admissible cost + while (partition_cost_upper_bound - partition_cost_lower_bound > 1) { + partition.reserve(max_num_partitions + 1); + const int partition_cost = + partition_cost_lower_bound + + (partition_cost_upper_bound - partition_cost_lower_bound) / 2; + bool admissible = MaxPartitionCostIsFeasible( + start, + end, + max_num_partitions, + partition_cost, + cumulative_cost_offset, + cumulative_cost_data, + std::forward<CumulativeCostFun>(cumulative_cost_fun), + &partition); + if (admissible) { + partition_cost_upper_bound = partition_cost; + std::swap(partition, partition_upper_bound); + } else { + partition_cost_lower_bound = partition_cost; + } + } + + return partition_upper_bound; +} +} // namespace ceres::internal + +#endif
diff --git a/internal/ceres/partitioned_matrix_view_impl.h b/internal/ceres/partitioned_matrix_view_impl.h index e685f5b..385ceb1 100644 --- a/internal/ceres/partitioned_matrix_view_impl.h +++ b/internal/ceres/partitioned_matrix_view_impl.h
@@ -37,6 +37,7 @@ #include "ceres/block_structure.h" #include "ceres/internal/eigen.h" #include "ceres/parallel_for.h" +#include "ceres/partition_range_for_parallel_for.h" #include "ceres/partitioned_matrix_view.h" #include "ceres/small_blas.h" #include "glog/logging.h" @@ -87,14 +88,14 @@ const int num_threads = options_.num_threads; if (transpose_bs != nullptr && num_threads > 1) { int kMaxPartitions = num_threads * 4; - e_cols_partition_ = parallel_for_details::ComputePartition( + e_cols_partition_ = PartitionRangeForParallelFor( 0, num_col_blocks_e_, kMaxPartitions, transpose_bs->rows.data(), [](const CompressedRow& row) { return row.cumulative_nnz; }); - f_cols_partition_ = parallel_for_details::ComputePartition( + f_cols_partition_ = PartitionRangeForParallelFor( num_col_blocks_e_, num_col_blocks_e_ + num_col_blocks_f_, kMaxPartitions,
diff --git a/internal/ceres/program_evaluator.h b/internal/ceres/program_evaluator.h index 7ccbbd1..372b260 100644 --- a/internal/ceres/program_evaluator.h +++ b/internal/ceres/program_evaluator.h
@@ -43,7 +43,7 @@ // residual jacobians are written directly into their final position in the // block sparse matrix by the user's CostFunction; there is no copying. // -// The evaluation is threaded with OpenMP or C++ threads. +// The evaluation is threaded with C++ threads. // // The EvaluatePreparer and JacobianWriter interfaces are as follows: // @@ -96,6 +96,7 @@ #include "ceres/execution_summary.h" #include "ceres/internal/eigen.h" #include "ceres/parallel_for.h" +#include "ceres/parallel_vector_ops.h" #include "ceres/parameter_block.h" #include "ceres/program.h" #include "ceres/residual_block.h"
diff --git a/internal/ceres/wall_time.h b/internal/ceres/wall_time.h index 36936ee..d503493 100644 --- a/internal/ceres/wall_time.h +++ b/internal/ceres/wall_time.h
@@ -41,10 +41,8 @@ namespace ceres::internal { -// Returns time, in seconds, from some arbitrary starting point. If -// OpenMP is available then the high precision openmp_get_wtime() -// function is used. Otherwise on unixes, gettimeofday is used. The -// granularity is in seconds on windows systems. +// Returns time, in seconds, from some arbitrary starting point. On unixes, +// gettimeofday is used. The granularity is microseconds. CERES_NO_EXPORT double WallTimeInSeconds(); // Log a series of events, recording for each event the time elapsed