wal_decoding: Add (non-)transactional message feature for logical decoding
authorAndres Freund <[email protected]>
Sun, 11 May 2014 14:24:20 +0000 (16:24 +0200)
committerAndres Freund <[email protected]>
Thu, 3 Jul 2014 15:55:17 +0000 (17:55 +0200)
contrib/test_decoding/test_decoding.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalfuncs.c
src/backend/replication/logical/reorderbuffer.c
src/backend/storage/ipc/standby.c
src/include/catalog/pg_proc.h
src/include/replication/logicalfuncs.h
src/include/replication/output_plugin.h
src/include/replication/reorderbuffer.h
src/include/storage/standby.h

index 5ce052b5c61409f3dc0574d8d45c6642bee8a4e6..f091770e879236e58b1901695cf308af70b3c077 100644 (file)
@@ -54,6 +54,10 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 static void pg_decode_change(LogicalDecodingContext *ctx,
                 ReorderBufferTXN *txn, Relation rel,
                 ReorderBufferChange *change);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+                             ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+                             bool transactional, Size sz,
+                             const char *message);
 
 void
 _PG_init(void)
@@ -72,6 +76,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    cb->change_cb = pg_decode_change;
    cb->commit_cb = pg_decode_commit_txn;
    cb->shutdown_cb = pg_decode_shutdown;
+   cb->message_cb = pg_decode_message;
 }
 
 
@@ -405,3 +410,16 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
    OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+                 ReorderBufferTXN *txn, XLogRecPtr lsn,
+                 bool transactional, Size sz,
+                 const char *message)
+{
+   OutputPluginPrepareWrite(ctx, true);
+   appendStringInfo(ctx->out, "message: lsn: %X/%X transactional: %d sz: %zu content:",
+                    (uint32)(lsn >> 32), (uint32)lsn, transactional, sz);
+   appendBinaryStringInfo(ctx->out, message, sz);
+   OutputPluginWrite(ctx, true);
+}
index d5f19b1b8f781bd1b20913df89ce2426146622aa..fe5d185c49ad24ada32009f94d57bd572ce5027a 100644 (file)
@@ -350,6 +350,20 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
            break;
        case XLOG_STANDBY_LOCK:
            break;
+       case XLOG_STANDBY_MESSAGE:
+           {
+               xl_standby_message *message = (xl_standby_message *) buf->record_data;
+               if (message->transactional &&
+                   !SnapBuildProcessChange(builder, r->xl_xid, buf->origptr))
+                   break;
+               else if(!message->transactional &&
+                       SnapBuildXactNeedsSkip(builder, buf->origptr))
+                   break;
+
+               ReorderBufferQueueMessage(ctx->reorder, r->xl_xid, buf->endptr,
+                                         message->transactional, message->size, message->message);
+               break;
+           }
        default:
            elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
    }
index bac010be6ad949072229736318bb9a15fa575290..b7295d3a120cfce2aaf87714f1bda172e71f4d25 100644 (file)
@@ -67,6 +67,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
                  Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                 XLogRecPtr message_lsn, bool transactional, Size sz,
+                 const char *message);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
@@ -169,6 +172,7 @@ StartupDecodingContext(List *output_plugin_options,
    ctx->reorder->begin = begin_cb_wrapper;
    ctx->reorder->apply_change = change_cb_wrapper;
    ctx->reorder->commit = commit_cb_wrapper;
+   ctx->reorder->message = message_cb_wrapper;
 
    ctx->out = makeStringInfo();
    ctx->prepare_write = prepare_write;
@@ -711,6 +715,40 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    error_context_stack = errcallback.previous;
 }
 
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+                              XLogRecPtr message_lsn,
+                              bool transactional, Size sz,
+                              const char *message)
+{
+   LogicalDecodingContext *ctx = cache->private_data;
+   LogicalErrorCallbackState state;
+   ErrorContextCallback errcallback;
+
+   if (ctx->callbacks.message_cb == NULL)
+       return;
+
+   /* Push callback + info on the error context stack */
+   state.ctx = ctx;
+   state.callback_name = "message";
+   state.report_location = message_lsn; /* beginning of commit record */
+   errcallback.callback = output_plugin_error_callback;
+   errcallback.arg = (void *) &state;
+   errcallback.previous = error_context_stack;
+   error_context_stack = &errcallback;
+
+   /* set output state */
+   ctx->accept_writes = true;
+   ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+   ctx->write_location = message_lsn;
+
+   /* do the actual work: call callback */
+   ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional,
+                             sz, message);
+
+   /* Pop the error context stack */
+   error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
index a3a58e6a49cad81f63bb9686f8e3d37bc917ec87..360329f081601e2027a8efce932f9bba7e70d4b9 100644 (file)
@@ -21,6 +21,8 @@
 #include "funcapi.h"
 #include "miscadmin.h"
 
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
@@ -40,6 +42,7 @@
 #include "replication/logicalfuncs.h"
 
 #include "storage/fd.h"
+#include "storage/standby.h"
 
 /* private date for writing out data */
 typedef struct DecodingOutputState
