Spicule - Data Processing Experts

Running an Apache Beam Pipeline over Spark on Kubernetes

Running an Apache Beam Pipeline over Spark on Kubernetes

We were asked to build a quick demo to show Spark jobs running on Kubernetes and so as ever we decided to see if there was something fun we could do to learn something new and make the demo a little unique. The Spark job was a hard dependency and they wanted to see how it worked with Kubernetes, so we couldn't choose other technologies there. But the method of execution was up for selection.

So having scoured the web for ideas we settled on the idea of processing some medical data via Apache Beam. At this point, we had no idea whether it was possible but figured it was worth a shot because Beam is hot property at the moment and idea for building portable pipelines for data processing.

For those of you who haven't heard of Apache Beam, it came from Google and has been at the Apache Foundation for a few years building a great community and more and more runners, plugins, extensions, and core components. The output is largely portable due to the runner architecture and you can run Beam pipelines directly on GCP and also Apache Flink, Spark, Apex, Gearpump, Hadoop and more.  Not all pipelines are completely portable but as our use case was pretty simple we figured we could make something work.

Firstly I created a blank Scala image, my initial plan was to write it all in Scala, but after a bit of playing around, and all the examples being in Java and various little niggles, I ended up ditching it in favour of pure Java. 

Initially we have a very simple main method, but with a slight tweak because we wanted to be able to read and write our data from S3 to make it easy to access inside Kubernetes so with that in mind you can see how Beam creates the default options object, but then we extend it with our custom AWSOptions class.

public static void main(String[] args) {
        AppOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(AppOptions.class);
        AWSOptions.formatOptions(options);

        runProcess(options);
}

This class looks like this:

package com.example.beam;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;

public class AWSOptions {

    // AWS configuration values
    private static final String AWS_DEFAULT_REGION = "eu-west-1";
    private static final String AWS_S3_PREFIX = "s3";

    /**
     * Formats BigQueryImportOptions to include AWS specific configuration.
     *
     * @param options for running the Cloud Dataflow pipeline.
     */
    public static void formatOptions(AppOptions options) {
        if (options.getBucketUrl().toLowerCase().startsWith(AWS_S3_PREFIX)) {
            setAwsCredentials(options);
        }

        if (options.getAwsRegion() == null) {
            setAwsDefaultRegion(options);
        }
    }

    private static void setAwsCredentials(AppOptions options) {
        options.setAwsCredentialsProvider(
                new AWSStaticCredentialsProvider(
                        new BasicAWSCredentials(options.getAwsKey(), options.getAwsSecret())));
    }

    private static void setAwsDefaultRegion(AppOptions options) {
        if (options.getAwsRegion() == null) {
            options.setAwsRegion(AWS_DEFAULT_REGION);
        }
    }
}

So with the Options built out we then looked at how to process some data.


Here is a snippet:

private static void runProcess(AppOptions options) {
        Pipeline pipeline = Pipeline.create(options);
        PCollection input = pipeline.apply(TextIO.read().from(options.getBucketUrl()));
        LOG.info("Running wordcount example from ${options.getBucketUrl} to ${options.getOutputPath} ...");

        input.apply(ParDo.of(new Q1Obj())).apply(GroupByKey.create())
                .apply(ParDo.of(new DoFn>, KV>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        String key = c.element().getKey();
                        Iterator iter = c.element().getValue().iterator();

                        Double bigvalue = 0.0;
                        int count = 0;
                        while (iter.hasNext()) {
                            count++;
                            bigvalue = bigvalue + iter.next();
                        }

                        bigvalue = bigvalue / count;

                        c.output(KV.of(key, bigvalue));
                    }
                })).apply("FormatResults", MapElements
                .into(TypeDescriptors.strings())
                .via((KV wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
                .apply(TextIO.write().to(options.getOutputPath() + "/avgcovered"));
...
}

Now we're not hardcore Beam experts so I'm sure there's a lot of stuff we could have done differently here, but it takes the input PCollection, applies a DoFn to the stream of data coming in, then Groups that data via the Key returned in the first stream process. Once we're grouped the data, Beam then creates an average from the resulting arrays and finally formats the results and outputs them to the output location, which in our case is an S3 bucket. 

The output of this bucket we then processed with a Jupyter Notebook and plotly.

Of course, I said that we wanted to run this on Kubernetes, so what's that all about?

Okay, so you can run this using the Direct Runner on your laptop running something like:


mvn clean scala:run -DmainClass=com.example.beam.ProcessHealth2 \
    -DaddArgs="--awsKey=|--awsSecret=|--outputPath=s3:///|--awsRegion=us-east-1"

But we wanted this to run on Spark. So to make that happen we first tried it on a Spark instance using Spark Submit. With a few tweaks, this worked okay. We did have to remove Beam SQL support for now due to a known issue running it in Spark, but other than that the pipeline ran.

Next, we wanted to run it on Kubernetes. The cool thing about Spark is its Kubernetes support can be triggered via Spark Submit, you don't have to stand up containers, wire up the bits and pieces and then tie them together with a Kubernetes deployment. So what does that look like? It took a bit of figuring out as some stuff worked, but its not the most documented process ever.

First up, if you're running the latest version of Kubernetes and Spark 2.4.4 or prior, you'll need to tweak the Spark code to build a version with the latest Kubernetes dependencies this was due to a critical flaw in the K8S API and if you don't you'll see 403 Forbidden a lot!

With that out of the way, you then need to build a Docker container with your code in it and Spark, the easiest way to do this is to get a copy of the Spark distribution and then run:

/bin/docker-image-tool.sh -r  -t  build
/bin/docker-image-tool.sh -r  -t  push

You'll also want to make a small change to the Dockerfile in:

resource-managers/kubernetes/docker/src/main/dockerfiles

to include your pipeline Jar. Or extend it further to make it more flexible!

Once you've got this built and pushed you can then run your pipeline with:

./spark-submit --master k8s://https:// --deploy-mode cluster --name spark-demo --class com.example.beam.ProcessHealth2 --conf spark.executor.instances=5 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark  --conf spark.kubernetes.container.image=/spark: local:///opt/wordcount-app-1.0.0-shaded.jar "--runner=SparkRunner" "--awsKey=" "--awsSecret=" "--outputPath=s3:///" "--awsRegion=us-east-1"

Of course the command line parameters could be anything.


You probably also need to import the K8S API SSL certificate into your local java trust store. The easiest way to do that is via the InstallCert script, which can be found here.

There we go, that was our little demo, hopefully, you found bits of this useful. The actual code can be found on our Gitlab repo here. As I said earlier it wasn't supposed to be a masterclass in Beam programming, as it was just a quick demo for a customer, but it shows the flexibility and how you can run your pipelines easily on Spark on Kubernetes.

Categories:

Compute Code Apache Containers Deployment Kubernetes Big Data