[ Team LiB ] Previous Section Next Section

6.10 A Multiplexed Network Client

We end this chapter with Example 6-14, a complex class named HttpDownloadManager. As its name implies, this class is a client-side utility that manages any number of concurrent HTTP downloads on a background thread. To download something, just call the download( ) method, passing a java.net.URI and an optional Listener object to be notified when the download is complete or has aborted with an error. download( ) does not block: it returns a Download object immediately, and you can monitor the status of the download by polling the methods of this object. The data that is downloaded is not saved to a file, but is available from the getData( ) method of the Download object.

The Listener and Download interfaces are defined as inner classes of HttpDownloadManager. The Status class is another inner class: it is a type-safe enumeration of download states returned by Download.getStatus( ). Two other inner classes used in this example are DownloadImpl, the package-private concrete implementation of the Download interface, and Test, a simple test program that demonstrates the usage of the HttpDownloadManager.

HttpDownloadManager extends Thread, and the downloads are handled in this background thread. Multiple downloads can be handled concurrently (this is useful when the client has more bandwidth available than the servers which are being downloaded from) because the thread uses a Selector to multiplex the SocketChannel objects from which data is being read: the select( ) call wakes up whenever data is ready to be read from one of the pending downloads. Most of the interesting code is found in the run( ) method, which is the body of the background thread.

We've seen basic channel multiplexing code with Selector.select( ) in previous examples. This one demonstrates three new features. The first is the call to wakeup( ) in the download( ) method. A new channel cannot be registered with a Selector while that selector is blocked in a select( ) call in the background thread. So the download( ) method creates a DownloadImpl object containing all the information about the download, places this object in a synchronized list of pending downloads, and then calls wakeup( ) and returns the Download object to the caller. The wakeup( ) call in download( ) causes the background thread to stop blocking in select( ). In other examples, we've immediately checked the selectedKeys( ) of a Selector when its select( ) method returns. In this case, we first look at the pendingDownloads list and create and register a SocketChannel for any DownloadImpl objects found there.

The second new feature of interest in this example is that it performs asynchronous connection. Before an HTTP GET request can be sent to a web server, the client must connect to the server. Establishing a TCP connection over the Internet can sometimes take a second or two, and we don't want the background thread to block while this connection is set up. So when the thread detects a pending download, it creates a SocketChannel in an unconnected state. It then puts the channel into nonblocking mode and registers it with the Selector object, indicating that the Selector should monitor the channel for readiness to connect as well as readiness to read. Only after registering the channel does the thread call the connect( ) method and supply the address and port to connect to. Since the channel is nonblocking, connect( ) returns immediately. When the connection is ready, select( ) wakes up and the thread calls finishConnect( ) on the channel to complete the connection. (After completing the connection, the thread immediately sends the HTTP GET request across the channel. It assumes that the channel is writable and that the complete text of the request can be written quickly; if this assumption fails, the thread will end up busy-waiting while it repeatedly attempts to send the request to the server.)

The third new feature demonstrated by this example is that when it registers a SocketChannel with the Selector, it uses a three-argument version of the register( ) call to associate the DownloadImpl object with the SelectionKey for the channel. Then, when the key becomes connectable or readable, the background thread can retrieve the channel and the state object associated with the key.

Finally, this example also demonstrates the logging API of java.util.logging. We'll discuss logging in the subsection that follows the example code.

Example 6-14. HttpDownloadManager.java
package je3.nio;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
import java.util.logging.*;

/**
 * This class manages asynchronous HTTP GET downloads and demonstrates
 * non-blocking I/O with SocketChannel and Selector and also demonstrates
 * logging with the java.util.logging package.  This example uses a number
 * of inner classes and interfaces.
 * 
 * Call download( ) for each HTTP GET request you want to issue.  You may
 * optionally pass a Listener object that will be notified when the download
 * terminates or encounters an exception.  download( ) returns a Download object
 * which holds the downloaded bytes (including HTTP headers) and which allows
 * you to poll the Status of the download.  Call release( ) when there are 
 * no more downloads.
 */
public class HttpDownloadManager extends Thread {
    // An enumerated type.  Values are returned by Download.getStatus( )
    public static class Status {
        // We haven't connected to the server yet
        public static final Status UNCONNECTED = new Status("Unconnected");
        // We're connected to the server, sending request or receiving response
        public static final Status CONNECTED = new Status("Connected");
        // Response has been received.  Response may have been an HTTP error
        public static final Status DONE = new Status("Done");
        // Something went wrong: bad hostname, for example.
        public static final Status ERROR = new Status("Error");

