API Reference

This page documents the public API exposed by AsyncFlow.

Creating a Flow

class asyncflow.BaseFlow(spec: Optional[Union[asyncflow.Sequence, asyncflow.Parallel]] = None)

Base class for a flow.

There are two APIs available to construct flows. The first is to instantiate a flow with no arguments and use the flow object to decorate functions:

flow = AsyncioFlow()
@flow()
def f(): ...

The second is to pass a Series or Parallel object to the constructor:

dag = Series(Parallel(f, g), h)
flow = AsyncioFlow(dag)
Parameters

spec – optionally provide a Series or Parallel object to specify the flow

__call__(upstream: Optional[Union[List[Callable[], Any]], Callable[], Any]]] = None, lock: Optional[Union[asyncflow.Lock, asyncflow.Semaphore]] = None) → Callable[[Callable[], Any]], Callable[], Any]]

Add a function to a flow.

BaseFlow objects can be used as a decorator to add functions to the flow:

flow = AsyncioFlow()  # inherits from ``BaseFlow``
@flow()
def f(): ...
Parameters
  • upstream – a function or a list of functions that must complete execution before this function can be executed

  • lock – a lock that needs to be acquired before this function can run

class asyncflow.AsyncioFlow(spec: Optional[Union[asyncflow.Sequence, asyncflow.Parallel]] = None)

Like BaseFlow but for the Asyncio runtime.

class asyncflow.CurioFlow(spec: Optional[Union[asyncflow.Sequence, asyncflow.Parallel]] = None)

Like BaseFlow but for the Curio runtime.

class asyncflow.TrioFlow(spec: Optional[Union[asyncflow.Sequence, asyncflow.Parallel]] = None)

Like BaseFlow but for the Trio runtime.

Locks

class asyncflow.Lock

Create a lock.

When provided to a function in a flow, the function will only run when no other function with the lock is running.

Locks can be passed with the decorator based API using the lock argument,

@flow(lock=l)
def f(): ...

or the programmatic API using WithLock

flow = Sequence(WithLock(f, l))
class asyncflow.Semaphore(value: int)

Create a semaphore.

This is the same as Lock but it can be acquired multiple times.

Parameters

value – the number of times the semaphore can be acquired.

Alternative API

class asyncflow.Sequence(*args: Union[asyncflow.Sequence, asyncflow.Parallel, asyncflow.WithLock, Callable[], Any]])

Define a flow as a succession of operations.

For example, if functions f, g and h should be executed sequentially, this can be specified with

Sequence(f, g, h)

This can be combined with Parallel to create complex flows programatically.

class asyncflow.Parallel(*args: Union[asyncflow.Sequence, asyncflow.Parallel, asyncflow.WithLock, Callable[], Any]])

Define a flow as a set of parallel operations.

For example, if functions f, g and h can all run in parallel, this can be specified with

Parallel(f, g, h)

This can be combined with Sequence to create complex flows programatically.

class asyncflow.WithLock(func: Job, lock: LockPlaceholder)

Require a lock to be acquired.

This is used to add a lock to a function when the flow is specified with Sequence or Parallel. It is not required when using the decorator-based API.

For example, in the following, flow will allow up to 2 of the functions f, g and h to run at a time:

from asyncflow import Parallel, WithLock, Semaphore

s = Semaphore(2)

flow = AsyncioFlow(Parallel(
    WithLock(f, s),
    WithLock(g, s),
    WithLock(h, s)
)
Parameters
  • func – the function

  • lock – the lock to associate with func