Skip to main content

Transformers

A Transformer sits between a data source and a North connector. It converts incoming data into the format the North connector expects before it is placed in the connector's cache.

Transformers are configured on the North connector, in the Transformers section of its settings page.


Configuring a Transformer

Click Add in the Transformers section to open the configuration modal. Three fields guide the setup.

1 — Source

Choose where the data comes from.

SourceDescription
South connectorData collected by one of the configured South connectors
OIBus APIData pushed to OIBus via the REST API. Requires a Data Source ID string to identify the originating system
OIAnalytics SetpointWrite-back commands sent from OIAnalytics

2 — Item Scope (South connectors only)

When the source is a South connector, you can restrict which of its items the transformer applies to.

ScopeDescription
All itemsEvery item of the selected South connector goes through this transformer
By groupOnly items belonging to a specific group are processed
Specific itemsHand-pick individual items from the search list
tip

A single North connector can have multiple transformers with different sources and scopes. This lets you apply different conversions to different parts of the same South connector — for example, one transformer for a group of temperature sensors and another for flow meters.

3 — Transformer

Once a source is selected, the list is automatically filtered to show only compatible transformers. The available choices depend on the type of South connector chosen:

South connector typeCompatible standard transformers
OPC UA, OPC Classic, Modbus, ADS, OSIsoft PI (PLC / historian)CSV, JSON, MQTT payloads, Modbus values, OIAnalytics (select time precision), OPCUA values
REST, SFTP, FTP, Folder Scanner, SQL databases (file / query-based)Time Values (from CSV), MQTT payloads (from CSV), CSV (from JSON)
OIBus APITime Values (from CSV), MQTT payloads (from CSV), CSV (from JSON)
OIAnalytics SetpointMQTT payloads, Modbus values, OPCUA values

Custom transformers are also shown in this list and are always named by the user.

After choosing a transformer, its settings form appears directly in the modal.


Standard Transformers

Standard transformers are built-in and require only configuration — no coding.


For PLC and historian South connectors

These transformers process the structured time-value arrays produced by connectors such as OPC UA, OPC Classic, Modbus, TwinCAT ADS, and OSIsoft PI.

Each time-value object has this shape:

{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } }

CSV

Converts the time-values array into a CSV file. Each row is one reading. The three columns (point ID, timestamp, value) are mapped to configurable column titles. Optional per-column JavaScript expressions let you transform individual values before they are written.

Settings
SettingDescription
FilenameOutput filename; @CurrentDate is replaced with the current UTC timestamp
EncodingUTF_8, UTF_8_BOM, LATIN_1, or UTF_16_LE
HeaderWhether to include the column header row
DelimiterColumn separator
NewlineLine ending (LF, CRLF, CR)
Quote charDOUBLE_QUOTE, SINGLE_QUOTE, or NONE
Escape charDOUBLE_QUOTE or BACKSLASH
Point ID columnHeader label for the point identifier column
Value columnHeader label for the value column
Timestamp columnHeader label for the timestamp column
Timestamp typeOutput format (iso-string, unix-epoch, unix-epoch-ms, string)
Timestamp formatCustom pattern when type is string (e.g. yyyy-MM-dd HH:mm:ss.SSS)
TimezoneTimezone for formatting when type is string
Point ID processOptional JS expression applied to each point ID (variable: value)
Value processOptional JS expression applied to each value (variable: value)
Timestamp processOptional JS expression applied to each formatted timestamp (variable: value)
Example

Input (OPC UA items: temp1, pressure1)

[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } }
]

With pointIdProcess: value.split('/').pop() and valueProcess: String(value).replace('.', ','):

Output CSV

Reference,Timestamp,Value
temp1,2024-01-15T10:00:00.000Z,23,5
pressure1,2024-01-15T10:00:00.000Z,102,1

JSON

A transparent pass-through that re-serializes the time-values array as a JSON file. Use this when the North connector expects a generic JSON file rather than the internal time-values format (e.g. a file-based North like Azure Blob or File Writer).

This transformer has no configuration options.

Example

Input — any time-values array from an OPC UA or Modbus South connector.

Output — the identical JSON content saved as a .json file.


MQTT payloads

