blob: 76fd9f428fb72efb5b9726ad8e2e25461e1829c4 [file] [log] [blame]
/*
* Copyright 2009 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.gwt.dev.shell.remoteui;
import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message;
import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Failure;
import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.MessageType;
import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Request;
import com.google.gwt.dev.shell.remoteui.RemoteMessageProto.Message.Response;
import com.google.gwt.dev.util.Callback;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Responsible for exchanging requests and responses between services.
*/
public class MessageTransport {
/**
* A callback that is invoked when errors occur at the transport level.
*/
public interface ErrorCallback {
/**
* Called when an exception occurs when attempting to send a response
* message.
*/
void onResponseException(Exception e);
/**
* Called when the transport terminates due to an exception.
*
* @param e The exception that led to the termination
*/
void onTermination(Exception e);
}
/**
* An exception that is generated by the transport when it receives a failure
* message from the server in response to executing a client request.
*/
@SuppressWarnings("serial")
public class RequestException extends Exception {
private final Failure failureMessage;
/**
* Create a new instance with the given failure message. The exception's
* message will be set to {@link Failure#getMessage()}.
*
* @param failureMessage The failure message returned by the server
*/
public RequestException(Failure failureMessage) {
super(failureMessage.getMessage());
this.failureMessage = failureMessage;
}
/**
* Gets the failure message returned by the server.
*/
public Failure getFailureMessage() {
return failureMessage;
}
}
class PendingRequest extends PendingSend {
private final Callback<Response> callback;
private final Message message;
PendingRequest(Message message, Callback<Response> callback) {
this.message = message;
this.callback = callback;
}
@Override
void failed(Exception e) {
assert e != null;
pendingRequestMap.remove(message.getMessageId());
callback.onError(e);
}
@Override
void send(OutputStream outputStream) throws IOException {
int messageId = message.getMessageId();
pendingRequestMap.put(messageId, this);
message.writeDelimitedTo(outputStream);
}
/**
* Sets the response that was received from the server, and signals the
* thread that is waiting on the response.
*
* @param responseMessage the server's response
* @throws InterruptedException
*/
void setResponse(Response responseMessage) {
assert (responseMessage != null);
callback.onDone(responseMessage);
}
}
static class PendingRequestMap {
private final Lock mapLock = new ReentrantLock();
private boolean noMoreAdds;
private final Map<Integer, PendingRequest> requestIdToPendingServerRequest = new HashMap<Integer, PendingRequest>();
public void blockAdds(Exception e) {
mapLock.lock();
try {
noMoreAdds = true;
for (PendingRequest pendingRequest : requestIdToPendingServerRequest.values()) {
pendingRequest.failed(e);
}
} finally {
mapLock.unlock();
}
}
public PendingRequest remove(int requestId) {
mapLock.lock();
try {
return requestIdToPendingServerRequest.remove(requestId);
} finally {
mapLock.unlock();
}
}
void put(int requestId, PendingRequest pendingServerRequest) {
mapLock.lock();
try {
if (noMoreAdds) {
pendingServerRequest.failed(new IllegalStateException(
"InputStream is closed"));
} else {
requestIdToPendingServerRequest.put(requestId, pendingServerRequest);
}
} finally {
mapLock.unlock();
}
}
}
class PendingResponse extends PendingSend {
Message message;
public PendingResponse(Message message) {
this.message = message;
}
@Override
public void failed(Exception e) {
if (errorCallback != null) {
errorCallback.onResponseException(e);
}
}
@Override
public void send(OutputStream outputStream) throws IOException {
message.writeDelimitedTo(outputStream);
}
}
abstract class PendingSend {
abstract void failed(Exception e);
abstract void send(OutputStream outputStream) throws IOException;
}
/**
* A callable that does nothing.
*/
private static final Callable<Response> DUMMY_CALLABLE = new Callable<Response>() {
public Response call() throws Exception {
return null;
}
};
private final InputStream inputStream;
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicInteger nextMessageId = new AtomicInteger();
private final OutputStream outputStream;
private final PendingRequestMap pendingRequestMap = new PendingRequestMap();
private final RequestProcessor requestProcessor;
private final LinkedBlockingQueue<PendingSend> sendQueue = new LinkedBlockingQueue<PendingSend>();
private final ErrorCallback errorCallback;
/**
* Create a new instance using the given streams and request processor.
*
* @param inputStream an input stream for reading messages
* @param outputStream an output stream for writing messages
* @param requestProcessor a callback interface for handling remote client
* requests
* @param terminationCallback a callback that is invoked when the transport
* terminates
*/
public MessageTransport(final InputStream inputStream,
final OutputStream outputStream, RequestProcessor requestProcessor,
ErrorCallback errorCallback) {
this.requestProcessor = requestProcessor;
this.errorCallback = errorCallback;
this.inputStream = inputStream;
this.outputStream = outputStream;
}
/**
* Asynchronously executes the request on a remote server.
*
* @param requestMessage The request to execute
*
* @return a {@link Future} that can be used to access the server's response
*/
public Future<Response> executeRequestAsync(Request requestMessage) {
Message.Builder messageBuilder = Message.newBuilder();
int messageId = nextMessageId.getAndIncrement();
messageBuilder.setMessageId(messageId);
messageBuilder.setMessageType(Message.MessageType.REQUEST);
messageBuilder.setRequest(requestMessage);
Message message = messageBuilder.build();
class FutureTaskExtension extends FutureTask<Response> {
private FutureTaskExtension() {
super(DUMMY_CALLABLE);
}
@Override
public void set(Response v) {
super.set(v);
}
@Override
public void setException(Throwable t) {
super.setException(t);
}
}
final FutureTaskExtension future = new FutureTaskExtension();
PendingRequest pendingRequest = new PendingRequest(message,
new Callback<Response>() {
public void onDone(Response result) {
future.set(result);
}
public void onError(Throwable t) {
future.setException(t);
}
});
sendQueue.add(pendingRequest);
return future;
}
/**
* Asynchronously executes the request on a remote server. The callback will
* generally be called by another thread. Memory consistency effects: actions
* in a thread prior to calling this method happen-before the callback is
* invoked.
*
* @param requestMessage The request to execute
* @param callback The callback to invoke when the response is received
*/
public void executeRequestAsync(Request requestMessage,
Callback<Response> callback) {
Message.Builder messageBuilder = Message.newBuilder();
int messageId = nextMessageId.getAndIncrement();
messageBuilder.setMessageId(messageId);
messageBuilder.setMessageType(Message.MessageType.REQUEST);
messageBuilder.setRequest(requestMessage);
Message message = messageBuilder.build();
PendingRequest pendingRequest = new PendingRequest(message, callback);
sendQueue.add(pendingRequest);
}
/**
* Starts up the message transport. The message transport creates its own
* threads, so it is not necessary to invoke this method from a separate
* thread.
*
* Closing either stream will cause the termination of the transport.
*/
public void start() {
if (isStarted.getAndSet(true)) {
return;
}
// This thread terminates on interruption or IO failure
Thread messageProcessingThread = new Thread(new Runnable() {
public void run() {
try {
while (true) {
Message message = Message.parseDelimitedFrom(inputStream);
// TODO: This is where we would do a protocol check
processMessage(message);
}
} catch (IOException e) {
terminateDueToException(e);
} catch (InterruptedException e) {
terminateDueToException(e);
}
}
});
messageProcessingThread.start();
// This thread only terminates if it is interrupted
Thread sendThread = new Thread(new Runnable() {
public void run() {
while (true) {
try {
PendingSend pendingSend = sendQueue.take();
try {
pendingSend.send(outputStream);
} catch (IOException e) {
pendingSend.failed(e);
}
} catch (InterruptedException e) {
break;
}
}
}
});
sendThread.setDaemon(true);
sendThread.start();
}
private void processClientRequest(int messageId, Request request)
throws InterruptedException {
Message.Builder messageBuilder = Message.newBuilder();
messageBuilder.setMessageId(messageId);
Response response = null;
try {
messageBuilder.setMessageType(Message.MessageType.RESPONSE);
response = requestProcessor.execute(request);
messageBuilder.setResponse(response);
} catch (Exception e) {
messageBuilder.setMessageType(Message.MessageType.FAILURE);
Message.Failure.Builder failureMessage = Message.Failure.newBuilder();
failureMessage.setMessage(e.getLocalizedMessage() != null
? e.getLocalizedMessage() : e.getClass().getName());
StringWriter sw = new StringWriter();
e.printStackTrace(new PrintWriter(sw, true));
failureMessage.setStackTrace(sw.getBuffer().toString());
messageBuilder.setFailure(failureMessage);
}
PendingResponse pendingResponse = new PendingResponse(
messageBuilder.build());
sendQueue.put(pendingResponse);
}
private void processFailure(int messageId, Failure failure) {
PendingRequest pendingServerRequest = pendingRequestMap.remove(messageId);
if (pendingServerRequest != null) {
pendingServerRequest.failed(new RequestException(failure));
}
}
private void processMessage(final Message message)
throws InterruptedException {
MessageType messageType = message.getMessageType();
if (messageType == null) {
processUnknownMessageType(message.getMessageId(), "unknown");
return;
}
switch (messageType) {
case RESPONSE: {
processServerResponse(message.getMessageId(), message.getResponse());
break;
}
case REQUEST: {
processClientRequest(message.getMessageId(), message.getRequest());
break;
}
case FAILURE: {
processFailure(message.getMessageId(), message.getFailure());
break;
}
default: {
processUnknownMessageType(message.getMessageId(), messageType.name());
break;
}
}
}
private void processServerResponse(int messageId, Response response) {
PendingRequest pendingServerRequest = pendingRequestMap.remove(messageId);
if (pendingServerRequest != null) {
pendingServerRequest.setResponse(response);
}
}
private void processUnknownMessageType(int messageId, String messageTypeName)
throws InterruptedException {
Message.Builder messageBuilder = Message.newBuilder();
messageBuilder.setMessageId(messageId);
messageBuilder.setMessageType(Message.MessageType.FAILURE);
Message.Failure.Builder failureMessage = Message.Failure.newBuilder();
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("Unknown message type '");
stringBuffer.append(messageTypeName);
stringBuffer.append("'. Known message types are: ");
for (MessageType type : MessageType.values()) {
stringBuffer.append(type.name());
stringBuffer.append(" ");
}
failureMessage.setMessage(stringBuffer.toString());
messageBuilder.setFailure(failureMessage);
PendingResponse pendingResponse = new PendingResponse(
messageBuilder.build());
sendQueue.put(pendingResponse);
}
private void terminateDueToException(Exception e) {
pendingRequestMap.blockAdds(e);
if (errorCallback != null) {
errorCallback.onTermination(e);
}
}
}