Note that while one of ambitious goals of Couler is to support multiple workflow engines, Couler currently only supports Argo Workflows as the workflow orchestration backend. An ambitious goal of Couler is to provide support for multiple workflow engines. While it initially supported only Argo Workflows for workflow orchestration, we are actively working on enhancing our support for Airflow and the current system supports about 40-50% of the Airflow API. In addition, if you are looking for a Python SDK that provides access to all the available features from Argo Workflows, you might want to check out the low-level Python SDK maintained by the Argo Workflows team.
You can find a list of organizations who are using Couler in ADOPTERS.md. If you'd like to add your organization to the list, please send us a pull request.
Many workflow engines exist nowadays, e.g. Argo Workflows, Tekton Pipelines, and Apache Airflow. However, their programming experience varies and they have different level of abstractions that are often obscure and complex. The code snippets below are some examples for constructing workflows using Apache Airflow and Kubeflow Pipelines.
Apache Airflow | Kubeflow Pipelines |
---|---|
|
|
Couler is a system for unified Mechine Learning (ML) workflow optimization in cloud and the contributions are outlined below::
Please see the following sections for installation guide and examples.
python3 -m pip install git+https://github.com/couler-proj/couler --ignore-installed
Alternatively, you can clone this repository and then run the following to install:
python setup.py install
Click here to launch the interactive Katacoda environment and learn how to write and submit your first Argo workflow using Couler Python SDK in your browser!
This example combines the use of a Python function result, along with conditionals, to take a dynamic path in the workflow. In this example, depending on the result of the first step defined in flip_coin()
, the template will either run the heads()
step or the tails()
step.
Steps can be defined via either couler.run_script()
for Python functions or couler.run_container()
for containers. In addition, the conditional logic to decide whether to flip the coin in this example is defined via the combined use of couler.when()
and couler.equal()
.
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter
def random_code():
import random
res = "heads" if random.randint(0, 1) == 0 else "tails"
print(res)
def flip_coin():
return couler.run_script(image="python:alpine3.6", source=random_code)
def heads():
return couler.run_container(
image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']
)
def tails():
return couler.run_container(
image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']
)
result = flip_coin()
couler.when(couler.equal(result, "heads"), lambda: heads())
couler.when(couler.equal(result, "tails"), lambda: tails())
submitter = ArgoSubmitter()
couler.run(submitter=submitter)
This example demonstrates different ways to define the workflow as a directed-acyclic graph (DAG) by specifying the dependencies of each task via couler.set_dependencies()
and couler.dag()
. Please see the code comments for the specific shape of DAG that we've defined in linear()
and diamond()
.
import couler.argo as couler
from couler.argo_submitter import ArgoSubmitter
def job(name):
couler.run_container(
image="docker/whalesay:latest",
command=["cowsay"],
args=[name],
step_name=name,
)
# A
# / \
# B C
# /
# D
def linear():
couler.set_dependencies(lambda: job(name="A"), dependencies=None)
couler.set_dependencies(lambda: job(name="B"), dependencies=["A"])
couler.set_dependencies(lambda: job(name="C"), dependencies=["A"])
couler.set_dependencies(lambda: job(name="D"), dependencies=["B"])
# A
# / \
# B C
# \ /
# D
def diamond():
couler.dag(
[
[lambda: job(name="A")],
[lambda: job(name="A"), lambda: job(name="B")], # A -> B
[lambda: job(name="A"), lambda: job(name="C")], # A -> C
[lambda: job(name="B"), lambda: job(name="D")], # B -> D
[lambda: job(name="C"), lambda: job(name="D")], # C -> D
]
)
linear()
submitter = ArgoSubmitter()
couler.run(submitter=submitter)
Note that the current version only works with Argo Workflows but we are actively working on the design of the unified interface that is extensible to additional workflow engines. Please stay tuned for more updates and we welcome any feedback and contributions from the community.
Please cite the repo if you use the code in this repo.
@misc{Couler,
author = {Xiaoda Wang, Yuan Tang, Tengda Guo, Bo Sang, Jingji Wu, Jian Sha, Ke Zhang, Jiang Qian, Mingjie Tang},
title = {Couler: Unified Machine Learning Workflow Optimization in Cloud},
year = {2024},
publisher = {40th IEEE International Conference on Data Engineering},
howpublished = {\url{https://arxiv.org/abs/2403.07608}}.
}