        private final String name;
        private Status(String name) { this.name = name; }
        public String toString( ) { return name; }
    }

    // Everything you need to know about a pending download
    public interface Download {
        public String getHost( );   // Hostname we're downloading from
        public int getPort( );      // Defaults to port 80
        public String getPath( );   // includes query string as well
        public Status getStatus( ); // Status of the download
        public byte[  ] getData( );   // Download data, including response headers
        public int getHttpStatus( );// Only call when status is DONE
    }

    // Implement this interface if you want to know when a download completes
    public interface Listener {
        public void done(Download download);
        public void error(Download download, Throwable throwable);
    }

    Selector selector;          // For multiplexing non-blocking I/O.
    ByteBuffer buffer;          // A shared buffer for downloads
    List pendingDownloads;      // Downloads that don't have a Channel yet
    boolean released = false;   // Set when the release( ) method is called.
    Logger log;                 // Logging output goes here

    // The HTTP protocol uses this character encoding
    static final Charset LATIN1 = Charset.forName("ISO-8859-1");

    public HttpDownloadManager(Logger log) throws IOException {
        if (log == null) log = Logger.getLogger(this.getClass( ).getName( ));
        this.log = log;
        selector = Selector.open( );                  // create Selector
        buffer = ByteBuffer.allocateDirect(64*1024); // allocate buffer
        pendingDownloads = Collections.synchronizedList(new ArrayList( ));
        this.start( );                                // start thread
    }

    // Ask the HttpDownloadManager to begin a download.  Returns a Download
    // object that can be used to poll the progress of the download.  The
    // optional Listener object will be notified when the download completes
    // or aborts.
    public Download download(URI uri, Listener l) 
           throws IOException
    {
        if (released)
           throw new IllegalStateException("Can't download( ) after release( )");

        // Get info from the URI
        String scheme = uri.getScheme( );
        if (scheme == null || !scheme.equals("http"))
            throw new IllegalArgumentException("Must use 'http:' protocol");
        String hostname = uri.getHost( );
        int port = uri.getPort( );
        if (port == -1) port = 80; // Use default port if none specified
        String path = uri.getRawPath( );
        if (path == null || path.length( ) == 0) path = "/";
        String query = uri.getRawQuery( );
        if (query != null) path += "?" + query;

        // Create a Download object with the pieces of the URL
        Download download = new DownloadImpl(hostname, port, path, l);

        // Add it to the list of pending downloads. This is a synchronized list
        pendingDownloads.add(download);

        // And ask the thread to stop blocking in the select( ) call so that
        // it will notice and process this new pending Download object.
        selector.wakeup( );

        // Return the Download so that the caller can monitor it if desired.
        return download;
    }

    public void release( ) {
        released = true; // The thread will terminate when it notices the flag.
        try { selector.close( ); } // This will wake the thread up
        catch(IOException e) {
            log.log(Level.SEVERE, "Error closing selector", e);
        }
    }

