Languages and Abstraction

During the last decade – as Moore’s law ceased to predict CPU clock speeds, hardware became commoditized and then increasingly virtualized, and “big data” evolved as a ubiquitous trend – concurrency (finally) moved into mainstream focus. This sparked renewed interest for programming paradigms that previously had attracted only limited attention outside of academia, like functional programming or the actor model. Adoption in practice was spurred by new sometimes multi-paradigm languages such as Scala or Clojure, by the addition of functional elements to mainstream languages like Java, and by frameworks like Spark, Hadoop, or Akka actors. Interestingly, the Java Virtual Machine has remained a solid foundation throughout. Despite the recent uptake in OS-level virtualization (Docker comes to mind), and despite the ensuing obsolescence of some JVM features (for instance, application isolation), there is nothing in sight that could replace the JVM anytime soon.

While the JVM is thus becoming increasingly polyglot, CloudKeeper is (to the best of our knowledge) the first JVM-based dataflow programming language that allows type-safe integration with existing JVM-based languages. Dataflow programming, and thus CloudKeeper, is particularly well-suited for “programming in the large”, that is, integrating and orchestrating smaller software modules in a high-level, declarative fashion. Thus, CloudKeeper gives an up-to-date interpretation of the original Java motto: “Write once, run everywhere”. Thanks to its abstraction of low-level concerns such as scheduling, synchronization, or data movement, CloudKeeper dataflows are not just portable across operating systems but also between single-machine development environments and distributed production environments in the cloud.

Dataflow Programming

Suppose we plan to develop software that takes as input the DNA fragments of a cancer patient and produces as output a report containing treatment recommendations for the oncologist. A bird’s-eye view of the necessary steps could look as follows.

Alignment Variant Calling Genomic Annotations Classification Alignment Statistics Variant Statistics Annotation Statistics Filtering Knowledge Base Report Generation

Each of the individual steps in this flow chart represent software modules that can be implemented independently. For instance, sequence alignment is a well-know problem, and it is reasonably straightforward to convert a textbook algorithm (say, Smith-Waterman) into plain Java. Optimizing such an implementation is often no trivial task, and traditional imperative programming languages and their standard libraries allow for many low-level techniques (say, ensuring locality of reference, avoiding lock contention, etc.) to ensure good performance.

Now what are best practices to integrate all these software modules so to comprise the entire genome-analysis software? One that is maintainable, scalable, and robust? While plain Java of course is expressive enough, it is in fact “overrich” for the task – giving a degree of freedom and requiring attention to low-level details that makes it hard to see the forest for the tree. For instance, imperative programming is inherently sequential, so concurrency requires the explicit use of low-level primitives such as threads and semaphores. Moreover, high-level features like checkpointing, scheduling, or data movement across machines would have to be explicitly implemented.

It quickly becomes obvious that an abstraction layer is needed that facilitates the aforementioned “programming in the large”. Generally speaking, data flows are an appropriate abstraction for problems that can be described by flow charts, and CloudKeeper is an implementation for tight integration with software modules written in JVM-based languages.

Concrete Example

For the purpose of a concise example, let us assume greatly simplified requirements for the genome-analysis dataflow. The input is a collection of DNA fragments; that is, character sequences consisting of the letters A, G, C, and T (corresponding to the four bases adenine, guanine, cytosine, and thymine). The output report is supposed to be plain text, mentioning the average length of all DNA fragment and whether the sequence “ACTG” was found in a fragment. As an example, for the input on the left-hand side we expect the “report” shown on the right-hand side:

Input

ACTGTA
CGATA
CGGA

Output

Avg. read length is 5.00, and sequence 'ACTG' was detected.

Source Code

We assume that three independent software modules have already been implemented in plain Java:

Average Line Length
Given a character sequence, computes the average length of all newline-separated parts.
Find
Given a text and a subsequence, determines whether the subsequence is contained anywhere in the text.
Report
Given (a) the average line length, (b) a subsequence, and (c) whether the subsequence was found, produces the textual report as specified above.

A suitable CloudKeeper dataflow (“composite module” in CloudKeeper terminology) could hence look as shown in the following diagram. Click on the individual modules to see the source code below.

reads report “ACTG” avgLineLengthModule text avg findModule text substring wasFound reportModule avgLineLength subsequence wasFound report
@CompositeModulePlugin("Analyzes String consisting of DNA fragments")
public abstract class GenomeAnalysisModule
    extends CompositeModule<GenomeAnalysisModule> {
  public abstract InPort<String> reads();
  public abstract OutPort<String> report();

  InputModule<String> sequence = value("ACTG");

  AvgLineLengthModule avgLineLengthModule = child(AvgLineLengthModule.class)
    .text().from(reads());
  FindModule findModule = child(FindModule.class)
    .text().from(reads())
    .substring().from(sequence);
  ReportModule reportModule = child(ReportModule.class)
    .avgLineLength().from(avgLineLengthModule.avg())
    .subsequence().from(sequence)
    .wasDetected().from(findModule.wasFound());

  { report().from(reportModule.report()); }
}
@SimpleModulePlugin("Computes the average line length in a text")
public abstract class AvgLineLengthModule
    extends SimpleModule<AvgLineLengthModule> {
  public abstract InPort<String> text();
  public abstract OutPort<Double> avg();

  @Override
  public void run() throws IOException {
    try (BufferedReader reader
        = new BufferedReader(new StringReader(text().get()))) {
      avg().set(
        reader.lines().collect(Collectors.averagingInt(String::length))
      );
    }
  }
}
@SimpleModulePlugin("Determines whether a text contains a substring")
public abstract class FindModule extends SimpleModule<FindModule> {
  public abstract InPort<String> text();
  public abstract InPort<String> substring();
  public abstract OutPort<Boolean> wasFound();

