flow  3.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Pages
node.h
Go to the documentation of this file.
1 #if !defined(FLOW_NODE_H)
2  #define FLOW_NODE_H
3 
4 #include "named.h"
5 #include "packet.h"
6 #include "pipe.h"
7 
8 #include <condition_variable>
9 #include <memory>
10 #include <mutex>
11 #include <string>
12 #include <utility>
13 #include <vector>
14 #include <utility>
15 
19 
23 namespace flow
24 {
25 
26 // Forward declarations.
28 template<typename T>
29 class outpin;
30 
31 template<typename T>
32 class consumer;
33 
34 template<typename T>
35 class producer;
36 
37 class graph;
39 
43 namespace state
44 {
45 
49 enum type
50 {
54 };
55 
56 }
57 
62 template<typename T>
63 class pin : public named
64 {
65 protected:
66  std::shared_ptr<std::pair<pipe<T>, std::unique_ptr<std::mutex>>> d_pipe_sp;
67 
68  friend class outpin<T>;
69 
70 public:
72  pin(const std::string& name_r) : named(name_r) {}
73 
75  pin(const pin<T>& o) : named(o.name()), d_pipe_sp(o.d_pipe_sp) {}
76 
77  virtual ~pin() {}
78 
80  virtual void disconnect()
81  {
82  d_pipe_sp.reset();
83  }
84 };
85 
89 template<typename T>
90 class inpin : public pin<T>
91 {
92  std::condition_variable *d_transition_cv_p;
93  std::mutex *d_transition_m_p;
94 
95  using pin<T>::d_pipe_sp;
96 
98  virtual void disconnect()
99  {
100  {
101  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
102  d_pipe_sp->first.rename(d_pipe_sp->first.input()->name() + "_to_" + "nothing");
103  }
104 
106  }
107 
108  friend class consumer<T>;
109 
110 public:
116  inpin(const std::string& name_r, std::condition_variable* transition_cv_p, std::mutex *transition_m_p)
117  : pin<T>(name_r), d_transition_cv_p(transition_cv_p), d_transition_m_p(transition_m_p)
118  {}
119 
120  virtual ~inpin() {}
121 
127  virtual std::string rename(const std::string& name_r)
128  {
129  if(d_pipe_sp)
130  {
131  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
132  if(d_pipe_sp->first.input())
133  {
134  d_pipe_sp->first.rename(d_pipe_sp->first.input()->name() + "_to_" + name_r);
135  }
136  }
137 
138  return named::rename(name_r);
139  }
140 
144  virtual bool peek() const
145  {
146  if(d_pipe_sp)
147  {
148  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
149  return d_pipe_sp->first.length() != 0;
150  }
151 
152  return false;
153  }
154 
158  virtual std::unique_ptr<packet<T>> pop()
159  {
160  if(d_pipe_sp)
161  {
162  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
163  return d_pipe_sp->first.pop();
164  }
165 
166  return std::unique_ptr<packet<T>>();
167  }
168 
173  virtual void incoming()
174  {
175  std::unique_lock<std::mutex> ul(*d_transition_m_p);
176  d_transition_cv_p->notify_one();
177  }
178 };
179 
183 template<typename T>
184 class outpin : public pin<T>
185 {
186  using pin<T>::d_pipe_sp;
187 
189  virtual void disconnect()
190  {
191  {
192  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
193  d_pipe_sp->first.rename(std::string("nothing") + "_to_" + d_pipe_sp->first.output()->name());
194  }
195 
197  }
198 
207  virtual void connect(inpin<T>& inpin_r, const size_t max_length = 0, const size_t max_weight = 0)
208  {
209  // Disconnect this outpin from it's pipe, if it has one.
210  if(d_pipe_sp)
211  {
212  disconnect();
213  }
214 
215  if(inpin_r.pin<T>::d_pipe_sp)
216  {
217  // The inpin already has a pipe, connect this outpin to it.
218  std::lock_guard<std::mutex> lg(*inpin_r.pin<T>::d_pipe_sp->second);
219 
220  pipe<T> &inpin_pipe = inpin_r.pin<T>::d_pipe_sp->first;
221 
222  //... but first, disconnects it from it's other output pin.
223  if(inpin_pipe.input())
224  {
225  inpin_pipe.input()->disconnect();
226  }
227 
228  inpin_pipe.rename(pin<T>::name() + "_to_" + inpin_pipe.name());
229 
230  // Overwrite the pipe's parameters with new ones.
231  inpin_pipe.cap_length(max_length);
232  inpin_pipe.cap_length(max_weight);
233  }
234  else
235  {
236  // The inpin has no pipe, make a new one.
237  pipe<T> p(pin<T>::name() + "_to_" + inpin_r.pin<T>::name(), this, &inpin_r, max_length, max_weight);
238  d_pipe_sp = inpin_r.pin<T>::d_pipe_sp = std::make_shared<std::pair<pipe<T>, std::unique_ptr<std::mutex>>>(std::make_pair(std::move(p), std::unique_ptr<std::mutex>(new std::mutex())));
239  }
240  }
241 
242  friend class producer<T>;
243 
244 public:
246  outpin(const std::string& name_r) : pin<T>(name_r) {}
247 
248  virtual ~outpin() {}
249 
255  virtual std::string rename(const std::string& name_r)
256  {
257  if(d_pipe_sp)
258  {
259  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
260  if(d_pipe_sp->first.output())
261  {
262  d_pipe_sp->first.rename(name_r + "_to_" + d_pipe_sp->first.output()->name());
263  }
264  }
265 
266  return named::rename(name_r);
267  }
268 
275  virtual bool push(std::unique_ptr<packet<T>> packet_p)
276  {
277  if(!d_pipe_sp) return false;
278 
279  inpin<T>* inpin_p = 0;
280  {
281  std::lock_guard<std::mutex> lg(*d_pipe_sp->second);
282  if(d_pipe_sp->first.push(std::move(packet_p)))
283  {
284  inpin_p = d_pipe_sp->first.output();
285  }
286  }
287 
288  if(inpin_p)
289  {
290  inpin_p->incoming();
291  }
292 
293  return inpin_p != 0;
294  }
295 };
296 
298 class node : public named
299 {
300  state::type d_state_a;
301 
305  virtual void transition(state::type s)
306  {
307  std::unique_lock<std::mutex> ul(d_transition_m);
308 
309  d_state_a = s;
310 
311  // Notify the concrete class.
312  switch(s)
313  {
314  case state::started: started(); break;
315  case state::paused: paused(); break;
316  case state::stopped: stopped(); break;
317  default: break;
318  }
319 
320  // Notify the execution loop.
321  d_transition_cv.notify_one();
322  }
323 
324  friend class graph;
325 
326 protected:
327  std::condition_variable d_transition_cv;
328  std::mutex d_transition_m;
329 
331  virtual void sever() = 0;
332 
333 public:
337  node(const std::string& name_r) : named(name_r), d_state_a(state::paused)
338  {}
339 
341  node(node&& node_rr) : named(std::move(node_rr)), d_state_a(node_rr.d_state_a)
342  {}
343 
344  virtual ~node() {}
345 
347  virtual state::type state() const
348  {
349  return d_state_a;
350  }
351 
355  virtual void started() {}
356 
360  virtual void paused() {}
361 
365  virtual void stopped() {}
366 
371  virtual void operator()() = 0;
372 };
373 
375 namespace detail
376 {
377 
378 // These base classes help flow::graph distinguish between types of nodes without having to know their template types.
379 
380 class producer
381 {
382 public:
383  virtual ~producer() {}
384 };
385 
386 class transformer
387 {
388 public:
389  virtual ~transformer() {}
390 };
391 
392 class consumer
393 {
394 public:
395  virtual ~consumer() {}
396 };
397 
398 }
400 
406 template<typename T>
407 class producer : public virtual node, public detail::producer
408 {
409  typedef std::vector<outpin<T>> outputs_t;
410  outputs_t d_outputs;
411 
412 protected:
420  virtual void connect(size_t p_pin, consumer<T>* consumer_p, size_t c_pin, const size_t max_length = 0, const size_t max_weight = 0)
421  {
422  output(p_pin).connect(consumer_p->input(c_pin), max_length, max_weight);
423  }
424 
428  virtual void disconnect(size_t pin)
429  {
430  output(pin).disconnect();
431  }
432 
434  virtual void sever()
435  {
436  for(auto& outpin : d_outputs)
437  {
438  outpin.disconnect();
439  }
440  }
441 
446  virtual void operator()()
447  {
448  state::type s(state());
449 
450  while(s != state::stopped)
451  {
452  if(s == state::paused)
453  {
454  std::unique_lock<std::mutex> ul(d_transition_m);
455  d_transition_cv.wait(ul, [&s, this](){ return (s = this->state()) != state::paused; });
456  }
457  else
458  {
459  s = state();
460  }
461 
462  if(s == state::started)
463  {
464  produce();
465  }
466  }
467  }
468 
469  friend class graph;
470 
471 public:
474  producer(const std::string& name_r, const size_t outs) : node(name_r)
475  {
476  for(size_t i = 0; i != outs; ++i)
477  {
478  d_outputs.push_back(outpin<T>(name_r + "_out" + static_cast<char>('0' + i)));
479  }
480  }
481 
482  virtual ~producer() {}
483 
485  virtual size_t outs() const { return d_outputs.size(); }
486 
490  virtual outpin<T>& output(const size_t n) { return d_outputs[n]; }
491 
495  virtual const outpin<T>& output(const size_t n) const { return d_outputs[n]; }
496 
498  virtual outputs_t& outputs() { return d_outputs; }
499 
501  virtual const outputs_t& outputs() const { return d_outputs; }
502 
510  virtual std::string rename(const std::string& name_r)
511  {
512  for(size_t i = 0; i != outs(); ++i)
513  {
514  output(i).outpin<T>::rename(name_r + "_out" + static_cast<char>('0' + i));
515  }
516 
517  return named::rename(name_r);
518  }
519 
525  virtual void produce() = 0;
526 };
527 
533 template<typename T>
534 class consumer : public virtual node, public detail::consumer
535 {
536  typedef std::vector<inpin<T>> inputs_t;
537  inputs_t d_inputs;
538 
539 protected:
543  virtual void disconnect(size_t pin)
544  {
545  input(pin).disconnect();
546  }
547 
549  virtual void sever()
550  {
551  for(auto& inpin : d_inputs)
552  {
553  inpin.disconnect();
554  }
555  }
556 
558  virtual bool incoming()
559  {
560  for(auto& inpin : d_inputs)
561  {
562  if(inpin.peek())
563  {
564  return true;
565  }
566  }
567 
568  return false;
569  }
570 
575  virtual void operator()()
576  {
577  state::type s(state());
578 
579  while(s != state::stopped)
580  {
581  bool p = false;
582 
583  if(s == state::paused)
584  {
585  std::unique_lock<std::mutex> ul(d_transition_m);
586  d_transition_cv.wait(ul, [&s, this](){ return (s = this->state()) != state::paused; });
587  }
588  else if(s == state::started)
589  {
590  std::unique_lock<std::mutex> ul(d_transition_m);
591  d_transition_cv.wait(ul, [&s, &p, this](){ return ((s = this->state()) != state::started) || (p = this->incoming()); });
592  }
593 
594  if(p)
595  {
596  for(size_t i = 0; i != ins(); ++i)
597  {
598  if(input(i).peek())
599  {
600  ready(i);
601  }
602  }
603  }
604  }
605  }
606 
607  friend class graph;
608 
609 public:
612  consumer(const std::string& name_r, const size_t ins) : node(name_r)
613  {
614  for(size_t i = 0; i != ins; ++i)
615  {
616  d_inputs.push_back(inpin<T>(name_r + "_in" + static_cast<char>('0' + i), &d_transition_cv, &d_transition_m));
617  }
618  }
619 
620  virtual ~consumer() {}
621 
623  virtual size_t ins() const { return d_inputs.size(); }
624 
628  virtual inpin<T>& input(const size_t n) { return d_inputs[n]; }
629 
633  virtual const inpin<T>& input(const size_t n) const { return d_inputs[n]; }
634 
636  virtual inputs_t& inputs() { return d_inputs; }
637 
639  virtual const inputs_t& inputs() const { return d_inputs; }
640 
646  virtual std::string rename(const std::string& name_r)
647  {
648  for(size_t i = 0; i != ins(); ++i)
649  {
650  input(i).inpin<T>::rename(name_r + "_in" + static_cast<char>('0' + i));
651  }
652 
653  return named::rename(name_r);
654  }
655 
663  virtual void ready(size_t n) = 0;
664 };
665 
670 template<typename C, typename P>
671 class transformer : public consumer<C>, public producer<P>, public detail::transformer
672 {
673  virtual void produce() {}
674 
675 protected:
677  virtual void sever()
678  {
680 
682  }
683 
685  virtual void operator()() { consumer<C>::operator()(); }
686 
687 public:
691  transformer(const std::string& name_r, const size_t ins, const size_t outs) : node(name_r), consumer<C>(name_r, ins), producer<P>(name_r, outs)
692  {}
693 
694  virtual ~transformer() {}
695 
701  virtual std::string rename(const std::string& name_r)
702  {
703  for(size_t i = 0; i != producer<P>::outs(); ++i)
704  {
705  producer<P>::output(i).outpin<P>::rename(name_r + "_out" + static_cast<char>('0' + i));
706  }
707 
708  for(size_t i = 0; i != consumer<C>::ins(); ++i)
709  {
710  consumer<C>::input(i).inpin<C>::rename(name_r + "_in" + static_cast<char>('0' + i));
711  }
712 
713  return named::rename(name_r);
714  }
715 
717  virtual void ready(size_t n) = 0;
718 };
719 
720 }
721 
722 #endif
723 
724 /*
725  (C) Copyright Thierry Seegers 2010-2012. Distributed under the following license:
726 
727  Boost Software License - Version 1.0 - August 17th, 2003
728 
729  Permission is hereby granted, free of charge, to any person or organization
730  obtaining a copy of the software and accompanying documentation covered by
731  this license (the "Software") to use, reproduce, display, distribute,
732  execute, and transmit the Software, and to prepare derivative works of the
733  Software, and to permit third-parties to whom the Software is furnished to
734  do so, all subject to the following:
735 
736  The copyright notices in the Software and this entire statement, including
737  the above license grant, this restriction and the following disclaimer,
738  must be included in all copies of the Software, in whole or in part, and
739  all derivative works of the Software, unless such copies or derivative
740  works are solely in the form of machine-executable object code generated by
741  a source language processor.
742 
743  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
744  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
745  FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
746  SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
747  FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
748  ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
749  DEALINGS IN THE SOFTWARE.
750 */