#include "gtm/libpq-int.h"
#include "gtm/pqformat.h"
+static GTM_SnapshotData localSnapshot;
/*
* Get snapshot for the given transactions. If this is the first call in the
* transaction, a fresh snapshot is taken and returned back. For a serializable
for (ii = 0; ii < txn_count; ii++)
{
- mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
+ /*
+ * Even if the request does not contain a valid GXID, we still send
+ * down a snapshot, but mark the status field acoordingly
+ */
+ if (handle[ii] != InvalidTransactionHandle)
+ mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
+ else
+ status[ii] = STATUS_NOT_FOUND;
/*
* If the transaction does not exist, just mark the status field with
* a STATUS_ERROR code
*/
- if (mygtm_txninfo == NULL)
- status[ii] = STATUS_ERROR;
- else if (snapshot == NULL)
+ if ((mygtm_txninfo != NULL) && (snapshot == NULL))
snapshot = &mygtm_txninfo->gti_current_snapshot;
}
/*
- * If no valid transaction exists in the array, send an error message back.
- * Otherwise, we should still get the snapshot and send it back. The
- * invalid transaction ids are marked separately in the status array.
+ * If no valid transaction exists in the array, we record the snapshot in a
+ * local strucure and still send it out to the caller
*/
if (snapshot == NULL)
- return NULL;
+ snapshot = &localSnapshot;
Assert(snapshot != NULL);
* We have already gone through all the transaction handles above and
* marked the invalid handles with STATUS_ERROR
*/
- if (status[ii] == STATUS_ERROR)
+ if ((status[ii] == STATUS_ERROR) || (status[ii] == STATUS_NOT_FOUND))
continue;
mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]);
int txn_count = 1;
const char *data = NULL;
- /*
- * Here we consume a byte which is a boolean to determine if snapshot can
- * be grouped or not. This is used only by GTM-Proxy and it is useless for GTM
- * so consume data.
- */
- pq_getmsgbyte(message);
-
data = pq_getmsgbytes(message, sizeof (gxid));
if (data == NULL)
ereport(ERROR,
GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message);
static void ProcessSnapshotCommand(GTMProxy_ConnectionInfo *conninfo,
GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message);
-static void ProcessSequenceCommand(GTMProxy_ConnectionInfo *conninfo,
- GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message);
-static void ProcessBarrierCommand(GTMProxy_ConnectionInfo *conninfo,
- GTM_Conn *gtm_conn, GTM_MessageType mtype, StringInfo message);
static void GTMProxy_RegisterPGXCNode(GTMProxy_ConnectionInfo *conninfo,
char *node_name,
static GTM_Conn *ConnectGTM(void);
static void ReleaseCmdBackup(GTMProxy_CommandInfo *cmdinfo);
static void workerThreadReconnectToGTM(void);
+static bool IsProxiedMessage(GTM_MessageType mtype);
/*
* One-time initialization. It's called immediately after the main process
switch (mtype)
{
- case MSG_NODE_REGISTER:
- case MSG_NODE_UNREGISTER:
+ case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
+ case MSG_TXN_PREPARE:
+ case MSG_TXN_START_PREPARED:
+ case MSG_TXN_GET_GID_DATA:
+ case MSG_TXN_COMMIT_PREPARED:
+ case MSG_SNAPSHOT_GET:
+ case MSG_SEQUENCE_INIT:
+ case MSG_SEQUENCE_GET_CURRENT:
+ case MSG_SEQUENCE_GET_NEXT:
+ case MSG_SEQUENCE_GET_LAST:
+ case MSG_SEQUENCE_SET_VAL:
+ case MSG_SEQUENCE_RESET:
+ case MSG_SEQUENCE_CLOSE:
+ case MSG_SEQUENCE_RENAME:
+ case MSG_SEQUENCE_ALTER:
+ case MSG_BARRIER:
#ifdef XCP
case MSG_REGISTER_SESSION:
#endif
+ GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, input_message);
+ break;
+
+
+ case MSG_NODE_REGISTER:
+ case MSG_NODE_UNREGISTER:
ProcessPGXCNodeCommand(conninfo, gtm_conn, mtype, input_message);
break;
case MSG_TXN_BEGIN:
case MSG_TXN_BEGIN_GETGXID:
- case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
- case MSG_TXN_PREPARE:
- case MSG_TXN_START_PREPARED:
case MSG_TXN_COMMIT:
- case MSG_TXN_COMMIT_PREPARED:
case MSG_TXN_ROLLBACK:
case MSG_TXN_GET_GXID:
- case MSG_TXN_GET_GID_DATA:
ProcessTransactionCommand(conninfo, gtm_conn, mtype, input_message);
break;
- case MSG_SNAPSHOT_GET:
+ case MSG_SNAPSHOT_GET_MULTI:
case MSG_SNAPSHOT_GXID_GET:
ProcessSnapshotCommand(conninfo, gtm_conn, mtype, input_message);
break;
- case MSG_SEQUENCE_INIT:
- case MSG_SEQUENCE_GET_CURRENT:
- case MSG_SEQUENCE_GET_NEXT:
- case MSG_SEQUENCE_GET_LAST:
- case MSG_SEQUENCE_SET_VAL:
- case MSG_SEQUENCE_RESET:
- case MSG_SEQUENCE_CLOSE:
- case MSG_SEQUENCE_RENAME:
- case MSG_SEQUENCE_ALTER:
- ProcessSequenceCommand(conninfo, gtm_conn, mtype, input_message);
- break;
- case MSG_BARRIER:
- ProcessBarrierCommand(conninfo, gtm_conn, mtype, input_message);
- break;
-
default:
ereport(FATAL,
(EPROTO,
}
+static bool
+IsProxiedMessage(GTM_MessageType mtype)
+{
+ switch (mtype)
+ {
+ case MSG_TXN_BEGIN:
+ case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
+ case MSG_TXN_PREPARE:
+ case MSG_TXN_START_PREPARED:
+ /* There are not so many 2PC from application messages, so just proxy it. */
+ case MSG_TXN_COMMIT_PREPARED:
+ case MSG_TXN_GET_GXID:
+ case MSG_TXN_GET_GID_DATA:
+ case MSG_NODE_REGISTER:
+ case MSG_NODE_UNREGISTER:
+#ifdef XCP
+ case MSG_REGISTER_SESSION:
+#endif
+ case MSG_SNAPSHOT_GXID_GET:
+ case MSG_SEQUENCE_INIT:
+ case MSG_SEQUENCE_GET_CURRENT:
+ case MSG_SEQUENCE_GET_NEXT:
+ case MSG_SEQUENCE_GET_LAST:
+ case MSG_SEQUENCE_SET_VAL:
+ case MSG_SEQUENCE_RESET:
+ case MSG_SEQUENCE_CLOSE:
+ case MSG_SEQUENCE_RENAME:
+ case MSG_SEQUENCE_ALTER:
+ case MSG_SNAPSHOT_GET:
+ return true;
+
+ default:
+ return false;
+ }
+}
+
static void
ProcessResponse(GTMProxy_ThreadInfo *thrinfo, GTMProxy_CommandInfo *cmdinfo,
GTM_Result *res)
StringInfoData buf;
GlobalTransactionId gxid;
GTM_Timestamp timestamp;
+ int status;
switch (cmdinfo->ci_mtype)
{
ReleaseCmdBackup(cmdinfo);
break;
- case MSG_SNAPSHOT_GET:
+ case MSG_SNAPSHOT_GET_MULTI:
if ((res->gr_type != SNAPSHOT_GET_RESULT) &&
(res->gr_type != SNAPSHOT_GET_MULTI_RESULT))
{
if (cmdinfo->ci_res_index >= res->gr_resdata.grd_txn_snap_multi.txn_count)
{
ReleaseCmdBackup(cmdinfo);
- elog(ERROR, "Too few GXIDs");
+ elog(ERROR, "Too few GXIDs - %d:%d", cmdinfo->ci_res_index,
+ res->gr_resdata.grd_txn_snap_multi.txn_count);
}
- if (res->gr_resdata.grd_txn_snap_multi.status[cmdinfo->ci_res_index] == STATUS_OK)
+ status = res->gr_resdata.grd_txn_snap_multi.status[cmdinfo->ci_res_index];
+ if ((status == STATUS_OK) || (status == STATUS_NOT_FOUND))
{
int txn_count = 1;
int status = STATUS_OK;
case MSG_SEQUENCE_CLOSE:
case MSG_SEQUENCE_RENAME:
case MSG_SEQUENCE_ALTER:
+ case MSG_SNAPSHOT_GET:
+ Assert(IsProxiedMessage(cmdinfo->ci_mtype));
if ((res->gr_proxyhdr.ph_conid == InvalidGTMProxyConnID) ||
(res->gr_proxyhdr.ph_conid >= GTM_PROXY_MAX_CONNECTIONS) ||
(thrinfo->thr_all_conns[res->gr_proxyhdr.ph_conid] != cmdinfo->ci_conn))
GTMProxy_ProxyPGXCNodeCommand(conninfo, gtm_conn, mtype, cmd_data);
break;
}
-#ifdef XCP
- case MSG_REGISTER_SESSION:
- GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, message);
- break;
-#endif
default:
Assert(0); /* Shouldn't come here.. Keep compiler quiet */
}
elog(FATAL, "Support not yet added for these message types");
break;
- case MSG_TXN_BEGIN_GETGXID_AUTOVACUUM:
- case MSG_TXN_PREPARE:
- case MSG_TXN_START_PREPARED:
- case MSG_TXN_GET_GID_DATA:
- case MSG_TXN_COMMIT_PREPARED:
- GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, message);
- break;
-
default:
Assert(0); /* Shouldn't come here.. keep compiler quiet */
}
ProcessSnapshotCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
GTM_MessageType mtype, StringInfo message)
{
- bool canbe_grouped = false;
GTMProxy_CommandData cmd_data;
switch (mtype)
{
- case MSG_SNAPSHOT_GET:
- canbe_grouped = pq_getmsgbyte(message);
- if (!canbe_grouped)
- GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, message);
- else
+ case MSG_SNAPSHOT_GET_MULTI:
{
{
const char *data = pq_getmsgbytes(message,
}
-static void
-ProcessSequenceCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
- GTM_MessageType mtype, StringInfo message)
-{
- /*
- * We proxy the Sequence messages as they are. Just add the connection
- * identifier to it so that the response can be quickly sent back to the
- * right backend.
- *
- * Write the message, but don't flush it just yet.
- */
- return GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, message);
-}
-
-static void
-ProcessBarrierCommand(GTMProxy_ConnectionInfo *conninfo, GTM_Conn *gtm_conn,
- GTM_MessageType mtype, StringInfo message)
-{
- /*
- * We proxy the Barrier messages as they are. Just add the connection
- * identifier to it so that the response can be quickly sent back to the
- * right backend.
- *
- * Write the message, but don't flush it just yet.
- */
- return GTMProxy_ProxyCommand(conninfo, gtm_conn, mtype, message);
-}
-
-
/*
* Proxy the incoming message to the GTM server after adding our own identifier
* to it. The rest of the message is forwarded as it is without even reading
GTMProxy_CommandInfo *cmdinfo;
GTMProxy_ThreadInfo *thrinfo = GetMyThreadInfo;
GTM_ProxyMsgHeader proxyhdr;
- char *unreadmsg;
+ const char *unreadmsg;
int unreadmsglen;
+ Assert(IsProxiedMessage(mtype));
+
proxyhdr.ph_conid = conninfo->con_id;
unreadmsglen = pq_getmsgunreadlen(message);
gtmpqPutnchar((char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader), gtm_conn) ||
gtmpqPutInt(mtype, sizeof (GTM_MessageType), gtm_conn) ||
gtmpqPutnchar(unreadmsg, unreadmsglen, gtm_conn))
- elog(ERROR, "Error proxing data");
+ elog(ERROR, "Error sending proxied message");
/*
* Add the message to the pending command list
thrinfo->thr_pending_commands[ii] = gtm_NIL;
break;
- case MSG_SNAPSHOT_GET:
+ case MSG_SNAPSHOT_GET_MULTI:
if (gtmpqPutInt(MSG_SNAPSHOT_GET_MULTI, sizeof (GTM_MessageType), gtm_conn) ||
gtmpqPutInt(gtm_list_length(thrinfo->thr_pending_commands[ii]), sizeof(int), gtm_conn))
elog(ERROR, "Error sending data");