SourceXtractorPlusPlus
0.17
SourceXtractor++, the next generation SExtractor
SEImplementation
src
lib
Measurement
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1
17
/*
18
* MultiThreadedMeasurement.cpp
19
*
20
* Created on: May 23, 2018
21
* Author: mschefer
22
*/
23
24
#include <chrono>
25
#include <
ElementsKernel/Logging.h
>
26
#include <csignal>
27
28
#include "
SEImplementation/Plugin/SourceIDs/SourceID.h
"
29
#include "
SEImplementation/Measurement/MultithreadedMeasurement.h
"
30
31
using namespace
SourceXtractor
;
32
33
static
Elements::Logging
logger
=
Elements::Logging::getLogger
(
"Multithreading"
);
34
35
36
MultithreadedMeasurement::~MultithreadedMeasurement
() {
37
if
(
m_output_thread
->
joinable
()) {
38
m_output_thread
->
join
();
39
}
40
}
41
42
void
MultithreadedMeasurement::startThreads
() {
43
m_output_thread
= Euclid::make_unique<std::thread>(
outputThreadStatic
,
this
);
44
}
45
46
void
MultithreadedMeasurement::stopThreads
() {
47
m_input_done
=
true
;
48
m_thread_pool
->
block
();
49
m_output_thread
->
join
();
50
logger
.debug() <<
"All worker threads done!"
;
51
}
52
53
void
MultithreadedMeasurement::synchronizeThreads
() {
54
// Wait until all worker threads are done
55
m_thread_pool
->
block
();
56
57
// Wait until the output queue is empty
58
while
(
true
) {
59
{
60
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
61
if
(
m_output_queue
.empty()) {
62
break
;
63
}
64
else
if
(
m_thread_pool
->
checkForException
(
false
)) {
65
logger
.fatal() <<
"An exception was thrown from a worker thread"
;
66
m_thread_pool
->
checkForException
(
true
);
67
}
68
else
if
(
m_thread_pool
->
activeThreads
() == 0) {
69
throw
Elements::Exception
() <<
"No active threads and the queue is not empty! Please, report this as a bug"
;
70
}
71
}
72
std::this_thread::sleep_for
(
std::chrono::milliseconds
(100));
73
}
74
}
75
76
77
void
78
MultithreadedMeasurement::handleMessage
(
const
std::shared_ptr<SourceGroupInterface>
& source_group) {
79
// Force computation of SourceID here, where the order is still deterministic
80
for
(
auto
& source : *source_group) {
81
source.getProperty<
SourceID
>();
82
}
83
84
// Put the new SourceGroup into the input queue
85
auto
order_number =
m_group_counter
;
86
m_thread_pool
->
submit
([
this
, order_number, source_group]() {
87
// Trigger measurements
88
for
(
auto
& source : *source_group) {
89
m_source_to_row
(source);
90
}
91
// Pass to the output thread
92
{
93
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
94
m_output_queue
.emplace_back(order_number, source_group);
95
}
96
m_new_output
.
notify_one
();
97
});
98
++
m_group_counter
;
99
}
100
101
void
MultithreadedMeasurement::outputThreadStatic
(
MultithreadedMeasurement
*measurement) {
102
logger
.debug() <<
"Starting output thread"
;
103
try
{
104
measurement->
outputThreadLoop
();
105
}
106
catch
(
const
Elements::Exception
&
e
) {
107
logger
.fatal() <<
"Output thread got an exception!"
;
108
logger
.fatal() <<
e
.what();
109
if
(!measurement->
m_abort_raised
.exchange(
true
)) {
110
logger
.fatal() <<
"Aborting the execution"
;
111
::raise(SIGTERM);
112
}
113
}
114
logger
.debug() <<
"Stopping output thread"
;
115
}
116
117
void
MultithreadedMeasurement::outputThreadLoop
() {
118
while
(
m_thread_pool
->
activeThreads
() > 0) {
119
std::unique_lock<std::mutex>
output_lock(
m_output_queue_mutex
);
120
121
// Wait for something in the output queue
122
if
(
m_output_queue
.empty()) {
123
m_new_output
.
wait_for
(output_lock,
std::chrono::milliseconds
(100));
124
}
125
126
// Process the output queue
127
while
(!
m_output_queue
.empty()) {
128
notifyObservers
(
m_output_queue
.front().second);
129
m_output_queue
.pop_front();
130
}
131
132
if
(
m_input_done
&&
m_thread_pool
->
running
() +
m_thread_pool
->
queued
() == 0 &&
133
m_output_queue
.empty()) {
134
break
;
135
}
136
}
137
}
Logging.h
MultithreadedMeasurement.h
SourceID.h
Elements::Exception
Elements::Logging
Elements::Logging::getLogger
static Logging getLogger(const std::string &name="")
Euclid::ThreadPool::submit
void submit(Task task)
Euclid::ThreadPool::running
size_t running() const
Euclid::ThreadPool::queued
size_t queued() const
Euclid::ThreadPool::block
void block()
Euclid::ThreadPool::checkForException
bool checkForException(bool rethrow=false)
Euclid::ThreadPool::activeThreads
size_t activeThreads() const
SourceXtractor::MultithreadedMeasurement
Definition:
MultithreadedMeasurement.h:38
SourceXtractor::MultithreadedMeasurement::m_output_thread
std::unique_ptr< std::thread > m_output_thread
Definition:
MultithreadedMeasurement.h:63
SourceXtractor::MultithreadedMeasurement::m_new_output
std::condition_variable m_new_output
Definition:
MultithreadedMeasurement.h:68
SourceXtractor::MultithreadedMeasurement::m_abort_raised
std::atomic_bool m_abort_raised
Definition:
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::m_input_done
std::atomic_bool m_input_done
Definition:
MultithreadedMeasurement.h:66
SourceXtractor::MultithreadedMeasurement::handleMessage
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
Definition:
MultithreadedMeasurement.cpp:78
SourceXtractor::MultithreadedMeasurement::m_output_queue_mutex
std::mutex m_output_queue_mutex
Definition:
MultithreadedMeasurement.h:70
SourceXtractor::MultithreadedMeasurement::m_output_queue
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
Definition:
MultithreadedMeasurement.h:69
SourceXtractor::MultithreadedMeasurement::outputThreadLoop
void outputThreadLoop()
Definition:
MultithreadedMeasurement.cpp:117
SourceXtractor::MultithreadedMeasurement::m_thread_pool
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Definition:
MultithreadedMeasurement.h:62
SourceXtractor::MultithreadedMeasurement::synchronizeThreads
void synchronizeThreads() override
Definition:
MultithreadedMeasurement.cpp:53
SourceXtractor::MultithreadedMeasurement::~MultithreadedMeasurement
virtual ~MultithreadedMeasurement()
Definition:
MultithreadedMeasurement.cpp:36
SourceXtractor::MultithreadedMeasurement::m_group_counter
int m_group_counter
Definition:
MultithreadedMeasurement.h:65
SourceXtractor::MultithreadedMeasurement::stopThreads
void stopThreads() override
Definition:
MultithreadedMeasurement.cpp:46
SourceXtractor::MultithreadedMeasurement::m_source_to_row
SourceToRowConverter m_source_to_row
Definition:
MultithreadedMeasurement.h:61
SourceXtractor::MultithreadedMeasurement::outputThreadStatic
static void outputThreadStatic(MultithreadedMeasurement *measurement)
Definition:
MultithreadedMeasurement.cpp:101
SourceXtractor::MultithreadedMeasurement::startThreads
void startThreads() override
Definition:
MultithreadedMeasurement.cpp:42
SourceXtractor::Observable< std::shared_ptr< SourceGroupInterface > >::notifyObservers
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition:
Observable.h:71
SourceXtractor::SourceID
Definition:
SourceID.h:33
std::chrono::milliseconds
std::thread::join
T join(T... args)
std::thread::joinable
T joinable(T... args)
e
constexpr double e
SourceXtractor
Definition:
Aperture.h:30
SourceXtractor::logger
static auto logger
Definition:
WCS.cpp:44
std::condition_variable::notify_one
T notify_one(T... args)
std::shared_ptr< SourceGroupInterface >
std::this_thread::sleep_for
T sleep_for(T... args)
std::unique_lock
std::condition_variable::wait_for
T wait_for(T... args)
Generated by
1.9.1