  @Override
  public void run() throws IOException {
    String substring = substring().get();
    try (BufferedReader reader
        = new BufferedReader(new StringReader(text().get()))) {
      wasFound().set(
        reader.lines().anyMatch(line -> line.contains(substring))
      );
    }
  }
}
@SimpleModulePlugin("Aggregates results into a report")
public abstract class ReportModule extends SimpleModule<ReportModule> {
  public abstract InPort<Double> avgLineLength();
  public abstract InPort<String> subsequence();
  public abstract InPort<Boolean> wasDetected();
  public abstract OutPort<String> report();

  @Override
  public void run() {
    report().set(String.format(
      "Report: Avg. read length is %.2f, and sequence '%s' was%s detected.",
      avgLineLength().get(), subsequence().get(),
      wasDetected().get() ? "" : " not"
    ));
  }
}

Object Model

The CloudKeeper DSL is a convenient and concise way to express dataflows. It is based on the idea of using Java language constructs to represent CloudKeeper language constructs. For instance, as seen in the above example, Java class declarations are used to represent CloudKeeper module declarations, and Java method declarations are used to represent CloudKeeper port declarations.

Nonetheless, the DSL is an entirely optional component when using CloudKeeper. In particular, the Java language constructs used to define CloudKeeper language constructs are immediately translated into an abstract syntax tree. In order to interpret and execute a dataflow, CloudKeeper then transforms the abstract syntax tree into an intermediate representation. This transformation includes the following steps:

Linking
Resolving symbolic references (such as java.lang.String) into object references.
Verification
Ensuring the consistency of invocations with their definitions, checking type safety, etc.
Optimization
Precomputing frequently needed information like, e.g., dependency relations between ports.

The following example shows how to programmatically create an abstract syntax tree for the above genome-analysis composite module. Thanks to JAXB annotations on the object-model classes, the abstract syntax tree can be mapped to the XML representation also shown below.

new MutableCompositeModuleDeclaration()
  .setSimpleName("GenomeAnalysisModule")
  .setTemplate(
    new MutableCompositeModule()
      .setDeclaredPorts(Arrays.asList(
        new MutableInPort()
          .setSimpleName("reads")
          .setType(
            new MutableDeclaredType()
              .setDeclaration(String.class.getName())
          ),
        /* ... */
      ))
      .setModules(Arrays.asList(
        new MutableInputModule()
          .setSimpleName("sequence")
          .setValue("ACTG"),
        new MutableProxyModule()
          .setSimpleName("avgLineLengthModule")
          .setDeclaration("xyz.cloudkeeper.samples.maven.AvgLineLengthModule"),
        /* ... */
      ))
      .setConnections(Arrays.asList(
        new MutableParentInToChildInConnection()
          .setFromPort("reads")
          .setToModule("avgLineLengthModule")
          .setToPort("text"),
        /* ... */
      ))
    );
    
<composite-module-declaration>
  <simple-name>GenomeAnalysisModule</simple-name>
  <template>
    <ports>
      <in-port>
        <name>reads</name>
        <declared-type ref="java.lang.String"/>
      </in-port>
      <!-- ... -->
    </ports>
    <modules>
      <input-module>
        <simple-name>sequence</simple-name>
        <declared-type ref="java.lang.String"/>
        <raw ref="cloudkeeper.serialization.StringMarshaler">
          <serialized-as-byte-stream id="">QUNURw==</serialized-as-byte-stream>
        </raw>
      </input-module>
      <proxy-module ref="xyz.cloudkeeper.samples.maven.AvgLineLengthModule">
        <simple-name>avgLineLengthModule</simple-name>
      </proxy-module>
      <!-- ... -->
    </modules>
    <connections>
      <parent-to-child-connection
        from-port="reads" to-module="avgLineLengthModule" to-port="text"/>
      <!-- ... -->
    </connections>
  </template>
</composite-module-declaration>

Related Work

Of course, dataflow programming is not a novel concept, and a lot of software tools exist that facilitate dataflow programming. The following gives a non-exhaustive comparison to existing tools and concepts.

Scientific Workflow Systems

Scientific workflow systems are probably closest to CloudKeeper in motivation. However, whereas existing systems are primarily targeted at researchers, CloudKeeper’s focus is on software-engineering needs. As an example, the myGrid team that also produced and develops the Taverna workflow system (see below), created the myExperiment web site for sharing and collaborating on scientific workflow. While CloudKeeper dataflows could be shared in similar ways, the goal behind CloudKeeper is to treat dataflows as first-class software code – and reusing established software-engineering best practices as much as possible. That is, CloudKeeper dataflows are meant to be shared in the same way Java code artifacts are shared: In artifact repositories such as the Central Repository and using tools like Nexus or Artifactory. Alternatively, CloudKeeper dataflows could also just be “inlined” in any JVM-based language.

The following list provides examples of existing scientific workflow systems. See Wikipedia for a more comprehensive list. Also note the Common Workflow Language (CWL) working group that is working towards a “vendor-neutral standard for representing workflows intended to be portable across a variety of computing platforms”.

Batch/Stream Processing

Various solutions exist to transform and accumulate individual records in data sets. Unlike these solutions, CloudKeeper makes no assumptions about its inputs – which may be arbitrary Java objects. That is, CloudKeeper does not rely on a record model or the fact that inputs can be naturally split into “chunks”.