Maps each reading to an MQTT message using a point ID → topic mapping table. Readings whose point ID has no match are silently dropped. The payload is a JSON string containing the data fields merged with the timestamp.

Settings
SettingDescription
MappingList of { pointId, topic } pairs
Example

Mapping: plant/temp1 → sensors/temperature, plant/pressure1 → sensors/pressure

Input

[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } },
{ "pointId": "plant/unknown", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "0" } }
]

Outputplant/unknown is dropped (no mapping)

[
{ "topic": "sensors/temperature", "payload": "{\"value\":\"23.5\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" },
{ "topic": "sensors/pressure", "payload": "{\"value\":\"102.1\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" }
]

Modbus values

Maps each reading to a Modbus write command using a point ID → address mapping table. The value is cast to the appropriate type: booleans for coils, integers for holding registers. Readings with no match are dropped.

Settings
SettingDescription
MappingList of { pointId, address, modbusType } pairs (modbusType: coil or register)
Example

Mapping: plant/valve1 → 0x0010 (coil), plant/speed → 0x0020 (register)

Input

[
{ "pointId": "plant/valve1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "true" } },
{ "pointId": "plant/speed", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "1450" } }
]

Output

[
{ "address": "0x0010", "value": true, "modbusType": "coil" },
{ "address": "0x0020", "value": 1450, "modbusType": "register" }
]

OIAnalytics (select time precision)

Prepares readings for the OIAnalytics North connector. Timestamps can be truncated to a chosen precision to reduce storage footprint. Only the pointId, timestamp, and data.value fields are forwarded — extra quality fields are stripped.

Settings
SettingDescription
PrecisionTimestamp precision: ms (unchanged), s, min, or hr
Example — Precision s

Input

[{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:00:00.523Z", "data": { "value": "23.5", "quality": "good" } }]

Output — milliseconds are zeroed, quality is stripped

[{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } }]

OPCUA values

Maps each reading to an OPC UA write command using a point ID → node ID mapping table. The value is forwarded as-is. Readings with no match are dropped.

Settings
SettingDescription
MappingList of { pointId, nodeId } pairs
Example

Mapping: plant/temp1 → ns=1;s=Temperature, plant/valve1 → ns=1;s=Valve1

Input

[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/valve1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "true" } }
]

Output

[
{ "nodeId": "ns=1;s=Temperature", "value": "23.5" },
{ "nodeId": "ns=1;s=Valve1", "value": "true" }
]

For file and query-based South connectors

These transformers process files or JSON payloads produced by connectors such as REST, SFTP, FTP, Folder Scanner, and SQL databases (MySQL, PostgreSQL, MSSQL, SQLite…), as well as data injected via the OIBus API.


Time Values (from CSV)

Parses a CSV file and maps each row to an OIBus time-value object. The transformer converts the raw timestamp string to ISO-8601 according to the configured format.

Settings
SettingDescription
Has headerWhether the first row is a header row
DelimiterColumn separator (COMMA, SEMI_COLON, TAB, etc.)
Point ID columnColumn name (header mode) or zero-based index holding the point identifier
Timestamp columnColumn name or index holding the timestamp
Value columnColumn name or index holding the value
Timestamp typeFormat of the raw timestamp (iso-string, unix-epoch, unix-epoch-ms, string)
TimezoneTimezone for parsing (only used when type is string)
FormatCustom datetime pattern (only used when type is string, e.g. yyyy-MM-dd HH:mm:ss)
Example

This transformer is useful when a REST South connector fetches CSV exports from an API, or when a Folder Scanner South connector picks up CSV files dropped by another system.

Input CSV (fetched by a REST item)

SensorID,Timestamp,Reading
tag/temp1,2024-01-15T10:00:00.000Z,23.5
tag/temp1,2024-01-15T10:01:00.000Z,23.8
tag/pressure1,2024-01-15T10:00:00.000Z,102.1

Output (time-values JSON)

