Home Java PubSub almost free:NOTIFYfeatures in PostgreSQL

PubSub almost free:NOTIFYfeatures in PostgreSQL

by admin

If your microservices are already using a common PostgreSQL database to store data, or is used by multiple instances of the same service on different servers, you can relatively "cheap" to be able to exchange messages (PubSub) between them without integrating into the Redis architecture, RabbitMQ cluster, or building in the application code another MQ system
To that end, we will not write messages to the database tables because it causes too much overhead first on the record of the transmitted , and then also on the sweep from what has already been read
We will send and receive data using the mechanism NOTIFY / LISTEN and let’s put together a model implementation for Node.js.
PubSub almost free:NOTIFYfeatures in PostgreSQL
But there are rakes along the way that will have to be carefully circumvented.

Features of protocol


LISTEN channel

An application using the libpq library executes the LISTEN command like a normal SQL command, and then it must periodically call the PQnotifies function to check for new notifications.

If you are not writing a library to handle PG, but already a specific application, in most cases you will not have access to call this function.
But if such a library has already been written for you according to the processing guidelines asynchronous requests and notifications , you will automatically get a message in the application code.If not, you can just periodically run SELECT 1 on the connection, then the notification will come with the result of the request :

In very old libpq releases, the only way to ensure that NOTIFY messages were received on time was to continually send commands, even if empty, and then check PQnotifies after each PQexec call.Although this method still works, it is considered obsolete due to the inefficient use of the CPU.

From the point of view of e.g, psql it looks like this :

_tmp=# LISTEN test;LISTEN_tmp=# SELECT 1;?column?----------1(1 row)Asynchronous notification "test" with payload "abc123" received from server process with PID 63991.

If we can agree on a maximum message delivery delay of 1 second for the application, then we execute the request at that interval.At the same time, this method helps monitor the "vivacity" of a connection. making sure nobody accidentally break it on the server side with pg_terminate_backend , or there was no sudden "crash" of the PG at all without any notification to the clients.


NOTIFY channel [ , message ]

The NOTIFY command sends a notification event along with an additional "message" string to all client applications that have previously executed a channel with the specified channel name in the current LISTEN database.

The "message" string that will be transmitted with the notification…must be set with a simple text constant In the standard configuration it is length should be less than 8000 bytes

That is, if our "message" suddenly contains something very different from ASCII, then we has to be escaped and if it exceeds 8000 bytes (not characters!), then cut into blocks and then glue. In doing so we should conserve both bandwidth and server resources for handling such blocks – in other words, we should add as little service "binding" to useful content as possible, but at the same time not "strangle" client application by forcing it to pack with gzip -9
The additional pluses of the mechanism can also be noted the binding to the "source" of the message…

additional work can be avoided by checking if the PID of the signaling process (specified in the event data) with the session’s own PID (it can be found out by consulting libpq). If they match, then the session has received a notification of its own action, so it can be ignored.

and guaranteed delivery order :

Apart from filtering out subsequent instances of duplicate notifications, NOTIFY ensures that notifications from the same transaction always arrive in the same order in which they were sent. It also guarantees that messages from different transactions arrive in the order in which those transactions are committed

We won’t merge anything on purpose, so each of our queries will just correspond to a separate transaction.
But remember, if there is also application activity on the connection used for the exchange, our NOTIFY may not be inside the transaction willingly, so side-effects can occur :

Transactions have a significant impact on how NOTIFY works. First, if NOTIFY is executed within a transaction, notifications are delivered to recipients after the transaction is committed, and only then. This makes sense, since if a transaction is interrupted, all commands in the transaction are invalidated, including NOTIFY

So it’s better to use a connection where you know there will be no transactions or long queries.

AccessExclusiveLock on object 0 of class 1262 of database 0

If all of a sudden your NOTIFY’s start to falter and expect such a lock in the log, then you’ve "grown out of your short pants" after all, and it’s time to think about "grown-up" MQ.
After all, the notification queue, although quite large (8GB in standard builds), is still finite. According to Tom Lane’s answer. :

This lock is held while inserting the transaction’s notify message(s), after which the transaction commits and releases the lock.

