flow  3.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Pages
generic.h
Go to the documentation of this file.
1 #if !defined(FLOW_GENERIC_H)
2  #define FLOW_GENERIC_H
3 
4 #include "node.h"
5 #include "timer.h"
6 
7 #include <chrono>
8 #include <condition_variable>
9 #include <functional>
10 #include <iostream>
11 #include <mutex>
12 #include <thread>
13 
17 
21 
25 namespace flow { namespace samples { namespace generic {
26 
28 template<typename T>
29 class generator : public producer<T>
30 {
31  std::function<T ()> d_gen_f;
32 
33  std::condition_variable d_awaken_cv;
34  std::mutex d_awaken_m;
35  bool d_awaken;
36 
37 public:
41  generator(timer& timer_r, const std::function<T ()>& gen_f, const std::string& name_r = "generator") : node(name_r), producer<T>(name_r, 1), d_gen_f(gen_f), d_awaken(false)
42  {
43  timer_r.listen(std::bind(&generator::timer_fired, this));
44  }
45 
46  virtual ~generator() {}
47 
49  virtual void stopped()
50  {
51  std::unique_lock<std::mutex> ul(d_awaken_m);
52  d_awaken = true;
53  d_awaken_cv.notify_one();
54  }
55 
57  virtual void timer_fired()
58  {
59  std::unique_lock<std::mutex> ul(d_awaken_m);
60  d_awaken = true;
61  d_awaken_cv.notify_one();
62  }
63 
68  virtual void produce()
69  {
70  {
71  std::unique_lock<std::mutex> ul(d_awaken_m);
72  d_awaken_cv.wait(ul, std::bind(std::equal_to<bool>(), std::ref(d_awaken), true));
73  d_awaken = false;
74  }
75 
77  {
78  std::unique_ptr<packet<T>> packet_p(new packet<T>(d_gen_f()));
79 
80  producer<T>::output(0).push(std::move(packet_p));
81  }
82  }
83 };
84 
86 template<typename T>
87 class ostreamer : public consumer<T>
88 {
89  std::ostream& d_o_r;
90 
91  std::condition_variable d_stopped_cv;
92  std::mutex d_stopped_m;
93 
94 public:
99  ostreamer(std::ostream& o_r, const std::string& name_r = "ostreamer") : node(name_r), consumer<T>(name_r, 1), d_o_r(o_r) {}
100 
101  virtual ~ostreamer() {}
102 
104  virtual void stopped()
105  {
106  d_stopped_cv.notify_one();
107  }
108 
115  virtual void ready(size_t)
116  {
117  std::unique_ptr<packet<T>> packet_p = consumer<T>::input(0).pop();
118 
119  if(packet_p->consumption_time() == typename packet<T>::time_point_type())
120  {
121  // This packet has no set consumption time. Consume it immediately.
122  d_o_r << packet_p->data() << std::endl;
123  }
124  else if(packet_p->consumption_time() > std::chrono::high_resolution_clock::now())
125  {
126  // This packet must be consumed at a set time.
127  // Wait until then or until this node is stopped.
128  std::unique_lock<std::mutex> l_stopped(d_stopped_m);
129  d_stopped_cv.wait_until(l_stopped, packet_p->consumption_time());
130 
131  if(node::state() == state::started)
132  {
133  d_o_r << packet_p->data() << std::endl;
134  }
135  }
136  }
137 };
138 
140 template<typename T>
141 class tee : public transformer<T, T>
142 {
143 public:
148  tee(const size_t outs = 2, const std::string& name_r = "tee") : node(name_r), transformer<T, T>(name_r, 1, outs) {}
149 
150  virtual ~tee() {}
151 
155  virtual void ready(size_t)
156  {
157  std::unique_ptr<packet<T>> packet_p = consumer<T>::input(0).pop();
158 
159  for(size_t s = 1; s != producer<T>::outs(); ++s)
160  {
161  std::unique_ptr<packet<T>> copy_p(new packet<T>(*packet_p));
162  producer<T>::output(s).push(std::move(copy_p));
163  }
164 
165  producer<T>::output(0).push(std::move(packet_p));
166  }
167 };
168 
170 template<typename T>
171 class delay : public transformer<T, T>
172 {
173  std::chrono::milliseconds d_offset;
174 
175 public:
178  template<typename Duration>
179  delay(const Duration& offset_r, const std::string& name_r = "delay") : node(name_r), transformer<T, T>(name_r, 1, 1), d_offset(offset_r) {}
180 
181  virtual ~delay() {}
182 
186  virtual void ready(size_t)
187  {
188  std::unique_ptr<packet<T>> packet_p = consumer<T>::input(0).pop();
189 
190  if(packet_p->consumption_time() == typename packet<T>::time_point_type())
191  {
192  packet_p->consumption_time() = std::chrono::high_resolution_clock::now() + d_offset;
193  }
194  else
195  {
196  packet_p->consumption_time() += d_offset;
197  }
198 
199  producer<T>::output(0).push(std::move(packet_p));
200  }
201 };
202 
203 }}}
204 
205 #endif
206 
207 /*
208  (C) Copyright Thierry Seegers 2010-2012. Distributed under the following license:
209 
210  Boost Software License - Version 1.0 - August 17th, 2003
211 
212  Permission is hereby granted, free of charge, to any person or organization
213  obtaining a copy of the software and accompanying documentation covered by
214  this license (the "Software") to use, reproduce, display, distribute,
215  execute, and transmit the Software, and to prepare derivative works of the
216  Software, and to permit third-parties to whom the Software is furnished to
217  do so, all subject to the following:
218 
219  The copyright notices in the Software and this entire statement, including
220  the above license grant, this restriction and the following disclaimer,
221  must be included in all copies of the Software, in whole or in part, and
222  all derivative works of the Software, unless such copies or derivative
223  works are solely in the form of machine-executable object code generated by
224  a source language processor.
225 
226  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
227  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
228  FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
229  SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
230  FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
231  ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
232  DEALINGS IN THE SOFTWARE.
233 */