blob: f0766317f8cd944a483a1c95154e70cd7d3474bd [file] [log] [blame]
// Ceres Solver - A fast non-linear least squares minimizer
// Copyright 2023 Google Inc. All rights reserved.
// http://ceres-solver.org/
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
// * Neither the name of Google Inc. nor the names of its contributors may be
// used to endorse or promote products derived from this software without
// specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
// Authors: vitus@google.com (Michael Vitus),
// dmitriy.korchemkin@gmail.com (Dmitriy Korchemkin)
#ifndef CERES_INTERNAL_PARALLEL_INVOKE_H_
#define CERES_INTERNAL_PARALLEL_INVOKE_H_
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <tuple>
#include <type_traits>
#include "absl/log/check.h"
namespace ceres::internal {
// InvokeWithThreadId handles passing thread_id to the function
template <typename F, typename... Args>
void InvokeWithThreadId(int thread_id, F&& function, Args&&... args) {
constexpr bool kPassThreadId = std::is_invocable_v<F, int, Args...>;
if constexpr (kPassThreadId) {
function(thread_id, std::forward<Args>(args)...);
} else {
function(std::forward<Args>(args)...);
}
}
// InvokeOnSegment either runs a loop over segment indices or passes it to the
// function
template <typename F>
void InvokeOnSegment(int thread_id, std::tuple<int, int> range, F&& function) {
constexpr bool kExplicitLoop =
std::is_invocable_v<F, int> || std::is_invocable_v<F, int, int>;
if constexpr (kExplicitLoop) {
const auto [start, end] = range;
for (int i = start; i != end; ++i) {
InvokeWithThreadId(thread_id, std::forward<F>(function), i);
}
} else {
InvokeWithThreadId(thread_id, std::forward<F>(function), range);
}
}
// This class creates a thread safe barrier which will block until a
// pre-specified number of threads call Finished. This allows us to block the
// main thread until all the parallel threads are finished processing all the
// work.
class BlockUntilFinished {
public:
explicit BlockUntilFinished(int num_total_jobs);
// Increment the number of jobs that have been processed by the number of
// jobs processed by caller and signal the blocking thread if all jobs
// have finished.
void Finished(int num_jobs_finished);
// Block until receiving confirmation of all jobs being finished.
void Block();
private:
std::mutex mutex_;
std::condition_variable condition_;
int num_total_jobs_finished_;
const int num_total_jobs_;
};
// Shared state between the parallel tasks. Each thread will use this
// information to get the next block of work to be performed.
struct ParallelInvokeState {
// The entire range [start, end) is split into num_work_blocks contiguous
// disjoint intervals (blocks), which are as equal as possible given
// total index count and requested number of blocks.
//
// Those num_work_blocks blocks are then processed in parallel.
//
// Total number of integer indices in interval [start, end) is
// end - start, and when splitting them into num_work_blocks blocks
// we can either
// - Split into equal blocks when (end - start) is divisible by
// num_work_blocks
// - Split into blocks with size difference at most 1:
// - Size of the smallest block(s) is (end - start) / num_work_blocks
// - (end - start) % num_work_blocks will need to be 1 index larger
//
// Note that this splitting is optimal in the sense of maximal difference
// between block sizes, since splitting into equal blocks is possible
// if and only if number of indices is divisible by number of blocks.
ParallelInvokeState(int start, int end, int num_work_blocks);
// The start and end index of the for loop.
const int start;
const int end;
// The number of blocks that need to be processed.
const int num_work_blocks;
// Size of the smallest block
const int base_block_size;
// Number of blocks of size base_block_size + 1
const int num_base_p1_sized_blocks;
// The next block of work to be assigned to a worker. The parallel for loop
// range is split into num_work_blocks blocks of work, with a single block of
// work being of size
// - base_block_size + 1 for the first num_base_p1_sized_blocks blocks
// - base_block_size for the rest of the blocks
// blocks of indices are contiguous and disjoint
std::atomic<int> block_id;
// Provides a unique thread ID among all active threads
// We do not schedule more than num_threads threads via thread pool
// and caller thread might steal one ID
std::atomic<int> thread_id;
// Used to signal when all the work has been completed. Thread safe.
BlockUntilFinished block_until_finished;
};
// This implementation uses a fixed size max worker pool with a shared task
// queue. The problem of executing the function for the interval of [start, end)
// is broken up into at most num_threads * kWorkBlocksPerThread blocks (each of
// size at least min_block_size) and added to the thread pool. To avoid
// deadlocks, the calling thread is allowed to steal work from the worker pool.
// This is implemented via a shared state between the tasks. In order for
// the calling thread or thread pool to get a block of work, it will query the
// shared state for the next block of work to be done. If there is nothing left,
// it will return. We will exit the ParallelFor call when all of the work has
// been done, not when all of the tasks have been popped off the task queue.
//
// A unique thread ID among all active tasks will be acquired once for each
// block of work. This avoids the significant performance penalty for acquiring
// it on every iteration of the for loop. The thread ID is guaranteed to be in
// [0, num_threads).
//
// A performance analysis has shown this implementation is on par with OpenMP
// and TBB.
template <typename F>
void ParallelInvoke(ContextImpl* context,
int start,
int end,
int num_threads,
F&& function,
int min_block_size) {
CHECK(context != nullptr);
// Maximal number of work items scheduled for a single thread
// - Lower number of work items results in larger runtimes on unequal tasks
// - Higher number of work items results in larger losses for synchronization
constexpr int kWorkBlocksPerThread = 4;
// Interval [start, end) is being split into
// num_threads * kWorkBlocksPerThread contiguous disjoint blocks.
//
// In order to avoid creating empty blocks of work, we need to limit
// number of work blocks by a total number of indices.
const int num_work_blocks = std::min((end - start) / min_block_size,
num_threads * kWorkBlocksPerThread);
// We use a std::shared_ptr because the main thread can finish all
// the work before the tasks have been popped off the queue. So the
// shared state needs to exist for the duration of all the tasks.
auto shared_state =
std::make_shared<ParallelInvokeState>(start, end, num_work_blocks);
// A function which tries to schedule another task in the thread pool and
// perform several chunks of work. Function expects itself as the argument in
// order to schedule next task in the thread pool.
auto task = [context, shared_state, num_threads, &function](auto& task_copy) {
int num_jobs_finished = 0;
const int thread_id = shared_state->thread_id.fetch_add(1);
// In order to avoid dead-locks in nested parallel for loops, task() will be
// invoked num_threads + 1 times:
// - num_threads times via enqueueing task into thread pool
// - one more time in the main thread
// Tasks enqueued to thread pool might take some time before execution, and
// the last task being executed will be terminated here in order to avoid
// having more than num_threads active threads
if (thread_id >= num_threads) return;
const int num_work_blocks = shared_state->num_work_blocks;
if (thread_id + 1 < num_threads &&
shared_state->block_id < num_work_blocks) {
// Add another thread to the thread pool.
// Note we are taking the task as value so the copy of shared_state shared
// pointer (captured by value at declaration of task lambda-function) is
// copied and the ref count is increased. This is to prevent it from being
// deleted when the main thread finishes all the work and exits before the
// threads finish.
context->thread_pool.AddTask([task_copy]() { task_copy(task_copy); });
}
const int start = shared_state->start;
const int base_block_size = shared_state->base_block_size;
const int num_base_p1_sized_blocks = shared_state->num_base_p1_sized_blocks;
while (true) {
// Get the next available chunk of work to be performed. If there is no
// work, return.
int block_id = shared_state->block_id.fetch_add(1);
if (block_id >= num_work_blocks) {
break;
}
++num_jobs_finished;
// For-loop interval [start, end) was split into num_work_blocks,
// with num_base_p1_sized_blocks of size base_block_size + 1 and remaining
// num_work_blocks - num_base_p1_sized_blocks of size base_block_size
//
// Then, start index of the block #block_id is given by a total
// length of preceeding blocks:
// * Total length of preceeding blocks of size base_block_size + 1:
// min(block_id, num_base_p1_sized_blocks) * (base_block_size + 1)
//
// * Total length of preceeding blocks of size base_block_size:
// (block_id - min(block_id, num_base_p1_sized_blocks)) *
// base_block_size
//
// Simplifying sum of those quantities yields a following
// expression for start index of the block #block_id
const int curr_start = start + block_id * base_block_size +
std::min(block_id, num_base_p1_sized_blocks);
// First num_base_p1_sized_blocks have size base_block_size + 1
//
// Note that it is guaranteed that all blocks are within
// [start, end) interval
const int curr_end = curr_start + base_block_size +
(block_id < num_base_p1_sized_blocks ? 1 : 0);
// Perform each task in current block
const auto range = std::make_tuple(curr_start, curr_end);
InvokeOnSegment(thread_id, range, function);
}
shared_state->block_until_finished.Finished(num_jobs_finished);
};
// Start scheduling threads and doing work. We might end up with less threads
// scheduled than expected, if scheduling overhead is larger than the amount
// of work to be done.
task(task);
// Wait until all tasks have finished.
shared_state->block_until_finished.Block();
}
} // namespace ceres::internal
#endif