That is, there won’t be too many workarounds left :

  • Send, but less frequently
    That is, aggregate the indicators sent, if they are any counters, on a longer interval.
  • send a smaller volume
    For example, to remove "default" values of keys from JSON transmitted from the application’s point of view.
  • Send only signal , no content at all
    Alternatively, you can have several channels, and each name will already have some applied meaning by itself.
  • after all take the sending out of the database

Transmission of "complex" messages

Coding "body"

In the general case, we might want to pass not only allowed characters, but also Russian letters and "all sorts of binary stuff" in the message – so it would be convenient to use conversion to hex representation to form the string being sent. And yes, that way works fine :

NOTIFY test, E'x20x21'

Asynchronous notification "test" with payload " !" received from server process with PID 63991.

But we turn again to the documentation :

You must take care that the byte sequences you create in this way, especially in octal and hexadecimal notation, form valid characters in the server encoding When the server works with UTF-8 encoding, Unicode special sequences or the alternative Unicode syntax described in Subsection should be used instead of such a byte entry. (Otherwise, you would have to encode the UTF-8 characters manually and write them out byte by byte, which is very inconvenient.)

So even with the trivial quote-finger character from win1251, we’re going to have a lot of grief :

NOTIFY test, E'x98'-- ERROR: invalid byte sequence for encoding "UTF8": 0x98

Since " encode the UTF-8 characters manually and write them out byte by byte " we don’t want to, let’s agree right away to pass the body of the message packed in base64 if it contains any characters outside the range of x20-x7E or when it is necessary to split it into segments. On the one hand, this packing method does not increase redundancy too much (4:3 ratio), on the other hand, it is implemented at the system library level in any language, and will provide minimal additional load.
But even if we have no "strange" characters, and the message fits into a single segment, there is still one peculiarity apostrophe escaping :

