SourceXtractorPlusPlus  0.17
SourceXtractor++, the next generation SExtractor
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1 
17 /*
18  * MultiThreadedMeasurement.cpp
19  *
20  * Created on: May 23, 2018
21  * Author: mschefer
22  */
23 
24 #include <chrono>
25 #include <ElementsKernel/Logging.h>
26 #include <csignal>
27 
30 
31 using namespace SourceXtractor;
32 
34 
35 
37  if (m_output_thread->joinable()) {
39  }
40 }
41 
43  m_output_thread = Euclid::make_unique<std::thread>(outputThreadStatic, this);
44 }
45 
47  m_input_done = true;
50  logger.debug() << "All worker threads done!";
51 }
52 
54  // Wait until all worker threads are done
56 
57  // Wait until the output queue is empty
58  while (true) {
59  {
61  if (m_output_queue.empty()) {
62  break;
63  }
64  else if (m_thread_pool->checkForException(false)) {
65  logger.fatal() << "An exception was thrown from a worker thread";
67  }
68  else if (m_thread_pool->activeThreads() == 0) {
69  throw Elements::Exception() << "No active threads and the queue is not empty! Please, report this as a bug";
70  }
71  }
73  }
74 }
75 
76 
77 void
79  // Force computation of SourceID here, where the order is still deterministic
80  for (auto& source : *source_group) {
81  source.getProperty<SourceID>();
82  }
83 
84  // Put the new SourceGroup into the input queue
85  auto order_number = m_group_counter;
86  m_thread_pool->submit([this, order_number, source_group]() {
87  // Trigger measurements
88  for (auto& source : *source_group) {
89  m_source_to_row(source);
90  }
91  // Pass to the output thread
92  {
94  m_output_queue.emplace_back(order_number, source_group);
95  }
97  });
99 }
100 
102  logger.debug() << "Starting output thread";
103  try {
104  measurement->outputThreadLoop();
105  }
106  catch (const Elements::Exception& e) {
107  logger.fatal() << "Output thread got an exception!";
108  logger.fatal() << e.what();
109  if (!measurement->m_abort_raised.exchange(true)) {
110  logger.fatal() << "Aborting the execution";
111  ::raise(SIGTERM);
112  }
113  }
114  logger.debug() << "Stopping output thread";
115 }
116 
118  while (m_thread_pool->activeThreads() > 0) {
120 
121  // Wait for something in the output queue
122  if (m_output_queue.empty()) {
124  }
125 
126  // Process the output queue
127  while (!m_output_queue.empty()) {
128  notifyObservers(m_output_queue.front().second);
129  m_output_queue.pop_front();
130  }
131 
132  if (m_input_done && m_thread_pool->running() + m_thread_pool->queued() == 0 &&
133  m_output_queue.empty()) {
134  break;
135  }
136  }
137 }
static Logging getLogger(const std::string &name="")
void submit(Task task)
size_t running() const
size_t queued() const
bool checkForException(bool rethrow=false)
size_t activeThreads() const
std::unique_ptr< std::thread > m_output_thread
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
static void outputThreadStatic(MultithreadedMeasurement *measurement)
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition: Observable.h:71
T join(T... args)
T joinable(T... args)
constexpr double e
static auto logger
Definition: WCS.cpp:44
T sleep_for(T... args)