
package uk.co.wingpath.io;

import java.io.*;
import java.net.*;
import uk.co.wingpath.event.*;
import uk.co.wingpath.util.*;

/**
* This class implements {@link Connection} using a UDP socket.
* <p>The methods in this class are thread-safe, but the synchronization
* does not guarantee atomicity of reads & writes. If this is needed, the
* recommended method is to synchronize on the UdpConnection instance.
*/
public class UdpConnection
    implements Connection
{
    private final String remoteHost;
    private final int remotePort;
    private final String localHost;
    private final int localPort;
    private final Reporter reporter;

    private static final int BUFSIZE = 2000;
    private volatile DatagramSocket socket;
    private final byte [] recvBuf;
    private int recvOffset;
    private int recvLength;
    private SocketAddress remoteAddress;
    private final Object lock = new Object ();
    private String name = null;

    /**
    * Constructs a {@code UdpConnection} using the supplied UDP socket.
    * <p>This constructor is intended for server use. For client
    * connections the {@link UdpConnection#UdpConnection(String,int,
    * String,int,Reporter)} constructor is preferred.
    * @param socket the socket to be used for the connection.
    * The socket must not be closed.
    * @param reporter where to report closing of connection.
    * {@code null} if closing is not to be reported.
    */
    public UdpConnection (DatagramSocket socket, Reporter reporter)
    {
        this.remoteHost = null;
        this.remotePort = 0;
        this.localHost = null;
        this.localPort = 0;
        this.socket = socket;
        this.reporter = reporter;
        if (socket.isClosed ())
            throw new IllegalArgumentException ("Socket is closed");
        recvBuf = new byte [BUFSIZE];
        recvOffset = 0;
        recvLength = 0;
        remoteAddress = socket.getRemoteSocketAddress ();
    }

    /**
    * Constructs a {@code UdpConnection} using the supplied UDP socket.
    * <p>This constructor is intended for server use. For client
    * connections the {@link UdpConnection#UdpConnection(String,int,
    * String,int,Reporter)} constructor is preferred.
    * @param socket the socket to be used for the connection.
    * The socket must not be closed.
    * {@code null} if closing is not to be reported.
    */
    public UdpConnection (DatagramSocket socket)
    {
        this (socket, null);
    }

    /**
    * Constructs a SocketConnection using the specified host, port, local host
    * and local port.
    * <p>This constructor is intended for client use. For server
    * connections use the {@link UdpConnection#UdpConnection(DatagramSocket)}
    * constructor.
    * <p>The connection must be opened using the
    * {@link UdpConnection#open open} method before reading or writing.
    * @param remoteHost the host to send messages to.
    * @param remotePort the port to send messages to.
    * @param localHost name or IP address of the local interface to be used.
    * A null or empty host will assign the wildcard address.
    * @param localPort number of local port to bind the connection to.
    * A port number of zero will let the system pick up an ephemeral port.
    * @param reporter where to report opening/closing of connection.
    * {@code null} if opening/closing is not to be reported.
    */
    public UdpConnection (String remoteHost, int remotePort, String localHost,
        int localPort, Reporter reporter)
    {
        this.remoteHost = remoteHost;
        this.remotePort = remotePort;
        this.reporter = reporter;
        if (localHost != null && localHost.equals (""))
            localHost = null;
        this.localHost = localHost;
        this.localPort = localPort;
        socket = null;
        recvBuf = new byte [BUFSIZE];
    }

    /**
    * Opens the connection.
    * @throws IOException if the connection cannot be opened.
    */
    public void open ()
        throws IOException
    {
        if (remoteHost == null)
            throw new IllegalStateException ("Host/port not specified");
        synchronized (lock)
        {
            if (socket != null)
            {
                socket.close ();
                socket = null;
            }
            try
            {
                InetAddress localAddr = localHost == null ? null :
                    InetAddress.getByName (localHost);
                socket = new DatagramSocket (
                    new InetSocketAddress (localAddr, localPort));
            }
            catch (UnknownHostException e)
            {
                throw new HIOException ("I111",
                    "Unknown local host: " + localHost);
            }
            catch (IOException e)
            {
                throw new HIOException ("I112", "Can't bind to port " +
                    localPort + ": " + e.getMessage ());
            }
            try
            {
                socket.connect (InetAddress.getByName (remoteHost), remotePort);
            }
            catch (UnknownHostException e)
            {
                socket.close ();
                socket = null;
                throw new HIOException ("I115", "Unknown host: " + remoteHost);
            }
            recvOffset = 0;
            recvLength = 0;
            remoteAddress = socket.getRemoteSocketAddress ();
        }
        if (reporter != null)
        {
            reporter.info (null,
                "Will send requests to host " + remoteHost + " port " +
                    remotePort);
        }
    }

    public boolean isOpen ()
    {
        return socket != null;
    }

    public int read (byte [] data, int offset, int len,
            int timeout, boolean first)
        throws IOException, InterruptedException
    {
        Event.checkIsNotEventDispatchThread ();
        if (len == 0)
            return 0;

        if (first)
        {
            for (;;)
            {
                if (Thread.interrupted ())
                    throw new InterruptedException ();
                synchronized (lock)
                {
                    if (socket == null)
                        throw new HEOFException ("I100", "Connection closed");
                    int to = timeout > 200 ? 200 : timeout;
                    socket.setSoTimeout (to);

                    try
                    {
                        DatagramPacket packet =
                            new DatagramPacket (recvBuf, BUFSIZE);
                        socket.receive (packet);
                        if (!socket.isConnected ())
                            remoteAddress = packet.getSocketAddress ();
                        recvOffset = packet.getOffset ();
                        recvLength = packet.getLength ();
                        if (recvLength > 0)
                            break;
                    }
                    catch (SocketTimeoutException e)
                    {
                        timeout -= to;
                        if (timeout <= 0)
                        {
                            throw new HInterruptedIOException ("I120",
                                "Timed out");
                        }
                    }
                    catch (PortUnreachableException e)
                    {
                        socket.close ();
                        socket = null;
                        throw new HIOException ("I124", "UDP port unreachable");
                    }
                }
                Thread.yield ();
            }
        }

        if (recvLength == 0)
            throw new HInterruptedIOException ("I120", "Timed out");
        if (len > recvLength)
            len = recvLength;
        System.arraycopy (recvBuf, recvOffset, data, offset, len);
        recvOffset += len;
        recvLength -= len;
        return len;
    }

    public void write (byte [] data, int offset, int length)
        throws IOException
    {
        Event.checkIsNotEventDispatchThread ();
        synchronized (lock)
        {
            if (socket == null)
                throw new HEOFException ("I100", "Connection closed");
            if (remoteAddress == null)
                throw new IllegalStateException ("Not connected");
            DatagramPacket sendPacket = new DatagramPacket (
                data, offset, length, remoteAddress);
            try
            {
                socket.send (sendPacket);
            }
            catch (PortUnreachableException e)
            {
                socket.close ();
                socket = null;
                throw new HIOException ("I124", "UDP port unreachable");
            }
        }
    }

    public void drain ()
    {
    }

    public void flush ()
    {
    }

    public byte [] discardInput ()
    {
        Event.checkIsNotEventDispatchThread ();
        if (socket == null)
            return null;
        if (recvLength > 0)
        {
            byte [] data = new byte [recvLength];
            System.arraycopy (recvBuf, recvOffset, data, 0, recvLength);
            recvOffset += recvLength;
            recvLength = 0;
            return data;
        }
        return null;
    }

    public void close ()
    {
        Event.checkIsNotEventDispatchThread ();
        synchronized (lock)
        {
            if (socket != null)
            {
                try
                {
                    socket.close ();
                }
                catch (Exception e)
                {
                }
                socket = null;
            }
        }
    }

    public String getName ()
    {
        if (name != null)
            return name;
        if (remoteHost != null)
            return remoteHost + ":" + remotePort;
        if (socket != null)
        {
            if (remoteAddress != null)
                return remoteAddress.toString ();
        }
        return "";
    }

    public void setName (String name)
    {
        this.name = name;
    }
}

