Adds a start() method to MessageTransport. The message transport threads are no longer started in the constructor; they are only started once the start() method is called.
Review by: mmendez
git-svn-id: https://google-web-toolkit.googlecode.com/svn/trunk@6880 8db76d5a-ed1c-0410-87a9-c151d255dfc7
diff --git a/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java b/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java
index 4f155c8..13bce3e 100644
--- a/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java
+++ b/dev/core/src/com/google/gwt/dev/shell/remoteui/MessageTransport.java
@@ -33,6 +33,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -228,18 +229,18 @@
private static final int DEFAULT_SERVICE_THREADS = 2;
- private final Thread messageProcessingThread;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicInteger nextMessageId = new AtomicInteger();
private final RequestProcessor requestProcessor;
private final LinkedBlockingQueue<PendingSend> sendQueue = new LinkedBlockingQueue<PendingSend>();
- private final Thread sendThread;
private final ExecutorService serverRequestExecutor;
private final PendingRequestMap pendingRequestMap = new PendingRequestMap();
private final TerminationCallback terminationCallback;
+ private final InputStream inputStream;
+ private final OutputStream outputStream;
/**
* Create a new instance using the given streams and request processor.
- * Closing either stream will cause the termination of the transport.
*
* @param inputStream an input stream for reading messages
* @param outputStream an output stream for writing messages
@@ -253,45 +254,9 @@
TerminationCallback terminationCallback) {
this.requestProcessor = requestProcessor;
this.terminationCallback = terminationCallback;
+ this.inputStream = inputStream;
+ this.outputStream = outputStream;
serverRequestExecutor = Executors.newFixedThreadPool(DEFAULT_SERVICE_THREADS);
-
- // This thread terminates on interruption or IO failure
- messageProcessingThread = new Thread(new Runnable() {
- public void run() {
- try {
- while (true) {
- Message message = Message.parseDelimitedFrom(inputStream);
- // TODO: This is where we would do a protocol check
- processMessage(message);
- }
- } catch (IOException e) {
- terminateDueToException(e);
- } catch (InterruptedException e) {
- terminateDueToException(e);
- }
- }
- });
- messageProcessingThread.start();
-
- // This thread only terminates if it is interrupted
- sendThread = new Thread(new Runnable() {
- public void run() {
- while (true) {
- try {
- PendingSend pendingSend = sendQueue.take();
- try {
- pendingSend.send(outputStream);
- } catch (IOException e) {
- pendingSend.failed(e);
- }
- } catch (InterruptedException e) {
- break;
- }
- }
- }
- });
- sendThread.setDaemon(true);
- sendThread.start();
}
/**
@@ -321,6 +286,58 @@
return responseFuture;
}
+ /**
+ * Starts up the message transport. The message transport creates its own
+ * threads, so it is not necessary to invoke this method from a separate
+ * thread.
+ *
+ * Closing either stream will cause the termination of the transport.
+ */
+ public void start() {
+
+ if (isStarted.getAndSet(true)) {
+ return;
+ }
+
+ // This thread terminates on interruption or IO failure
+ Thread messageProcessingThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ while (true) {
+ Message message = Message.parseDelimitedFrom(inputStream);
+ // TODO: This is where we would do a protocol check
+ processMessage(message);
+ }
+ } catch (IOException e) {
+ terminateDueToException(e);
+ } catch (InterruptedException e) {
+ terminateDueToException(e);
+ }
+ }
+ });
+ messageProcessingThread.start();
+
+ // This thread only terminates if it is interrupted
+ Thread sendThread = new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ PendingSend pendingSend = sendQueue.take();
+ try {
+ pendingSend.send(outputStream);
+ } catch (IOException e) {
+ pendingSend.failed(e);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ });
+ sendThread.setDaemon(true);
+ sendThread.start();
+ }
+
private void processClientRequest(int messageId, Request request)
throws InterruptedException {
Message.Builder messageBuilder = Message.newBuilder();
diff --git a/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java b/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java
index 2dbc4f5..e24e9ff 100644
--- a/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java
+++ b/dev/core/src/com/google/gwt/dev/shell/remoteui/RemoteUI.java
@@ -62,6 +62,7 @@
devModeRequestProcessor = new DevModeServiceRequestProcessor(this);
transport = new MessageTransport(transportSocket.getInputStream(),
transportSocket.getOutputStream(), devModeRequestProcessor, this);
+ transport.start();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
} catch (IOException e) {
@@ -123,13 +124,11 @@
}
public void onTermination(Exception e) {
- getTopLogger().log(
- TreeLogger.INFO,
- "Remote UI connection terminated due to exception: "
- + e);
+ getTopLogger().log(TreeLogger.INFO,
+ "Remote UI connection terminated due to exception: " + e);
getTopLogger().log(TreeLogger.INFO,
"Shutting down development mode server.");
-
+
try {
// Close the transport socket
transportSocket.close();
diff --git a/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java b/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java
index e5400c4..770534b 100644
--- a/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java
+++ b/dev/core/test/com/google/gwt/dev/shell/remoteui/MessageTransportTest.java
@@ -117,6 +117,7 @@
public void onTermination(Exception e) {
}
});
+ messageTransport.start();
Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder();
requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE);
@@ -170,6 +171,7 @@
MessageTransport messageTransport = new MessageTransport(
network.getClientSocket().getInputStream(),
network.getClientSocket().getOutputStream(), requestProcessor, null);
+ messageTransport.start();
// Generate a new request
DevModeRequest.Builder devModeRequestBuilder = DevModeRequest.newBuilder();
@@ -245,6 +247,7 @@
public void onTermination(Exception e) {
}
});
+ messageTransport.start();
// Generate a new request
Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder();
@@ -322,6 +325,7 @@
public void onTermination(Exception e) {
}
});
+ messageTransport.start();
Message.Request.Builder requestMessageBuilder = Message.Request.newBuilder();
requestMessageBuilder.setServiceType(Message.Request.ServiceType.DEV_MODE);
@@ -387,12 +391,14 @@
};
// Start up the message transport on the server side
- new MessageTransport(network.getClientSocket().getInputStream(),
+ MessageTransport messageTransport = new MessageTransport(
+ network.getClientSocket().getInputStream(),
network.getClientSocket().getOutputStream(), requestProcessor,
new MessageTransport.TerminationCallback() {
public void onTermination(Exception e) {
}
});
+ messageTransport.start();
// Send the request from the client to the server
Message.Builder clientRequestMsgBuilder = Message.newBuilder();
@@ -441,12 +447,14 @@
};
// Start up the message transport on the server side
- new MessageTransport(network.getClientSocket().getInputStream(),
+ MessageTransport messageTransport = new MessageTransport(
+ network.getClientSocket().getInputStream(),
network.getClientSocket().getOutputStream(), requestProcessor,
new MessageTransport.TerminationCallback() {
public void onTermination(Exception e) {
}
});
+ messageTransport.start();
// Send a request to the server
Message.Request.Builder clientRequestBuilder = Message.Request.newBuilder();