One of the common problems that developers find themselves working through is that of getting two different systems to talk to each other. On a recent accounting project, we needed to synchronize an internal project tracking system (think Jira or Asana) to an external, cloud based accounting system. There are a number of ways to implement things like this, usually involving some type of queuing system with asynchronous polling, but we wanted to avoid the process of continuous polling and still have our data synchronized in near-real time. Enter PostgreSQL’s LISTEN/NOTIFY functionality.
The Usual Solution
When two systems need to talk to each other asynchronously, the typical pattern looks like this:
- An action is performed in System A that requires a subsequent action in system B.
- In the code on System A, after the action is performed, insert a row into a queue with all the information needed for the second system.
- System B periodically polls the queue looking for new entries and processes them.
This works reasonably well but given that the two systems are disconnected from each other, there will be a delay in processing of messages, based on how often System B checks for work to do. Of course, there are issues with this type of system, such as the overhead in constantly checking the queue for unprocessed entries, especially in cases where checking for new records will likely yield zero results. That’s a lot of round-trip queries for nothing. LISTEN/NOTIFY provides an alternative solution where processes can be told what to do, rather than polling for something to do.
What Sorcery Is This?
Despite being available from at least as far back as the original Postgres95 release, LISTEN/NOTIFY is a somewhat unknown notification system that comes bundled inside PostgreSQL. With LISTEN/NOTIFY, multiple listeners can watch a given channel for messages and receive them instantly when another process (or even the same process) issues a notification. For those familiar with RabbitMQ or similar distributed messaging systems, this is analogous to a fanout exchange.
In this project our messaging needs are fairly simple; we would like to have actions that occur within the project tracking system kick off actions in the external cloud accounting system. Passing basic messages around like this is a perfect job for LISTEN/NOTIFY because we can solve the job of message passing without the operational overhead of setting up an additional queuing system.
For basic use a process can register itself to receive messages on a channel by issuing a simple SQL command. Our accounting system listener might do something like this:
LISTEN accounts;
Since PostgreSQL creates channels for us on first use, we don't need to worry about pre-declaring a channel that we want to listen on. Once the process is listening on a channel, any process (even the same one) can send a message on the channel to communicate with the listener. The listener will continue to receive notifications on this channel until stopped with the UNLISTEN command.
Sending a message is also quite simple:
NOTIFY accounts, ‘2 hours posted on Contract ID 1234’;
Anything listening to the ‘accounts’ channel will then immediately receive the following, without needing to check or poll:
Asynchronous notification "accounts" with payload
"2 hours posted on Contract ID 1234" received from
server process with PID 219.
Using the NOTIFY statement in this way requires a basic string as the payload; we’ll dive a bit more into ways to generate more descriptive payloads later.
A Sample Implementation
Basic usage requires something to issue the notification and something to listen for it. In most cases you will want to set up a long-running listener to wait for activity on a database connection. Many languages provide this sort of construct; in this particular implementation we chose Perl. The basic blueprint looks like this:
use DBI;
use IO::Select;
# Connect to the database
my $dbh = DBI::connect(...);
# Listen to the channel
$dbh->do(‘LISTEN accounts’);
my $fd = $dbh->func(‘getfd’);
my $sel = IO::Select->new($fd);
while (1) {
$sel->can_read();
while (my $notify = $dbh->func("pg_notifies")) {
my $pid = $notify->[1];
my $payload = $notify->[2];
print “I got ‘$payload’ from pid $pid\n”;
# ... and whatever else needs to be
done with $payload
}
}
(original source: How to use LISTEN/NOTIFY in a Perl program)
At this point we have a long-running listener able to receive messages instantly and on demand, however there is a wrinkle. Like most message queuing systems PostgreSQL doesn’t maintain any message history or other way for late-comers to catch up. In our case we want the system to be a bit more robust than that so the system should be able to handle a script crash or other issue that could otherwise result in the loss of a message. For some applications this may not matter but in our case it does - we need to be able to process all messages including any that are sent while the script is offline.
Our Old Friend, The Queue Table
While not the primary way of communicating messages, backing our on-demand message processor with a queue table offers a number of benefits:
- Messages can be retained indefinitely allowing for statistics to be collected
- Stale entries can be detected and processed
- Progress in processing a given message can be reflected in its corresponding queue table entry providing metrics and a way to alert if things fail
That last one is important for monitoring as it provides the ability to catch problems in the message processing system. LISTEN/NOTIFY is fire-and-forget; once the message has been consumed, it’s no longer available for audit/troubleshooting. Queued message processing is handled by a one-time check of the queue table at script startup, catching any messages that might have been issued out of band, and making sure that we are up to date.
It is also beneficial to setup external monitoring of the queue table to catch any anomalies in processing. Messages that stay in a processing state for too long or that are never picked up could indicate a script failure. Retaining message history allows for alerts to be fired to notify someone to investigate.
Automating Messages
Now that we have basic message passing set up and a way to capture missed messages, the next thing we need to focus on is how to trigger messages from the application to the listeners. One option is to issue the NOTIFY command via SQL statements at the appropriate point(s) in the code, but there are a number of problems that arise with that method, such as overhead in preparing and executing the statement, needing to remember all the places in the code to add these new calls, and error mitigation if the statement fails, among other issues.
To avoid all of these possible issues, in our implementation we opted to make use of triggers. Instead of identifying code points that should spawn messages we instead identify the business data that should spawn a message when changed. This takes the application code completely out of the equation and makes it a data-driven solution. This also keeps the application code smaller and the implementation is resistant to future programmers forgetting to code notification points. As a nice plus, messages will be sent if someone modifies data directly in the database, not that anyone would ever do that.
Trigger Function Magic
The approach is twofold; first a trigger on the data table is responsible for forming the message that is inserted into the queue table, then a trigger on the queue table issues the actual NOTIFY to our listening process. Once the listening process gets the NOTIFY, it fetches the queue entry and processes it. There is no repeated polling for jobs and the listening process receives the message almost immediately after the data is changed for the cost of a single query.
A simple implementation for generating an account notice based on changed data might look something like this:
CREATE OR REPLACE FUNCTION modify_account RETURNS trigger AS
$$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO queue_table (payload, status)
VALUES (‘2 hours posted on Contract ID 1234’, ‘NEW’);
RETURN NEW;
-- similar conditionals for UPDATE and DELETE
END IF;
RETURN NULL;
END;
$$
While this gives us some useful functionality, it is still fairly basic. In many cases our messages will likely need more information, such as potentially containing a parent row ID, perhaps a new/old value combination, or other key/value pairs. Given that the payload sent via NOTIFY must be a flat string, a stringified object format seems sensible. We used JSON to solve this problem, allowing us to transmit as much information as necessary in the payload. Combining these ideas results in the new trigger function below, which runs after any changing operation on affected columns:
CREATE OR REPLACE FUNCTION modify_account RETURNS trigger AS
$$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO queue_table (payload, status) VALUES (
(SELECT row_to_json(row) FROM
(SELECT TG_TABLE_NAME AS table, TG_OP AS operation,
NEW.id AS id, NOW() AS trigger_timestamp) row), ‘NEW’);
RETURN NEW;
-- similar conditionals for UPDATE and DELETE
END IF;
RETURN NULL;
END;
$$
Depending on what data is selected for the INSERT, the resulting JSON string might look something like this:
{"table":"accounts","operation":"UPDATE",
"id":6,"trigger_timestamp":"2015-09-14 10:33:45.03627-05"}
So our queue table entry now knows about the triggering row, what operation was performed, the time it was triggered, and we tag the row as NEW to indicate it’s available for processing. PostgreSQL's handy built-in row_to_json() function is used to convert a result row into the JSON string. Any other details about the account being modified are in the business data.
Propagating the message in the queue table out to any listeners could not be simpler. A two line trigger function runs after inserts on the queue table:
CREATE OR REPLACE FUNCTION modify_account RETURNS trigger AS
$$
BEGIN
PERFORM pg_notify('accounts', ((SELECT row_to_json(row)
FROM (SELECT NEW.queue_id AS id, NEW.payload AS payload)
row))::text);
RETURN NEW;
END;
$$
This grafts the queue table ID onto the existing payload, constructing a deeper JSON object and uses pg_notify() to do the NOTIFY work. Recall that anything other than a flat string won’t work with a straight NOTIFY, so we use a PostgreSQL function instead. Our Perl listener now receives the message on the channel almost instantaneously after the original change commits to the database by way of these trigger functions and is free to act on that message in any way it wishes.
Security Concerns
Compared to a full featured queuing system, LISTEN/NOTIFY is still somewhat primitive. For example, PostgreSQL doesn't provide any additional authentication process to listen on a channel beyond normal database connectivity, so messages on a given channel are available to all database users that know the channel name. If you plan to pass messages that contain sensitive information, the channel name will need to be kept secure and not easily guessed.
One approach you could use would be to generate a random channel name at application startup time and distribute that name to any listening processes. The maximum length of a channel name is 64 characters, so something like an MD5 hash (with proper entropy) makes a suitable choice. Of course, this wouldn't be particularly hard to brute force, so you might want to add additional measures like encrypting the message itself.
Conclusion
By utilizing PostgreSQL’s built-in LISTEN/NOTIFY capabilities, we constructed a system where changes in business data could be communicated to an external process in near real time, while retaining some nice quality-of-life features such as message persistence and audit trails. As other data points were identified in the database, it was simple to hook them up to the notification queue and add corresponding functionality to the listener to handle them. All of this required zero changes to existing application code, and is completely invisible to any end user. Any application systems already using PostgreSQL have this feature built-in, so it is worth a look as a solution to simple messaging needs, especially if those messages can be triggered by changes in business data.