
package uk.co.wingpath.io;

import java.io.*;
import java.net.*;
import uk.co.wingpath.event.*;
import uk.co.wingpath.util.*;

/**
* This class implements {@link Connection} using a TCP socket.
* <p>The methods in this class are thread-safe, and guarantee atomicity of
* reads & writes.
*/
public class SocketConnection
    implements Connection
{
    private final String remoteHost;
    private final int remotePort;
    private final String localHost;
    private final int localPort;
    private final Reporter reporter;

    // The following are declared volatile to avoid having to lock in their
    // respective access methods. Locking 'readLock' could take up to 200ms.
    private volatile Socket socket;
    private volatile long timeLastRead;
    private volatile long timeLastWrite;

    private InputStream input;
    private OutputStream output;
    private final Object readLock = new Object ();
    private final Object writeLock = new Object ();
    private String name = null;

    /**
    * Constructs a {@code SocketConnection} using the supplied socket.
    * <p>This constructor is intended for server use. For client
    * connections the {@link SocketConnection#SocketConnection(String,int,
    * String,int,Reporter)} constructor is preferred.
    * <p>TCP_NODELAY is enabled (i.e. Nagle's algorithm is disabled) on the
    * socket, and input and output streams are obtained from the socket.
    * <p>The {@link SocketConnection#open open} method may not be used with a
    * {@code SocketConnection} constructed using this constructor.
    * @param socket the socket to be used for the connection.
    * The socket must already be connected and not closed.
    * @param reporter where to report closing of connection.
    * @throws IOException if any of the socket methods throws an
    * {@code IOException}.
    */
    public SocketConnection (Socket socket, Reporter reporter)
        throws IOException
    {
        if (reporter == null)
            throw new NullPointerException ("reporter must not be null");
        this.localHost = null;
        this.localPort = 0;
        this.socket = socket;
        this.reporter = reporter;
        if (!socket.isConnected ())
            throw new IllegalArgumentException ("Socket is not connected");
        if (socket.isClosed ())
            throw new IllegalArgumentException ("Socket is closed");
        this.remoteHost =
            socket.getInetAddress ().getHostAddress ().toString ();
        this.remotePort = socket.getPort ();
        socket.setTcpNoDelay (true);
        input = socket.getInputStream ();
        output = socket.getOutputStream ();
        timeLastRead = timeLastWrite = System.nanoTime ();
    }

    /**
    * Constructs a {@code SocketConnection} using the supplied socket.
    * <p>This constructor is intended for server use. For client
    * connections the {@link SocketConnection#SocketConnection(String,int,
    * String,int,Reporter)} constructor is preferred.
    * <p>TCP_NODELAY is enabled (i.e. Nagle's algorithm is disabled) on the
    * socket, and input and output streams are obtained from the socket.
    * <p>The {@link SocketConnection#open open} method may not be used with a
    * {@code SocketConnection} constructed using this constructor.
    * @param socket the socket to be used for the connection.
    * The socket must already be connected and not closed.
    * @throws IOException if any of the socket methods throws an
    * {@code IOException}.
    */
    public SocketConnection (Socket socket)
        throws IOException
    {
        this (socket, new DummyReporter ());
    }

    /**
    * Constructs a SocketConnection using the specified host and port.
    * <p>This constructor is intended for client use. For server
    * connections use the {@link SocketConnection#SocketConnection(Socket)}
    * constructor.
    * <p>The connection must be opened using the
    * {@link SocketConnection#open open} method before reading or writing.
    * @param remoteHost the host to connect to.
    * @param remotePort the port to connect to.
    * @param localHost name or IP address of the local interface to be used.
    * A null or empty host will assign the wildcard address.
    * @param localPort number of local port to bind the connection to.
    * A port number of zero will let the system pick up an ephemeral port.
    * @param reporter where to report opening/closing of connection.
    */
    public SocketConnection (String remoteHost, int remotePort,
        String localHost, int localPort, Reporter reporter)
    {
        if (reporter == null)
            throw new NullPointerException ("reporter must not be null");
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.reporter = reporter;
        if (localHost != null && localHost.equals (""))
            localHost = null;
        this.localHost = localHost;
        this.localPort = localPort;
        socket = null;
        input = null;
        output = null;
        timeLastRead = timeLastWrite = System.nanoTime ();
    }

    /**
    * Opens the connection.
    * <p>TCP_NODELAY is enabled (i.e. Nagle's algorithm is disabled) on the
    * socket, and input and output streams are obtained from the socket.
    * @throws IOException if the connection cannot be opened.
    */
    @Override
    public void open ()
        throws IOException
    {
        if (remoteHost == null)
            throw new IllegalStateException ("Host/port not specified");
        reporter.trace (null, "Connecting to %s", getName ());
        synchronized (readLock)
        {
            synchronized (writeLock)
            {
                if (socket != null)
                    close ();
                socket = new Socket ();
                try
                {
                    InetAddress localAddr = localHost == null ? null :
                        InetAddress.getByName (localHost);
                    socket.bind (new InetSocketAddress (localAddr, localPort));
                }
                catch (UnknownHostException e)
                {
                    quietClose ();
                    throw new HIOException ("I111",
                        "Unknown local host: " + localHost);
                }
                catch (IOException e)
                {
                    quietClose ();
                    throw new HIOException ("I112", "Can't bind to port " +
                        localPort + ": " + e.getMessage ());
                }
                try
                {
                    socket.connect (
                        new InetSocketAddress (remoteHost, remotePort), 10000);
                }
                catch (UnknownHostException e)
                {
                    quietClose ();
                    throw new HIOException ("I115", "Unknown host: " + remoteHost);
                }
                catch (SocketTimeoutException e)
                {
                    quietClose ();
                    throw new HIOException ("I123",
                        "Can't connect to host '" + remoteHost + "' port " +
                            remotePort + ": " + e.getMessage ());
                }
                catch (ConnectException e)
                {
                    quietClose ();
                    throw new HIOException ("I122",
                        "Can't connect to host '" + remoteHost + "' port " +
                            remotePort + ": " + e.getMessage ());
                }
                catch (NoRouteToHostException e)
                {
                    quietClose ();
                    throw new HIOException ("I123",
                        "Can't connect to host '" + remoteHost + "': " +
                            e.getMessage ());
                }
                socket.setTcpNoDelay (true);
                input = socket.getInputStream ();
                output = socket.getOutputStream ();
                timeLastRead = timeLastWrite = System.nanoTime ();
            }
        }
        reporter.info (null, "Opened connection to %s", getName ());
    }

    @Override
    public boolean isOpen ()
    {
        return socket != null;
    }

    @Override
    public void write (byte [] data, int offset, int length)
        throws IOException
    {
        synchronized (writeLock)
        {
            if (socket == null)
            {
                throw new HEOFException ("I100",
                    "Connection " + getName () + " not open");
            }
            try
            {
                output.write (data, offset, length);
            }
            catch (IOException e)
            {
                close ();
                if (e.getMessage ().equals ("Connection reset"))
                {
                    throw new HEOFException ("I100",
                        "Connection " + getName () + " reset");
                }
                throw e;
            }
            timeLastWrite = System.nanoTime ();
        }
    }

    @Override
    public int read (byte [] data, int offset, int len,
            int timeout, boolean first)
        throws IOException, InterruptedException
    {
        if (len == 0)
            return 0;
        long startTime = System.nanoTime ();
        int count = 0;

        for (;;)
        {
            if (Thread.interrupted ())
                throw new InterruptedException ();
            synchronized (readLock)
            {
                if (socket == null)
                {
                    throw new HEOFException ("I100",
                        "Connection " + getName () + " not open");
                }
                int to = timeout -
                    (int) ((System.nanoTime () - startTime) / 1000000L);
                if (to <= 0)
                    throw new HInterruptedIOException ("I120", "Timed out");
                if (to > 200)
                    to = 200;
                socket.setSoTimeout (to);

                try
                {
                    int b = input.read ();
                    if (b < 0)
                    {
                        // Peer has closed the connection.
                        close ();
                        throw new HEOFException ("I100",
                            "Connection " + getName () + " closed by peer");
                    }
                    data [offset] = (byte) b;
                    offset++;
                    len--;
                    if (len == 0)
                        return 1;
                    count = 1;
                    timeLastRead = System.nanoTime ();

                    socket.setSoTimeout (1);

                    try
                    {
                        int n = input.available ();
                        if (n == 0)
                            return count;
                        if (n > len)
                            n = len;
                        n = input.read (data, offset, n);
                        if (n > 0)
                        {
                            count += n;
                            timeLastRead = System.nanoTime ();
                        }
                    }
                    catch (IOException e)
                    {
                    }

                    return count;
                }
                catch (SocketTimeoutException e)
                {
                }
                catch (IOException e)
                {
                    close ();
                    if (e.getMessage ().equals ("Connection reset"))
                    {
                        throw new HEOFException ("I100",
                            "Connection " + getName () + " reset");
                    }
                    throw e;
                }
            }
            Thread.sleep (1);
        }
    }

    @Override
    public long getTimeLastRead ()
    {
        return timeLastRead;
    }

    @Override
    public long getTimeLastWrite ()
    {
        return timeLastWrite;
    }

    @Override
    public void drain ()
    {
        // Note that "output.flush ()" is a no-op on a socket OutputStream.
        // Setting TCP_NODELAY should ensure that output is sent ASAP, but
        // there is no way to force output to be sent, or to wait until it has
        // been sent.
        // See http://www.unixguide.net/network/socketfaq/2.11.shtml.
    }

    @Override
    public void flush ()
    {
        // There is no way to discard buffered output, apart from closing
        // and re-opening the socket.
    }

    private void quietClose ()
    {
        try
        {
            socket.close ();
        }
        catch (Exception e)
        {
        }
        socket = null;
    }

    @Override
    public void close ()
    {
        synchronized (readLock)
        {
            synchronized (writeLock)
            {
                if (socket != null)
                {
                    quietClose ();
                    reporter.info (null, "Closed connection to %s", getName ());
                }
            }
        }
    }

    @Override
    public String getName ()
    {
        if (name != null && !name.equals (""))
            return name;
        if (remoteHost != null)
            return remoteHost + ":" + remotePort;
        if (socket != null)
        {
            SocketAddress addr = socket.getRemoteSocketAddress ();
            if (addr != null)
                return addr.toString ();
        }
        return "";
    }

    @Override
    public void setName (String name)
    {
        this.name = name;
    }
}

