
package uk.co.wingpath.modbus;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
import uk.co.wingpath.io.*;
import uk.co.wingpath.util.*;

/**
* This class is an implementation of ServiceFactory that creates Modbus
* services using a ModbusTransactionHandler.
*/

public class ModbusServiceFactory
    implements ServiceFactory
{
    private final ModbusTransactionHandler handler;
    private final PacketType packetType;
    private final boolean alwaysRespond;
    private final Reporter reporter;
    private final ModbusCounters counters;
    private final TracerFactory tracerFactory;
    private int responseDelay = 0;
    private String tracePrefix;
    private volatile int idleTimeout = 0;                  // seconds

    /**
    * Constructs a {@code ModbusServiceFactory}.
    * @param handler the handler to be used to process each request.
    * @param packetType the type of Modbus packet to be used for the
    * requests and responses.
    * @param alwaysRespond whether broadcast messages should be responded to,
    * and whether Modbus exceptions 11 and 12 should be sent.
    * This should normally be {@code true} for socket connections and
    * {@code false} for serial connections.
    * @param reporter handler for reporting recoverable errors (such as
    * error responses sent back to masters, and badly formatted received data).
    * @param counters diagnostic counters.
    */
    public ModbusServiceFactory (final ModbusTransactionHandler handler,
        PacketType packetType, boolean alwaysRespond, final Reporter reporter,
        ModbusCounters counters, TracerFactory tracerFactory)
    {
        if (reporter == null)
            throw new NullPointerException ("reporter must not be null");
        this.handler = handler;
        this.packetType = packetType;
        this.alwaysRespond = alwaysRespond;
        this.reporter = reporter;
        this.counters = counters;
        this.tracerFactory = tracerFactory;
        tracePrefix = "";
    }

    public void setTracePrefix (String prefix)
    {
        tracePrefix = prefix;
    }

    /**
    * Sets the idle timeout.
    * The connection will be closed and the service terminated if no data
    * has been received for timeout seconds.
    * Setting the timeout period to 0 disables the idle timeout.
    * @param timeout idle timeout period in seconds.
    */
    public void setIdleTimeout (int timeout)
    {
        idleTimeout = timeout;
    }

    /**
    * Provides a Modbus service.
    * <p>The service repeatedly receives a Modbus request from the
    * connection, passes the request to the handler to be processed,
    * and sends the response returned by the handler to the connection.
    */
    private class ModbusService
        implements Service
    {
        private final Connection connection;
        private final Tracer tracer;
        private final AtomicBoolean terminated;
        private final Thread readThread;
        private final Thread handleThread;
        private final Thread writeThread;
        private final LinkedList<ModbusTransaction> readyQueue;
        private final LinkedList<ModbusTransaction> activeList;
        private final LinkedList<ModbusTransaction> responseQueue;
        private final Object readyLock;
        private final Object activeLock;
        private final Object responseLock;

        ModbusService (Connection connection)
        {
            this.connection = connection;
            tracer = tracerFactory.createTracer (reporter);
            tracer.setPrefix (tracePrefix);
            // tracer.setConnection (connection);
            terminated = new AtomicBoolean (false);
            readyQueue = new LinkedList<ModbusTransaction> ();
            activeList = new LinkedList<ModbusTransaction> ();
            responseQueue = new LinkedList<ModbusTransaction> ();
            readyLock = new Object ();
            activeLock = new Object ();
            responseLock = new Object ();

            assert reporter.debug ("Service starting %s",
                connection.getName ());
            readThread = WThread.create (connection.getName () + " reader",
                reporter, new ReadServer ());
            readThread.start ();
            handleThread = WThread.create (connection.getName () + " handler",
                reporter, new HandleServer ());
            handleThread.start ();
            writeThread = WThread.create (connection.getName () + " writer",
                reporter, new WriteServer ());
            writeThread.start ();
        }

        /**
        * Checks whether we are already handling a request matching
        * the specified request.
        * @param request the request to be matched.
        * @return true if there is a matching request being handled.
        */
        private boolean isDuplicate (ModbusMessage request)
        {
            synchronized (readyLock)
            {
                for (ModbusTransaction t : readyQueue)
                {
                    if (t.getRequest ().equals (request))
                        return true;
                }
            }

            synchronized (activeLock)
            {
                for (ModbusTransaction t : activeList)
                {
                    if (t.getRequest ().equals (request))
                        return true;
                }
            }

            synchronized (responseLock)
            {
                for (ModbusTransaction t : responseQueue)
                {
                    if (t.getRequest ().equals (request))
                        return true;
                }
            }

            return false;
        }

        private class ReadServer
            implements Runnable
        {
            @Override
            public void run ()
            {
                try
                {
                    while (!terminated.get ())
                    {
                        if (Thread.interrupted ())
                        {
                            assert reporter.debug ("%s interrupted",
                                readThread.getName ());
                            break;
                        }

                        // Stop servicing the connection if it has been idle for
                        // too long. The connection is considered to be idle if
                        // we have not written any data to it recently.
                        // Reads are ignored for this purpose since we may be
                        // receiving requests/responses to/from other slaves.
                        if (idleTimeout != 0)
                        {
                            long timeLastWrite = connection.getTimeLastWrite ();
                            long now = System.nanoTime ();
                            int idleTime =
                                (int) ((now - timeLastWrite) / 1000000000L);
                            if (idleTime >= idleTimeout)
                            {
                                assert reporter.debug (
                                    "Connection %s idle for %d seconds",
                                    connection.getName (), idleTime);
                                break;
                            }
                        }

                        ModbusMessage request;
                        try
                        {
                            packetType.setTimeout (1000);
                            request = packetType.receiveRequest (
                                connection, tracer, counters);
                        }
                        catch (InterruptedIOException e)
                        {
                            // Timed out.
                            continue;
                        }
                        catch (RecoverableIOException e)
                        {
                            continue;
                        }

                        // Check whether we are already handling an identical
                        // request - if so, discard this request since it is
                        // presumably a retry.
                        if (isDuplicate (request))
                        {
                            request.traceDiscard (tracer, "S404", "Duplicate");
                            continue;
                        }

                        // Create a transaction for the request, arranging for
                        // the write thread to send the response.
                        ModbusTransaction trans = new ModbusTransaction (
                            request,
                            reporter,
                            new ModbusResponseHandler ()
                            {
                                public void handleResponse (ModbusTransaction t)
                                {
                                    synchronized (activeLock)
                                    {
                                        synchronized (responseLock)
                                        {
                                            activeList.remove (t);
                                            responseQueue.add (t);
                                            responseLock.notifyAll ();
                                        }
                                    }
                                }
                            });


                        assert reporter.debug ("%s: %d %s age %s",
                            readThread.getName (),
                            trans.getId (), trans.getState (),
                            Metric.formatNanoTime (trans.getAge ()));
                        assert !Thread.holdsLock (readyLock);
                        synchronized (readyLock)
                        {
                            readyQueue.add (trans);
                            readyLock.notifyAll ();
                        }
                    }
                }
                catch (InterruptedException e)
                {
                    assert reporter.debug ("%s interrupted",
                        readThread.getName ());
                }
                catch (EOFException e)
                {
                    // Should already have been reported.
                    assert reporter.debug ("%s connection closed",
                        readThread.getName ());
                }
                catch (IOException e)
                {
                    if (!terminated.get ())
                    {
                        reporter.error (Exceptions.getHelpId (e),
                            "%s: %s: %s",
                            readThread.getName (),
                            connection.getName (),
                            Exceptions.getMessage (e));
                    }
                }
                finally
                {
                    terminate ();
                }
                assert reporter.debug ("%s finished", readThread.getName ());
            }
        }

        private class HandleServer
            implements Runnable
        {
            @Override
            public void run ()
            {
                try
                {
                    while (!terminated.get ())
                    {
                        if (Thread.interrupted ())
                        {
                            assert reporter.debug ("%s interrupted",
                                handleThread.getName ());
                            break;
                        }

                        // The following call to 'sleep' was added to give the
                        // SerialConnection.setSerialPortParams method an
                        // opportunity to lock the connection. It was having to
                        // wait for an arbitrary period of time (apparently
                        // sometimes for ever under Linux) to acquire the lock.
                        Thread.sleep (1);

                        ModbusTransaction trans;
                        synchronized (readyLock)
                        {
                            if (readyQueue.isEmpty ())
                            {
                                readyLock.wait (200);
                                continue;
                            }
                            synchronized (activeLock)
                            {
                                trans = readyQueue.removeFirst ();
                                activeList.add (trans);
                            }
                        }

                        assert reporter.debug ("%s: %d %s age %s",
                            handleThread.getName (),
                            trans.getId (), trans.getState (),
                            Metric.formatNanoTime (trans.getAge ()));
                        handler.handleTransaction (trans);
                        assert reporter.debug ("%s: %d %s age",
                            handleThread.getName (),
                            trans.getId (), trans.getState (),
                            Metric.formatNanoTime (trans.getAge ()));
                    }
                }
                catch (InterruptedException e)
                {
                    assert reporter.debug ("%s interrupted",
                        handleThread.getName ());
                }
                finally
                {
                    terminate ();
                }
                assert reporter.debug ("%s finished", handleThread.getName ());
            }
        }

        /**
        * Sends a Modbus response
        */
        private void sendResponse (ModbusTransaction trans)
            throws IOException, InterruptedException
        {
            ModbusMessage response = trans.getResponse ();

            // If not using TCP, don't send reply to broadcast and don't
            // send 0x0A or 0x0B error responses.

            if (!alwaysRespond)
            {
                if (response.getSlaveId () == 0)
                {
                    if (response.isError ())
                    {
                        if (response.getErrorCode () != Modbus.ERROR_TIMED_OUT)
                            response.traceInterpretation ();
                    }
                    response.traceExplanation ("S402",
                        "Response not sent: Request was broadcast");
                    tracer.endTransaction ();
                    return;
                }
                if (response.isError ())
                {
                    int error = response.getErrorCode ();
                    if (error == Modbus.ERROR_PATH_UNAVAILABLE ||
                        error == Modbus.ERROR_TIMED_OUT)
                    {
                        response.traceInterpretation ();
                        response.traceExplanation ("S403",
                            "Response not sent: Exception 10/11");
                        tracer.endTransaction ();
                        return;
                    }
                }
            }

            if (responseDelay != 0)
            {
                int age = (int) (trans.getAge () / 1000000L);
                int delay = responseDelay - age;
                if (delay > 0)
                {
                    assert reporter.debug ("%s sleeping for %d ms",
                        writeThread.getName (), delay);
                    Thread.sleep (delay);
                }
            }
            packetType.send (connection, response);
            tracer.endTransaction ();
        }

        private class WriteServer
            implements Runnable
        {
            @Override
            public void run ()
            {
                try
                {
                    while (!terminated.get ())
                    {
                        if (Thread.interrupted ())
                        {
                            assert reporter.debug ("%s interrupted",
                                writeThread.getName ());
                            break;
                        }

                        ModbusTransaction trans;
                        assert !Thread.holdsLock (responseLock);
                        synchronized (responseLock)
                        {
                            if (responseQueue.isEmpty ())
                            {
                                responseLock.wait (200);
                                continue;
                            }
                            // Leave transaction on 'responseQueue' list while
                            // sending the response, so we can recognize
                            // duplicate requests.
                            trans = responseQueue.getFirst ();
                            assert reporter.debug ("%s: %d %s age %s",
                                writeThread.getName (),
                                trans.getId (), trans.getState (),
                                Metric.formatNanoTime (trans.getAge ()));
                        }
                        sendResponse (trans);
                        synchronized (responseLock)
                        {
                            responseQueue.removeFirst ();
                        }
                    }
                }
                catch (InterruptedException e)
                {
                    assert reporter.debug ("%s interrupted",
                        writeThread.getName ());
                }
                catch (EOFException e)
                {
                    // Should already have been reported.
                    assert reporter.debug ("%s connection closed",
                        writeThread.getName ());
                }
                catch (IOException e)
                {
                    if (!terminated.get ())
                    {
                        reporter.error (Exceptions.getHelpId (e),
                            "%s: %s: %s",
                            writeThread.getName (),
                            connection.getName (),
                            Exceptions.getMessage (e));
                    }
                }
                finally
                {
                    terminate ();
                }
                assert reporter.debug ("%s finished", writeThread.getName ());
            }
        }

        @Override
        public void terminate ()
        {
            if (terminated.getAndSet (true))
                return;
            assert reporter.debug ("ModbusService terminating %s",
                connection.getName ());
            connection.close ();
            tracerFactory.deleteTracer (tracer);
        }

        @Override
        public boolean isAlive ()
        {
            return !terminated.get ();
        }
    }

    /**
    * Creates a Modbus service for the supplied connection.
    * @param connection the connection on which the service is to be provided.
    */
    @Override
    public Service createService (Connection connection)
    {
        return new ModbusService (connection);
    }

    /**
    * Sets the period to delay by before sending a response.
    * @param delay response delay in milliseconds.
    */
    public void setResponseDelay (int delay)
    {
        responseDelay = delay;
    }
}



