Increases the performance of the C++11 threading.

Previously, the thread ID was acquired and released on every iteration
of the for loop.  The C++11 concurrent queue implementation is much
slower than TBB's version and consequently this was a huge bottleneck.

This introduces another ParallelFor API which takes the thread ID as a
parameter in the evaluation function.  This allows us to acquire and
release the thread ID for each block of work which drastically improves
the performance.

This change brings us on par with OpenMP and TBB.  See below for a
timing comparison.  Note: in this example this CLs C++11 version is
faster to compute the residuals because TBB still must acquire the
thread ID on every iteration, which has some overhead.

Tested by building and running tests for no threading, OpenMP, TBB, and
C++11 threads.  Also ran bazel tests.

./bin/bundle_adjuster --input=problem-744-543562-pre.txt --num_threads=8

C++11 @Head
Time (in seconds):
  Residual only evaluation           7.819692 (5)
  Jacobian & residual evaluation    11.606063 (6)
  Linear solver                     47.860195 (5)
Minimizer                           70.877072

Total                               90.806338
---------------------------------------------------

C++11 (This CL)
Time (in seconds):
  Residual only evaluation           1.217500 (5)
  Jacobian & residual evaluation     5.796112 (6)
  Linear solver                     44.080873 (5)
Minimizer                           54.635524

Total                               77.640072

---------------------------------------------------
OpenMP
Time (in seconds):
  Residual only evaluation           0.797023 (5)
  Jacobian & residual evaluation     5.633916 (6)
  Linear solver                     43.280020 (5)
Minimizer                           53.199058

Total                               76.250861

---------------------------------------------------
TBB
Time (in seconds):

  Residual only evaluation           1.911095 (5)
  Jacobian & residual evaluation     5.557807 (6)
  Linear solver                     44.074680 (5)
Minimizer                           55.002688

Total                               78.052687

---------------------------------------------------
No Threads

Time (in seconds):

  Residual only evaluation           2.939212 (5)
  Jacobian & residual evaluation    18.519874 (6)
  Linear solver                     74.017837 (5)
Minimizer                           98.980080

Total                              122.216391

Change-Id: I3af959b0771bbdfe8cad8c13896191d6ac903181
diff --git a/internal/ceres/covariance_impl.cc b/internal/ceres/covariance_impl.cc
index 8b34bc1..d77e5e1 100644
--- a/internal/ceres/covariance_impl.cc
+++ b/internal/ceres/covariance_impl.cc
@@ -342,7 +342,9 @@
 
   bool success = true;
 
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
   ThreadTokenProvider thread_token_provider(num_threads);
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
   // Technically the following code is a double nested loop where
   // i = 1:n, j = i:n.
@@ -361,7 +363,7 @@
               0,
               num_parameters * num_parameters,
               num_threads,