[
{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": 23.5 } },
{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:01:00.000Z", "data": { "value": 23.8 } },
{ "pointId": "tag/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": 102.1 } }
]

These time-values can then be forwarded to any North connector that accepts structured readings (OIAnalytics, MQTT, OPC UA…).


MQTT payloads (from CSV)

Parses a CSV file and converts each row into an MQTT message with a topic and a payload. Two payload modes are supported:

  • Simple — a single column becomes the payload, cast to the configured type.
  • Object — multiple columns are assembled into a JSON object payload.
Settings
SettingDescription
Has headerWhether the first row is a header row
DelimiterColumn separator
Topic columnColumn name or index holding the MQTT topic
Payload typestring, number, boolean, datetime, or object
Value columnColumn for the value (simple payload types only)
Object fieldsList of { key, column, dataType } mappings (object payload only)
Datetime settingsInput/output datetime conversion (applies when payload type or a field is datetime)
Example — Simple payload

Input CSV

topic,value
plant/pump1/speed,1450
plant/pump1/running,true

Output (MQTT messages)

[
{ "topic": "plant/pump1/speed", "payload": "1450" },
{ "topic": "plant/pump1/running", "payload": "true" }
]
Example — Object payload

Input CSV

topic,value,quality,timestamp
plant/sensor1,23.5,good,2024-01-15T10:00:00.000Z
plant/sensor2,48.0,good,2024-01-15T10:00:00.000Z

Object fields: value → number, quality → string, timestamp → datetime

Output (MQTT messages)

[
{ "topic": "plant/sensor1", "payload": "{\"value\":23.5,\"quality\":\"good\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" },
{ "topic": "plant/sensor2", "payload": "{\"value\":48.0,\"quality\":\"good\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" }
]

CSV (from JSON)

Parses a JSON structure and writes a CSV file. A row iterator path (JSONPath expression) identifies the array of objects that become rows, and each configured field maps a path within each row to a CSV column. Fields support datetime conversion, type casting, and optional per-field JavaScript post-processing expressions.

Settings
SettingDescription
FilenameOutput filename; @CurrentDate is replaced with the current UTC timestamp
EncodingUTF_8, UTF_8_BOM, LATIN_1, or UTF_16_LE
HeaderWhether to include the column header row
DelimiterColumn separator
NewlineLine ending (LF, CRLF, CR)
Quote charDOUBLE_QUOTE, SINGLE_QUOTE, or NONE
Escape charDOUBLE_QUOTE or BACKSLASH
Null valueString written when a field is missing or null (defaults to empty string)
Row iterator pathJSONPath pointing to the array of rows (e.g. $.data[*])
FieldsOne entry per CSV column (see below)

Each field entry:

SettingDescription
Column nameHeader label in the CSV
JSON pathJSONPath to the value within the current row (e.g. $.timestamp)
Data typestring, number, boolean, datetime, array, or object
Datetime settingsInput and output format/timezone/type when data type is datetime
Field processOptional JS expression to post-process the extracted value (e.g. value.toUpperCase())
Example

A REST South connector fetches a JSON report from an API. The CSV (from JSON) transformer extracts the relevant fields and reformats them for a File Writer North connector.

Input JSON (returned by the REST item)

{
"data": [
{ "ts": "2024-01-15T10:00:00.000Z", "tag": "temp1", "val": 23.5, "ok": true },
{ "ts": "2024-01-15T10:01:00.000Z", "tag": "temp1", "val": 23.8, "ok": true },
{ "ts": "2024-01-15T10:00:00.000Z", "tag": "pressure1", "val": 102.1, "ok": false }
]
}

Configuration:

  • Row iterator path: $.data[*]
  • Fields: Timestamp ($.ts, datetime), Tag ($.tag, string), Value ($.val, number), OK ($.ok, boolean)

Output CSV

Timestamp,Tag,Value,OK
2024-01-15T10:00:00.000Z,temp1,23.5,true
2024-01-15T10:01:00.000Z,temp1,23.8,true
2024-01-15T10:00:00.000Z,pressure1,102.1,false
Nested JSON strings

The json-to-csv transformer can automatically parse JSON-stringified intermediate nodes. If a field value is a JSON string that itself contains an object, the remaining path is resolved against the parsed object.


For OIAnalytics Setpoints

These transformers convert write-back commands sent by OIAnalytics into the format expected by the target actuator North connector.

Each setpoint command has this shape:

{ "reference": "pump1/setSpeed", "value": 1200 }

MQTT payloads

Converts setpoint commands into MQTT messages using a reference → topic mapping table. Commands with no matching entry are dropped. The payload is the value serialized to a string; object values are JSON-stringified.

Settings
SettingDescription
MappingList of { reference, topic } pairs
Example

Mapping: pump1/setSpeed → control/pump1/speed

Input (setpoints)

[
{ "reference": "pump1/setSpeed", "value": 1200 },
{ "reference": "valve2/open", "value": true }
]

valve2/open is dropped (no mapping).

Output (MQTT messages)

[{ "topic": "control/pump1/speed", "payload": "1200" }]

Modbus values

Converts setpoint commands into Modbus write commands. Boolean values are written as coils; numeric values are parsed as integers and written as holding registers.

Settings
SettingDescription
MappingList of { reference, address, modbusType } pairs (modbusType: coil or register)
Example

Mapping: valve1/open → 0x0010 (coil), pump1/speed → 0x0020 (register)

Input (setpoints)

[
{ "reference": "valve1/open", "value": true },
{ "reference": "pump1/speed", "value": 1450 }
]

Output (Modbus write commands)

[
{ "address": "0x0010", "value": true, "modbusType": "coil" },
{ "address": "0x0020", "value": 1450, "modbusType": "register" }
]

OPCUA values

Converts setpoint commands into OPC UA write commands using a reference → node ID mapping table.

Settings
SettingDescription
MappingList of { reference, nodeId } pairs
Example

Mapping: pump1/speed → ns=1;s=PumpSpeed

Input (setpoints)

[{ "reference": "pump1/speed", "value": 1200 }]

Output (OPC UA write commands)

[{ "nodeId": "ns=1;s=PumpSpeed", "value": 1200 }]

Custom Transformers

When no standard transformer fits your use case, you can write a Custom Transformer in JavaScript or TypeScript. Custom code runs inside a secure isolated sandbox with no access to the host file system, Node.js modules, or the network.

Custom transformers are created in the Engine → Transformers page and are then available in the transformer selection list of any North connector.

Transform Function

Your code must export a transform function (or a default export). It receives four arguments and must return a result object.

/**
* @param content - The raw input data as a UTF-8 string (JSON, CSV, XML…)
* @param source - Metadata about the data source ({ type, dataSourceId, … })
* @param filename - The original filename (or a generated ID if no file was involved)
* @param options - The transformer options as configured in the UI
*/
export function transform(
content: string,
source: object,
filename: string,
options: object
): { data: string; filename: string; numberOfElement?: number } {
// …
}
Return fieldDescription
dataThe transformed output as a string (JSON, CSV, binary base-64…)
filenameThe filename used when the output is stored in the North cache
numberOfElementOptional count of processed records, shown in logs and metrics

Available Libraries

The following libraries are pre-loaded in the sandbox and available via require:

Libraryrequire nameDescription
LuxonluxonDate/time parsing, formatting, and arithmetic
JSONPath Plusjsonpath-plusJSONPath queries over arbitrary JSON structures
PapaParsepapaparseCSV parsing and serialization

Any other require call throws an error — fs, http, process, and all Node.js built-ins are blocked.

Logging

Inside the sandbox, console.log, console.info, console.warn, and console.error are bridged to the OIBus logger. All messages appear prefixed with CUSTOM TRANSFORMER:.

Limits

LimitValue
Memory256 MB per execution
TimeoutConfigurable in the transformer settings
Module accessOnly luxon, jsonpath-plus, papaparse
Host accessNone (file system, network, and env are blocked)

Examples

Example 1 — Reformat a JSON array to CSV (JavaScript)

A REST South connector returns a JSON array. This transformer converts it to a semicolon-delimited CSV file for a File Writer North connector.

Input (content)

[
{ "ts": "2024-01-15T10:00:00.000Z", "id": "temp1", "val": 23.5 },
{ "ts": "2024-01-15T10:01:00.000Z", "id": "temp1", "val": 23.8 },
{ "ts": "2024-01-15T10:00:00.000Z", "id": "pressure1", "val": 102.1 }
]

Output

Timestamp;Tag;Value
2024-01-15T10:00:00.000Z;temp1;23.5
2024-01-15T10:01:00.000Z;temp1;23.8
2024-01-15T10:00:00.000Z;pressure1;102.1

Code

export function transform(content, source, filename, options) {
const Papa = require('papaparse');
const rows = JSON.parse(content);

const csv = Papa.unparse(
rows.map(r => ({
Timestamp: r.ts,
Tag: r.id,
Value: r.val
})),
{ delimiter: ';', header: true }
);

return {
data: csv,
filename: `export_${filename.replace('.json', '.csv')}`,
numberOfElement: rows.length
};
}

Example 2 — Filter and enrich time-values (TypeScript)

Only keep readings above a configured threshold, and annotate each with a derived status field.

Input (content) — standard time-values array, options.threshold = 20

[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/temp2", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "18.0" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } }
]

Outputtemp2 is dropped (18.0 ≤ 20), the others are annotated

[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5", "status": "NORMAL" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1", "status": "HIGH" } }
]

