#ifndef LIBFLOW_FLOW_HPP
#define LIBFLOW_FLOW_HPP

/*
 * Copyright (c) 2008, Victor Nicollet <victor@nicollet.net>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *     * Redistributions of source code must retain the above copyright
 *       notice, this list of conditions and the following disclaimer.
 *     * Redistributions in binary form must reproduce the above copyright
 *       notice, this list of conditions and the following disclaimer in the
 *       documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY VICTOR NICOLLET ``AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 * DISCLAIMED. IN NO EVENT SHALL VICTOR NICOLLET BE LIABLE FOR ANY
 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include <queue>
#include <cassert>
#include <numeric>

#include <boost/optional.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition.hpp>

namespace flows
{
  template <typename Data_t>
  struct flow;

  namespace detail
  {
    template <typename T>
    class channel
    {
      typedef T value_t;
      typedef boost::optional<value_t> optional_t;
      typedef std::queue<value_t> queue_t;
      typedef boost::mutex mutex_t;
      typedef mutex_t::scoped_lock lock_t;
      typedef boost::condition condition_t;

      // Input-output elements ================================================

      // Any operation over the data passes through the
      // data mutex. This allows sync of both input and
      // output threads.

      queue_t data; 
      volatile bool is_over;
      mutex_t data_mutex; 
      condition_t written_to;

      // Reference counting elements ==========================================

      // Any operation over the handles passes through 
      // the handle mutex.
      
      mutex_t handle_mutex;
      volatile unsigned output_handles;
      volatile unsigned input_handles;
      volatile bool is_silent;

    public:

      // Initialize a clean channel
      channel() : 
        is_over(false), 
        output_handles(0), 
        input_handles(0), 
        is_silent(false)
      {
      }

      void acquire(bool input) 
      {
        lock_t lock(handle_mutex);
        if (input) 
          ++input_handles;
        else
          ++output_handles;
      }

      // Release a reference to the flow.
      // - If all threads cease to reference the flow, then
      //   the memory is deallocated with operator delete
      // - If all input threads cease to reference the flow,
      //   sets the is-over flag and wakes up an output 
      //   thread.
      // - If all output threads cease to reference the
      //   flow, sets the is-silent flag.
      void release(bool input)
      {
        bool should_delete = false;

        {
          lock_t lock(handle_mutex);

          if (input) 
            { 
              if (--input_handles == 0) 
                {
                  lock_t lock(data_mutex);
                  is_over = true;
                  written_to.notify_all();
                }
            }
          else
            {
              if (--output_handles == 0)
                {
                  is_silent = true;
                }
            }

          should_delete = (input_handles + output_handles == 0);
        }

        if (should_delete) 
          {
            delete this;
          }
      }

      // Is the flow silent?
      bool silent()
      {
        lock_t lock(handle_mutex);
        return is_silent;
      }

      // Add an element to the flow.
      // - If any output threads were waiting for input, 
      //   one of them is reactivated and reads the input.
      // - Otherwise, stores the element for later collection.
      void add(const value_t &t) 
      {
        lock_t lock(data_mutex);
        data.push(t);
        written_to.notify_one();
      }

      // Attempt to read an element from the flow.
      // - If the flow is over, leaves the argument untouched.
      // - If the flow is currently empty, but there are still
      //   input threads, blocks until input (or an is-over
      //   signal) arrives.
      void get(optional_t &t)
      {
        lock_t lock(data_mutex);

        while (!is_over && data.empty())
          {
            written_to.wait(lock);
          }

        if (is_over && data.empty()) 
          {
            return;
          }

        t = data.front();
        data.pop();
      }
    };

    template <typename T>
    class reader_iterator
    {
      typedef boost::optional<T> optional_t;
      typedef channel<T> channel_t;
      typedef T value_t;
      typedef reader_iterator self_t;

      channel_t *chan;
      mutable optional_t value;
      bool end;
      
    public:

      typedef unsigned long            difference_type;
      typedef T                        value_type;
      typedef T&                       reference;
      typedef T*                       pointer;
      typedef std::input_iterator_tag  iterator_category;

    private:

      void get_value()
      {
        assert (chan);
        value = optional_t ();
        chan -> get(value);
      }
      
    public: 

      reader_iterator(channel_t &chan) : chan(&chan), value()
      {
        get_value(); 
      }

      reader_iterator() : chan(0), value() {}

      const value_t &operator * () const
      {
        assert (value);
        return *value;
      }

      self_t operator ++ (int) 
      {
        self_t old = *this;
        assert (value);
        get_value ();
        return old;
      }
      
      self_t &operator ++ ()
      {
        assert (value);
        get_value ();
        return *this;
      } 

      bool operator != (const reader_iterator &o) const
      {
        // Input iterators equal if and only if they are both
        // past-the-end iterators.
        return value || o.value;
      }

      bool operator == (const reader_iterator &o) const
      {
        return !(*this != o);
      }
    };

    template <typename T>
    class writer_iterator
    {
      typedef T value_t;
      typedef channel<T> channel_t;
      channel_t *chan;

      struct assigner_t
      {
        channel_t &chan;

        assigner_t(channel_t &chan) : chan(chan) {}

        void operator = (const value_t &t) 
        {
          chan.add(t);
        }
      };
     
    public:

      writer_iterator(channel_t &chan) : chan(&chan) {}

      typedef void                      difference_type;
      typedef void                      value_type;
      typedef void                      reference;
      typedef void                      pointer;
      typedef std::output_iterator_tag  iterator_category;
      
      assigner_t operator * () const
      {
        assert (chan);
        return assigner_t(*chan);
      }

      writer_iterator operator ++ () { return *this; }
      writer_iterator &operator ++ (int) { return *this; }
    };
  }

  // An input view on a given flow.

  // Provides an input iterator range. Reading from the iterators
  // yields the data inside the flow. If several input views are
  // present for the same flow, then a given element from the
  // flow will be read by exactly one view.

  // The element order is the same order the elements were in
  // when they were added to the flow, though some may be
  // missing (because they have been read through other input views).

  // Reading from a view is thread-safe (it's possible to read
  // and write from one or more threads to a given thread). However,
  // deadlocks are possible: attempting to read from an input view 
  // will block until an output view writes something to the flow,
  // or all output views for the flow have been destroyed. 

  template <typename T>
  class reader
  { 
    friend class flow<T>;
    typedef T value_t;
    typedef detail::channel<T> channel_t;
    typedef reader self_t;
    typedef boost::optional<value_t> optional_t;
    
    channel_t *chan;

    reader(channel_t &chan) : chan(&chan) 
    { 
      chan.acquire(false);
    }

  public:

    typedef T value_type;
    typedef optional_t option_type;
    typedef detail::reader_iterator<value_t> iterator;

    reader() : chan(0) {}

    reader(const reader &i) : chan(i.chan)
    {
      if (chan) chan -> acquire(false);
    }

    reader &operator = (const reader &i)
    {
      if (i.chan) i.chan -> acquire(false);
      if (chan) chan -> release(false);

      chan = i.chan;
      return *this;
    }

    void close() 
    {
      assert (chan);
      chan -> release(false);
      chan = 0;
    }

    iterator begin() 
    {
      assert (chan);
      return iterator(*chan);
    }

    iterator end()
    {
      assert (chan);
      return iterator();
    }

    optional_t get()
    {
      assert (chan);
      iterator it = begin();
      if (it != end()) return *it;
      return optional_t ();
    }

    ~reader()
    {
      if (chan) chan -> release(false);
    }
  };

  // An output view on a flow.

  // Provides an output iterator for writing to the flow.
  // Elements written to the flow are available, in the same
  // order, to input views over that flow. 

  // Several output views on the same flow can coexist,
  // and the behavior is thread-safe. Elements added by
  // distinct views are not ordered in any defined way.

  // When all output views are destroyed, the flow is 
  // closed and any existing input views cease to expect input.
  // Otherwise, input views keep waiting for input (in a
  // blocking fashion) as long as an output view remains. 

  template <typename T>
  class writer
  {
    friend class flow<T>;
    typedef T value_t;
    typedef writer self_t;
    typedef detail::channel<T> channel_t;
    
    channel_t *chan;

    writer(channel_t &chan) : chan(&chan)
    {
      chan.acquire(true);
    }

  public:

    typedef T value_type;
    typedef detail::writer_iterator<value_t> iterator;

    writer() : chan(0) {}

    writer(const writer &o) : chan(o.chan)
    {
      if (chan) chan -> acquire(true);
    }

    writer &operator = (const writer &o)
    {
      if (o.chan) o.chan -> acquire(true);
      if (chan) chan -> release(true);

      chan = o.chan;
      return *this;
    }
    
    void close() 
    {
      assert (chan);
      chan -> release(true);
      chan = 0;
    }

    bool silent() const
    {
      assert (chan);
      return chan -> silent();
    }

    iterator begin() 
    {
      assert (chan);
      return iterator(*chan);
    }

    void put(const value_t &t)
    {
      assert (chan);
      iterator i = begin();
      *i++ = t;
    }

    ~writer()
    {
      if (chan) chan -> release(true);
    }
  };

  // A flow, as an input and an output. 

  template <typename Data_t>
  class flow
  {
    reader<Data_t> flow_reader;
    writer<Data_t> flow_writer;

  public: 

    flow() 
    {
      detail::channel<Data_t> *chan = new detail::channel<Data_t>();
      flow_reader = reader<Data_t> (*chan);
      flow_writer = writer<Data_t> (*chan);
    }

    const reader<Data_t> &output() const { return flow_reader; }
    const writer<Data_t> &input() const { return flow_writer; }
  };

  // The default threaded runner. Creates brand new boost::thread
  // instances with the associated run function.

  void run_boost_thread(const boost::function0<void> &func)
  {
    boost::thread t(func);
  }

  // Creates two matching flows in two distinct threads.
  // - Creates an input flow 'i' and an output flow 'o'.
  // - Creates a thread which executes 'f(o)'. The thread
  //   ceases its execution when 'f(o)' returns. 
  // - Immediately returns 'i'.

  namespace detail
  {
    template <typename Data_t, typename Func_t>
    struct generate1_lambda
    {
      writer<Data_t> o;
      Func_t f;
      
      void operator () () const
      {
        f(o);
      }
    }; 
  }

  template <typename Data_t, typename Func_t, typename Thread_t>
  reader<Data_t> generate(Func_t f, Thread_t t)
  {
    flow<Data_t> pair;

    detail::generate1_lambda<Data_t, Func_t> lambda = 
      { pair.input(), f };

    t (lambda);

    return pair.output();
  }

  template <typename Data_t, typename Func_t>
  reader<Data_t> generate(Func_t f)
  {
    return generate<Data_t>(f, run_boost_thread);
  }

  // Creates several flows in as many distinct threads.
  // - Creates an input flow 'i' and an output flow 'o'.
  // - For each iterator 'it' in the range [b,e) creates a
  //   thread which executes 'f(o,*it)'. That thread ceases
  //   its execution when the function returns.
  // - Immediately returns 'i'. 

  namespace detail
  {
    template<typename Data_t, typename Func_t, typename Iter_t>
    struct generate2_lambda
    {
      writer<Data_t> o;
      Func_t f;
      typename std::iterator_traits<Iter_t>::value_type v;
      
      void operator () () const
      {
        f(o,v);
      }
    };
  }
  
  template <typename Data_t, typename Func_t, typename Iter_t, typename Thread_t>
  reader<Data_t> generate(Func_t f, Iter_t b, Iter_t e, Thread_t t)
  {
    flow<Data_t> pair;

    while (b != e)
      {
        detail::generate2_lambda<Data_t,Func_t,Iter_t> lambda = 
          { pair.input(), f, *b++ };

        t (lambda);
      }

    return pair.output();
  }

  template <typename Data_t, typename Func_t, typename Iter_t>
  reader<Data_t> generate(Func_t f, Iter_t b, Iter_t e)
  {
    return generate<Data_t>(f, b, e, run_boost_thread);
  }

  // Process elements in a flow with possible reordering.
  // - Creates a thread, then immediately returns an reader
  //   corresponding to an writer of type 'Output_t' called 'dest'.
  // - The thread executes 'f(source, dest)', then exits.
  
  namespace detail
  {
    template<typename Input_t, typename Output_t, typename Func_t>
    struct process_lambda
    {
      writer<Output_t> o;
      reader<Input_t> s;
      Func_t f;
      
      void operator () () const
      {
        f(s,o);
      }
    };
  }

  template<typename Output_t, 
           typename Input_t, 
           typename Func_t, 
           typename Thread_t>
  reader<Output_t> process(const reader<Input_t> &source, Func_t f, Thread_t t)
  {
    flow<Output_t> pair;
    
    detail::process_lambda<Input_t, Output_t, Func_t> lambda = 
      { pair.input(), source, f };

    t (lambda);

    return pair.output();
  };

  template<typename Output_t, typename Input_t, typename Func_t>
  reader<Output_t> process(const reader<Input_t> &source, Func_t f)
  {
    return process<Output_t>(source, f, run_boost_thread);
  };

  // Transforms a flow into another. Parallelizes the process.
  // - Creates 'n' independent threads, then immediately returns
  //   an reader corresponding to an writer of type 'Output_t'
  //   called 'dest'.
  // - Every thread executes '*dest_it++ = f(*source_it++)' until 
  //   'source' is over.
  // - When 'source' is over, all threads exit and 'dest' is 
  //   destroyed.

  namespace detail
  {
    template<typename Input_t, typename Output_t, typename Func_t>
    struct transform_lambda
    {
      writer<Output_t> o;
      reader<Input_t> s;
      Func_t f;
      
      void operator () () const
      {
        writer<Output_t> o = this -> o;
        reader<Input_t> s = this -> s;
        typename reader<Input_t>::iterator it = s.begin();

        while (! o.silent() && it != s.end())
          { o.put(f(*it++)); }
      }
    };
  }
  
  template <typename Output_t, typename Input_t, typename Func_t, 
            typename Thread_t>
  reader<Output_t> transform(const reader<Input_t> &source, 
                             Func_t f, 
                             Thread_t t,
                             unsigned n = 1)
  {
    assert (n > 0);

    flow<Output_t> pair;

    for (int i = 0; i < n; ++i)
      {
        detail::transform_lambda<Input_t, Output_t, Func_t> lambda = 
          { pair.input(), source, f };
        
        t (lambda);
      }

    return pair.output();
  }

  template <typename Output_t, typename Input_t, typename Func_t>
  reader<Output_t> transform(const reader<Input_t> &source, 
                            Func_t f, 
                            unsigned n = 1)
  {
    return transform<Output_t>(source, f, run_boost_thread, n);
  }

  // Accumulates a flow into a value using another thread.
  // - Creates a new thread, then returns an reader.
  // - The thread executes 'initial = f(initial, *source_it++)' until
  //   'source' is over.
  // - When 'source' is over, the final value of 'initial' is written to 
  //   the returned reader, which is then closed. 

  namespace detail
  {
    template <typename Input_t, typename Output_t, typename Func_t>
    struct accumulate_lambda
    {
      writer<Output_t> o;
      reader<Input_t> s;
      Func_t f;
      Output_t data;

      void operator () () const
      {
        reader<Input_t> s = this -> s;
        writer<Output_t> o = this -> o;
        Output_t data = this -> data;

        typename reader<Input_t>::iterator it = s.begin();

        while (! o.silent() && it != s.end())
          data = f(data, *it++);

        o.put(data);
      }
    };
  }
  
  template <typename Input_t, typename Output_t, typename Func_t, 
            typename Thread_t>
  reader<Output_t> accumulate(const reader<Input_t> &source, 
                             Output_t initial, 
                             Func_t f,
                             Thread_t t)
  {
    flow<Output_t> pair;
    
    detail::accumulate_lambda<Input_t, Output_t, Func_t> lambda = 
      { pair.input(), source, f, initial };

    t (lambda);

    return pair.output();
  }

  template <typename Input_t, typename Output_t, typename Func_t>
  reader<Output_t> accumulate(const reader<Input_t> &source, 
                             Output_t initial, 
                             Func_t f)
  {
    return accumulate<Output_t>(source, initial, f, run_boost_thread);
  }

}

#endif