    public void run( ) {
        log.info("HttpDownloadManager thread starting.");

        // The download thread runs until release( ) is called
        while(!released) {
            // The thread blocks here waiting for something to happen
            try { selector.select( ); }
            catch(IOException e) {
                // This should never happen.
                log.log(Level.SEVERE, "Error in select( )", e);
                return;
            }

            // If release( ) was called, the thread should exit.
            if (released) break;

            // If any new Download objects are pending, deal with them first
            if (!pendingDownloads.isEmpty( )) {
                // Although pendingDownloads is a synchronized list, we still
                // need to use a synchronized block to iterate through its
                // elements to prevent a concurrent call to download( ).
                synchronized(pendingDownloads) {
                    Iterator iter = pendingDownloads.iterator( );
                    while(iter.hasNext( )) {
                        // Get the pending download object from the list
                        DownloadImpl download = (DownloadImpl)iter.next( );
                        iter.remove( );  // And remove it.

                        // Now begin an asynchronous connection to the 
                        // specified host and port.  We don't block while
                        // waiting to connect.
                        SelectionKey key = null;
                        SocketChannel channel = null;
                        try {
                            // Open an unconnected channel
                            channel = SocketChannel.open( );
                            // Put it in non-blocking mode
                            channel.configureBlocking(false);
                            // Register it with the selector, specifying that
                            // we want to know when it is ready to connect
                            // and when it is ready to read.
                            key = channel.register(selector,
                                                   SelectionKey.OP_READ | 
                                                   SelectionKey.OP_CONNECT,
                                                   download);
                            // Create the web server address
                            SocketAddress address = 
                                new InetSocketAddress(download.host,
                                                      download.port);
                            // Ask the channel to start connecting
                            // Note that we don't send the HTTP request yet.
                            // We'll do that when the connection completes.
                            channel.connect(address);
                        }
                        catch(Exception e) {
                            handleError(download, channel, key, e);
                        }
                    }
                }
            }

            // Now get the set of keys that are ready for connecting or reading
            Set keys = selector.selectedKeys( );
            if (keys == null) continue; // bug workaround; should not be needed
            // Loop through the keys in the set
            for(Iterator i = keys.iterator( ); i.hasNext( ); ) {
                SelectionKey key = (SelectionKey)i.next( );
                i.remove( );  // Remove the key from the set before handling

                // Get the Download object we attached to the key
                DownloadImpl download = (DownloadImpl) key.attachment( );
                // Get the channel associated with the key.
                SocketChannel channel = (SocketChannel)key.channel( );

                try {
                    if (key.isConnectable( )) {  
                        // If the channel is ready to connect, complete the
                        // connection and then send the HTTP GET request to it.
                        if (channel.finishConnect( )) {
                            download.status = Status.CONNECTED;
                            // This is the HTTP request we send
                            String request =
                                "GET " + download.path + " HTTP/1.1\r\n" +
                                "Host: " + download.host + "\r\n" +
                                "Connection: close\r\n" +
                                "\r\n";
                            // Wrap in a CharBuffer and encode to a ByteBuffer
                            ByteBuffer requestBytes =
                                LATIN1.encode(CharBuffer.wrap(request));
                            // Send the request to the server.  If the bytes
                            // aren't all written in one call, we busy loop!
                            while(requestBytes.hasRemaining( ))
                                channel.write(requestBytes);

                            log.info("Sent HTTP request: " + download.host + 
                                     ":" + download.port + ": " + request);
                        }
                    }
                    if (key.isReadable( )) {
                        // If the key indicates that there is data to be read,
                        // then read it and store it in the Download object.
                        int numbytes = channel.read(buffer);
                        
                        // If we read some bytes, store them, otherwise
                        // the download is complete and we need to note this
                        if (numbytes != -1) {
                            buffer.flip( );  // Prepare to drain the buffer
                            download.addData(buffer); // Store the data
                            buffer.clear( ); // Prepare for another read
                            log.info("Read " + numbytes + " bytes from " +
                                     download.host + ":" + download.port);
                        }
                        else {
                            // If there are no more bytes to read
                            key.cancel( );     // We're done with the key
                            channel.close( );  // And with the channel.
                            download.status = Status.DONE; 
                            if (download.listener != null)  // notify listener
                                download.listener.done(download);
                            log.info("Download complete from " +
                                     download.host + ":" + download.port);
                        }
                    }
                }
                catch (Exception e) {
                    handleError(download, channel, key, e);
                }
            }
        }
        log.info("HttpDownloadManager thread exiting.");
    }

    // Error-handling code used by the run( ) method: 
    // set status, close channel, cancel key, log error, notify listener.
    void handleError(DownloadImpl download, SocketChannel channel,
                     SelectionKey key, Throwable throwable)
    {
        download.status = Status.ERROR;
        try {if (channel != null) channel.close( );} catch(IOException e) {  }
        if (key != null) key.cancel( );
        log.log(Level.WARNING,
                "Error connecting to or downloading from " + download.host +
                ":" + download.port,
                throwable);
        if (download.listener != null)
            download.listener.error(download, throwable);
    }

    // This is the Download implementation we use internally.
    static class DownloadImpl implements Download {
        final String host;     // Final fields are immutable for thread-safety
        final int port;
        final String path;
        final Listener listener;
        volatile Status status; // Volatile fields may be changed concurrently
        volatile byte[  ] data = new byte[0];

        DownloadImpl(String host, int port, String path, Listener listener) {
            this.host = host;
            this.port = port;
            this.path = path;
            this.listener = listener;
            this.status = Status.UNCONNECTED;  // Set initial status
        }

        // These are the basic getter methods
        public String getHost( ) { return host; }
        public int getPort( ) { return port; }
        public String getPath( ) { return path; }
        public Status getStatus( ) { return status; }
        public byte[  ] getData( ) { return data; }

        /**
         * Return the HTTP status code for the download.
         * Throws IllegalStateException if status is not Status.DONE
         */
        public int getHttpStatus( ) {
            if (status != Status.DONE) throw new IllegalStateException( );
            // In HTTP 1.1, the return code is in ASCII bytes 10-12.
            return
                (data[9] - '0') * 100 +
                (data[10]- '0') * 10 +
                (data[11]- '0') * 1;
        }

