Dflow

A minimal Dataflow programming engine

Features

Expressive and simple API
It is easy to create nodes
Written in TypeScript
Whole implementation is in this dflow.ts file. It is roughly 4kb once minified.
Minimal type system
It is possible to connect an output to an input only if the data types are matching.

Notice that you must implement your own nodes. For example a node "addition" could be implemented using BigInt or some arbitrary-precision library, according to your needs. You can find example nodes implementing basic JavaScript features in the examples/nodes/ folder.

How it works

A node is a block of code that can have inputs and outputs.

A link connects an input to an output.

A graph represents a program. It can contain nodes and links. Nodes are executed, sorted by their connections.

An input is just a reference to its connected output, if any.

An output can be connected to multiple inputs, and hold a **data** value that can be undefined or any value that can be serialized into JSON.

Usage

You can run the following examples via npm scripts: see examples folder to follow instructions.

This is a graph that will compute sin(π / 2) = 1 and print the result.


import { Dflow, type DflowNode } from "dflow";

const { input, output } = Dflow;

const MathSin: DflowNode = {
  kind: "mathSin",
  inputs: [input("number")],
  outputs: [output("number")],
  run(input: number) {
    return Math.sin(input);
  }
};

const ConsoleLog: DflowNode = {
  kind: "consoleLog",
  inputs: [input()],
  run(input: unknown) {
    console.log(input);
  }
};

// Create a Dflow instance with the given nodes.
const dflow = new Dflow([MathSin, ConsoleLog]);

// Create nodes.
const sinNodeId = dflow.node("mathSin");
const consoleLogNodeId = dflow.node("consoleLog");

// Create a data node.
// It will create an instance of a node with kind "data"
// This is a special node, which is built-in into every Dflow instance.
const numNodeId = dflow.data(Math.PI / 2);

// Connect numNode to sinNode and sinNode to consoleLog
dflow.link(numNodeId, sinNodeId);
dflow.link(sinNodeId, consoleLogNodeId);

// run graph
dflow.run();
      

You can also run async nodes.


import { Dflow } from "dflow";

const { input, output } = Dflow;

const SumNode = {
  kind: "Sum",
  inputs: [input(["number"]), input(["number"])],
  outputs: [output(["number"])],
  run(a: number, b: number) {
    return a + b;
  }
};

function sleep(timeout: number): Promise {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, timeout);
  });
}

const SleepNode = {
  kind: "Sleep",
  async run() {
    const timeout = 500;
    console.info("sleep node start", `(will sleep ${timeout} ms) zZz`);
    await sleep(timeout);
    console.info("sleep node end");
  }
};

async function runGraph() {
  const dflow = new Dflow([SumNode, SleepNode]);

  // Create two nodes, num and sum.

  const numNodeId = dflow.data(21);
  const sumNodeId = dflow.node(SumNode.kind, "sum");

  // Connect nodes.
  dflow.link(numNodeId, [sumNodeId, 0]);
  dflow.link(numNodeId, [sumNodeId, 1]);

  // Add also an async node.
  dflow.node(SleepNode.kind);

  // Run graph asynchronously.
  await dflow.run();

  // The id "sum" was passed as `wantedId` on SumNode .
  const result = dflow.out.sum[0];
  if (result !== 42) console.error("Unexpected result", result);
}

runGraph();
      

API

A Dflow represents a program as an executable graph.

A graph can contain nodes and links. Nodes are executed, sorted by their connections.

Constructor

Dflow constructor requires a list of node definitions.


import { Dflow, type DflowNode } from "dflow";

// Node definition.
const helloWorld: DflowNode = {
  kind: "hello",
  run: () => console.log("Hello, World!")
}

// Create a dflow instance.
const dflow = new Dflow([helloWorld]);

// Add a node to the graph.
dflow.node("hello")

// Run the dflow graph.
dflow.run()
      

Methods

dflow.node(kind)

Create a new node. Returns node id.

dflow.link(source, target)

Create a new link and connect two nodes. Returns link id.

If source or target position is omitted, then it defaults to 0 i.e. the first position.

dflow.data(value)

Create a new data node. Returns node id.

If value is not a valid DflowData, it will be set to undefined.

dflow.run()

Execute all nodes, sorted by their connections.

dflow.delete(itemId)

Delete node or link with given id.

Getters

dflow.graph

A graph contains nodes and links.

A DflowGraph has the following attributes:

node: Record<string, string>
Key is node id, value is node kind
link: Record<string, DflowLink>
Key is link id
data: Record<string, DflowData>
Data nodes: key is node id, value is node kind

dflow.error

Get error messages from last run, indexed by node id.

dflow.out

Get output data of last run, indexed by node id.

Static methods

Dflow.input()

Helper to define inputs.

Dflow.output()

Helper to define outputs.

Define an output named π (PI) for a constant node


const MathPI: DflowNode = {
  kind: "mathPI",
  outputs: [Dflow.output("number", { name: "π" })],
  run: () => Math.PI
}
      

Types

DflowData

Includes JSON data types and undefined

The DflowData can be one of the following:

DflowDataType

The DflowDataType is a literal type; it can be one of the following:

DflowInput

A DflowInput has the following attributes:

name?: string
Ignored by Dflow, but could be used by UI.
types: DflowDataType[]
An input can be connected to an output only if the data types match.
optional?: boolean
Any input is required by default, i.e. not optional. If an input is not optional and it has no data, then its node will not be executed. If an input is optional, then its node will be executed even if the input has no data.

DflowOutput

A DflowOutput has the following attributes:

name?: string
Ignored by Dflow, but could be used by UI.
types: DflowDataType[]
An output can be connected to an input only if the data types match.

Connects two nodes in the graph.

DflowNode

Defines a block of code: it can have inputs and outputs.

A DflowNode has the following attributes:

kind: string
inputs?: DflowInput[]
outputs?: DflowOutput[]
run(inputs): outputs

Define a "sum" node.


import { Dflow, type DflowNode } from "dflow";

const Sum: DflowNode = {
  kind: "sum",
  inputs: [Dflow.input("number"), Dflow.input("number")];
  outputs: [Dflow.output("number")];
  run(a: number, b: number) {
    return a + b;
  }
}