SourceXtractorPlusPlus  0.17
SourceXtractor++, the next generation SExtractor
SourceXtractor.cpp
Go to the documentation of this file.
1 
23 #include <dlfcn.h>
24 #include <iomanip>
25 #include <map>
26 #include <string>
27 #include <typeinfo>
28 
29 #include <boost/program_options.hpp>
30 #include <boost/algorithm/string/predicate.hpp>
32 
33 #include "ElementsKernel/Main.h"
34 #include "ElementsKernel/System.h"
36 
38 #include "Configuration/Utils.h"
39 
41 
49 
51 
54 
78 
80 #include "SEMain/PluginConfig.h"
81 #include "SEMain/Sorter.h"
82 
83 
84 namespace po = boost::program_options;
85 namespace fs = boost::filesystem;
86 using namespace SourceXtractor;
87 using namespace Euclid::Configuration;
88 
90 
91 static const std::string LIST_OUTPUT_PROPERTIES {"list-output-properties"};
92 static const std::string PROPERTY_COLUMN_MAPPING_ALL {"property-column-mapping-all"};
93 static const std::string PROPERTY_COLUMN_MAPPING {"property-column-mapping"};
94 static const std::string DUMP_CONFIG {"dump-default-config"};
95 
96 class GroupObserver : public Observer<std::shared_ptr<SourceGroupInterface>> {
97 public:
98  virtual void handleMessage(const std::shared_ptr<SourceGroupInterface>& group) override {
99  m_list.push_back(group);
100  }
101 
103 };
104 
105 class SourceObserver : public Observer<std::shared_ptr<SourceWithOnDemandProperties>> {
106 public:
107  virtual void handleMessage(const std::shared_ptr<SourceWithOnDemandProperties>& source) override {
108  m_list.push_back(source);
109  }
110 
112 };
113 
115 
116 static void setupEnvironment(void) {
117  // Some parts of boost (including boost::filesystem) can throw an exception when the
118  // locale as configured in the environment is invalid.
119  // We work around that overriding the locale if we find an invalid one.
120  // See https://svn.boost.org/trac10/ticket/10205
121  try {
122  std::locale("");
123  }
124  catch (...) {
125  ::setenv("LC_ALL", "C", 1);
126  }
127 }
128 
136  bool omp_env_present = getenv("OMP_NUM_THREADS") || getenv("OMP_DYNAMIC");
137  bool mkl_env_present = getenv("MKL_NUM_THREADS") || getenv("MKL_DYNAMIC");
138  if (!omp_env_present && !mkl_env_present) {
139  // Despite the documentation, the methods following C ABI are capitalized
140  void (*set_num_threads)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Num_Threads"));
141  void (*set_dynamic)(int) = reinterpret_cast<void (*)(int)>(dlsym(RTLD_DEFAULT, "MKL_Set_Dynamic"));
142  if (set_num_threads) {
143  logger.debug() << "Disabling multithreading";
144  set_num_threads(1);
145  }
146  if (set_dynamic) {
147  logger.debug() << "Disabling dynamic multithreading";
148  set_dynamic(0);
149  }
150  }
151 }
152 
153 class SEMain : public Elements::Program {
154 
155  std::shared_ptr<TaskFactoryRegistry> task_factory_registry = std::make_shared<TaskFactoryRegistry>();
156  std::shared_ptr<TaskProvider> task_provider = std::make_shared<TaskProvider>(task_factory_registry);
157  std::shared_ptr<OutputRegistry> output_registry = std::make_shared<OutputRegistry>();
158  SegmentationFactory segmentation_factory {task_provider};
159  OutputFactory output_factory { output_registry };
162  std::make_shared<SourceWithOnDemandPropertiesFactory>(task_provider);
164  std::make_shared<SourceGroupWithOnDemandPropertiesFactory>(task_provider);
165  PartitionFactory partition_factory {source_factory};
166  GroupingFactory grouping_factory {group_factory};
167  DeblendingFactory deblending_factory {source_factory};
168  MeasurementFactory measurement_factory { output_registry };
169  ProgressReporterFactory progress_printer_factory {};
170 
171  bool config_initialized = false;
172  po::options_description config_parameters;
173 
174 public:
175 
176  SEMain(const std::string& plugin_path, const std::vector<std::string>& plugin_list)
177  : plugin_manager { task_factory_registry, output_registry, config_manager_id, plugin_path, plugin_list } {
178  }
179 
183  po::options_description getConfigParameters() {
184  if (!config_initialized) {
185  auto& config_manager = ConfigManager::getInstance(config_manager_id);
186  config_manager.registerConfiguration<SourceXtractorConfig>();
187  config_manager.registerConfiguration<BackgroundConfig>();
188  config_manager.registerConfiguration<SE2BackgroundConfig>();
189  config_manager.registerConfiguration<MemoryConfig>();
190  config_manager.registerConfiguration<BackgroundAnalyzerFactory>();
191  config_manager.registerConfiguration<SamplingConfig>();
192  config_manager.registerConfiguration<DetectionFrameConfig>();
193 
195 
196  //plugins need to be registered before reportConfigDependencies()
197  plugin_manager.loadPlugins();
198  task_factory_registry->reportConfigDependencies(config_manager);
199  segmentation_factory.reportConfigDependencies(config_manager);
200  partition_factory.reportConfigDependencies(config_manager);
201  grouping_factory.reportConfigDependencies(config_manager);
202  deblending_factory.reportConfigDependencies(config_manager);
203  measurement_factory.reportConfigDependencies(config_manager);
204  output_factory.reportConfigDependencies(config_manager);
205 
206  config_parameters.add(config_manager.closeRegistration());
207  config_initialized = true;
208  }
209  return config_parameters;
210  }
211 
214  auto options = getConfigParameters();
215 
216  options.add_options() (LIST_OUTPUT_PROPERTIES.c_str(), po::bool_switch(),
217  "List the possible output properties for the given input parameters and exit");
218  options.add_options() (PROPERTY_COLUMN_MAPPING_ALL.c_str(), po::bool_switch(),
219  "Show the columns created for each property");
220  options.add_options() (PROPERTY_COLUMN_MAPPING.c_str(), po::bool_switch(),
221  "Show the columns created for each property, for the given configuration");
222  options.add_options() (DUMP_CONFIG.c_str(), po::bool_switch(),
223  "Dump parameters with default values into a configuration file");
224  progress_printer_factory.addOptions(options);
225 
226  // Allow to pass Python options as positional following --
227  po::positional_options_description p;
228  p.add("python-arg", -1);
229 
230  return {options, p};
231  }
232 
234  template <typename T>
235  static void writeDefault(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
236  out << opt.long_name() << '=' << boost::any_cast<T>(default_value) << std::endl;
237  }
238 
240  template <typename T>
241  static void writeDefaultMultiple(std::ostream& out, const po::option_description& opt, const boost::any& default_value) {
242  auto values = boost::any_cast<std::vector<T>>(default_value);
243  if (values.empty()) {
244  out << "# " << opt.long_name() << '=' << std::endl;
245  }
246  else {
247  for (const auto& v : values)
248  out << opt.long_name() << '=' << v << std::endl;
249  }
250  }
251 
253  void printDefaults() {
254  typedef std::function<void(std::ostream&, const po::option_description&, const boost::any&)> PrinterFunction;
256  {typeid(bool), &writeDefault<bool>},
257  {typeid(int), &writeDefault<int>},
258  {typeid(double), &writeDefault<double>},
259  {typeid(std::string), &writeDefault<std::string>},
260  {typeid(std::vector<std::string>), &writeDefaultMultiple<std::string>}
261  };
262  decltype(printers)::const_iterator printer;
263 
264  auto config_parameters = getConfigParameters();
265  for (const auto& p : config_parameters.options()) {
266  boost::any default_value;
267 
268  std::cout << "# " << p->description() << std::endl;
269  if (!p->semantic()->apply_default(default_value)) {
270  std::cout << '#' << p->long_name() << "=" << std::endl;
271  }
272  else if ((printer = printers.find(default_value.type())) == printers.end()) {
273  std::cout << '#' << p->long_name() << "=<Unknown type " << default_value.type().name() << '>' << std::endl;
274  }
275  else {
276  printer->second(std::cout, *p, default_value);
277  }
278  std::cout << std::endl;
279  }
280 
281  // We need to print the log options manually, as that is set up by Elements
282  std::cout << "# Log level: FATAL, ERROR, WARN, INFO, DEBUG" << std::endl;
283  std::cout << "log-level=INFO" << std::endl;
284  std::cout << "# Log file" << std::endl;
285  std::cout << "#log-file" << std::endl;
286  }
287 
289 
290  // If the user just requested to see the possible output columns we show
291  // them and we do nothing else
292 
293  if (args.at(LIST_OUTPUT_PROPERTIES).as<bool>()) {
294  for (auto& name : output_registry->getOutputPropertyNames()) {
295  std::cout << name << std::endl;
296  }
297  return Elements::ExitCode::OK;
298  }
299 
300  if (args.at(PROPERTY_COLUMN_MAPPING_ALL).as<bool>()) {
301  output_registry->printPropertyColumnMap();
302  return Elements::ExitCode::OK;
303  }
304 
305  if (args.at(DUMP_CONFIG).as<bool>()) {
306  printDefaults();
307  return Elements::ExitCode::OK;
308  }
309 
310  // Make sure the BLAS multithreading does not interfere
312 
313  // Elements does not verify that the config-file exists. It will just not read it.
314  // We verify that it does exist here.
315  if (args.find("config-file") != args.end()) {
316  auto cfg_file = args.at("config-file").as<fs::path>();
317  if (cfg_file != "" && !fs::exists(cfg_file)) {
318  throw Elements::Exception() << "The configuration file '" << cfg_file << "' does not exist";
319  }
320  }
321 
322  // Create the progress listener and printer ASAP
323  progress_printer_factory.configure(args);
324  auto progress_mediator = progress_printer_factory.createProgressMediator();
325 
326  // Initialize the rest of the components
327  auto& config_manager = ConfigManager::getInstance(config_manager_id);
328  config_manager.initialize(args);
329 
330  // Configure TileManager
331  auto memory_config = config_manager.getConfiguration<MemoryConfig>();
332  TileManager::getInstance()->setOptions(memory_config.getTileSize(),
333  memory_config.getTileSize(), memory_config.getTileMaxMemory());
334 
335  CheckImages::getInstance().configure(config_manager);
336 
337  task_factory_registry->configure(config_manager);
338  task_factory_registry->registerPropertyInstances(*output_registry);
339 
340  segmentation_factory.configure(config_manager);
341  partition_factory.configure(config_manager);
342  grouping_factory.configure(config_manager);
343  deblending_factory.configure(config_manager);
344  measurement_factory.configure(config_manager);
345  output_factory.configure(config_manager);
346 
347  if (args.at(PROPERTY_COLUMN_MAPPING).as<bool>()) {
348  output_registry->printPropertyColumnMap(config_manager.getConfiguration<OutputConfig>().getOutputProperties());
349  return Elements::ExitCode::OK;
350  }
351 
352  auto segmentation = segmentation_factory.createSegmentation();
353 
354  // Multithreading
355  auto multithreading_config = config_manager.getConfiguration<MultiThreadingConfig>();
356  auto thread_pool = multithreading_config.getThreadPool();
357 
358  // Prefetcher
359  std::shared_ptr<Prefetcher> prefetcher;
360  if (thread_pool) {
361  prefetcher = std::make_shared<Prefetcher>(thread_pool, multithreading_config.getMaxQueueSize());
362  }
363 
364  // Rest of the stagees
365  auto partition = partition_factory.getPartition();
366  auto source_grouping = grouping_factory.createGrouping();
367 
368  std::shared_ptr<Deblending> deblending = deblending_factory.createDeblending();
369  std::shared_ptr<Measurement> measurement = measurement_factory.getMeasurement();
370  std::shared_ptr<Output> output = output_factory.createOutput();
371 
372  if (prefetcher) {
373  prefetcher->requestProperties(source_grouping->requiredProperties());
374  prefetcher->requestProperties(deblending->requiredProperties());
375  }
376 
377  // Link together the pipeline's steps
378  segmentation->Observable<std::shared_ptr<SourceInterface>>::addObserver(partition);
379 
380  if (prefetcher) {
381  segmentation->Observable<ProcessSourcesEvent>::addObserver(prefetcher);
382  prefetcher->Observable<ProcessSourcesEvent>::addObserver(source_grouping);
383  partition->addObserver(prefetcher);
384  prefetcher->Observable<std::shared_ptr<SourceInterface>>::addObserver(source_grouping);
385  }
386  else {
387  segmentation->Observable<ProcessSourcesEvent>::addObserver(source_grouping);
388  partition->addObserver(source_grouping);
389  }
390 
391  source_grouping->addObserver(deblending);
392  deblending->addObserver(measurement);
393 
394  if (config_manager.getConfiguration<OutputConfig>().getOutputUnsorted()) {
395  logger.info() << "Writing output following measure order";
396  measurement->addObserver(output);
397  } else {
398  logger.info() << "Writing output following segmentation order";
399  auto sorter = std::make_shared<Sorter>();
400  measurement->addObserver(sorter);
401  sorter->addObserver(output);
402  }
403 
404  segmentation->Observable<SegmentationProgress>::addObserver(progress_mediator->getSegmentationObserver());
405  segmentation->Observable<std::shared_ptr<SourceInterface>>::addObserver(progress_mediator->getDetectionObserver());
406  deblending->addObserver(progress_mediator->getDeblendingObserver());
407  measurement->addObserver(progress_mediator->getMeasurementObserver());
408 
409  // Add observers for CheckImages
410  if (CheckImages::getInstance().getSegmentationImage(0) != nullptr) {
411  segmentation->Observable<std::shared_ptr<SourceInterface>>::addObserver(
412  std::make_shared<DetectionIdCheckImage>());
413  }
414  if (CheckImages::getInstance().getPartitionImage(0) != nullptr) {
415  measurement->addObserver(
416  std::make_shared<SourceIdCheckImage>());
417  }
418  if (CheckImages::getInstance().getGroupImage(0) != nullptr) {
419  measurement->addObserver(
420  std::make_shared<GroupIdCheckImage>());
421  }
422  if (CheckImages::getInstance().getMoffatImage(0) != nullptr) {
423  measurement->addObserver(
424  std::make_shared<MoffatCheckImage>());
425  }
426  const auto& detection_frames = config_manager.getConfiguration<DetectionFrameConfig>().getDetectionFrames();
427 
428  // Perform measurements (multi-threaded part)
429  measurement->startThreads();
430 
431  size_t prev_writen_rows = 0;
432  size_t frame_number = 0;
433  for (auto& detection_frame : detection_frames) {
434  frame_number++;
435  try {
436  // Process the image
437  logger.info() << "Processing frame "
438  << frame_number << " / " << detection_frames.size() << " : " << detection_frame->getLabel();
439  segmentation->processFrame(detection_frame);
440  }
441  catch (const std::exception &e) {
442  logger.error() << "Failed to process the frame! " << e.what();
443  measurement->stopThreads();
445  }
446 
447  if (prefetcher) {
448  prefetcher->synchronize();
449  }
450  measurement->synchronizeThreads();
451 
452  size_t nb_writen_rows = output->flush();
453  output->nextPart();
454 
455  logger.info() << (nb_writen_rows - prev_writen_rows) << " sources detected in frame, " << nb_writen_rows << " total";
456 
457  prev_writen_rows = nb_writen_rows;
458  }
459 
460  if (prefetcher) {
461  prefetcher->wait();
462  }
463  measurement->stopThreads();
464 
466  TileManager::getInstance()->flush();
467  progress_mediator->done();
468 
469  if (prev_writen_rows > 0) {
470  logger.info() << "total " << prev_writen_rows << " sources detected";
471  } else {
472  logger.info() << "NO SOURCES DETECTED";
473  }
474 
475  return Elements::ExitCode::OK;
476  }
477 };
478 
479 
481 
482 public:
484  m_plugin_path(plugin_path), m_plugin_list(plugin_list) {
485  }
486 
487  virtual ~PluginOptionsMain() = default;
488 
489  boost::program_options::options_description defineSpecificProgramOptions() override {
490  auto& config_manager = ConfigManager::getInstance(conf_man_id);
491  config_manager.registerConfiguration<PluginConfig>();
492  auto options = config_manager.closeRegistration();
493  // The following will consume any extra options in the configuration file
494  options.add_options()("*", po::value<std::vector<std::string>>());
495  return options;
496  }
497 
499  auto& config_manager = ConfigManager::getInstance(conf_man_id);
500  config_manager.initialize(args);
501  auto& conf = config_manager.getConfiguration<PluginConfig>();
502  m_plugin_path = conf.getPluginPath();
503  m_plugin_list = conf.getPluginList();
504  return Elements::ExitCode::OK;
505  }
506 
507 private:
508 
509  long conf_man_id = getUniqueManagerId();
512 
513 };
514 
515 
516 static void forwardOptions(int argc, char *const *argv, std::vector<std::string>& plugin_options_input) {
517  for (int i = 0; i < argc; ++i) {
518  std::string option{argv[i]};
519  if (option == "--config-file") {
520  plugin_options_input.emplace_back("--config-file");
521  plugin_options_input.emplace_back(std::string{argv[i + 1]});
522  }
523  if (boost::starts_with(option, "--config-file=")) {
524  plugin_options_input.emplace_back(option);
525  }
526  if (option == "--plugin-directory") {
527  plugin_options_input.emplace_back("--plugin-directory");
528  plugin_options_input.emplace_back(std::string{argv[i + 1]});
529  }
530  if (boost::starts_with(option, "--plugin-directory=")) {
531  plugin_options_input.emplace_back(option);
532  }
533  if (option == "--plugin") {
534  plugin_options_input.emplace_back("--plugin");
535  plugin_options_input.emplace_back(std::string{argv[i + 1]});
536  }
537  if (boost::starts_with(option, "--plugin=")) {
538  plugin_options_input.emplace_back(option);
539  }
540  }
541 }
542 
543 
544 ELEMENTS_API int main(int argc, char* argv[]) {
545  std::string plugin_path {};
546  std::vector<std::string> plugin_list {};
547 
548  // This adds the current directory as a valid location for the default "sourcextractor++.conf" configuration
549  Elements::TempEnv local_env;
550  if (local_env["ELEMENTS_CONF_PATH"].empty()) {
551  local_env["ELEMENTS_CONF_PATH"] = ".:/etc";
552  } else {
553  local_env["ELEMENTS_CONF_PATH"] = ".:" + local_env["ELEMENTS_CONF_PATH"] + ":/etc";
554  }
555 
557 
558  // Try to be reasonably graceful with unhandled exceptions
560 
561  try {
562  // First we create a program which has a sole purpose to get the options for
563  // the plugin paths. Note that we do not want to have this helper program
564  // to handle any other options except of the plugin-directory and plugin, so
565  // we create a subset of the given options with only the necessary ones. We
566  // also turn off the the logging.
567  std::vector<int> masked_indices{};
568  std::vector<std::string> plugin_options_input{};
569  plugin_options_input.emplace_back("DummyProgram");
570  plugin_options_input.emplace_back("--log-level");
571  plugin_options_input.emplace_back("ERROR");
572  forwardOptions(argc, argv, plugin_options_input);
573 
574  int argc_tmp = plugin_options_input.size();
575  std::vector<const char *> argv_tmp(argc_tmp);
576  for (unsigned int i = 0; i < plugin_options_input.size(); ++i) {
577  auto& option_str = plugin_options_input[i];
578  argv_tmp[i] = option_str.data();
579  }
580 
581  CREATE_MANAGER_WITH_ARGS(plugin_options_program, PluginOptionsMain, plugin_path, plugin_list);
582  plugin_options_program.run(argc_tmp, const_cast<char **>(argv_tmp.data()));
583 
584  CREATE_MANAGER_WITH_ARGS(main, SEMain, plugin_path, plugin_list);
585  Elements::ExitCode exit_code = main.run(argc, argv);
586  return static_cast<Elements::ExitCodeType>(exit_code);
587  }
588  catch (const std::exception &e) {
589  logger.fatal() << e.what();
591  }
592  catch (...) {
593  logger.fatal() << "Unknown exception type!";
594  logger.fatal() << "Please, report this as a bug";
596  }
597 }
static const std::string PROPERTY_COLUMN_MAPPING
static void setupEnvironment(void)
static void disableBlasMultithreading()
static const std::string LIST_OUTPUT_PROPERTIES
static void forwardOptions(int argc, char *const *argv, std::vector< std::string > &plugin_options_input)
static const std::string PROPERTY_COLUMN_MAPPING_ALL
static const std::string DUMP_CONFIG
static long config_manager_id
ELEMENTS_API int main(int argc, char *argv[])
T at(T... args)
T c_str(T... args)
static Logging getLogger(const std::string &name="")
static void onTerminate() noexcept
static ConfigManager & getInstance(long id)
virtual void handleMessage(const std::shared_ptr< SourceGroupInterface > &group) override
std::list< std::shared_ptr< SourceGroupInterface > > m_list
boost::program_options::options_description defineSpecificProgramOptions() override
Elements::ExitCode mainMethod(std::map< std::string, boost::program_options::variable_value > &args) override
std::string & m_plugin_path
virtual ~PluginOptionsMain()=default
PluginOptionsMain(std::string &plugin_path, std::vector< std::string > &plugin_list)
std::vector< std::string > & m_plugin_list
po::options_description getConfigParameters()
void printDefaults()
Print a configuration file populated with defaults.
SEMain(const std::string &plugin_path, const std::vector< std::string > &plugin_list)
Elements::ExitCode mainMethod(std::map< std::string, po::variable_value > &args) override
static void writeDefaultMultiple(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a multiple-value option.
std::pair< po::options_description, po::positional_options_description > defineProgramArguments() override
Return the arguments that the program accepts.
static void writeDefault(std::ostream &out, const po::option_description &opt, const boost::any &default_value)
Print a simple option.
PluginManager plugin_manager
po::options_description config_parameters
std::list< std::shared_ptr< SourceWithOnDemandProperties > > m_list
virtual void handleMessage(const std::shared_ptr< SourceWithOnDemandProperties > &source) override
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
Definition: CheckImages.cpp:84
static CheckImages & getInstance()
Definition: CheckImages.h:150
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
Definition: CheckImages.cpp:58
Provides combined detection frame.
const std::shared_ptr< Euclid::ThreadPool > & getThreadPool() const
Observer interface to be used with Observable to implement the Observer pattern.
Definition: Observable.h:38
const std::vector< std::string > getOutputProperties()
std::set< std::string > getOutputPropertyNames()
void printPropertyColumnMap(const std::vector< std::string > &properties={})
PluginManager handles the loading of plugins and calls their registration function,...
Definition: PluginManager.h:53
void loadPlugins()
loads all the available plugins. Both those linked at compile-time and those loaded at run-time
The SegmentationFactory will provide a Segmentation implementation based on the current configuration...
void configure(Euclid::Configuration::ConfigManager &manager) override
Method which should initialize the object.
void registerPropertyInstances(OutputRegistry &output_registry)
void reportConfigDependencies(Euclid::Configuration::ConfigManager &manager) const override
Registers all the Configuration dependencies.
static std::shared_ptr< TileManager > getInstance()
T data(T... args)
T emplace_back(T... args)
T end(T... args)
T endl(T... args)
T find(T... args)
T getenv(T... args)
#define ELEMENTS_API
#define CREATE_MANAGER_WITH_ARGS(MANAGER, ELEMENTS_PROGRAM,...)
constexpr double e
std::underlying_type< ExitCode >::type ExitCodeType
long getUniqueManagerId() noexcept
static auto logger
Definition: WCS.cpp:44
Definition: conf.py:1
T partition(T... args)
T set_terminate(T... args)