Adds a start() method to MessageTransport. The message transport threads are no longer started in the constructor; they are only started once the start() method is called.

Review by: mmendez

git-svn-id: https://google-web-toolkit.googlecode.com/svn/trunk@6880 8db76d5a-ed1c-0410-87a9-c151d255dfc7
diff --git a/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java b/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java
index 4f155c8..13bce3e 100644
--- a/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java
+++ b/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -228,18 +229,18 @@
 
   private static final int DEFAULT_SERVICE_THREADS = 2;
 
-  private final Thread messageProcessingThread;
+  private final AtomicBoolean isStarted = new AtomicBoolean(false);
   private final AtomicInteger nextMessageId = new AtomicInteger();
   private final RequestProcessor requestProcessor;
   private final LinkedBlockingQueue<PendingSend> sendQueue = new LinkedBlockingQueue<PendingSend>();
-  private final Thread sendThread;
   private final ExecutorService serverRequestExecutor;
   private final PendingRequestMap pendingRequestMap = new PendingRequestMap();
   private final TerminationCallback terminationCallback;
+  private final InputStream inputStream;
+  private final OutputStream outputStream;
 
   /**
    * Create a new instance using the given streams and request processor.
-   * Closing either stream will cause the termination of the transport.
    * 
    * @param inputStream an input stream for reading messages
    * @param outputStream an output stream for writing messages
@@ -253,45 +254,9 @@
       TerminationCallback terminationCallback) {
     this.requestProcessor = requestProcessor;
     this.terminationCallback = terminationCallback;
+    this.inputStream = inputStream;
+    this.outputStream = outputStream;
     serverRequestExecutor = Executors.newFixedThreadPool(DEFAULT_SERVICE_THREADS);
-
-    // This thread terminates on interruption or IO failure
-    messageProcessingThread = new Thread(new Runnable() {
-      public void run() {
-        try {
-          while (true) {
-            Message message = Message.parseDelimitedFrom(inputStream);
-            // TODO: This is where we would do a protocol check
-            processMessage(message);
-          }
-        } catch (IOException e) {
-          terminateDueToException(e);
-        } catch (InterruptedException e) {
-          terminateDueToException(e);
-        }
-      }
-    });
-    messageProcessingThread.start();
-
-    // This thread only terminates if it is interrupted
-    sendThread = new Thread(new Runnable() {
-      public void run() {
-        while (true) {
-          try {
-            PendingSend pendingSend = sendQueue.take();
-            try {
-              pendingSend.send(outputStream);
-            } catch (IOException e) {
-              pendingSend.failed(e);
-            }
-          } catch (InterruptedException e) {
-            break;
-          }
-        }
-      }
-    });
-    sendThread.setDaemon(true);
-    sendThread.start();
   }
 
   /**
@@ -321,6 +286,58 @@
     return responseFuture;
   }
 
+  /**
+   * Starts up the message transport. The message transport creates its own
+   * threads, so it is not necessary to invoke this method from a separate
+   * thread.
+   * 
+   * Closing either stream will cause the termination of the transport.
+   */
+  public void start() {
+
+    if (isStarted.getAndSet(true)) {
+      return;
+    }
+
+    // This thread terminates on interruption or IO failure
+    Thread messageProcessingThread = new Thread(new Runnable() {
+      public void run() {
+        try {
+          while (true) {
+            Message message = Message.parseDelimitedFrom(inputStream);
+            // TODO: This is where we would do a protocol check
+            processMessage(message);
+          }
+        } catch (IOException e) {
+          terminateDueToException(e);
+        } catch (InterruptedException e) {
+          terminateDueToException(e);
+        }
+      }
+    });
+    messageProcessingThread.start();
+
+    // This thread only terminates if it is interrupted
+    Thread sendThread = new Thread(new Runnable() {
+      public void run() {
+        while (true) {
+          try {
+            PendingSend pendingSend = sendQueue.take();
+            try {
+              pendingSend.send(outputStream);
+            } catch (IOException e) {
+              pendingSend.failed(e);
+            }
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      }
+    });
+    sendThread.setDaemon(true);
+    sendThread.start();
+  }
+
   private void processClientRequest(int messageId, Request request)
       throws InterruptedException {
     Message.Builder messageBuilder = Message.newBuilder();
diff --git a/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java b/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java
index 2dbc4f5..e24e9ff 100644
--- a/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java
+++ b/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java
@@ -62,6 +62,7 @@
       devModeRequestProcessor = new DevModeServiceRequestProcessor(this);
       transport = new MessageTransport(transportSocket.getInputStream(),
           transportSocket.getOutputStream(), devModeRequestProcessor, this);
+      transport.start();
     } catch (UnknownHostException e) {
       throw new RuntimeException(e);
     } catch (IOException e) {
@@ -123,13 +124,11 @@
   }
 
   public void onTermination(Exception e) {
-    getTopLogger().log(
-        TreeLogger.INFO,
-        "Remote UI connection terminated due to exception: "
-            + e);
+    getTopLogger().log(TreeLogger.INFO,
+        "Remote UI connection terminated due to exception: " + e);
     getTopLogger().log(TreeLogger.INFO,
         "Shutting down development mode server.");
-    
+
     try {
       // Close the transport socket
       transportSocket.close();
diff --git a/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java b/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java
index e5400c4..770534b 100644
--- a/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java
+++ b/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java
@@ -117,6 +117,7 @@
           public void onTermination(Exception e) {
           }
         });
+    messageTransport.start();
 
     Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder();
     requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE);
@@ -170,6 +171,7 @@
     MessageTransport messageTransport = new MessageTransport(
         network.getClientSocket().getInputStream(),
         network.getClientSocket().getOutputStream(), requestProcessor, null);
+    messageTransport.start();
 
     // Generate a new request
     DevModeRequest.Builder devModeRequestBuilder = DevModeRequest.newBuilder();
@@ -245,6 +247,7 @@
           public void onTermination(Exception e) {
           }
         });
+    messageTransport.start();
 
     // Generate a new request
     Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder();
@@ -322,6 +325,7 @@
           public void onTermination(Exception e) {
           }
         });
+    messageTransport.start();
 
     Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder();
     requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE);
@@ -387,12 +391,14 @@
     };
 
     // Start up the message transport on the server side
-    new MessageTransport(network.getClientSocket().getInputStream(),
+    MessageTransport messageTransport = new MessageTransport(
+        network.getClientSocket().getInputStream(),
         network.getClientSocket().getOutputStream(), requestProcessor,
         new MessageTransport.TerminationCallback() {
           public void onTermination(Exception e) {
           }
         });
+    messageTransport.start();
 
     // Send the request from the client to the server
     Message.Builder clientRequestMsgBuilder = Message.newBuilder();
@@ -441,12 +447,14 @@
     };
 
     // Start up the message transport on the server side
-    new MessageTransport(network.getClientSocket().getInputStream(),
+    MessageTransport messageTransport = new MessageTransport(
+        network.getClientSocket().getInputStream(),
         network.getClientSocket().getOutputStream(), requestProcessor,
         new MessageTransport.TerminationCallback() {
           public void onTermination(Exception e) {
           }
         });
+    messageTransport.start();
 
     // Send a request to the server
     Message.Request.Builder clientRequestBuilder = Message.Request.newBuilder();