Schedule task in ParallerFor from the previous task

As pointed out by several users, introduction of parallel operations on
vectors severely impacts solver performance on small problems, with time
consumption increasing with the number of threads.

The problem is two-fold:
 - Single-threaded execution is faster than multi-threaded
 - Overhead of multi-threaded execution increases dramaticaly when
   number of threads is increased

Supposedly, the second problem is due to ParallelInvoke scheduling
a task for each thread via ThreadPool.
When the time required to perform computations is smaller than costs of
scheduling task, runtime becomes linear in num_threads.
Moreover, main thread competes with working threads for mutex in
ConcurrentQueue.

In order to limit scheduling overhead and minimize lock contention,
each new task is scheduled from the previous one, if:
 - Number of scheduled tasks is less than num_threads
 - At the moment of creating the task not all work has been done

Correctness is granted by atomicity of thread_id counter.

SchedulerBenchmark mini-benchmark was added to illustrate the issue.
Each iteration of parallel loop performs change of a single value.

With the previous scheduling strategy, increasing number of threads
leads to significant increase of runtime:
-----------------------------------------------------
Benchmark                           Time   Iterations
-----------------------------------------------------
SchedulerBenchmark/128/1         14.1 ns     49496153
SchedulerBenchmark/128/2         3965 ns       240173
SchedulerBenchmark/128/4        13162 ns        71478
SchedulerBenchmark/128/8        30643 ns        29614
SchedulerBenchmark/128/16       63694 ns        10000
SchedulerBenchmark/256/1         24.1 ns     28943598
SchedulerBenchmark/256/2         3878 ns       227498
SchedulerBenchmark/256/4        13293 ns        69817
SchedulerBenchmark/256/8        31117 ns        32640
SchedulerBenchmark/256/16       59503 ns        14910
SchedulerBenchmark/1024/1        56.7 ns     12048398
SchedulerBenchmark/1024/2        4346 ns       203140
SchedulerBenchmark/1024/4       13487 ns        66736
SchedulerBenchmark/1024/8       30982 ns        33090
SchedulerBenchmark/1024/16      63199 ns        14762
SchedulerBenchmark/4096/1         189 ns      3633540
SchedulerBenchmark/4096/2        5932 ns       131884
SchedulerBenchmark/4096/4       14784 ns        61236
SchedulerBenchmark/4096/8       35857 ns        29276
SchedulerBenchmark/4096/16      63934 ns        10000

With new scheduling strategy, increasing requested number of threads
does not result in that high increase of runtime
-----------------------------------------------------
Benchmark                           Time   Iterations
-----------------------------------------------------
SchedulerBenchmark/128/1         14.1 ns     49323498
SchedulerBenchmark/128/2         2411 ns       362916
SchedulerBenchmark/128/4         3556 ns       243026
SchedulerBenchmark/128/8         4346 ns       200626
SchedulerBenchmark/128/16        5066 ns       169698
SchedulerBenchmark/256/1         24.2 ns     28960018
SchedulerBenchmark/256/2         2330 ns       388470
SchedulerBenchmark/256/4         3864 ns       219233
SchedulerBenchmark/256/8         4399 ns       195225
SchedulerBenchmark/256/16        5111 ns       161858
SchedulerBenchmark/1024/1        55.9 ns     12204777
SchedulerBenchmark/1024/2        2541 ns       329807
SchedulerBenchmark/1024/4        3977 ns       222628
SchedulerBenchmark/1024/8        4607 ns       193548
SchedulerBenchmark/1024/16       5031 ns       160285
SchedulerBenchmark/4096/1         188 ns      3714433
SchedulerBenchmark/4096/2        4203 ns       188284
SchedulerBenchmark/4096/4        4832 ns       171811
SchedulerBenchmark/4096/8        5605 ns       159093
SchedulerBenchmark/4096/16       6425 ns       126861

(both runs were executed on 28-core 56-thread cpu)

Change-Id: I91eca783280598997bfe6abd28019847731692e4
diff --git a/internal/ceres/CMakeLists.txt b/internal/ceres/CMakeLists.txt
index 0436178..5bd2792 100644
--- a/internal/ceres/CMakeLists.txt
+++ b/internal/ceres/CMakeLists.txt
@@ -589,6 +589,9 @@
   add_executable(parallel_vector_operations_benchmark parallel_vector_operations_benchmark.cc)
   add_dependencies_to_benchmark(parallel_vector_operations_benchmark)
 
+  add_executable(parallel_for_benchmark parallel_for_benchmark.cc)
+  add_dependencies_to_benchmark(parallel_for_benchmark)
+
   add_executable(block_jacobi_preconditioner_benchmark
     block_jacobi_preconditioner_benchmark.cc)
   add_dependencies_to_benchmark(block_jacobi_preconditioner_benchmark)
