flow  3.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Pages
graph.h
Go to the documentation of this file.
1 #if !defined(FLOW_GRAPH_H)
2  #define FLOW_GRAPH_H
3 
4 #include "named.h"
5 #include "node.h"
6 
7 #include <iostream>
8 #include <map>
9 #include <memory>
10 #include <string>
11 #include <thread>
12 
16 
17 namespace flow
18 {
19 
23 class graph : public named
24 {
25  typedef std::map<std::string, std::shared_ptr<node>> nodes_t;
26  nodes_t d_producers, d_transformers, d_consumers;
27 
28  typedef std::map<std::string, std::unique_ptr<std::thread>> threads_t;
29  threads_t d_threads;
30 
31  typedef std::map<std::string, std::map<size_t, std::pair<std::string, size_t>>> connections_t;
32  connections_t connections;
33 
34 public:
36  graph(const std::string name_r = "graph") : named(name_r)
37  {}
38 
39  virtual ~graph()
40  {
41  stop();
42  }
43 
50  virtual void add(std::shared_ptr<node> node_p, const std::string& name_r = std::string())
51  {
52  if(!name_r.empty())
53  {
54  node_p->rename(name_r);
55  }
56 
57  if(std::dynamic_pointer_cast<detail::transformer>(node_p))
58  {
59  d_transformers[node_p->name()] = node_p;
60  }
61  else if(std::dynamic_pointer_cast<detail::producer>(node_p))
62  {
63  d_producers[node_p->name()] = node_p;
64  }
65  else if(std::dynamic_pointer_cast<detail::consumer>(node_p))
66  {
67  d_consumers[node_p->name()] = node_p;
68  }
69 
70  connections[node_p->name()];
71  }
72 
78  virtual std::shared_ptr<node> remove(const std::string& name_r)
79  {
80  std::shared_ptr<node> p;
81  nodes_t *n;
82  nodes_t::iterator i;
83 
84  if(n = find(name_r, i))
85  {
86  i->second->sever();
87  p = i->second;
88  n->erase(i);
89  }
90 
91  connections.erase(name_r);
92 
93  return p;
94  }
95 
99  virtual void remove(const std::shared_ptr<node>& sp_node)
100  {
101  remove(sp_node->name());
102  }
103 
114  template<typename T>
115  bool connect(const std::string& p_name_r, const size_t p_pin, const std::string& c_name_r, const size_t c_pin, const size_t max_length = 0, const size_t max_weight = 0)
116  {
117  nodes_t::iterator p, c;
118 
119  // Confirm these two nodes are in the graph.
120  if(!find(p_name_r, p) || !find(c_name_r, c))
121  {
122  return false;
123  }
124 
125  std::dynamic_pointer_cast<producer<T>>(p->second)->connect(p_pin, std::dynamic_pointer_cast<consumer<T>>(c->second).get(), c_pin, max_length, max_weight);
126 
127  connections[p_name_r][p_pin] = std::make_pair(c_name_r, c_pin);
128 
129  return true;
130  }
131 
142  template<typename T>
143  bool connect(std::shared_ptr<flow::producer<T>> sp_p, const size_t p_pin, std::shared_ptr<flow::consumer<T>> sp_c, const size_t c_pin, const size_t max_length = 0, const size_t max_weight = 0)
144  {
145  nodes_t::iterator i;
146 
147  // Confirm these two nodes are in the graph.
148  if(!find(sp_p->name(), i) || !find(sp_c->name(), i))
149  {
150  return false;
151  }
152 
153  sp_p->connect(p_pin, sp_c.get(), c_pin, max_length, max_weight);
154 
155  connections[sp_p->name()][p_pin] = std::make_pair(sp_c->name(), c_pin);
156 
157  return true;
158  }
159 
164  template<typename T>
165  void disconnect(std::shared_ptr<flow::producer<T>> sp_p, const size_t p_pin)
166  {
167  sp_p->disconnect(p_pin);
168 
169  connections[sp_p->name()][p_pin] = std::make_pair(std::string(), 0);
170  }
171 
176  template<typename T>
177  void disconnect(std::shared_ptr<flow::consumer<T>> sp_c, const size_t c_pin)
178  {
179  sp_c->disconnect(c_pin);
180 
181  connections[sp_c->name()][c_pin] = std::make_pair(std::string(), 0);
182  }
183 
188  virtual void start()
189  {
190  auto start_f = [this](nodes_t::value_type& i)
191  {
192  i.second->transition(state::started);
193 
194  if(d_threads.find(i.first) == d_threads.end())
195  {
196 // d_threads[i.first] = std::unique_ptr<std::thread>(new std::thread(std::ref(*i.second)));
197  d_threads[i.first] = std::unique_ptr<std::thread>(new std::thread([&i]{ i.second->operator()(); })); // Remove this workaround for bug in VC++11 (bug #734305) when possible.
198  }
199  };
200 
201  for(auto& i : d_consumers){ start_f(i); }
202  for(auto& i : d_transformers){ start_f(i); }
203  for(auto& i : d_producers){ start_f(i); }
204  }
205 
209  virtual void pause()
210  {
211  auto pause_f = [this](nodes_t::value_type& i)
212  {
213  i.second->transition(state::paused);
214  };
215 
216  for(auto& i : d_producers){ pause_f(i); }
217  for(auto& i : d_transformers){ pause_f(i); }
218  for(auto& i : d_consumers){ pause_f(i); }
219  }
220 
224  virtual void stop()
225  {
226  auto stop_f = [this](nodes_t::value_type& i)
227  {
228  i.second->transition(state::stopped);
229 
230  graph::threads_t::iterator j = d_threads.find(i.first);
231  if(j != d_threads.end())
232  {
233  j->second->join();
234  d_threads.erase(j);
235  }
236  };
237 
238  for(auto& i : d_producers){ stop_f(i); }
239  for(auto& i : d_transformers){ stop_f(i); }
240  for(auto& i : d_consumers){ stop_f(i); }
241  }
242 
246  virtual std::ostream& to_dot(std::ostream& o)
247  {
248  o << "digraph " << (name() == "graph" ? "graph1" : name()) << "\n{\n";
249  o << "\trankdir = LR\n";
250  o << "\tnode [shape = record, fontname = \"Helvetica\"]\n";
251  o << "\tedge [color = \"midnightblue\", labelfontname = \"Courier\"]\n";
252 
253  for(auto& p : connections)
254  {
255  for(auto& i : p.second)
256  {
257  o << "\t" << p.first << " -> " << i.second.first << " [taillabel = \"" << i.first << "\", headlabel = \"" << i.second.second << "\"]\n";
258  }
259  }
260 
261  o << "}" << std::endl;
262 
263  return o;
264  }
265 
266 private:
267  virtual nodes_t* find(const std::string& name_r, nodes_t::iterator& i)
268  {
269  i = d_producers.find(name_r);
270 
271  if(i != d_producers.end())
272  {
273  return &d_producers;
274  }
275  else if((i = d_transformers.find(name_r)) != d_transformers.end())
276  {
277  return &d_transformers;
278  }
279  else if((i = d_consumers.find(name_r)) != d_consumers.end())
280  {
281  return &d_consumers;
282  }
283 
284  return nullptr;
285  }
286 };
287 
288 }
289 
290 #endif
291 
292 /*
293  (C) Copyright Thierry Seegers 2010-2012. Distributed under the following license:
294 
295  Boost Software License - Version 1.0 - August 17th, 2003
296 
297  Permission is hereby granted, free of charge, to any person or organization
298  obtaining a copy of the software and accompanying documentation covered by
299  this license (the "Software") to use, reproduce, display, distribute,
300  execute, and transmit the Software, and to prepare derivative works of the
301  Software, and to permit third-parties to whom the Software is furnished to
302  do so, all subject to the following:
303 
304  The copyright notices in the Software and this entire statement, including
305  the above license grant, this restriction and the following disclaimer,
306  must be included in all copies of the Software, in whole or in part, and
307  all derivative works of the Software, unless such copies or derivative
308  works are solely in the form of machine-executable object code generated by
309  a source language processor.
310 
311  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
312  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
313  FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
314  SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
315  FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
316  ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
317  DEALINGS IN THE SOFTWARE.
318 */