@@ -513,3 +516,25 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 
    return ret;
 }
+
+
+/*
+ * SQL function returning the changestream in binary, only peeking ahead.
+ */
+Datum
+pg_logical_send_message_bytea(PG_FUNCTION_ARGS)
+{
+   bool transactional = PG_GETARG_BOOL(0);
+   bytea *data = PG_GETARG_BYTEA_PP(1);
+   XLogRecPtr lsn;
+
+   lsn = LogStandbyMessage(VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data), transactional);
+   PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_send_message_text(PG_FUNCTION_ARGS)
+{
+   /* bytea and text are compatible */
+   return pg_logical_send_message_bytea(fcinfo);
+}
index a938186873672a41476e017f24a058f18f529b85..cb061e42a90cbf8a853e9e1571fa227ea5db12a7 100644 (file)
@@ -413,6 +413,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
                change->data.tp.oldtuple = NULL;
            }
            break;
+       case REORDER_BUFFER_CHANGE_MESSAGE:
+           if (change->data.msg.message != NULL)
+               pfree(change->data.msg.message);
+           change->data.msg.message = NULL;
+           break;
        case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
            if (change->data.snapshot)
            {
@@ -597,6 +602,38 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
    ReorderBufferCheckSerializeTXN(rb, txn);
 }
 
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+                         bool transactional, Size sz, const char *msg)
+{
+   ReorderBufferTXN *txn = NULL;
+
+   if (xid != InvalidTransactionId)
+       txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+   if (transactional)
+   {
+       ReorderBufferChange *change;
+
+       Assert(xid != InvalidTransactionId);
+       Assert(txn != NULL);
+
+       change = ReorderBufferGetChange(rb);
+       change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+       change->data.msg.transactional = true;
+       change->data.msg.sz = sz;
+       change->data.msg.message = palloc(sz);
+       memcpy(change->data.msg.message, msg, sz);
+
+       ReorderBufferQueueChange(rb, xid, lsn, change);
+   }
+   else
+   {
+       rb->message(rb, txn, lsn, transactional, sz, msg);
+   }
+}
+
+
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
@@ -1407,6 +1444,12 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                    }
                    RelationClose(relation);
                    break;
+               case REORDER_BUFFER_CHANGE_MESSAGE:
+                   rb->message(rb, txn, change->lsn,
+                               change->data.msg.transactional,
+                               change->data.msg.sz,
+                               change->data.msg.message);
+                   break;
                case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
                    /* get rid of the old */
                    TeardownHistoricSnapshot(false);
@@ -2043,6 +2086,18 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                }
                break;
            }
+       case REORDER_BUFFER_CHANGE_MESSAGE:
+           {
+               char       *data;
+
+               sz += change->data.msg.sz;
+               ReorderBufferSerializeReserve(rb, sz);
+
+               data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+               memcpy(data, change->data.msg.message,
+                      change->data.msg.sz);
+               break;
+           }
        case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
            {
                Snapshot    snap;
@@ -2283,6 +2338,14 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                data += len;
            }
            break;
+       case REORDER_BUFFER_CHANGE_MESSAGE:
+           {
+               Size        len = change->data.msg.sz;
+               change->data.msg.message = palloc(len);
+               memcpy(change->data.msg.message, data, len);
+
+               data += len;
+           }
        case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
            {
                Snapshot    oldsnap;
index 1c327fd45c71e1dca419fafb5ab8af3ab5263d78..ec6564c8114c4926ec798b60b41eb3154cfd0142 100644 (file)
@@ -794,6 +794,10 @@ standby_redo(XLogRecPtr lsn, XLogRecord *record)
 
        ProcArrayApplyRecoveryInfo(&running);
    }
+   else if (info == XLOG_STANDBY_MESSAGE)
+   {
+       /* Only interesting to logical decoding. Check decode.c */
+   }
    else
        elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1054,3 +1058,31 @@ LogAccessExclusiveLockPrepare(void)
     */
    (void) GetTopTransactionId();
 }
+
+XLogRecPtr
+LogStandbyMessage(const char *message, size_t size, bool transactional)
+{
+   xl_standby_message xlrec;
+   XLogRecData     rdata[2];
+
+   /*
+    * Force xid to be allocated if we're sending a transactional message.
+    */
+   if (transactional)
+       GetCurrentTransactionId();
+
+   xlrec.size = size;
+   xlrec.transactional = transactional;
+
+   rdata[0].data = (char *) &xlrec;
+   rdata[0].len = SizeOfStandbyMessage;
+   rdata[0].buffer = InvalidBuffer;
+   rdata[0].next = &rdata[1];
+
+   rdata[1].data = (char *) message;
+   rdata[1].len = size;
+   rdata[1].buffer = InvalidBuffer;
+   rdata[1].next = NULL;
+
+   return XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_MESSAGE, rdata);
+}
index 36a8a0d277272be7ff6a6e0a480650a9dd4ea8a6..6d56fd59a55512d11c2b9869e875ac26244b08c2 100644 (file)
@@ -4984,6 +4984,10 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3791 (  pg_logical_send_message PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 3220 "16 25" _null_ _null_ _null_ _null_ pg_logical_send_message_text _null_ _null_ _null_ ));
+DESCR("send a textual message");
+DATA(insert OID = 3792 (  pg_logical_send_message PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 3220 "16 17" _null_ _null_ _null_ _null_ pg_logical_send_message_bytea _null_ _null_ _null_ ));
+DESCR("send a binary message");
 
 DATA(insert OID = 6022 (  sequence_local_alloc    PGNSP PGUID 12 1 0 0 0 f f f f t f s 4 0 2281 "2281 2281 2281 2281" _null_ _null_ _null_ _null_ sequence_local_alloc _null_ _null_ _null_ ));
 DESCR("Local SequenceAM allocation");