Code

type TimeValue = { pointId: string; timestamp: string; data: { value: string } };

export function transform(
content: string,
_source: object,
_filename: string,
options: { threshold: number }
): { data: string; filename: string; numberOfElement: number } {
const values: Array<TimeValue> = JSON.parse(content);
const threshold = options.threshold ?? 0;

const filtered = values
.filter(v => parseFloat(v.data.value) > threshold)
.map(v => ({
...v,
data: {
...v.data,
status: parseFloat(v.data.value) > threshold * 2 ? 'HIGH' : 'NORMAL'
}
}));

console.info(`Kept ${filtered.length} / ${values.length} values above threshold ${threshold}`);

return {
data: JSON.stringify(filtered),
filename: `filtered_${Date.now()}.json`,
numberOfElement: filtered.length
};
}

Example 3 — Extract fields from a nested JSON with JSONPath (JavaScript)

Input (content)

{
"records": [
{ "header": { "ts": "2024-01-15T10:00:00.000Z" }, "tag": "plant/temp1", "measurements": { "val": 23.5 } },
{ "header": { "ts": "2024-01-15T10:01:00.000Z" }, "tag": "plant/temp1", "measurements": { "val": 23.8 } },
{ "header": { "ts": "2024-01-15T10:00:00.000Z" }, "tag": "plant/pressure1", "measurements": { "val": 102.1 } }
]
}

