1 #if !defined(FLOW_NODE_H)
8 #include <condition_variable>
66 std::shared_ptr<std::pair<pipe<T>, std::unique_ptr<std::mutex>>>
d_pipe_sp;
72 pin(
const std::string& name_r) :
named(name_r) {}
92 std::condition_variable *d_transition_cv_p;
93 std::mutex *d_transition_m_p;
98 virtual void disconnect()
101 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
116 inpin(
const std::string& name_r, std::condition_variable* transition_cv_p, std::mutex *transition_m_p)
117 :
pin<T>(name_r), d_transition_cv_p(transition_cv_p), d_transition_m_p(transition_m_p)
127 virtual std::string
rename(
const std::string& name_r)
131 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
148 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
158 virtual std::unique_ptr<packet<T>>
pop()
162 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
166 return std::unique_ptr<packet<T>>();
175 std::unique_lock<std::mutex> ul(*d_transition_m_p);
176 d_transition_cv_p->notify_one();
189 virtual void disconnect()
192 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
193 d_pipe_sp->first.rename(std::string(
"nothing") +
"_to_" +
d_pipe_sp->first.output()->name());
207 virtual void connect(
inpin<T>& inpin_r,
const size_t max_length = 0,
const size_t max_weight = 0)
218 std::lock_guard<std::mutex> lg(*inpin_r.
pin<T>::
d_pipe_sp->second);
223 if(inpin_pipe.
input())
225 inpin_pipe.
input()->disconnect();
237 pipe<T> p(
pin<T>::name() +
"_to_" + inpin_r.pin<T>::name(),
this, &inpin_r, max_length, max_weight);
238 d_pipe_sp = inpin_r.
pin<T>
::d_pipe_sp = std::make_shared<std::pair<pipe<T>, std::unique_ptr<std::mutex>>>(std::make_pair(std::move(p), std::unique_ptr<std::mutex>(
new std::mutex())));
255 virtual std::string
rename(
const std::string& name_r)
259 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
281 std::lock_guard<std::mutex> lg(*
d_pipe_sp->second);
282 if(
d_pipe_sp->first.push(std::move(packet_p)))
331 virtual void sever() = 0;
341 node(
node&& node_rr) :
named(std::move(node_rr)), d_state_a(node_rr.d_state_a)
409 typedef std::vector<outpin<T>> outputs_t;
420 virtual void connect(
size_t p_pin,
consumer<T>* consumer_p,
size_t c_pin,
const size_t max_length = 0,
const size_t max_weight = 0)
422 output(p_pin).connect(consumer_p->
input(c_pin), max_length, max_weight);
436 for(
auto&
outpin : d_outputs)
476 for(
size_t i = 0; i !=
outs; ++i)
478 d_outputs.push_back(
outpin<T>(name_r +
"_out" + static_cast<char>(
'0' + i)));
485 virtual size_t outs()
const {
return d_outputs.size(); }
498 virtual outputs_t&
outputs() {
return d_outputs; }
501 virtual const outputs_t&
outputs()
const {
return d_outputs; }
510 virtual std::string
rename(
const std::string& name_r)
512 for(
size_t i = 0; i !=
outs(); ++i)
514 output(i).outpin<T>::rename(name_r +
"_out" + static_cast<char>(
'0' + i));
536 typedef std::vector<inpin<T>> inputs_t;
545 input(pin).disconnect();
551 for(
auto&
inpin : d_inputs)
560 for(
auto&
inpin : d_inputs)
596 for(
size_t i = 0; i !=
ins(); ++i)
614 for(
size_t i = 0; i !=
ins; ++i)
623 virtual size_t ins()
const {
return d_inputs.size(); }
636 virtual inputs_t&
inputs() {
return d_inputs; }
639 virtual const inputs_t&
inputs()
const {
return d_inputs; }
646 virtual std::string
rename(
const std::string& name_r)
648 for(
size_t i = 0; i !=
ins(); ++i)
650 input(i).inpin<T>::rename(name_r +
"_in" + static_cast<char>(
'0' + i));
663 virtual void ready(
size_t n) = 0;
670 template<
typename C,
typename P>
673 virtual void produce() {}
701 virtual std::string
rename(
const std::string& name_r)
717 virtual void ready(
size_t n) = 0;