        // Used internally when we read more data.
        // This should use a larger buffer to prevent frequent re-allocation.
        void addData(ByteBuffer buffer) {
            assert status == Status.CONNECTED;  // only called during download
            int oldlen = data.length;           // How many existing bytes
            int numbytes = buffer.remaining( );  // How many new bytes
            int newlen = oldlen + numbytes;
            byte[  ] newdata = new byte[newlen];  // Create new array
            System.arraycopy(data, 0, newdata, 0, oldlen); // Copy old bytes
            buffer.get(newdata, oldlen, numbytes);         // Copy new bytes
            data = newdata;                     // Save new array
        }
    }

    // This class demonstrates a simple use of HttpDownloadManager.
    public static class Test {
        static int completedDownloads = 0;

        public static void main(String args[  ])
            throws IOException, URISyntaxException
        {
            // With a -v argument, our logger will display lots of messages
            final boolean verbose = args[0].equals("-v");
            int firstarg = 0;
            Logger logger = Logger.getLogger(Test.class.getName( ));

            if (verbose) {
                firstarg = 1;
                logger.setLevel(Level.INFO);
            }
            else                       // regular output
                logger.setLevel(Level.WARNING);
            
            // How many URLs are on the command line?
            final int numDownloads = args.length - firstarg;
            // Create the download manager
            final HttpDownloadManager dm = new HttpDownloadManager(logger);
            // Now loop through URLs and call download( ) for each one
            // passing a listener object to receive notifications
            for(int i = firstarg; i < args.length; i++) {
                URI uri = new URI(args[i]);
                dm.download(uri, 
                    new Listener( ) {
                            public void done(Download d) {
                                System.err.println("DONE: " + d.getHost( ) +
                                                   ": " + d.getHttpStatus( ));
                                // If all downloads are complete, we're done
                                // with the HttpDownloadManager thread.
                                if (++completedDownloads == numDownloads)
                                    dm.release( );
                            }
                            public void error(Download d, Throwable t) {
                                System.err.println(d.getHost( ) + ": " + t);
                                if (++completedDownloads == numDownloads)
                                    dm.release( );
                            }
                        });
            }
        }
    }
}

6.10.1 Logging in HttpDownloadManager

HttpDownloadManager uses a Logger object to log informational and error messages. It demonstrates the info( ) and log( ) methods of Logger; see the Logger documentation for descriptions of many other logging methods. HttpDownloadManager can use a Logger passed to it, or it can obtain its own. There are examples of obtaining a Logger object in both the constructor method and the inner Test class. The Test class also implements a -v "verbose" switch to demonstrate how to set the logging threshold of a Logger. Informational logging messages will be discarded unless the -v option is specified.

The java.util.logging package allows flexible runtime configuration of how logging is done. In most installations, the default is to print logging messages to the console. See the file jre/lib/logging.properties for the default configuration on your installation. You can override this default configuration with your own properties file: define the property java.util.logging.config.file with the -D option when you start the Java VM. For example, to run the HttpDownloadManager test program using a logging configuration specified in a file named log.props, you'd use a command line like this one (it has been word wrapped to fit on the page; you'd type it on one line):

java -Djava.util.logging.config.file=log.props
je3.nio.HttpDownloadManager\$Test -v ...urls here...

Note that we use the -v switch so that the program actually generates log messages. Without this switch, you'd have to purposely specify bad URLs so that the program would log some errors. The following listing is a sample logging configuration file, which you'll find, along with the HttpDownloadManager source code, in a file named log.props:

#
# A logging properties file for the HttpDownloadManager example
# See also jre/lib/logging.properties in your Java installation.
# Use this file by specifying it as the value of a property named
# java.util.logging.config.file.  For example:
#
# java -Djava.util.logging.config.file je3.nio.Http...
#

# This property specifies the default logging level for log messages
# sent by our program.  Note, however, that if you run the 
# HttpDownloadManager\$Test class, this property will be overridden by
# the presence or absence of the -v option.
je3.nio.HttpDownloadManager.level: INFO

# This property says where output should go.  The default configuration
# is for it to go to the console.  This property sends log messages to
# a FileHandler configured below instead
handlers=java.util.logging.FileHandler

# These properties configure the FileHandler
# See java.util.logging.FileHandler for other available properties
java.util.logging.FileHandler.pattern = %h/java%u.log
java.util.logging.FileHandler.formatter = java.util.logging.XMLFormatter
    [ Team LiB ] Previous Section Next Section