To include an apostrophe in a line, write two apostrophes side by side For example: ‘Joan of Arc. Note that this is not the same as a double quotation mark (").

Segment identification

The next task is to correctly "cut" the message into allowable segments blocks of 7999 bytes if its size suddenly exceeds this value. And in such a way that you can assemble it on the receiver without disturbing the order or getting into a chain of "alien" segments. To do this, each of them must be identified somehow.
Actually, two "coordinates" we already know are PID of the sender process andchannel name coming in each notification. And the order in which the segments arrive is guaranteed by the protocol itself.
Neighbors are writers We will not consider the case of on the same connection to the database (i.e. obviously within one application process) are active at the same time several writers to the same channel Technically, this can be supported by passing an additional identifier in the segment header – but it’s better to "share" a single PubSub object within your application.

Container restriction

To assemble a solid container from multiple segments, we need to know when it ends. There are two typical ways to do this :

  • transfer the target size (in bytes or segments) in the first
  • transmitting a sign of the [not]last segment in each

Since we’re writing PubSub after all, most of our messages will be short and reserving many bytes for size transfer is unprofitable. So we’ll use the second way and reserve the first data character of the segment as the continuation/end-of-container flag.

Transferring objects

To pass both plain text strings and JSON objects as a "message", we add one more character-character for the reverse conversion on the recipient side.
Since we decided to encode "unformat" in base64, we can take any allowed characters not included in this set for flags.
So we have the following options for transmitted segments :

--"short" text string!simple string-- "short simple" object@{"a":1}-- nonfinal segment in base64#<segment>-- the final segment in base64$<segment>

As you can see, you only need to analyze the first character of the segment to know what to do with it next.

Writing PubSub implementation

Our application will be in Node.js, so to work with PostgreSQL we use module node-postgres
Writing a starter framework First, let’s create PubSub as an heir EventEmitter to be able to generate events for those who subscribe to specific channels :

const util = require('util');const EventEmitter = require('events').EventEmitter;const PubSub = function(connection, interval, skipSelf) {// use an existing connectionthis.connection = connection;// let's subscribe to our handler to receive all notificationsthis.connection.on('notification', p._onmessage.bind(this));// don't accept notifications from our own database connectionthis.skipSelf = skipSelf;// start "antifreeze"setInterval(() => {this.connection.query('SELECT 1');}, interval);// we'll store the "slices" message segments herethis.slices = {};};util.inherits(PubSub, EventEmitter);const p = PubSub.prototype;

Working with channels Since LISTEN/UNLISTEN don’t swear at re-subscribing to a channel or unsubscribing from something we weren’t subscribed to, we won’t complicate things.

// if there is any "unformat" in the channel name, it must be enclosed in quotation marks// the double-quotes symbol itself is not allowed in the channel name anywayconst quot = str => /^[_a-z][0-9a-z_$]*$/.test(str) ? str : `"${str}"`;p.subscribe = function(channel) {this.connection.query(`LISTEN ${quot(channel)}`);return this;};p.unsubscribe = function(channel) {this.connection.query(`UNLISTEN ${quot(channel)}`);return this;};

Message transmission and reception

const PAYLOAD_LIMIT = 8000 - 1;const PAYLOAD_FL_STR = '!';const PAYLOAD_FL_OBJ = '@';const PAYLOAD_FL_SEQ = '#';const PAYLOAD_FL_FIN = '$';const PAYLOAD_SZ_HEAD = 1;const PAYLOAD_SZ_DATA = PAYLOAD_LIMIT - PAYLOAD_SZ_HEAD;// only "simple" charactersconst reASCII = /^[x20-x7E]*$/;// sendp.publish = function(channel, payload) {let query = `NOTIFY ${quot(channel)}`;if (payload !== null payload !== undefined) {// encode message type - string or objectlet str = typeof payload == 'string'? PAYLOAD_FL_STR + payload: PAYLOAD_FL_OBJ + JSON.stringify(payload);if (str.length > PAYLOAD_LIMIT || !reASCII.test(str)) {// send segments of the base64 representationconst b64 = Buffer.from(str).toString('base64');for (let pos = 0, len = b64.length; pos < len; pos += PAYLOAD_SZ_DATA) {let fin = pos + PAYLOAD_SZ_DATA;let seg = fin > = len? PAYLOAD_FL_FIN + b64.slice(pos): PAYLOAD_FL_SEQ + b64.slice(pos, fin);this.connection.query(`${query}, '${seg}'`);}}else {// everything fits in one segment with valid characters?// don't forget to escape the apostrophestr = str.replace(/'/g, "''");this.connection.query(`${query}, '${str}'`);}}else {// a simple signal to the channel without a messagethis.connection.query(query);}return this;};//receive and assemblep._onmessage = function(msg) {const {processId, channel, payload} = msg;// we skip "our"if (processId == this.connection.processID this.skipSelf) {return;}// "coordinates" sourceconst id = `${processId}:${channel}`;let rv;// segment typelet fl = payload.charAt(0);if (fl == PAYLOAD_FL_SEQ || fl == PAYLOAD_FL_FIN) {// base64const str = payload.slice(PAYLOAD_SZ_HEAD);const slices = this.slices;let b64;if (fl == PAYLOAD_FL_FIN) {//assemble the containerif (slices[id]) {slices[id].push(str);b64 = slices[id].join(');delete slices[id];}else {b64 = str;}}else {// add a segment to the arrayif (slices[id]) {slices[id].push(str);}else {slices[id] = [str];}}if (b64) {rv = Buffer.from(b64, 'base64').toString();fl = rv.charAt(0);}}else {// simple string/objectrv = payload;}if (rv !== undefined) { // could be ''let res = {processId, channel};if (rv) {// unpack the message according to its typelet data = rv.slice(1);res.payload = fl == PAYLOAD_FL_OBJ ? JSON.parse(data) : data;}this.emit(channel, res);}};

Some tests

const pg = require('pg');const pgsql = new pg.Client({host : 'example-db', port : 5432, user : 'postgres', password : 'postgres', database : '_tmp'});pgsql.connect(err => {let psA = new PubSub(pgsql, 1000);let psB = new PubSub(pgsql, 1000);let chA = 'channel:A';let chB = 'channel:B';psA.subscribe(chA);psB.subscribe(chB);psA.on(chA, (msg) => {console.log('A:rcv', msg);});psB.on(chB, (msg) => {console.log('B:rcv', msg);});psB.publish(chA);psB.publish(chA, 'simple string');psB.publish(chA, 'mama washed the frame');psB.publish(chA, {a : 1});psA.publish(chB, 'mom washed the frame 100 times'.repeat(100));});

It’s simple enough, so you can easily implement it in any other language you use in your project, using asynchronous notifications examples :

You may also like