pgmqtt

CDC-to-MQTT broker for PostgreSQL

Overview

PackageVersionCategoryLicenseLanguage
pgmqtt0.3.0ETLELv2Rust
IDExtensionBinLibLoadCreateTrustRelocSchema
9620pgmqttNoYesNoYesNoNo-

requires wal_level = logical for CDC; pgrx patched to 0.18.1.

Version

TypeRepoVersionPG VerPackageDeps
EXTPIGSTY0.3.01817161514pgmqtt-
RPMPIGSTY0.3.01817161514pgmqtt_$v-
DEBPIGSTY0.3.01817161514postgresql-$v-pgmqtt-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64
el8.aarch64
el9.x86_64
el9.aarch64
el10.x86_64
el10.aarch64
d12.x86_64
d12.aarch64
d13.x86_64
d13.aarch64
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
u22.x86_64
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
u22.aarch64
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
u24.x86_64
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
u24.aarch64
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
PIGSTY 0.3.0
u26.x86_64
u26.aarch64

Build

You can build the RPM / DEB packages for pgmqtt using pig build:

pig build pkg pgmqtt         # build RPM / DEB packages

Install

You can install pgmqtt directly. First, make sure the PGDG and PIGSTY repositories are added and enabled:

pig repo add pgsql -u          # Add repo and update cache

Install the extension using pig or apt/yum/dnf:

pig install pgmqtt;          # Install for current active PG version
pig ext install -y pgmqtt -v 18  # PG 18
pig ext install -y pgmqtt -v 17  # PG 17
pig ext install -y pgmqtt -v 16  # PG 16
pig ext install -y pgmqtt -v 15  # PG 15
pig ext install -y pgmqtt -v 14  # PG 14
dnf install -y pgmqtt_18       # PG 18
dnf install -y pgmqtt_17       # PG 17
dnf install -y pgmqtt_16       # PG 16
dnf install -y pgmqtt_15       # PG 15
dnf install -y pgmqtt_14       # PG 14
apt install -y postgresql-18-pgmqtt   # PG 18
apt install -y postgresql-17-pgmqtt   # PG 17
apt install -y postgresql-16-pgmqtt   # PG 16
apt install -y postgresql-15-pgmqtt   # PG 15
apt install -y postgresql-14-pgmqtt   # PG 14

Create Extension:

CREATE EXTENSION pgmqtt;

Usage

Sources: README, interfaces, configuration, limitations, Cargo.toml

pgmqtt is a pgrx extension that embeds an MQTT broker into PostgreSQL and uses change data capture to turn table changes into MQTT messages. It also supports inbound topic mappings so MQTT publishes can insert rows into PostgreSQL tables.

CREATE EXTENSION pgmqtt;

Outbound Mapping

Publish table changes to topics:

SELECT pgmqtt_add_outbound_mapping(
  'public',
  'my_table',
  'topics/{{ op | lower }}',
  '{{ columns | tojson }}',
  1
);

With that mapping, INSERT, UPDATE, and DELETE publish JSON payloads to topics such as topics/insert. The documented function signature also accepts optional qos integer DEFAULT 0 and template_type text DEFAULT 'jinja2'.

Inbound Mapping

Insert rows from MQTT publishes:

SELECT pgmqtt_add_inbound_mapping(
  'sensor/{site_id}/temperature',
  'sensor_readings',
  '{"site_id": "{site_id}", "value": "$.temperature"}'::jsonb
);

Publishing {"temperature": 22.5} to sensor/site-1/temperature inserts a row into sensor_readings.

Inbound mappings can also perform upsert and delete operations by passing op, conflict_columns, target_schema, mapping_name, and template_type. Topic patterns use {variable} captures; JSON payload fields use expressions such as $.temperature, $payload, and $topic.

Inspect and Remove Mappings

SELECT * FROM pgmqtt_list_outbound_mappings();
SELECT pgmqtt_remove_outbound_mapping('public', 'my_table');

SELECT * FROM pgmqtt_list_inbound_mappings();
SELECT pgmqtt_remove_inbound_mapping('temp_readings');

SELECT * FROM pgmqtt_status();

pgmqtt_status() reports active connections, subscriptions, retained messages, pending session messages, CDC mappings, CDC slot state, inbound mappings, pending inbound writes, and dead letters.

Version 0.3.0 adds asynchronous admin commands:

SELECT pgmqtt_disconnect_client('device-42');
SELECT pgmqtt_disconnect_role('mqtt_devices');
SELECT pgmqtt_reload_acls('*');

The background worker drains pgmqtt_admin_commands; ACL reloads prune subscriptions that are no longer allowed.

MQTT Client Examples

mosquitto_sub -h localhost -t 'topics/#'
mosquitto_pub -h localhost -t 'sensor/site-1/temperature' -m '{"temperature": 22.5}'

Configuration

The documented GUCs live under the pgmqtt namespace:

ALTER SYSTEM SET pgmqtt.cdc_every_n_ticks = 16;
SELECT pg_reload_conf();

Listener GUCs include pgmqtt.mqtt_enabled, pgmqtt.mqtt_port (1883), pgmqtt.ws_enabled, pgmqtt.ws_port (9001), pgmqtt.mqtts_enabled, pgmqtt.mqtts_port (8883), pgmqtt.wss_enabled, and pgmqtt.wss_port (9002). TLS and authentication settings include pgmqtt.tls_cert_file, pgmqtt.tls_key_file, pgmqtt.license_key, pgmqtt.jwt_public_key, pgmqtt.jwt_required, pgmqtt.jwt_required_ws, pgmqtt.password_auth_enabled, pgmqtt.password_auth_required, and pgmqtt.password_auth_role_filter.

Performance and observability GUCs include pgmqtt.tick_interval_ms, pgmqtt.max_client_buffer_bytes, pgmqtt.cdc_every_n_ticks, pgmqtt.debug_log, pgmqtt.metrics_snapshot_interval, pgmqtt.metrics_retention_days, pgmqtt.metrics_connections_cache_interval, pgmqtt.metrics_hook_function, and pgmqtt.metrics_notify_channel. Listener and TLS settings are read when the MQTT background worker starts, so they require a worker restart rather than only pg_reload_conf().

Caveats

  • The README requires wal_level = logical; without logical decoding the CDC side will not work.
  • This project’s CSV tracks version 0.3.0, PostgreSQL versions 14-18, and a package-side pgrx 0.18.1 rebuild note.
  • MQTT 5.0 and MQTT 3.1.1 clients are documented as supported. QoS 0 and QoS 1 are supported; QoS 2 is not implemented and subscriptions requesting QoS 2 are downgraded to QoS 1.
  • CDC captures INSERT, UPDATE, and DELETE; DDL changes and TRUNCATE are not captured. DELETE may require REPLICA IDENTITY FULL.
  • Version 0.3.0 adds PostgreSQL role/password authentication, per-topic ACLs, admin disconnect/reload commands, metrics/observability tables and functions, MQTT 3.1.1 protocol tests, UNLOGGED metrics/cache recovery tests, and additional flow-control limits.

Last Modified 2026-07-02: extension update 2026-07-02 (f9f0d13)