Output

[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:01:00.000Z", "data": { "value": "23.8" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } }
]

Code

export function transform(content, _source, _filename, _options) {
const { JSONPath } = require('jsonpath-plus');
const data = JSON.parse(content);

const timestamps = JSONPath({ path: '$.records[*].header.ts', json: data });
const tags = JSONPath({ path: '$.records[*].tag', json: data });
const values = JSONPath({ path: '$.records[*].measurements.val', json: data });

const timeValues = timestamps.map((ts, i) => ({
pointId: tags[i],
timestamp: ts,
data: { value: String(values[i]) }
}));

return {
data: JSON.stringify(timeValues),
filename: `extracted_${Date.now()}.json`,
numberOfElement: timeValues.length
};
}

Example 4 — Timestamp conversion with Luxon (JavaScript)

Parse Unix epoch millisecond timestamps from a REST API response and convert them to ISO-8601 in a specific timezone.

Input (content)

[
{ "id": "plant/temp1", "epoch_ms": 1705312800000, "val": 23.5 },
{ "id": "plant/pressure1", "epoch_ms": 1705312800000, "val": 102.1 }
]

Outputepoch_ms converted to ISO-8601, timezone Europe/Paris (UTC+1 in January)

[
{ "id": "plant/temp1", "epoch_ms": 1705312800000, "val": 23.5, "timestamp": "2024-01-15T11:00:00.000+01:00" },
{ "id": "plant/pressure1", "epoch_ms": 1705312800000, "val": 102.1, "timestamp": "2024-01-15T11:00:00.000+01:00" }
]

Code

export function transform(content, _source, _filename, _options) {
const { DateTime } = require('luxon');
const rows = JSON.parse(content);

const converted = rows.map(row => ({
...row,
timestamp: DateTime.fromMillis(row.epoch_ms, { zone: 'Europe/Paris' }).toISO()
}));

return {
data: JSON.stringify(converted),
filename: `converted_${Date.now()}.json`,
numberOfElement: converted.length
};
}
Testing custom transformers

Use the Test button in the transformer editor to run your code against a sample payload before deploying it. The test result shows the transformed output, the output filename, and the execution time.