diff --git a/internal/ceres/parallel_for_benchmark.cc b/internal/ceres/parallel_for_benchmark.cc
new file mode 100644
index 0000000..f1cd0d9
--- /dev/null
+++ b/internal/ceres/parallel_for_benchmark.cc
@@ -0,0 +1,73 @@
+// Ceres Solver - A fast non-linear least squares minimizer
+// Copyright 2023 Google Inc. All rights reserved.
+// http://ceres-solver.org/
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// * Redistributions of source code must retain the above copyright notice,
+//   this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright notice,
+//   this list of conditions and the following disclaimer in the documentation
+//   and/or other materials provided with the distribution.
+// * Neither the name of Google Inc. nor the names of its contributors may be
+//   used to endorse or promote products derived from this software without
+//   specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+
+#include "benchmark/benchmark.h"
+#include "ceres/parallel_for.h"
+
+namespace ceres::internal {
+
+// Parallel for with very small amount of work per iteration and small amount of
+// iterations benchmarks performance of task scheduling
+static void SchedulerBenchmark(benchmark::State& state) {
+  const int vector_size = static_cast<int>(state.range(0));
+  const int num_threads = static_cast<int>(state.range(1));
+  ContextImpl context;
+  context.EnsureMinimumThreads(num_threads);
+
+  Vector x = Vector::Random(vector_size);
+  for (auto _ : state) {
+    ParallelFor(
+        &context, 0, vector_size, num_threads, [&x](int id) { x[id] = 0.; });
+  }
+  CHECK_EQ(x.squaredNorm(), 0.);
+}
+BENCHMARK(SchedulerBenchmark)
+    ->Args({128, 1})
+    ->Args({128, 2})
+    ->Args({128, 4})
+    ->Args({128, 8})
+    ->Args({128, 16})
+    ->Args({256, 1})
+    ->Args({256, 2})
+    ->Args({256, 4})
+    ->Args({256, 8})
+    ->Args({256, 16})
+    ->Args({1024, 1})
+    ->Args({1024, 2})
+    ->Args({1024, 4})
+    ->Args({1024, 8})
+    ->Args({1024, 16})
+    ->Args({4096, 1})
+    ->Args({4096, 2})
+    ->Args({4096, 4})
+    ->Args({4096, 8})
+    ->Args({4096, 16});
+
+}  // namespace ceres::internal
+
+BENCHMARK_MAIN();
diff --git a/internal/ceres/parallel_invoke.h b/internal/ceres/parallel_invoke.h
index 3e67e37..707751b 100644
--- a/internal/ceres/parallel_invoke.h
+++ b/internal/ceres/parallel_invoke.h
@@ -185,8 +185,10 @@
   auto shared_state =
       std::make_shared<ParallelInvokeState>(start, end, num_work_blocks);
 
-  // A function which tries to perform several chunks of work.
-  auto task = [shared_state, num_threads, &function]() {
+  // A function which tries to schedule another task in the thread pool and
+  // perform several chunks of work. Function expects itself as the argument in
+  // order to schedule next task in the thread pool.
+  auto task = [context, shared_state, num_threads, &function](auto& task_copy) {
     int num_jobs_finished = 0;
     const int thread_id = shared_state->thread_id.fetch_add(1);
     // In order to avoid dead-locks in nested parallel for loops, task() will be
@@ -197,11 +199,21 @@
     //  the last task being executed will be terminated here in order to avoid
     //  having more than num_threads active threads
     if (thread_id >= num_threads) return;
+    const int num_work_blocks = shared_state->num_work_blocks;
+    if (thread_id + 1 < num_threads &&
+        shared_state->block_id < num_work_blocks) {
+      // Add another thread to the thread pool.
+      // Note we are taking the task as value so the copy of shared_state shared
+      // pointer (captured by value at declaration of task lambda-function) is
+      // copied and the ref count is increased. This is to prevent it from being
+      // deleted when the main thread finishes all the work and exits before the
+      // threads finish.
+      context->thread_pool.AddTask([task_copy]() { task_copy(task_copy); });
+    }
 
     const int start = shared_state->start;
     const int base_block_size = shared_state->base_block_size;
     const int num_base_p1_sized_blocks = shared_state->num_base_p1_sized_blocks;
-    const int num_work_blocks = shared_state->num_work_blocks;
 
     while (true) {
       // Get the next available chunk of work to be performed. If there is no
@@ -242,20 +254,10 @@
     shared_state->block_until_finished.Finished(num_jobs_finished);
   };
 
-  // Add all the tasks to the thread pool.
-  for (int i = 0; i < num_threads; ++i) {
-    // Note we are taking the task as value so the copy of shared_state shared
-    // pointer (captured by value at declaration of task lambda-function) is
-    // copied and the ref count is increased. This is to prevent it from being
-    // deleted when the main thread finishes all the work and exits before the
-    // threads finish.
-    context->thread_pool.AddTask([task]() { task(); });
-  }
-
-  // Try to do any available work on the main thread. This may steal work from
-  // the thread pool, but when there is no work left the thread pool tasks
-  // will be no-ops.
-  task();
+  // Start scheduling threads and doing work. We might end up with less threads
+  // scheduled than expected, if scheduling overhead is larger than the amount
+  // of work to be done.
+  task(task);
 
   // Wait until all tasks have finished.
   shared_state->block_until_finished.Block();