Pipelining & Workflow Basics


We've previously met the flux.* target namespace, and discussed how it provides basic tooling for workflows.

This demo builds on those ideas and introduces flux.pipeline/<targets> to describe a miniature ETL. For this exercise we'll also use idioms for Structured IO and Logging so that what we build is friendly reading for both people and machines.

Miniature ETL


Consider the following Makefile for describing a miniature ETL:

Summary
#!/usr/bin/env -S make -f
# Describes a JSON-backed ETL pipeline with `compose.mk`
#
# Part of the `compose.mk` repo. This file runs as part of the test-suite.
# USAGE: ./demos/etl-json.mk

include compose.mk
.DEFAULT_GOAL := etl.safe

# Declare an ETL pipeline, using `flux.pipeline` to bind tasks together.
# Roughly equivalent to => `make extract | make transform | make load`
etl: flux.pipeline.verbose/extract,transform,load

# Declare a "safe" version of the pipeline that handles failure/cleanup
etl.safe: flux.try.except.finally/etl,etl.failed,etl.cleanup

# Simulate individual tasks for extract/transform/load.  
# For convenience we ignore input and emit new JSON using the `jb` tool.
extract: 
    ${jb} stage=extracted
transform: 
    ${jb} stage=transformed
load:
    ${jb} stage=loaded

# Simulate ETL handlers for failure and cleanup
etl.failed:
    $(call log, ETL failed!)
etl.cleanup:
    $(call log, Cleaning up ETL!)

Basically this works the way you'd expect, and we can execute the ETL end-to-end with make etl, or with error-handling using make etl.safe. As a bonus, developers retain the ability to execute individual pipeline components or cleanup piecewise, or individually.

One benefit of flux.pipeline is getting previews of the results at different stages. Running the example so far looks like this:

Despite appearances, this is actually pipe-safe: the final output of {"stage":"loaded"} is the only thing on stdout, the rest is stderr.

For related reading, see the API docs for flux.pipeline/<t1>,..<tn>. The platform lifecycle demo shows flow control with flux.pipe.fork/<target_list>

Extending the Pipeline


Maybe any example passing around fake JSON looks tight and tidy, but.. it's really easy to extend this. Let's sketch small changes that can actually turn this into an image pipeline, using docker idioms from compose.mk to leverage external tools.

Summary
#!/usr/bin/env -S make -f
# Describes an image pipeline with `compose.mk`.
#
# Part of the `compose.mk` repo. This file runs as part of the test-suite.  
# USAGE: ./demos/etl-img.mk 

include compose.mk
.DEFAULT_GOAL := etl

# Setup a default docker image and tag for imagemagick tool container.
img.imagemagick="dpokidov/imagemagick:latest"

# Setup a target that fixes both an image & the entrypoint, but still
# defers to callers for deciding the exact command to pass.
convert:; ${docker.image.run}/${img.imagemagick},magick

# First the extract-task will kick off the pipeline by just injecting any image.
extract:; cat docs/img/icon.png

# Back to the ETL, now our transform-task is ready to stream stdin
# into the dockerized `convert` tool.  We need to pass it some kind of
# instruction in the `cmd` variable, so we just use imagemagick's own
# syntax for "convert the png on stdin to a jpeg on stdout".
transform:; ${stream.stdin} | cmd="png:- jpg:-" ${make} convert

# For the "load" part of the ETL, we could do something else with imagemagick,
# but let's make it interesting and preview the jpeg in the terminal.
# This is actually a one-liner and requires no fancy dependencies!
# Under the hood, `stream.img.preview` target uses a dockerized `chafa`.
load: stream.img.preview

# Putting it together..
etl: flux.pipeline/extract,transform,load

For more information, see the api docs for docker.run.image and the docs for raw docker support.

Discussion


It's a neat trick that compose.mk has some features that look like Luigi or Airflow if you squint, but of course it's not really made for ETLs. Actually flux.* is more similar in spirit to things like declarative pipelines in Jenkins1.

References