index 21bf44ec4b7b66371d17488e3e79ce267af8ed31..6c7cd4793de7ec3055391cffb181c643f671ff16 100644 (file)
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
 
+extern Datum pg_logical_send_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_send_message_text(PG_FUNCTION_ARGS);
 #endif
index a58e68d30a582d10fed0963f57c656cc2ff5babe..f474d7e9a4adcf7645b954e98f36257e000f4ae1 100644 (file)
@@ -80,6 +80,16 @@ typedef void (*LogicalDecodeShutdownCB) (
                                              struct LogicalDecodingContext *
 );
 
+/*
+ * Called for messages sent by C code.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+                                            struct LogicalDecodingContext *,
+                                            ReorderBufferTXN *txn,
+                                            XLogRecPtr message_lsn,
+                                            bool transactional, Size sz,
+                                            const char *message);
+
 /*
  * Output plugin callbacks
  */
@@ -90,6 +100,7 @@ typedef struct OutputPluginCallbacks
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeShutdownCB shutdown_cb;
+   LogicalDecodeMessageCB message_cb;
 } OutputPluginCallbacks;
 
 void       OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
index d85a69351b46a7c6a4fcacaf727f11b1aa1c465c..5838ec1d75bc7cd83e786c4174af97f6fb2e7b15 100644 (file)
@@ -45,6 +45,7 @@ enum ReorderBufferChangeType
    REORDER_BUFFER_CHANGE_INSERT,
    REORDER_BUFFER_CHANGE_UPDATE,
    REORDER_BUFFER_CHANGE_DELETE,
+   REORDER_BUFFER_CHANGE_MESSAGE,
    REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
    REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
    REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID
@@ -83,6 +84,13 @@ typedef struct ReorderBufferChange
            ReorderBufferTupleBuf *newtuple;
        }           tp;
 
+       struct
+       {
+           size_t sz;
+           bool transactional;
+           char *message;
+       } msg;
+
        /* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
        Snapshot    snapshot;
 
@@ -262,6 +270,14 @@ typedef void (*ReorderBufferCommitCB) (
                                                   ReorderBufferTXN *txn,
                                                   XLogRecPtr commit_lsn);
 
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+                                                  ReorderBuffer *rb,
+                                                  ReorderBufferTXN *txn,
+                                                  XLogRecPtr message_lsn,
+                                                  bool transactional, Size sz,
+                                                  const char *message);
+
 struct ReorderBuffer
 {
    /*
@@ -288,6 +304,7 @@ struct ReorderBuffer
    ReorderBufferBeginCB begin;
    ReorderBufferApplyChangeCB apply_change;
    ReorderBufferCommitCB commit;
+   ReorderBufferMessageCB message;
 
    /*
     * Pointer that will be passed untouched to the callbacks.
@@ -338,6 +355,8 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void       ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
 void       ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void       ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+                                     bool transactional, Size sz, const char *msg);
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
                    XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
                    TimestampTz commit_time, RepNodeId origin_id, XLogRecPtr origin_lsn);
index 89ab704699ba32dedca81d4d0ddfba6ef77f1318..2efe957231868071f2c2fb47ec5ac393cfae69e2 100644 (file)
@@ -55,6 +55,7 @@ extern void StandbyReleaseOldLocks(int nxids, TransactionId *xids);
  */
 #define XLOG_STANDBY_LOCK          0x00
 #define XLOG_RUNNING_XACTS         0x10
+#define XLOG_STANDBY_MESSAGE       0x20
 
 typedef struct xl_standby_locks
 {
@@ -79,6 +80,14 @@ typedef struct xl_running_xacts
 
 #define MinSizeOfXactRunningXacts offsetof(xl_running_xacts, xids)
 
+typedef struct xl_standby_message
+{
+   size_t      size;
+   bool        transactional;
+   char        message[FLEXIBLE_ARRAY_MEMBER];
+} xl_standby_message;
+
+#define SizeOfStandbyMessage   (offsetof(xl_standby_message, message))
 
 /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
 extern void standby_redo(XLogRecPtr lsn, XLogRecord *record);
@@ -114,5 +123,6 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
 extern XLogRecPtr LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbyMessage(const char *message, size_t size, bool transactional);
 
 #endif   /* STANDBY_H */