javagoogle-app-enginegoogle-cloud-dataflowapache-beamgoogle-eclipse-plugin

Bare bones GAE app with Dataflow pipeline fails


I used Eclipse IDE for Java Developers 4.7.3 to create a bare bones Google App Engine Standard application with a Dataflow pipeline. When I deploy the application and invoke the web service, I get java.lang.RuntimeException: Error while staging packages. What am I doing wrong? Here are the steps to reproduce this error:

  1. Use https://appengine.google.com to create a new Google App Engine project (e.g. pushpin-dataflow-test)
  2. Use https://console.developers.google.com/apis/api/dataflow.googleapis.com/overview to enable the Dataflow API for the project
  3. Use https://console.cloud.google.com/storage/browser to create a storage bucket (e.g. pushpin-dataflow-test) for the project
  4. Run Eclipse
  5. From the Google Cloud Platform menu, choose Create New Project > Google App Engine Standard Java Project
  6. In the project name box, type a project name (e.g. DataflowTest)
  7. In the Java version box, choose Java 8, Servlet 3.1
  8. Check the create as Maven project box
  9. In the group ID box, enter a group ID (e.g. com.pushpin.dataflowtest)
  10. In the artifact ID box, enter an artifact ID (e.g. dataflowtest)
  11. Click finish
  12. Update the pom.xml file to match the one below
  13. Update the HelloAppEngine.java file to match the one below
  14. From the Google Cloud Platform menu, choose Deploy to App Engine Standard
  15. Click the project (e.g. pushpin-dataflow-test)
  16. Click deploy
  17. Invoke the servlet (e.g. by opening https://pushpin-dataflow-test.appspot.com/hello)
  18. Note the 500 server error
  19. Check the log
  20. Note the java.lang.RuntimeException: Error while staging packages

Here is my pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <packaging>war</packaging>
  <version>0.1.0-SNAPSHOT</version>
  <groupId>com.pushpin.dataflowtest3</groupId>
  <artifactId>dataflowtest3</artifactId>
  <dependencies>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
    </dependency>
    <dependency>
      <groupId>javax.servlet.jsp</groupId>
      <artifactId>javax.servlet.jsp-api</artifactId>
      <version>2.3.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.appengine</groupId>
      <artifactId>appengine-api-1.0-sdk</artifactId>
      <version>1.9.60</version>
    </dependency>
    <dependency>
      <groupId>jstl</groupId>
      <artifactId>jstl</artifactId>
      <version>1.2</version>
    </dependency>
    <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.4.0</version>
    </dependency>
  </dependencies>

  <build>
    <!-- for hot reload of the web application-->
    <outputDirectory>${project.build.directory}/${project.build.finalName}/WEB-INF/classes</outputDirectory>
    <plugins>
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>versions-maven-plugin</artifactId>
        <version>2.3</version>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>display-dependency-updates</goal>
              <goal>display-plugin-updates</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>com.google.cloud.tools</groupId>
        <artifactId>appengine-maven-plugin</artifactId>
        <version>1.3.2</version>
      </plugin>
    </plugins>
  </build>
</project>

Here is my HelloAppEngine.java file:

package com.pushpin.dataflowtest3;

import java.io.IOException;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

@WebServlet(
    name = "HelloAppEngine",
    urlPatterns = {"/hello"}
)
public class HelloAppEngine extends HttpServlet {

  @Override
  public void doGet(HttpServletRequest request, HttpServletResponse response) 
      throws IOException {

    DataflowPipelineOptions dataflowOptions = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    dataflowOptions.setGcpTempLocation("gs://pushpin-dataflow-test/gcp-temp");
    dataflowOptions.setProject("pushpin-dataflow-test");
    dataflowOptions.setRunner(DataflowRunner.class);
    dataflowOptions.setTempLocation("gs://pushpin-dataflow-test/temp");
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    pipeline.run();

  }
}

Here is the stack trace:

java.lang.RuntimeException: Error while staging packages
    at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:396)
    at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:273)
    at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:76)
    at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:64)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:661)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at com.pushpin.dataflowtest3.HelloAppEngine.doGet(HelloAppEngine.java:33)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1772)
    at com.google.apphosting.utils.servlet.ParseBlobUploadFilter.doFilter(ParseBlobUploadFilter.java:125)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.runtime.jetty9.SaveSessionFilter.doFilter(SaveSessionFilter.java:37)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.JdbcMySqlConnectionCleanupFilter.doFilter(JdbcMySqlConnectionCleanupFilter.java:60)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at com.google.apphosting.utils.servlet.TransactionCleanupFilter.doFilter(TransactionCleanupFilter.java:48)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1759)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:582)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
    at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:226)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at com.google.apphosting.runtime.jetty9.AppVersionHandlerMap.handle(AppVersionHandlerMap.java:297)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
    at org.eclipse.jetty.server.Server.handle(Server.java:534)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:320)
    at com.google.apphosting.runtime.jetty9.RpcConnection.handle(RpcConnection.java:202)
    at com.google.apphosting.runtime.jetty9.RpcConnector.serviceRequest(RpcConnector.java:81)
    at com.google.apphosting.runtime.jetty9.JettyServletEngineAdapter.serviceRequest(JettyServletEngineAdapter.java:108)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchServletRequest(JavaRuntime.java:680)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.dispatchRequest(JavaRuntime.java:642)
    at com.google.apphosting.runtime.JavaRuntime$RequestRunnable.run(JavaRuntime.java:612)
    at com.google.apphosting.runtime.JavaRuntime$NullSandboxRequestRunnable.run(JavaRuntime.java:806)
    at com.google.apphosting.runtime.ThreadGroupPool$PoolEntry.run(ThreadGroupPool.java:274)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not stage /base/java8_runtime/appengine-api.jar to gs://pushpin-dataflow-test-3a/gcp-temp/staging/appengine-api-5oFhZxRcq0C-va8sAIoBcg.jar
    at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:193)
    at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stagePackage$1(PackageUtil.java:174)
    at org.apache.beam.sdk.util.MoreFutures.lambda$supplyAsync$0(MoreFutures.java:101)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1249)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1243)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThread.run(ApiProxyImpl.java:1210)
Caused by: java.lang.IllegalStateException: Each request cannot exceed 50 active threads.
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThread.start(ApiProxyImpl.java:1197)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.initialize(AbstractGoogleAsyncWriteChannel.java:333)
    at org.apache.beam.sdk.util.GcsUtil.create(GcsUtil.java:450)
    at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.create(GcsFileSystem.java:107)
    at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.create(GcsFileSystem.java:57)
    at org.apache.beam.sdk.io.FileSystems.create(FileSystems.java:248)
    at org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackage(PackageUtil.java:246)
    at org.apache.beam.runners.dataflow.util.PackageUtil.tryStagePackageWithRetry(PackageUtil.java:206)
    at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackageSynchronously(PackageUtil.java:190)
    ... 10 more

Solution

  • Seems to be related to this app engine limitation: https://cloud.google.com/appengine/docs/standard/java/javadoc/com/google/appengine/api/ThreadManager#method.detail


    In general, I wouldn't plan to invoke dataflow from within an app engine app...A similar pattern you might want to investigate is dataflow templates: https://cloud.google.com/dataflow/docs/templates/overview

    (ie. being able to invoke/run reusable dataflow jobs with minor configuration/input changes)