
package uk.co.wingpath.io;

import java.io.*;
import java.net.*;
import uk.co.wingpath.event.*;
import uk.co.wingpath.util.*;

/**
* This class implements {@link Connection} using a UDP socket.
* <p>The methods in this class are thread-safe, and guarantee atomicity of
* reads & writes.
*/
public class UdpConnection
    implements Connection
{
    private final String remoteHost;
    private final int remotePort;
    private final String localHost;
    private final int localPort;
    private final Reporter reporter;

    private static final int BUFSIZE = 2000;

    // The following are declared volatile to avoid having to lock in their
    // respective access methods. Locking 'readLock' could take up to 200ms.
    private volatile DatagramSocket socket;
    private volatile long timeLastRead;
    private volatile long timeLastWrite;

    private final byte [] recvBuf;
    private int recvOffset;
    private int recvLength;
    private SocketAddress remoteAddress;
    private final Object readLock = new Object ();
    private final Object writeLock = new Object ();
    private String name = null;

    /**
    * Constructs a {@code UdpConnection} using the specified interface and
    * port.
    * <p>This constructor is intended for client use.
    * <p>The connection is opened by this constructor.
    * @param host name of interface on which to listen for requests.
    * May be an empty string if any interface may be used.
    * @param port port on which to listen for requests.
    * @param reporter where to report closing of connection.
    */
    public UdpConnection (String host, int port, Reporter reporter)
        throws IOException
    {
        if (reporter == null)
            throw new NullPointerException ("reporter must not be null");
        remoteHost = null;
        remotePort = 0;
        localHost = host;
        localPort = port;
        this.reporter = reporter;

        try
        {
            if (!host.equals (""))
                socket = new DatagramSocket (port,
                    InetAddress.getByName (host));
            else
                socket = new DatagramSocket (port);
        }
        catch (BindException e)
        {
            if (host.equals (""))
            {
                throw new HIOException ("I113", "Can't listen on port " + port);
            }
            else
            {
                throw new HIOException ("I114",
                    "Can't listen on interface '" + host + "' port " + port);
            }
        }
        catch (UnknownHostException e)
        {
            throw new HIOException ("I115", "Unknown host: " + host);
        }
        if (socket.isClosed ())
            throw new IllegalArgumentException ("Socket is closed");
        recvBuf = new byte [BUFSIZE];
        recvOffset = 0;
        recvLength = 0;
        remoteAddress = socket.getRemoteSocketAddress ();
        timeLastRead = timeLastWrite = System.nanoTime ();
    }

    /**
    * Constructs a SocketConnection using the specified host, port, local host
    * and local port.
    * <p>This constructor is intended for client use.
    * <p>The connection must be opened using the
    * {@link UdpConnection#open open} method before reading or writing.
    * @param remoteHost the host to send messages to.
    * @param remotePort the port to send messages to.
    * @param localHost name or IP address of the local interface to be used.
    * A null or empty host will assign the wildcard address.
    * @param localPort number of local port to bind the connection to.
    * A port number of zero will let the system pick up an ephemeral port.
    * @param reporter where to report opening/closing of connection.
    */
    public UdpConnection (String remoteHost, int remotePort, String localHost,
        int localPort, Reporter reporter)
    {
        if (reporter == null)
            throw new NullPointerException ("reporter must not be null");
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.reporter = reporter;
        if (localHost != null && localHost.equals (""))
            localHost = null;
        this.localHost = localHost;
        this.localPort = localPort;
        socket = null;
        recvBuf = new byte [BUFSIZE];
        timeLastRead = timeLastWrite = System.nanoTime ();
    }

    /**
    * Opens the connection.
    * @throws IOException if the connection cannot be opened.
    */
    @Override
    public void open ()
        throws IOException
    {
        if (remoteHost == null)
            throw new IllegalStateException ("Host/port not specified");
        synchronized (readLock)
        {
            synchronized (writeLock)
            {
                if (socket != null)
                {
                    socket.close ();
                    socket = null;
                }
                try
                {
                    InetAddress localAddr = localHost == null ? null :
                        InetAddress.getByName (localHost);
                    socket = new DatagramSocket (
                        new InetSocketAddress (localAddr, localPort));
                }
                catch (UnknownHostException e)
                {
                    throw new HIOException ("I111",
                        "Unknown local host: " + localHost);
                }
                catch (IOException e)
                {
                    throw new HIOException ("I112", "Can't bind to port " +
                        localPort + ": " + e.getMessage ());
                }
                try
                {
                    socket.connect (InetAddress.getByName (remoteHost),
                        remotePort);
                }
                catch (UnknownHostException e)
                {
                    socket.close ();
                    socket = null;
                    throw new HIOException ("I115",
                        "Unknown host: " + remoteHost);
                }
                recvOffset = 0;
                recvLength = 0;
                remoteAddress = socket.getRemoteSocketAddress ();
            }
        }
        timeLastRead = timeLastWrite = System.nanoTime ();
        reporter.info (null, "Started UDP to %s", getName ());
    }

    @Override
    public boolean isOpen ()
    {
        return socket != null;
    }

    @Override
    public long getTimeLastRead ()
    {
        return timeLastRead;
    }

    @Override
    public long getTimeLastWrite ()
    {
        return timeLastWrite;
    }

    private void throwConnectionClosed ()
        throws HEOFException
    {
        throw new HEOFException ("I100",
            "UDP " + getName () + " closed");
    }

    @Override
    public int read (byte [] data, int offset, int len,
            int timeout, boolean first)
        throws IOException, InterruptedException
    {
        if (len == 0)
            return 0;

        if (first)
        {
            long startTime = System.nanoTime ();

            for (;;)
            {
                if (Thread.interrupted ())
                    throw new InterruptedException ();
                synchronized (readLock)
                {
                    if (socket == null)
                        throwConnectionClosed ();
                    int to = timeout -
                        (int) ((System.nanoTime () - startTime) / 1000000L);
                    if (to < 0)
                    {
                        throw new HInterruptedIOException ("I120",
                            "Timed out");
                    }
                    if (to == 0)
                        to = 1;
                    else if (to > 200)
                        to = 200;
                    socket.setSoTimeout (to);

                    try
                    {
                        DatagramPacket packet =
                            new DatagramPacket (recvBuf, BUFSIZE);
                        socket.receive (packet);
                        if (!socket.isConnected ())
                            remoteAddress = packet.getSocketAddress ();
                        recvOffset = packet.getOffset ();
                        recvLength = packet.getLength ();
                        if (recvLength > 0)
                            break;
                    }
                    catch (SocketTimeoutException e)
                    {
                    }
                    catch (PortUnreachableException e)
                    {
                        socket.close ();
                        socket = null;
                        throw new HIOException ("I124", "UDP port unreachable");
                    }
                }
                Thread.sleep (1);
            }
        }

        if (recvLength == 0)
            throw new HInterruptedIOException ("I120", "Timed out");
        if (len > recvLength)
            len = recvLength;
        System.arraycopy (recvBuf, recvOffset, data, offset, len);
        recvOffset += len;
        recvLength -= len;
        timeLastRead = System.nanoTime ();
        return len;
    }

    @Override
    public void write (byte [] data, int offset, int length)
        throws IOException
    {
        synchronized (writeLock)
        {
            if (socket == null)
                throwConnectionClosed ();
            if (remoteAddress == null)
                throw new IllegalStateException ("Not connected");
            DatagramPacket sendPacket = new DatagramPacket (
                data, offset, length, remoteAddress);
            try
            {
                socket.send (sendPacket);
                timeLastWrite = System.nanoTime ();
            }
            catch (PortUnreachableException e)
            {
                socket.close ();
                socket = null;
                throw new HIOException ("I124", "UDP port unreachable");
            }
        }
    }

    @Override
    public void drain ()
    {
    }

    @Override
    public void flush ()
    {
    }

    @Override
    public void close ()
    {
        synchronized (readLock)
        {
            synchronized (writeLock)
            {
                if (socket != null)
                {
                    try
                    {
                        socket.close ();
                    }
                    catch (Exception e)
                    {
                    }
                    socket = null;
                    reporter.info (null, "Stopped UDP to %s", getName ());
                }
            }
        }
    }

    @Override
    public String getName ()
    {
        if (name != null && !name.equals (""))
            return name;
        if (remoteHost != null)
            return remoteHost + ":" + remotePort;
        if (socket != null)
        {
            if (remoteAddress != null)
                return remoteAddress.toString ();
        }
        return "";
    }

    @Override
    public void setName (String name)
    {
        this.name = name;
    }
}

