Transformers
A Transformer sits between a data source and a North connector. It converts incoming data into the format the North connector expects before it is placed in the connector's cache.
Transformers are configured on the North connector, in the Transformers section of its settings page.
Configuring a Transformer
Click Add in the Transformers section to open the configuration modal. Three fields guide the setup.
1 — Source
Choose where the data comes from.
| Source | Description |
|---|---|
| South connector | Data collected by one of the configured South connectors |
| OIBus API | Data pushed to OIBus via the REST API. Requires a Data Source ID string to identify the originating system |
| OIAnalytics Setpoint | Write-back commands sent from OIAnalytics |
2 — Item Scope (South connectors only)
When the source is a South connector, you can restrict which of its items the transformer applies to.
| Scope | Description |
|---|---|
| All items | Every item of the selected South connector goes through this transformer |
| By group | Only items belonging to a specific group are processed |
| Specific items | Hand-pick individual items from the search list |
A single North connector can have multiple transformers with different sources and scopes. This lets you apply different conversions to different parts of the same South connector — for example, one transformer for a group of temperature sensors and another for flow meters.
3 — Transformer
Once a source is selected, the list is automatically filtered to show only compatible transformers. The available choices depend on the type of South connector chosen:
| South connector type | Compatible standard transformers |
|---|---|
| OPC UA, OPC Classic, Modbus, ADS, OSIsoft PI (PLC / historian) | CSV, JSON, MQTT payloads, Modbus values, OIAnalytics (select time precision), OPCUA values |
| REST, SFTP, FTP, Folder Scanner, SQL databases (file / query-based) | Time Values (from CSV), MQTT payloads (from CSV), CSV (from JSON) |
| OIBus API | Time Values (from CSV), MQTT payloads (from CSV), CSV (from JSON) |
| OIAnalytics Setpoint | MQTT payloads, Modbus values, OPCUA values |
Custom transformers are also shown in this list and are always named by the user.
After choosing a transformer, its settings form appears directly in the modal.
Standard Transformers
Standard transformers are built-in and require only configuration — no coding.
For PLC and historian South connectors
These transformers process the structured time-value arrays produced by connectors such as OPC UA, OPC Classic, Modbus, TwinCAT ADS, and OSIsoft PI.
Each time-value object has this shape:
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } }
CSV
Converts the time-values array into a CSV file. Each row is one reading. The three columns (point ID, timestamp, value) are mapped to configurable column titles. Optional per-column JavaScript expressions let you transform individual values before they are written.
Settings
| Setting | Description |
|---|---|
| Filename | Output filename; @CurrentDate is replaced with the current UTC timestamp |
| Encoding | UTF_8, UTF_8_BOM, LATIN_1, or UTF_16_LE |
| Header | Whether to include the column header row |
| Delimiter | Column separator |
| Newline | Line ending (LF, CRLF, CR) |
| Quote char | DOUBLE_QUOTE, SINGLE_QUOTE, or NONE |
| Escape char | DOUBLE_QUOTE or BACKSLASH |
| Point ID column | Header label for the point identifier column |
| Value column | Header label for the value column |
| Timestamp column | Header label for the timestamp column |
| Timestamp type | Output format (iso-string, unix-epoch, unix-epoch-ms, string) |
| Timestamp format | Custom pattern when type is string (e.g. yyyy-MM-dd HH:mm:ss.SSS) |
| Timezone | Timezone for formatting when type is string |
| Point ID process | Optional JS expression applied to each point ID (variable: value) |
| Value process | Optional JS expression applied to each value (variable: value) |
| Timestamp process | Optional JS expression applied to each formatted timestamp (variable: value) |
Example
Input (OPC UA items: temp1, pressure1)
[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } }
]
With pointIdProcess: value.split('/').pop() and valueProcess: String(value).replace('.', ','):
Output CSV
Reference,Timestamp,Value
temp1,2024-01-15T10:00:00.000Z,23,5
pressure1,2024-01-15T10:00:00.000Z,102,1
JSON
A transparent pass-through that re-serializes the time-values array as a JSON file. Use this when the North connector expects a generic JSON file rather than the internal time-values format (e.g. a file-based North like Azure Blob or File Writer).
This transformer has no configuration options.
Example
Input — any time-values array from an OPC UA or Modbus South connector.
Output — the identical JSON content saved as a .json file.
MQTT payloads
Maps each reading to an MQTT message using a point ID → topic mapping table. Readings whose point ID has no
match are silently dropped. The payload is a JSON string containing the data fields merged with the timestamp.
Settings
| Setting | Description |
|---|---|
| Mapping | List of { pointId, topic } pairs |
Example
Mapping: plant/temp1 → sensors/temperature, plant/pressure1 → sensors/pressure
Input
[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } },
{ "pointId": "plant/unknown", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "0" } }
]
Output — plant/unknown is dropped (no mapping)
[
{ "topic": "sensors/temperature", "payload": "{\"value\":\"23.5\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" },
{ "topic": "sensors/pressure", "payload": "{\"value\":\"102.1\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" }
]
Modbus values
Maps each reading to a Modbus write command using a point ID → address mapping table. The value is cast to the appropriate type: booleans for coils, integers for holding registers. Readings with no match are dropped.
Settings
| Setting | Description |
|---|---|
| Mapping | List of { pointId, address, modbusType } pairs (modbusType: coil or register) |
Example
Mapping: plant/valve1 → 0x0010 (coil), plant/speed → 0x0020 (register)
Input
[
{ "pointId": "plant/valve1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "true" } },
{ "pointId": "plant/speed", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "1450" } }
]
Output
[
{ "address": "0x0010", "value": true, "modbusType": "coil" },
{ "address": "0x0020", "value": 1450, "modbusType": "register" }
]
OIAnalytics (select time precision)
Prepares readings for the OIAnalytics North connector. Timestamps can be truncated to a chosen precision to reduce
storage footprint. Only the pointId, timestamp, and data.value fields are forwarded — extra quality fields
are stripped.
Settings
| Setting | Description |
|---|---|
| Precision | Timestamp precision: ms (unchanged), s, min, or hr |
Example — Precision s
Input
[{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:00:00.523Z", "data": { "value": "23.5", "quality": "good" } }]
Output — milliseconds are zeroed, quality is stripped
[{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } }]
OPCUA values
Maps each reading to an OPC UA write command using a point ID → node ID mapping table. The value is forwarded as-is. Readings with no match are dropped.
Settings
| Setting | Description |
|---|---|
| Mapping | List of { pointId, nodeId } pairs |
Example
Mapping: plant/temp1 → ns=1;s=Temperature, plant/valve1 → ns=1;s=Valve1
Input
[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/valve1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "true" } }
]
Output
[
{ "nodeId": "ns=1;s=Temperature", "value": "23.5" },
{ "nodeId": "ns=1;s=Valve1", "value": "true" }
]
For file and query-based South connectors
These transformers process files or JSON payloads produced by connectors such as REST, SFTP, FTP, Folder Scanner, and SQL databases (MySQL, PostgreSQL, MSSQL, SQLite…), as well as data injected via the OIBus API.
Time Values (from CSV)
Parses a CSV file and maps each row to an OIBus time-value object. The transformer converts the raw timestamp string to ISO-8601 according to the configured format.
Settings
| Setting | Description |
|---|---|
| Has header | Whether the first row is a header row |
| Delimiter | Column separator (COMMA, SEMI_COLON, TAB, etc.) |
| Point ID column | Column name (header mode) or zero-based index holding the point identifier |
| Timestamp column | Column name or index holding the timestamp |
| Value column | Column name or index holding the value |
| Timestamp type | Format of the raw timestamp (iso-string, unix-epoch, unix-epoch-ms, string) |
| Timezone | Timezone for parsing (only used when type is string) |
| Format | Custom datetime pattern (only used when type is string, e.g. yyyy-MM-dd HH:mm:ss) |
Example
This transformer is useful when a REST South connector fetches CSV exports from an API, or when a Folder Scanner South connector picks up CSV files dropped by another system.
Input CSV (fetched by a REST item)
SensorID,Timestamp,Reading
tag/temp1,2024-01-15T10:00:00.000Z,23.5
tag/temp1,2024-01-15T10:01:00.000Z,23.8
tag/pressure1,2024-01-15T10:00:00.000Z,102.1
Output (time-values JSON)
[
{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": 23.5 } },
{ "pointId": "tag/temp1", "timestamp": "2024-01-15T10:01:00.000Z", "data": { "value": 23.8 } },
{ "pointId": "tag/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": 102.1 } }
]
These time-values can then be forwarded to any North connector that accepts structured readings (OIAnalytics, MQTT, OPC UA…).
MQTT payloads (from CSV)
Parses a CSV file and converts each row into an MQTT message with a topic and a payload. Two payload modes are supported:
- Simple — a single column becomes the payload, cast to the configured type.
- Object — multiple columns are assembled into a JSON object payload.
Settings
| Setting | Description |
|---|---|
| Has header | Whether the first row is a header row |
| Delimiter | Column separator |
| Topic column | Column name or index holding the MQTT topic |
| Payload type | string, number, boolean, datetime, or object |
| Value column | Column for the value (simple payload types only) |
| Object fields | List of { key, column, dataType } mappings (object payload only) |
| Datetime settings | Input/output datetime conversion (applies when payload type or a field is datetime) |
Example — Simple payload
Input CSV
topic,value
plant/pump1/speed,1450
plant/pump1/running,true
Output (MQTT messages)
[
{ "topic": "plant/pump1/speed", "payload": "1450" },
{ "topic": "plant/pump1/running", "payload": "true" }
]
Example — Object payload
Input CSV
topic,value,quality,timestamp
plant/sensor1,23.5,good,2024-01-15T10:00:00.000Z
plant/sensor2,48.0,good,2024-01-15T10:00:00.000Z
Object fields: value → number, quality → string, timestamp → datetime
Output (MQTT messages)
[
{ "topic": "plant/sensor1", "payload": "{\"value\":23.5,\"quality\":\"good\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" },
{ "topic": "plant/sensor2", "payload": "{\"value\":48.0,\"quality\":\"good\",\"timestamp\":\"2024-01-15T10:00:00.000Z\"}" }
]
CSV (from JSON)
Parses a JSON structure and writes a CSV file. A row iterator path (JSONPath expression) identifies the array of objects that become rows, and each configured field maps a path within each row to a CSV column. Fields support datetime conversion, type casting, and optional per-field JavaScript post-processing expressions.
Settings
| Setting | Description |
|---|---|
| Filename | Output filename; @CurrentDate is replaced with the current UTC timestamp |
| Encoding | UTF_8, UTF_8_BOM, LATIN_1, or UTF_16_LE |
| Header | Whether to include the column header row |
| Delimiter | Column separator |
| Newline | Line ending (LF, CRLF, CR) |
| Quote char | DOUBLE_QUOTE, SINGLE_QUOTE, or NONE |
| Escape char | DOUBLE_QUOTE or BACKSLASH |
| Null value | String written when a field is missing or null (defaults to empty string) |
| Row iterator path | JSONPath pointing to the array of rows (e.g. $.data[*]) |
| Fields | One entry per CSV column (see below) |
Each field entry:
| Setting | Description |
|---|---|
| Column name | Header label in the CSV |
| JSON path | JSONPath to the value within the current row (e.g. $.timestamp) |
| Data type | string, number, boolean, datetime, array, or object |
| Datetime settings | Input and output format/timezone/type when data type is datetime |
| Field process | Optional JS expression to post-process the extracted value (e.g. value.toUpperCase()) |
Example
A REST South connector fetches a JSON report from an API. The CSV (from JSON) transformer extracts the relevant fields and reformats them for a File Writer North connector.
Input JSON (returned by the REST item)
{
"data": [
{ "ts": "2024-01-15T10:00:00.000Z", "tag": "temp1", "val": 23.5, "ok": true },
{ "ts": "2024-01-15T10:01:00.000Z", "tag": "temp1", "val": 23.8, "ok": true },
{ "ts": "2024-01-15T10:00:00.000Z", "tag": "pressure1", "val": 102.1, "ok": false }
]
}
Configuration:
- Row iterator path:
$.data[*] - Fields:
Timestamp($.ts, datetime),Tag($.tag, string),Value($.val, number),OK($.ok, boolean)
Output CSV
Timestamp,Tag,Value,OK
2024-01-15T10:00:00.000Z,temp1,23.5,true
2024-01-15T10:01:00.000Z,temp1,23.8,true
2024-01-15T10:00:00.000Z,pressure1,102.1,false
The json-to-csv transformer can automatically parse JSON-stringified intermediate nodes. If a field value is a
JSON string that itself contains an object, the remaining path is resolved against the parsed object.
For OIAnalytics Setpoints
These transformers convert write-back commands sent by OIAnalytics into the format expected by the target actuator North connector.
Each setpoint command has this shape:
{ "reference": "pump1/setSpeed", "value": 1200 }
MQTT payloads
Converts setpoint commands into MQTT messages using a reference → topic mapping table. Commands with no matching entry are dropped. The payload is the value serialized to a string; object values are JSON-stringified.
Settings
| Setting | Description |
|---|---|
| Mapping | List of { reference, topic } pairs |
Example
Mapping: pump1/setSpeed → control/pump1/speed
Input (setpoints)
[
{ "reference": "pump1/setSpeed", "value": 1200 },
{ "reference": "valve2/open", "value": true }
]
valve2/open is dropped (no mapping).
Output (MQTT messages)
[{ "topic": "control/pump1/speed", "payload": "1200" }]
Modbus values
Converts setpoint commands into Modbus write commands. Boolean values are written as coils; numeric values are parsed as integers and written as holding registers.
Settings
| Setting | Description |
|---|---|
| Mapping | List of { reference, address, modbusType } pairs (modbusType: coil or register) |
Example
Mapping: valve1/open → 0x0010 (coil), pump1/speed → 0x0020 (register)
Input (setpoints)
[
{ "reference": "valve1/open", "value": true },
{ "reference": "pump1/speed", "value": 1450 }
]
Output (Modbus write commands)
[
{ "address": "0x0010", "value": true, "modbusType": "coil" },
{ "address": "0x0020", "value": 1450, "modbusType": "register" }
]
OPCUA values
Converts setpoint commands into OPC UA write commands using a reference → node ID mapping table.
Settings
| Setting | Description |
|---|---|
| Mapping | List of { reference, nodeId } pairs |
Example
Mapping: pump1/speed → ns=1;s=PumpSpeed
Input (setpoints)
[{ "reference": "pump1/speed", "value": 1200 }]
Output (OPC UA write commands)
[{ "nodeId": "ns=1;s=PumpSpeed", "value": 1200 }]
Custom Transformers
When no standard transformer fits your use case, you can write a Custom Transformer in JavaScript or TypeScript. Custom code runs inside a secure isolated sandbox with no access to the host file system, Node.js modules, or the network.
Custom transformers are created in the Engine → Transformers page and are then available in the transformer selection list of any North connector.
Transform Function
Your code must export a transform function (or a default export). It receives four arguments and must return a
result object.
/**
* @param content - The raw input data as a UTF-8 string (JSON, CSV, XML…)
* @param source - Metadata about the data source ({ type, dataSourceId, … })
* @param filename - The original filename (or a generated ID if no file was involved)
* @param options - The transformer options as configured in the UI
*/
export function transform(
content: string,
source: object,
filename: string,
options: object
): { data: string; filename: string; numberOfElement?: number } {
// …
}
| Return field | Description |
|---|---|
data | The transformed output as a string (JSON, CSV, binary base-64…) |
filename | The filename used when the output is stored in the North cache |
numberOfElement | Optional count of processed records, shown in logs and metrics |
Available Libraries
The following libraries are pre-loaded in the sandbox and available via require:
| Library | require name | Description |
|---|---|---|
| Luxon | luxon | Date/time parsing, formatting, and arithmetic |
| JSONPath Plus | jsonpath-plus | JSONPath queries over arbitrary JSON structures |
| PapaParse | papaparse | CSV parsing and serialization |
Any other require call throws an error — fs, http, process, and all Node.js built-ins are blocked.
Logging
Inside the sandbox, console.log, console.info, console.warn, and console.error are bridged to the OIBus
logger. All messages appear prefixed with CUSTOM TRANSFORMER:.
Limits
| Limit | Value |
|---|---|
| Memory | 256 MB per execution |
| Timeout | Configurable in the transformer settings |
| Module access | Only luxon, jsonpath-plus, papaparse |
| Host access | None (file system, network, and env are blocked) |
Examples
Example 1 — Reformat a JSON array to CSV (JavaScript)
A REST South connector returns a JSON array. This transformer converts it to a semicolon-delimited CSV file for a File Writer North connector.
Input (content)
[
{ "ts": "2024-01-15T10:00:00.000Z", "id": "temp1", "val": 23.5 },
{ "ts": "2024-01-15T10:01:00.000Z", "id": "temp1", "val": 23.8 },
{ "ts": "2024-01-15T10:00:00.000Z", "id": "pressure1", "val": 102.1 }
]
Output
Timestamp;Tag;Value
2024-01-15T10:00:00.000Z;temp1;23.5
2024-01-15T10:01:00.000Z;temp1;23.8
2024-01-15T10:00:00.000Z;pressure1;102.1
Code
export function transform(content, source, filename, options) {
const Papa = require('papaparse');
const rows = JSON.parse(content);
const csv = Papa.unparse(
rows.map(r => ({
Timestamp: r.ts,
Tag: r.id,
Value: r.val
})),
{ delimiter: ';', header: true }
);
return {
data: csv,
filename: `export_${filename.replace('.json', '.csv')}`,
numberOfElement: rows.length
};
}
Example 2 — Filter and enrich time-values (TypeScript)
Only keep readings above a configured threshold, and annotate each with a derived status field.
Input (content) — standard time-values array, options.threshold = 20
[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/temp2", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "18.0" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } }
]
Output — temp2 is dropped (18.0 ≤ 20), the others are annotated
[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5", "status": "NORMAL" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1", "status": "HIGH" } }
]
Code
type TimeValue = { pointId: string; timestamp: string; data: { value: string } };
export function transform(
content: string,
_source: object,
_filename: string,
options: { threshold: number }
): { data: string; filename: string; numberOfElement: number } {
const values: Array<TimeValue> = JSON.parse(content);
const threshold = options.threshold ?? 0;
const filtered = values
.filter(v => parseFloat(v.data.value) > threshold)
.map(v => ({
...v,
data: {
...v.data,
status: parseFloat(v.data.value) > threshold * 2 ? 'HIGH' : 'NORMAL'
}
}));
console.info(`Kept ${filtered.length} / ${values.length} values above threshold ${threshold}`);
return {
data: JSON.stringify(filtered),
filename: `filtered_${Date.now()}.json`,
numberOfElement: filtered.length
};
}
Example 3 — Extract fields from a nested JSON with JSONPath (JavaScript)
Input (content)
{
"records": [
{ "header": { "ts": "2024-01-15T10:00:00.000Z" }, "tag": "plant/temp1", "measurements": { "val": 23.5 } },
{ "header": { "ts": "2024-01-15T10:01:00.000Z" }, "tag": "plant/temp1", "measurements": { "val": 23.8 } },
{ "header": { "ts": "2024-01-15T10:00:00.000Z" }, "tag": "plant/pressure1", "measurements": { "val": 102.1 } }
]
}
Output
[
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "23.5" } },
{ "pointId": "plant/temp1", "timestamp": "2024-01-15T10:01:00.000Z", "data": { "value": "23.8" } },
{ "pointId": "plant/pressure1", "timestamp": "2024-01-15T10:00:00.000Z", "data": { "value": "102.1" } }
]
Code
export function transform(content, _source, _filename, _options) {
const { JSONPath } = require('jsonpath-plus');
const data = JSON.parse(content);
const timestamps = JSONPath({ path: '$.records[*].header.ts', json: data });
const tags = JSONPath({ path: '$.records[*].tag', json: data });
const values = JSONPath({ path: '$.records[*].measurements.val', json: data });
const timeValues = timestamps.map((ts, i) => ({
pointId: tags[i],
timestamp: ts,
data: { value: String(values[i]) }
}));
return {
data: JSON.stringify(timeValues),
filename: `extracted_${Date.now()}.json`,
numberOfElement: timeValues.length
};
}
Example 4 — Timestamp conversion with Luxon (JavaScript)
Parse Unix epoch millisecond timestamps from a REST API response and convert them to ISO-8601 in a specific timezone.
Input (content)
[
{ "id": "plant/temp1", "epoch_ms": 1705312800000, "val": 23.5 },
{ "id": "plant/pressure1", "epoch_ms": 1705312800000, "val": 102.1 }
]
Output — epoch_ms converted to ISO-8601, timezone Europe/Paris (UTC+1 in January)
[
{ "id": "plant/temp1", "epoch_ms": 1705312800000, "val": 23.5, "timestamp": "2024-01-15T11:00:00.000+01:00" },
{ "id": "plant/pressure1", "epoch_ms": 1705312800000, "val": 102.1, "timestamp": "2024-01-15T11:00:00.000+01:00" }
]
Code
export function transform(content, _source, _filename, _options) {
const { DateTime } = require('luxon');
const rows = JSON.parse(content);
const converted = rows.map(row => ({
...row,
timestamp: DateTime.fromMillis(row.epoch_ms, { zone: 'Europe/Paris' }).toISO()
}));
return {
data: JSON.stringify(converted),
filename: `converted_${Date.now()}.json`,
numberOfElement: converted.length
};
}
Use the Test button in the transformer editor to run your code against a sample payload before deploying it. The test result shows the transformed output, the output filename, and the execution time.