SourceXtractorPlusPlus  0.17
SourceXtractor++, the next generation SExtractor
Prefetcher.h
Go to the documentation of this file.
1 
18 #ifndef _SEIMPLEMENTATION_MEASUREMENT_PREFETCHER_H_
19 #define _SEIMPLEMENTATION_MEASUREMENT_PREFETCHER_H_
20 
21 #include <condition_variable>
26 #include "SEUtils/Observable.h"
27 
28 namespace SourceXtractor {
29 
41 class Prefetcher : public Observer<std::shared_ptr<SourceInterface>>,
42  public Observable<std::shared_ptr<SourceInterface>>,
43  public Observer<ProcessSourcesEvent>,
44  public Observable<ProcessSourcesEvent> {
45 public:
46 
52  Prefetcher(const std::shared_ptr<Euclid::ThreadPool>& thread_pool, unsigned max_queue_size);
53 
57  virtual ~Prefetcher();
58 
64  void handleMessage(const std::shared_ptr<SourceInterface>& message) override;
65 
71  void handleMessage(const ProcessSourcesEvent& message) override;
72 
80  template<typename Container>
81  void requestProperties(Container&& properties) {
82  for (auto& p : properties) {
83  requestProperty(p);
84  }
85  }
86 
92  void wait();
93 
98  void synchronize();
99 
100 
101 private:
102  struct EventType {
103  enum Type {
107 
108  explicit EventType(Type type, intptr_t source_addr = -1)
109  : m_event_type(type), m_source_addr(source_addr) {}
110  };
111 
126 
128 
130  std::atomic_bool m_stop;
131 
134 
135  void requestProperty(const PropertyId& property_id);
136  void outputLoop();
137 };
138 
139 } // end of namespace SourceXtractor
140 
141 #endif // _SEIMPLEMENTATION_MEASUREMENT_PREFETCHER_H_
Implements the Observer pattern. Notifications will be made using a message of type T.
Definition: Observable.h:51
Observer interface to be used with Observable to implement the Observer pattern.
Definition: Observable.h:38
void requestProperty(const PropertyId &property_id)
Definition: Prefetcher.cpp:76
std::deque< ProcessSourcesEvent > m_event_queue
Queue of received ProcessSourceEvent, order preserved.
Definition: Prefetcher.h:123
std::shared_ptr< Euclid::ThreadPool > m_thread_pool
Pointer to the pool of worker threads.
Definition: Prefetcher.h:113
Prefetcher(const std::shared_ptr< Euclid::ThreadPool > &thread_pool, unsigned max_queue_size)
Definition: Prefetcher.cpp:44
std::condition_variable m_new_output
Notifies there is a new source done processing.
Definition: Prefetcher.h:119
std::deque< EventType > m_received
Queue of type of received events. Used to pass downstream events respecting the received order.
Definition: Prefetcher.h:125
void handleMessage(const std::shared_ptr< SourceInterface > &message) override
Definition: Prefetcher.cpp:54
void requestProperties(Container &&properties)
Definition: Prefetcher.h:81
Euclid::Semaphore m_semaphore
Keep the queue under control.
Definition: Prefetcher.h:133
std::set< PropertyId > m_prefetch_set
Properties to prefetch.
Definition: Prefetcher.h:115
std::unique_ptr< std::thread > m_output_thread
Orchestration thread.
Definition: Prefetcher.h:117
std::atomic_bool m_stop
Termination condition for the output loop.
Definition: Prefetcher.h:130
std::map< intptr_t, std::shared_ptr< SourceInterface > > m_finished_sources
Finished sources.
Definition: Prefetcher.h:121
Identifier used to set and retrieve properties.
Definition: PropertyId.h:40
EventType(Type type, intptr_t source_addr=-1)
Definition: Prefetcher.h:108
enum SourceXtractor::Prefetcher::EventType::Type m_event_type
Event received by SourceGrouping to request the processing of some of the Sources stored.