Implementation of PWF without the annoying ThreadPoolExecutor. :)

Review by: spoon (pair prog)


git-svn-id: https://google-web-toolkit.googlecode.com/svn/releases/1.6@4163 8db76d5a-ed1c-0410-87a9-c151d255dfc7
diff --git a/dev/core/src/com/google/gwt/dev/ExternalPermutationWorkerFactory.java b/dev/core/src/com/google/gwt/dev/ExternalPermutationWorkerFactory.java
index 392afea..137144d 100644
--- a/dev/core/src/com/google/gwt/dev/ExternalPermutationWorkerFactory.java
+++ b/dev/core/src/com/google/gwt/dev/ExternalPermutationWorkerFactory.java
@@ -342,6 +342,7 @@
     List<PermutationWorker> toReturn = new ArrayList<PermutationWorker>(
         numWorkers);
 
+    // TODO(spoon): clean up already-launched processes if we get an exception?
     for (int i = 0; i < numWorkers; i++) {
       String cookie = launchExternalWorker(logger, sock.getLocalPort());
       cookies.add(cookie);
diff --git a/dev/core/src/com/google/gwt/dev/PermutationWorkerFactory.java b/dev/core/src/com/google/gwt/dev/PermutationWorkerFactory.java
index 4448c3d..0322072 100644
--- a/dev/core/src/com/google/gwt/dev/PermutationWorkerFactory.java
+++ b/dev/core/src/com/google/gwt/dev/PermutationWorkerFactory.java
@@ -17,7 +17,6 @@
 
 import com.google.gwt.core.ext.TreeLogger;
 import com.google.gwt.core.ext.UnableToCompleteException;
-import com.google.gwt.dev.jjs.InternalCompilerException;
 import com.google.gwt.dev.jjs.UnifiedAst;
 import com.google.gwt.dev.util.Util;
 
@@ -26,16 +25,9 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Represents a factory for implementations of an endpoint that will invoke
@@ -46,73 +38,167 @@
 public abstract class PermutationWorkerFactory {
 
   /**
-   * This callable is responsible for compiling one permutation. It waits for an
-   * available worker, uses it, and returns it to the pool when finished.
+   * Coordinates the actions of a set of {@link PermutationWorker}s, running
+   * each in its own thread.
    */
-  private static class CompileOnePermutation implements Callable<ResultStatus> {
-    private final BlockingQueue<PermutationWorker> availableWorkers;
-    private final TreeLogger logger;
-    private final Permutation permutation;
-    private final File resultFile;
+  private static class Manager {
 
-    public CompileOnePermutation(TreeLogger logger, Permutation permutation,
-        File resultFile, BlockingQueue<PermutationWorker> availableWorkers) {
-      this.logger = logger;
-      this.permutation = permutation;
-      this.resultFile = resultFile;
-      this.availableWorkers = availableWorkers;
+    private static final Work POISON_PILL = new Work(null, null, null);
+
+    private static enum Result {
+      SUCCESS, FAIL, WORKER_DEATH
     }
 
-    public ResultStatus call() {
-      // Find a free worker
-      PermutationWorker worker;
+    /**
+     * Runs a {@link PermutationWorker} on its own thread.
+     */
+    private class WorkerThread implements Runnable {
+      private final PermutationWorker worker;
+
+      public WorkerThread(PermutationWorker worker) {
+        this.worker = worker;
+      }
+
+      public void run() {
+        Result threadDeathResult = Result.FAIL;
+        try {
+          while (true) {
+            Work work = workQueue.take();
+            if (work == POISON_PILL) {
+              return;
+            }
+            TreeLogger logger = work.getLogger();
+            try {
+              PermutationResult result = worker.compile(logger, work.getPerm());
+              Util.writeObjectAsFile(logger, work.getResultFile(), result);
+              logger.log(TreeLogger.DEBUG, "Successfully compiled permutation");
+              resultsQueue.put(Result.SUCCESS);
+            } catch (TransientWorkerException e) {
+              logger.log(TreeLogger.DEBUG,
+                  "Worker died, will retry Permutation", e);
+              workQueue.add(work);
+              threadDeathResult = Result.WORKER_DEATH;
+              return;
+            } catch (UnableToCompleteException e) {
+              logger.log(TreeLogger.ERROR,
+                  "Unrecoverable exception, shutting down", e);
+              threadDeathResult = Result.FAIL;
+              return;
+            }
+          }
+        } catch (InterruptedException e) {
+          return;
+        } finally {
+          // Record why I died.
+          try {
+            resultsQueue.put(threadDeathResult);
+          } catch (InterruptedException ignored) {
+          }
+        }
+      }
+    }
+
+    public static void run(TreeLogger logger, List<Work> work,
+        List<PermutationWorker> workers) throws UnableToCompleteException {
+      new Manager().doRun(logger, work, workers);
+    }
+
+    /**
+     * The queue of work to do.
+     */
+    BlockingQueue<Work> workQueue;
+
+    /**
+     * The queue of work to do.
+     */
+    BlockingQueue<Result> resultsQueue;
+
+    private Manager() {
+    }
+
+    private void doRun(TreeLogger logger, List<Work> work,
+        List<PermutationWorker> workers) throws UnableToCompleteException {
+
+      // Initialize state.
+      workQueue = new LinkedBlockingQueue<Work>(work);
+      resultsQueue = new LinkedBlockingQueue<Result>();
+
+      List<Thread> threads = new ArrayList<Thread>(workers.size());
       try {
-        worker = availableWorkers.take();
+        for (PermutationWorker worker : workers) {
+          Thread thread = new Thread(new WorkerThread(worker));
+          threads.add(thread);
+          thread.start();
+        }
+
+        int workToDo = work.size();
+        int aliveWorkers = workers.size();
+        waitForWorkers : while (workToDo > 0 && aliveWorkers > 0) {
+          Result take = resultsQueue.take();
+          switch (take) {
+            case SUCCESS:
+              --workToDo;
+              break;
+            case FAIL:
+              break waitForWorkers;
+            case WORKER_DEATH:
+              --aliveWorkers;
+              break;
+            default:
+              throw new IncompatibleClassChangeError(Result.class.toString());
+          }
+        }
+
+        workQueue.clear();
+        for (int i = 0; i < aliveWorkers; ++i) {
+          workQueue.add(POISON_PILL);
+        }
+
+        if (workToDo > 0) {
+          logger.log(TreeLogger.ERROR,
+              "Not all permutation were compiled , completed ("
+                  + (work.size() - workToDo) + "/" + work.size() + ")");
+          throw new UnableToCompleteException();
+        }
       } catch (InterruptedException e) {
-        logger.log(TreeLogger.DEBUG, "Worker interrupted", e);
-        return ResultStatus.HARD_FAILURE;
-      }
-
-      if (worker == noMoreWorkersWorker) {
-        // Shutting down
-        return ResultStatus.HARD_FAILURE;
-      }
-
-      // Invoke the worker
-      try {
-        PermutationResult result = worker.compile(logger, permutation);
-        Util.writeObjectAsFile(logger, resultFile, result);
-        logger.log(TreeLogger.DEBUG, "Successfully compiled permutation");
-        availableWorkers.add(worker);
-        return ResultStatus.SUCCESS;
-      } catch (TransientWorkerException e) {
-        logger.log(TreeLogger.DEBUG, "Worker died, will retry Permutation", e);
-        return ResultStatus.TRANSIENT_FAILURE;
-      } catch (UnableToCompleteException e) {
-        logger.log(TreeLogger.ERROR, "Unrecoverable exception, shutting down",
-            e);
-        return ResultStatus.HARD_FAILURE;
+        logger.log(TreeLogger.ERROR,
+            "Exiting without results due to interruption", e);
+        throw new UnableToCompleteException();
+      } finally {
+        // Interrupt any outstanding threads.
+        for (Thread thread : threads) {
+          thread.interrupt();
+        }
       }
     }
   }
 
-  private static enum ResultStatus {
-    /**
-     * A failure bad enough to merit shutting down the compilation.
-     */
-    HARD_FAILURE,
+  /**
+   * Represents work to do.
+   */
+  private static class Work {
+    private final TreeLogger logger;
+    private final Permutation perm;
+    private final File resultFile;
 
-    /**
-     * A successful compile.
-     */
-    SUCCESS,
+    public Work(TreeLogger logger, Permutation perm, File resultFile) {
+      this.logger = logger;
+      this.perm = perm;
+      this.resultFile = resultFile;
+    }
 
-    /**
-     * A worker died while processing this permutation, but it is worth trying
-     * to compile it with another worker.
-     */
-    TRANSIENT_FAILURE
-  };
+    public TreeLogger getLogger() {
+      return logger;
+    }
+
+    public Permutation getPerm() {
+      return perm;
+    }
+
+    public File getResultFile() {
+      return resultFile;
+    }
+  }
 
   /**
    * The name of the system property used to define the workers.
@@ -128,21 +214,6 @@
 
   private static List<PermutationWorkerFactory> lazyFactories;
 
-  private static final PermutationWorker noMoreWorkersWorker = new PermutationWorker() {
-
-    public PermutationResult compile(TreeLogger logger, Permutation permutation)
-        throws TransientWorkerException, UnableToCompleteException {
-      throw new UnableToCompleteException();
-    }
-
-    public String getName() {
-      return "Marker worker indicating no more workers";
-    }
-
-    public void shutdown() {
-    }
-  };
-
   /**
    * Compiles all Permutations in a Precompilation and returns an array of Files
    * that can be consumed by Link using the system-default
@@ -175,103 +246,35 @@
     assert Arrays.asList(precompilation.getPermutations()).containsAll(
         Arrays.asList(permutations));
 
-    // We may have a mixed collection of workers from different factories
+    // Create the work.
+    List<Work> work = new ArrayList<Work>(permutations.length);
+    for (int i = 0; i < permutations.length; ++i) {
+      Permutation perm = permutations[i];
+      TreeLogger permLogger = logger.branch(TreeLogger.DEBUG,
+          "Worker permutation " + perm.getId() + " of " + permutations.length);
+      work.add(new Work(permLogger, perm, resultFiles[i]));
+    }
+
+    // Create the workers.
     List<PermutationWorker> workers = new ArrayList<PermutationWorker>();
-
-    /*
-     * We can have errors below this point, there's a finally block to handle
-     * cleanup of workers.
-     */
     try {
-      createWorkers(logger, precompilation.getUnifiedAst(),
-          permutations.length, localWorkers, workers);
-      ExecutorService executor = Executors.newFixedThreadPool(workers.size());
+      createWorkers(logger, precompilation.getUnifiedAst(), work.size(),
+          localWorkers, workers);
 
-      // List of available workers.
-      // The extra space is for inserting nulls at shutdown time
-      BlockingQueue<PermutationWorker> availableWorkers = new ArrayBlockingQueue<PermutationWorker>(
-          2 * workers.size());
-      availableWorkers.addAll(workers);
-
-      try {
-
-        // Submit all tasks to the executor
-
-        // The permutation compiles not yet finished
-        Queue<CompileOnePermutation> tasksOutstanding = new LinkedList<CompileOnePermutation>();
-
-        // The futures for the results of those compiles
-        Queue<Future<ResultStatus>> resultFutures = new LinkedList<Future<ResultStatus>>();
-
-        for (int i = 0; i < permutations.length; ++i) {
-          TreeLogger permLogger = logger.branch(TreeLogger.DEBUG,
-              "Worker permutation " + permutations[i].getId() + " of "
-                  + permutations.length);
-          CompileOnePermutation task = new CompileOnePermutation(permLogger,
-              permutations[i], resultFiles[i], availableWorkers);
-          tasksOutstanding.add(task);
-          resultFutures.add(executor.submit(task));
-        }
-
-        // Count the number of dead workers
-        int numDeadWorkers = 0;
-        int successCount = 0;
-
-        while (!resultFutures.isEmpty() && numDeadWorkers < workers.size()) {
-          assert resultFutures.size() == tasksOutstanding.size();
-
-          CompileOnePermutation task = tasksOutstanding.remove();
-          Future<ResultStatus> future = resultFutures.remove();
-          ResultStatus result;
-          try {
-            result = future.get();
-          } catch (InterruptedException e) {
-            logger.log(TreeLogger.ERROR,
-                "Exiting without results due to interruption", e);
-            throw new UnableToCompleteException();
-          } catch (ExecutionException e) {
-            logger.log(TreeLogger.ERROR, "A compilation failed", e);
-            throw new UnableToCompleteException();
-          }
-
-          if (result == ResultStatus.SUCCESS) {
-            ++successCount;
-          } else if (result == ResultStatus.TRANSIENT_FAILURE) {
-            // A worker died. Resubmit for the remaining workers.
-            ++numDeadWorkers;
-            tasksOutstanding.add(task);
-            resultFutures.add(executor.submit(task));
-          } else if (result == ResultStatus.HARD_FAILURE) {
-            // Shut down.
-            break;
-          } else {
-            throw new InternalCompilerException("Unknown result type");
-          }
-        }
-
-        // Too many permutations is a coding error
-        assert successCount <= permutations.length;
-
-        if (successCount < permutations.length) {
-          // Likely as not, all of the workers died
-          logger.log(TreeLogger.ERROR, "Not all permutation were compiled "
-              + successCount + " of " + permutations.length);
-          throw new UnableToCompleteException();
-        }
-      } finally {
-        // Shut down the executor
-        executor.shutdown();
-
-        // Inform any residual CompileOnePermutation's that there aren't any
-        // more workers
-        for (int i = 0; i < workers.size(); i++) {
-          availableWorkers.add(noMoreWorkersWorker);
+      // Get it done!
+      Manager.run(logger, work, workers);
+    } finally {
+      Throwable caught = null;
+      for (PermutationWorker worker : workers) {
+        try {
+          worker.shutdown();
+        } catch (Throwable e) {
+          caught = e;
         }
       }
-    } finally {
-      // Shut down all workers
-      for (PermutationWorker worker : workers) {
-        worker.shutdown();
+      if (caught != null) {
+        throw new RuntimeException(
+            "One of the workers threw an exception while shutting down", caught);
       }
     }
   }