A minimal Dataflow programming engine
DlowNode here.
import { Dflow, type DflowNode } from "dflow";
// Node definition.
const helloWorld: DflowNode = {
kind: "hello",
run: () => console.log("Hello, World!")
};
// Create a dflow instance.
const dflow = new Dflow([helloWorld]);
// Add a node to the graph.
dflow.node("hello");
// Run the dflow graph.
dflow.run();
A node is a block of code that can have inputs and outputs.
A link connects an input to an output.
An input is just a reference to its connected output, if any.
An output can be connected to multiple inputs, and holds a data value that can be undefined or any value that can be serialized into JSON.
A graph represents a program. It can contain nodes and links.
undefined or it does not correspond to input data types, then the node will not be executed.You can run the following examples via npm scripts. See the examples folder for detailed instructions.
This graph computes sin(π / 2) = 1 and prints the result.
import { Dflow, type DflowNode } from "dflow";
const { input, output } = Dflow;
const MathSin: DflowNode = {
kind: "mathSin",
inputs: [input("number")],
outputs: [output("number")],
run(input: number) {
return Math.sin(input);
}
};
const ConsoleLog: DflowNode = {
kind: "consoleLog",
inputs: [input()],
run(input: unknown) {
console.log(input);
}
};
// Create a Dflow instance with the given nodes.
const dflow = new Dflow([MathSin, ConsoleLog]);
// Create nodes.
const sinNodeId = dflow.node("mathSin");
const consoleLogNodeId = dflow.node("consoleLog");
// Create a data node.
// It will create an instance of a node with kind "data"
// This is a special node, which is built-in into every Dflow instance.
const numNodeId = dflow.data(Math.PI / 2);
// Connect numNode to sinNode and sinNode to consoleLog
dflow.link(numNodeId, sinNodeId);
dflow.link(sinNodeId, consoleLogNodeId);
// run graph
dflow.run();
You can also run async nodes. Notice the SleepNode below has an async run() method and the graph is executed with await dflow.run().
import { Dflow, type DflowNode } from "dflow";
const { input, output } = Dflow;
const SumNode: DflowNode = {
kind: "Sum",
inputs: [input(["number"]), input(["number"])],
outputs: [output(["number"])],
run(a: number, b: number) {
return a + b;
}
};
function sleep(timeout: number): Promise {
return new Promise((resolve) => {
setTimeout(() => {
resolve();
}, timeout);
});
}
const SleepNode: DflowNode = {
kind: "Sleep",
async run() {
const timeout = 500;
console.info("sleep node start", `(will sleep ${timeout} ms) zZz`);
await sleep(timeout);
console.info("sleep node end");
}
};
async function runGraph() {
const dflow = new Dflow([SumNode, SleepNode]);
// Create two nodes, num and sum.
const numNodeId = dflow.data(21);
const sumNodeId = dflow.node(SumNode.kind);
// Connect nodes.
dflow.link(numNodeId, [sumNodeId, 0]);
dflow.link(numNodeId, [sumNodeId, 1]);
// Add also an async node.
dflow.node(SleepNode.kind);
// Run graph asynchronously.
await dflow.run();
// Get the result of the sum node.
const result = dflow.out[sumNodeId][0];
if (result !== 42) throw new Error("Unexpected result");
}
await runGraph();
// sleep node start (will sleep 500 ms) zZz
// sleep node end
A Dflow represents a program as an executable graph.
A graph can contain nodes and links. Nodes are executed, sorted by their connections.
Dflow constructor requires a list of node definitions which is an Array<DflowNode>.
Create a new node. Returns node id.
Create a new link and connect two nodes. Returns link id.
The source and target arguments can be either:
[nodeId, position]string0 i.e. the first oneCreate a new data node. Returns node id.
If value is not a valid DflowData, it will be set to undefined.
Execute all nodes, sorted by their connections.
Notice that the run method is async and returns a Promise<void>. In general you should call it with await dflow.run() but if no node in the graph is async, then you can just call it with dflow.run().
Delete node or link with given id.
The itemId argument can be either a node id or a link id.
If it is a node id, then all links connected to that node will be deleted as well as the outputs related to that node.
A graph contains nodes and links.
A DflowGraph can be serialized into JSON so it can be saved and loaded by a Dflow instance.
It has the following attributes:
node: Record<string, string>link: Record<string, DflowLink>data: Record<string, DflowData>Get error messages from last run, indexed by node id.
Get output data of last run, indexed by node id.
Helper to define inputs.
It is supposed to be used inside a node definition.
For example, define an input string named message.
const Print: DflowNode = {
kind: "print",
inputs: [Dflow.input("string", { name: "message" })],
run: (message: string) => {
console.log(message);
}
};
Input with number type.
Dflow.input("number")
Input that accepts both number and string type.
Dflow.input(["number", "string"])
Input with type array and name.
Dflow.input("array", { name: "list" })
Input that accepts any type.
Dflow.input()
Input with any type and named "foo".
Dflow.input([], { name: "foo" })
Optional number input.
Dflow.input("number", { optional: true })
Helper to define outputs.
The signature is similar to Dflow.input() except for the optional parameter, which is not needed for outputs.
It is supposed to be used inside a node definition.
For example, define a number output named π (PI).
const MathPI: DflowNode = {
kind: "mathPI",
outputs: [Dflow.output("number", { name: "π" })],
run: () => Math.PI
};
Includes JSON data types and undefined
The DflowData can be one of the following:
undefinednullbooleannumberstringDflowArrayDflowObject
Where DflowArray is just an array of DflowData and DflowObject is an object with string keys and DflowData values.
Dflow data types represent values that can be serialized as JSON.
The DflowDataType is a literal type; it can be one of the following:
"null""boolean""number""string""array""object"
Both DflowInput and DflowOutput have a types: DflowDataType[] attribute which is used to check if they can be connected.
A special case is when types is an empty array. In this case, the input or output can accept any data type.
Defines a node input.
You can use Dflow.input() helper to create them.
A DflowInput has the following attributes:
name?: stringtypes: DflowDataType[]optional?: booleanDefines a node output.
You can use Dflow.output() helper to create them.
A DflowOutput has the following attributes:
name?: stringtypes: DflowDataType[]Connects two nodes in the graph.
A DflowLink is a list with four elements:
sourceNodeId: stringsourcePosition: number0 if not provided in dflow.link()targetNodeId: stringtargetPosition: number0 if not provided in dflow.link()Defines a block of code: it can have inputs and outputs.
Dflow does not provide a set of pre-defined nodes. You must implement your own nodes.
For example, a node "addition" could be implemented using BigInt or some arbitrary-precision library, according to your needs... but it is as easy as creating a function. Basically, a node is an object with a run method and a few metadata properties.
See the examples/nodes folder: it contains few node definitions used by dflow tests.
A DflowNode has the following attributes:
kind: stringinputs?: DflowInput[]outputs?: DflowOutput[]run(inputs): outputsWhen you define a set of nodes, you may want to start the file with something like this:
import { Dflow, type DflowNode } from "dflow";
const { input, output } = Dflow;
So you have the DflowNode type that can help you define nodes and the input and output helpers to define inputs and outputs.
This is a node that implements Array.prototype.join() so its first input is an array and it has an optional second input for the separator, which defaults to a comma.
const ArrayJoin: DflowNode = {
kind: "arrayJoin",
inputs: [
input("array"),
input("string", { name: "separator", optional: true })
],
outputs: [output("string")],
run(array: DflowArray, separator: string | undefined) {
return array.join(separator);
}
};
First of all, the name attribute of inputs and outputs is optional and ignored by Dflow.
It could be used by a UI to show meaningful names. So there is no need to match the input names with the run method arguments.
Notice that the run method arguments are typed according to the inputs definition.
You can import DflowArray type from dflow package to type the first argument.
The second argument is either a string or undefined, because the input is marked as an optional string.
It is recommended to just use the types defined by DflowData.
This is the thing: the node above will run only if the first input is connected to an output that produces an array and the second input is either not connected or connected to an output that produces a string.
It is also recommended to not type the return value, just let TypeScript infer it.
Here is an example of nodes generated from the Math global.
import { Dflow } from "dflow";
// Generate Dflow nodes for all Math properties and functions.
const mathNodes = Object.getOwnPropertyNames(Math).map((key) => {
// @ts-expect-error: expression of type 'string' can't be used to index type 'Math'
const item = Math[key];
const kind = `Math.${key}`;
const outputs = [Dflow.output("number")];
// If the item is a number, create a node that outputs that number.
if (typeof item === "number") {
return {
kind,
outputs,
run: () => item
};
}
// If the item is a function, wrap in in the run method.
if (typeof item === "function") {
return {
kind,
// Get the number of inputs from the function's length property.
inputs: Array(item.length).fill(Dflow.input("number")),
outputs,
run: (...args: number[]) => {
return item(...args);
}
};
}
// Not needed, just to make TS happy.
throw new Error(`Unsupported Math property: ${key}`);
});
// Create a Dflow instance with the generated node definitions.
const dflow = new Dflow(mathNodes);
// Compute Math.trunc(Math.E)
const nodeId1 = dflow.node("Math.E");
const nodeId2 = dflow.node("Math.trunc");
dflow.link(nodeId1, nodeId2);
dflow.run();
console.log(dflow.out);
// { n0: [ 2.718281828459045 ], n1: [ 2 ] }
Dflow context is bound to every node at runtime, hence it is accessible via this inside node run.
// Create a host with an API context.
import { Dflow, type DflowNode } from "dflow";
// Add an API client to the context.
// A Dflow context is a Record that will be bound to nodes at runtime.
type Context = {
apiClient: ApiClient;
};
// Of course this is a dummy API client.
class ApiClient {
apiKey: string;
constructor(apiKey: ApiClient["apiKey"]) {
this.apiKey = apiKey;
}
async fetchSomeData(
payload: string
): Promise<{ status: string; payload: string }> {
return await Promise.resolve({ status: "SUCCESS", payload });
}
}
// This nodes uses the apiClient from the context...
const CustomNode: DflowNode = {
kind: "Custom",
inputs: [Dflow.input("string")],
outputs: [Dflow.output("object")],
// ... notice that we specify the type of `this` via the
//
// this: Context
//
// argument on the run method.
async run(this: Context, data: string) {
const result = await this.apiClient.fetchSomeData(data);
return result;
}
};
// Create a Dflow instance and add the context.
const dflow = new Dflow([CustomNode]);
dflow.context.apiClient = new ApiClient("s3cret");
const nodeId = dflow.node(CustomNode.kind);
const dataId = dflow.data("foo");
dflow.link(dataId, nodeId);
await dflow.run();
const result = dflow.out[nodeId][0] as { status: string; payload: string };
if (!result || result.status !== "SUCCESS" || result.payload !== "foo")
console.error("Unexpected result:", result);
else console.info(result.status);
Optional error logger.
By default, Dflow does not log errors. For example to send errors to STDERR, you can do:
dflow.ERR = console.error;
The logger must be a function that accepts a single argument of any type. Usually it will be an Error object but it could be anything that can be thrown by a node, for example a string.