-              [&](int k) {
+              [&](int thread_id, int k) {
 #endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
       int i = k / num_parameters;
       int j = k % num_parameters;
@@ -377,8 +379,10 @@
       int covariance_col_idx = cum_parameter_size[j];
       int size_i = parameter_sizes[i];
       int size_j = parameter_sizes[j];
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
       const ScopedThreadToken scoped_thread_token(&thread_token_provider);
       const int thread_id = scoped_thread_token.token();
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
       double* covariance_block =
           workspace.get() +
           thread_id * max_covariance_block_size * max_covariance_block_size;
@@ -716,7 +720,9 @@
   const int num_threads = options_.num_threads;
   scoped_array<double> workspace(new double[num_threads * num_cols]);
 
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
   ThreadTokenProvider thread_token_provider(num_threads);
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
 #ifdef CERES_USE_OPENMP
 #pragma omp parallel for num_threads(num_threads) schedule(dynamic)
@@ -726,14 +732,20 @@
   for (int r = 0; r < num_cols; ++r) {
 #else
   problem_->context()->EnsureMinimumThreads(num_threads);
-  ParallelFor(problem_->context(), 0, num_cols, num_threads, [&](int r) {
+  ParallelFor(problem_->context(),
+              0,
+              num_cols,
+              num_threads,
+              [&](int thread_id, int r) {
 #endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
     const int row_begin = rows[r];
     const int row_end = rows[r + 1];
     if (row_end != row_begin) {
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
       const ScopedThreadToken scoped_thread_token(&thread_token_provider);
       const int thread_id = scoped_thread_token.token();
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
       double* solution = workspace.get() + thread_id * num_cols;
       SolveRTRWithSparseRHS<SuiteSparse_long>(
@@ -917,7 +929,9 @@
   const int num_threads = options_.num_threads;
   scoped_array<double> workspace(new double[num_threads * num_cols]);
 
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
   ThreadTokenProvider thread_token_provider(num_threads);
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
 #ifdef CERES_USE_OPENMP
 #pragma omp parallel for num_threads(num_threads) schedule(dynamic)
@@ -927,14 +941,20 @@
   for (int r = 0; r < num_cols; ++r) {
 #else
   problem_->context()->EnsureMinimumThreads(num_threads);
-  ParallelFor(problem_->context(), 0, num_cols, num_threads, [&](int r) {
+  ParallelFor(problem_->context(),
+              0,
+              num_cols,
+              num_threads,
+              [&](int thread_id, int r) {
 #endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
     const int row_begin = rows[r];
     const int row_end = rows[r + 1];
     if (row_end != row_begin) {
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
       const ScopedThreadToken scoped_thread_token(&thread_token_provider);
       const int thread_id = scoped_thread_token.token();
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
       double* solution = workspace.get() + thread_id * num_cols;
       SolveRTRWithSparseRHS<int>(
diff --git a/internal/ceres/parallel_for.h b/internal/ceres/parallel_for.h
index 603c609..e54a1b3 100644
--- a/internal/ceres/parallel_for.h
+++ b/internal/ceres/parallel_for.h
@@ -47,6 +47,16 @@
                  int num_threads,
                  const std::function<void(int)>& function);
 
+// Execute the function for every element in the range [start, end) with at most
+// num_threads. It will execute all the work on the calling thread if
+// num_threads is 1.  Each invocation of function() will be passed a thread_id
+// in [0, num_threads) that is guaranteed to be distinct from the value passed
+// to any concurrent execution of function().
+void ParallelFor(ContextImpl* context,
+                 int start,
+                 int end,
+                 int num_threads,
+                 const std::function<void(int thread_id, int i)>& function);
 }  // namespace internal
 }  // namespace ceres
 
diff --git a/internal/ceres/parallel_for_cxx.cc b/internal/ceres/parallel_for_cxx.cc
index f03ddb2..bfc8973 100644
--- a/internal/ceres/parallel_for_cxx.cc
+++ b/internal/ceres/parallel_for_cxx.cc
@@ -41,7 +41,8 @@
 #include <mutex>
 
 #include "ceres/concurrent_queue.h"
-#include "ceres/thread_pool.h"
+#include "ceres/scoped_thread_token.h"
+#include "ceres/thread_token_provider.h"
 #include "glog/logging.h"
 
 namespace ceres {
@@ -90,6 +91,7 @@
         end(end),
         num_work_items(num_work_items),
         i(0),
+        thread_token_provider(num_work_items),
         block_until_finished(num_work_items) {}
 
   // The start and end index of the for loop.
@@ -105,27 +107,17 @@
   int i;
   std::mutex mutex_i;
 
-  // Used to signal when all the work has been completed.
+  // Provides a unique thread ID among all active threads working on the same
+  // group of tasks.  Thread-safe.
+  ThreadTokenProvider thread_token_provider;
+
+  // Used to signal when all the work has been completed.  Thread safe.
   BlockUntilFinished block_until_finished;
 };
 
 }  // namespace
 
-// This implementation uses a fixed size max worker pool with a shared task
-// queue. The problem of executing the function for the interval of [start, end)
-// is broken up into at most num_threads blocks and added to the thread pool. To
-// avoid deadlocks, the calling thread is allowed to steal work from the worker
-// pool. This is implemented via a shared state between the tasks. In order for
-// the calling thread or thread pool to get a block of work, it will query the
-// shared state for the next block of work to be done. If there is nothing left,
-// it will return. We will exit the ParallelFor call when all of the work has
-// been done, not when all of the tasks have been popped off the task queue.
-//
-// A performance analysis has shown this implementation is about ~20% slower
-// than OpenMP or TBB. This native implementation is a fix for platforms that do
-// not have access to OpenMP or TBB. The gain in enabling multi-threaded Ceres
-// is much more significant so we decided to not chase the performance of these
-// two libraries.
+// See ParallelFor (below) for more details.
 void ParallelFor(ContextImpl* context,
                  int start,
                  int end,
@@ -145,6 +137,51 @@
     return;
   }
 
+  ParallelFor(context, start, end, num_threads,
+              [&function](int /*thread_id*/, int i) { function(i); });
+}
+
+// This implementation uses a fixed size max worker pool with a shared task
+// queue. The problem of executing the function for the interval of [start, end)
+// is broken up into at most num_threads blocks and added to the thread pool. To
+// avoid deadlocks, the calling thread is allowed to steal work from the worker
+// pool. This is implemented via a shared state between the tasks. In order for
+// the calling thread or thread pool to get a block of work, it will query the
+// shared state for the next block of work to be done. If there is nothing left,
+// it will return. We will exit the ParallelFor call when all of the work has
+// been done, not when all of the tasks have been popped off the task queue.
+//
+// A unique thread ID among all active tasks will be acquired once for each
+// block of work.  This avoids the significant performance penalty for acquiring
+// it on every iteration of the for loop. The thread ID is guaranteed to be in
+// [0, num_threads).
+//
+// A performance analysis has shown this implementation is onpar with OpenMP and
+// TBB.
+void ParallelFor(ContextImpl* context,
+                 int start,
+                 int end,
+                 int num_threads,
+                 const std::function<void(int thread_id, int i)>& function) {
+  CHECK_GT(num_threads, 0);
+  CHECK(context != NULL);
+  if (end <= start) {
+    return;
+  }
+
+  // Fast path for when it is single threaded.
+  if (num_threads == 1) {
+    // Even though we only have one thread, use the thread token provider to
+    // guarantee the exact same behavior when running with multiple threads.
+    ThreadTokenProvider thread_token_provider(num_threads);
+    const ScopedThreadToken scoped_thread_token(&thread_token_provider);
+    const int thread_id = scoped_thread_token.token();
+    for (int i = start; i < end; ++i) {
+      function(thread_id, i);
+    }
+    return;
+  }
+
   // We use a shared_ptr because the main thread can finish all the work before
   // the tasks have been popped off the queue.  So the shared state needs to
   // exist for the duration of all the tasks.
@@ -167,11 +204,15 @@
       ++shared_state->i;
     }
 
+    const ScopedThreadToken scoped_thread_token(
+        &shared_state->thread_token_provider);
+    const int thread_id = scoped_thread_token.token();
+
     // Perform each task.
     for (int j = shared_state->start + i;
          j < shared_state->end;
          j += shared_state->num_work_items) {
-      function(j);
+      function(thread_id, j);
     }
     shared_state->block_until_finished.Finished();
     return true;
diff --git a/internal/ceres/parallel_for_tbb.cc b/internal/ceres/parallel_for_tbb.cc
index 82fbf10..e235b50 100644
--- a/internal/ceres/parallel_for_tbb.cc
+++ b/internal/ceres/parallel_for_tbb.cc
@@ -38,6 +38,8 @@
 #include <tbb/parallel_for.h>
 #include <tbb/task_arena.h>
 
+#include "ceres/scoped_thread_token.h"
+#include "ceres/thread_token_provider.h"
 #include "glog/logging.h"
 
 namespace ceres {
@@ -68,6 +70,22 @@
     });
 }
 
+void ParallelFor(ContextImpl* context,
+                 int start,
+                 int end,
+                 int num_threads,
+                 const std::function<void(int thread_id, int i)>& function) {
+  CHECK(context != NULL);
+
+  ThreadTokenProvider thread_token_provider(num_threads);
+  ParallelFor(context, start, end, num_threads, [&](int i) {
+    const ScopedThreadToken scoped_thread_token(&thread_token_provider);
+    const int thread_id = scoped_thread_token.token();
+    function(thread_id, i);
+  });
+}
+
+
 }  // namespace internal
 }  // namespace ceres
 
diff --git a/internal/ceres/parallel_for_test.cc b/internal/ceres/parallel_for_test.cc
index a3cf2dc..2de4865 100644
--- a/internal/ceres/parallel_for_test.cc
+++ b/internal/ceres/parallel_for_test.cc
@@ -36,9 +36,13 @@
 #include "ceres/parallel_for.h"
 
 #include <cmath>
+#include <condition_variable>
+#include <mutex>
+#include <thread>
 #include <vector>
 
 #include "ceres/context_impl.h"
+#include "glog/logging.h"
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
 
@@ -46,6 +50,7 @@
 namespace internal {
 
 using testing::ElementsAreArray;
+using testing::UnorderedElementsAreArray;
 
 // Tests the parallel for loop computes the correct result for various number of
 // threads.
@@ -67,6 +72,26 @@
   }
 }
 
+// Tests the parallel for loop with the thread ID interface computes the correct
+// result for various number of threads.
+TEST(ParallelForWithThreadId, NumThreads) {
+  ContextImpl context;
+  context.EnsureMinimumThreads(/*num_threads=*/2);
+
+  const int size = 16;
+  std::vector<int> expected_results(size, 0);
+  for (int i = 0; i < size; ++i) {
+    expected_results[i] = std::sqrt(i);
+  }
+
+  for (int num_threads = 1; num_threads <= 8; ++num_threads) {
+    std::vector<int> values(size, 0);
+    ParallelFor(&context, 0, size, num_threads,
+                [&values](int thread_id, int i) { values[i] = std::sqrt(i); });
+    EXPECT_THAT(values, ElementsAreArray(expected_results));
+  }
+}
+
 // Tests nested for loops do not result in a deadlock.
 TEST(ParallelFor, NestedParallelForDeadlock) {
   ContextImpl context;
@@ -85,6 +110,54 @@
   }
 }
 
+// Tests nested for loops do not result in a deadlock for the parallel for with
+// thread ID interface.
+TEST(ParallelForWithThreadId, NestedParallelForDeadlock) {
+  ContextImpl context;
+  context.EnsureMinimumThreads(/*num_threads=*/2);
+
+  // Increment each element in the 2D matrix.
+  std::vector<std::vector<int>> x(3, {1, 2, 3});
+  ParallelFor(&context, 0, 3, 2, [&x, &context](int thread_id, int i) {
+    std::vector<int>& y = x.at(i);
+    ParallelFor(&context, 0, 3, 2, [&y](int thread_id, int j) { ++y.at(j); });
+  });
+
+  const std::vector<int> results = {2, 3, 4};
+  for (const std::vector<int>& value : x) {
+    EXPECT_THAT(value, ElementsAreArray(results));
+  }
+}
+
+TEST(ParallelForWithThreadId, UniqueThreadIds) {
+  // Ensure the hardware supports more than 1 thread to ensure the test will
+  // pass.
+  const int num_hardware_threads = std::thread::hardware_concurrency();
+  if (num_hardware_threads <= 1) {
+    LOG(ERROR)
+        << "Test not supported, the hardware does not support threading.";
+    return;
+  }
+
+  ContextImpl context;
+  context.EnsureMinimumThreads(/*num_threads=*/2);
+  // Increment each element in the 2D matrix.
+  std::vector<int> x(2, -1);
+  std::mutex mutex;
+  std::condition_variable condition;
+  int count = 0;
+  ParallelFor(&context, 0, 2, 2,
+              [&x, &mutex, &condition, &count](int thread_id, int i) {
+                std::unique_lock<std::mutex> lock(mutex);
+                x[i] = thread_id;
+                ++count;
+                condition.notify_all();
+                condition.wait(lock, [&]() { return count == 2; });
+              });
+
+  EXPECT_THAT(x, UnorderedElementsAreArray({0,1}));
+}
+
 }  // namespace internal
 }  // namespace ceres
 
diff --git a/internal/ceres/program_evaluator.h b/internal/ceres/program_evaluator.h
index ca60902..5986d81 100644
--- a/internal/ceres/program_evaluator.h
+++ b/internal/ceres/program_evaluator.h
@@ -175,7 +175,9 @@
 
     const int num_residual_blocks = program_->NumResidualBlocks();
 
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
     ThreadTokenProvider thread_token_provider(options_.num_threads);
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
 #ifdef CERES_USE_OPENMP
     // This bool is used to disable the loop if an error is encountered
@@ -196,8 +198,11 @@
 #if defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS)
     std::atomic_bool abort(false);
 
-    ParallelFor(options_.context, 0, num_residual_blocks, options_.num_threads,
-                [&](int i) {
+    ParallelFor(options_.context,
+                0,
+                num_residual_blocks,
+                options_.num_threads,
+                [&](int thread_id, int i) {
 #endif // defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS)
 
       if (abort) {
@@ -208,8 +213,10 @@
 #endif // defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS)
       }
 
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
       const ScopedThreadToken scoped_thread_token(&thread_token_provider);
       const int thread_id = scoped_thread_token.token();
+#endif  // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
       EvaluatePreparer* preparer = &evaluate_preparers_[thread_id];
       EvaluateScratch* scratch = &evaluate_scratch_[thread_id];
diff --git a/internal/ceres/schur_eliminator_impl.h b/internal/ceres/schur_eliminator_impl.h
index ca8f30e..8baa039 100644
--- a/internal/ceres/schur_eliminator_impl.h
+++ b/internal/ceres/schur_eliminator_impl.h
@@ -222,7 +222,9 @@
 #endif // defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS)
   }
 
+#if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
   ThreadTokenProvider thread_token_provider(num_threads_);
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
 #ifdef CERES_USE_OPENMP
   // Eliminate y blocks one chunk at a time.  For each chunk, compute
@@ -243,12 +245,15 @@
 
 #if !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
   for (int i = 0; i < chunks_.size(); ++i) {
-#else
-  ParallelFor(context_, 0, int(chunks_.size()), num_threads_, [&](int i) {
-#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
-
     const ScopedThreadToken scoped_thread_token(&thread_token_provider);
     const int thread_id = scoped_thread_token.token();
+#else
+    ParallelFor(context_,
+                0,
+                int(chunks_.size()),
+                num_threads_,
+                [&](int thread_id, int i) {
+#endif // !(defined(CERES_USE_TBB) || defined(CERES_USE_CXX11_THREADS))
 
     double* buffer = buffer_.get() + thread_id * buffer_size_;
     const Chunk& chunk = chunks_[i];
diff --git a/internal/ceres/thread_pool_test.cc b/internal/ceres/thread_pool_test.cc
index 1aa81f2..5485fe4 100644
--- a/internal/ceres/thread_pool_test.cc
+++ b/internal/ceres/thread_pool_test.cc
@@ -36,10 +36,13 @@
 #include "ceres/thread_pool.h"
 
 #include <chrono>
+#include <condition_variable>
+#include <mutex>
 #include <thread>
 
 #include "gmock/gmock.h"
 #include "gtest/gtest.h"
+#include "glog/logging.h"
 
 namespace ceres {
 namespace internal {
diff --git a/internal/ceres/thread_token_provider.h b/internal/ceres/thread_token_provider.h
index 841d312..e4c9cd8 100644
--- a/internal/ceres/thread_token_provider.h
+++ b/internal/ceres/thread_token_provider.h
@@ -95,16 +95,9 @@
 #ifdef CERES_USE_CXX11_THREADS
   // This queue initially holds a sequence from 0..num_threads-1. Every
   // Acquire() call the first number is removed from here. When the token is not
-  // needed anymore it shall be given back with corresponding Release() call.
-  //
-  // The thread number is acquired on every for loop iteration. The
-  // ConcurrentQueue uses a mutex to enable thread safety, however, this can
-  // lead to a large amount of contention between the threads which can cause a
-  // loss in performance. This is noticable for problems with inexpensive
-  // residual computations.
-  //
-  // TODO(vitus): We should either implement a more performant queue (such as
-  // lock free), or get the thread ID from the shared state.
+  // needed anymore it shall be given back with corresponding Release()
+  // call. This concurrent queue is more expensive than TBB's version, so you
+  // should not acquire the thread ID on every for loop iteration.
   ConcurrentQueue<int> pool_;
 #endif