温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

PostgreSQL中的ProcessRepliesIfAny函数分析

发布时间:2021-11-09 14:58:39 来源:亿速云 阅读:222 作者:iii 栏目:关系型数据库

本篇内容主要讲解“PostgreSQL中的ProcessRepliesIfAny函数分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“PostgreSQL中的ProcessRepliesIfAny函数分析”吧!

调用栈如下:

(gdb) bt #0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6 #1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0,      nevents=1) at latch.c:1048 #2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1,      wait_event_info=83886092) at latch.c:1000 #3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999,      wait_event_info=83886092) at latch.c:385 #4  0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229 #5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684 #6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")     at walsender.c:1539 #7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")     at postgres.c:4178 #8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361 #9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033 #10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706 #11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379 #12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

N/A

二、源码解读

ProcessRepliesIfAny
在streaming期间,处理接收到的消息,同时检查远程终端是否关闭了连接,执行相关处理.
代码不多也不复杂,可自行阅读.

/*  * Process any incoming messages while streaming. Also checks if the remote  * end has closed the connection.  * 在streaming期间,处理接收到的消息.  * 同时检查远程终端是否关闭了连接,执行相关处理.  */ static void ProcessRepliesIfAny(void) {     unsigned char firstchar;     int         r;     bool        received = false;     //当前时间     last_processing = GetCurrentTimestamp();     for (;;)     {         //---------- 循环接收相关消息         pq_startmsgread();         r = pq_getbyte_if_available(&firstchar);         if (r < 0)         {             /* unexpected error or EOF */             //未知异常或者EOF             ereport(COMMERROR,                     (errcode(ERRCODE_PROTOCOL_VIOLATION),                      errmsg("unexpected EOF on standby connection")));             //进程退出             proc_exit(0);         }         if (r == 0)         {             /* no data available without blocking */             //已无阻塞的消息数据,退出             pq_endmsgread();             break;         }         /* Read the message contents */         //读取消息内容         resetStringInfo(&reply_message);         if (pq_getmessage(&reply_message, 0))         {             ereport(COMMERROR,                     (errcode(ERRCODE_PROTOCOL_VIOLATION),                      errmsg("unexpected EOF on standby connection")));             proc_exit(0);         }         /*          * If we already received a CopyDone from the frontend, the frontend          * should not send us anything until we've closed our end of the COPY.          * XXX: In theory, the frontend could already send the next command          * before receiving the CopyDone, but libpq doesn't currently allow          * that.          * 如果已在前台接收到CopyDone消息,前台不应该再发送消息,直至关闭COPY.          * XXX:理论上来说,在接收到CopyDone前,前台可能已经发送了下一个命令,但libpq不允许这种情况发生          */         if (streamingDoneReceiving && firstchar != 'X')             ereport(FATAL,                     (errcode(ERRCODE_PROTOCOL_VIOLATION),                      errmsg("unexpected standby message type \"%c\", after receiving CopyDone",                             firstchar)));         /* Handle the very limited subset of commands expected in this phase */         //处理有限几个命令         switch (firstchar)         {                 /*                  * 'd' means a standby reply wrapped in a CopyData packet.                  * 'd'意味着standby节点的应答封装了CopyData包                  */             case 'd':                 ProcessStandbyMessage();                 received = true;                 break;                 /*                  * CopyDone means the standby requested to finish streaming.                  * Reply with CopyDone, if we had not sent that already.                  * CopyDone意味着standby节点请求结束streaming.                  * 如尚未发送,则使用CopyDone应答.                  */             case 'c':                 if (!streamingDoneSending)                 {                     pq_putmessage_noblock('c', NULL, 0);                     streamingDoneSending = true;                 }                 streamingDoneReceiving = true;                 received = true;                 break;                 /*                  * 'X' means that the standby is closing down the socket.                  * 'X'意味着standby节点正在关闭socket                  */             case 'X':                 proc_exit(0);             default:                 ereport(FATAL,                         (errcode(ERRCODE_PROTOCOL_VIOLATION),                          errmsg("invalid standby message type \"%c\"",                                 firstchar)));         }     }     /*      * Save the last reply timestamp if we've received at least one reply.      * 如接收到至少一条应答信息,则保存最后的应答时间戳.      */     if (received)     {         last_reply_timestamp = last_processing;         waiting_for_ping_response = false;     } }

二、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点

(gdb) set follow-fork-mode child (gdb) b ProcessRepliesIfAny Breakpoint 2 at 0x85343b: file walsender.c, line 1597. (gdb) c Continuing. Breakpoint 2, ProcessRepliesIfAny () at walsender.c:1597 1597        bool        received = false; (gdb)

查看进程信息

[xdb@localhost ~]$ ps -ef|grep postgres xdb       1376     1  0 14:16 ?        00:00:00 /appdb/xdb/pg11.2/bin/postgres xdb       1377  1376  0 14:16 ?        00:00:00 postgres: logger    xdb       1550  1376  0 16:53 ?        00:00:00 postgres: checkpointer    xdb       1551  1376  0 16:53 ?        00:00:00 postgres: background writer    xdb       1552  1376  0 16:53 ?        00:00:00 postgres: walwriter    xdb       1553  1376  0 16:53 ?        00:00:00 postgres: autovacuum launcher   xdb       1554  1376  0 16:53 ?        00:00:00 postgres: archiver    xdb       1555  1376  0 16:53 ?        00:00:00 postgres: stats collector    xdb       1556  1376  0 16:53 ?        00:00:00 postgres: logical replication launcher   xdb       1633  1376  0 17:26 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle

循环接收相关消息

(gdb) n 1599        last_processing = GetCurrentTimestamp(); (gdb)  1603            pq_startmsgread(); (gdb)  1604            r = pq_getbyte_if_available(&firstchar); (gdb)  1605            if (r < 0) (gdb) p r $1 = 1 (gdb) p firstchar $2 = 100 'd' (gdb)

命令是’d’,执行相关处理

(gdb) n 1613            if (r == 0) (gdb)  1621            resetStringInfo(&reply_message); (gdb)  1622            if (pq_getmessage(&reply_message, 0)) (gdb)  1637            if (streamingDoneReceiving && firstchar != 'X') (gdb)  1644            switch (firstchar) (gdb)  1650                    ProcessStandbyMessage(); (gdb)  1651                    received = true; (gdb)  1652                    break; (gdb)  1681        } (gdb)

设置断点

(gdb) b walsender.c:1643 Breakpoint 3 at 0x8535b6: file walsender.c, line 1643. (gdb) b walsender.c:1672 Breakpoint 4 at 0x85361a: file walsender.c, line 1672. (gdb) c Continuing. Breakpoint 3, ProcessRepliesIfAny () at walsender.c:1644 1644            switch (firstchar) (gdb)  Continuing. ... Breakpoint 4, ProcessRepliesIfAny () at walsender.c:1673 1673                    proc_exit(0); (gdb)

进程即将退出,查看进程信息

[xdb@localhost ~]$ ps -ef|grep postgres xdb       1376     1  0 14:16 ?        00:00:00 /appdb/xdb/pg11.2/bin/postgres xdb       1377  1376  0 14:16 ?        00:00:00 postgres: logger    xdb       1550  1376  0 16:53 ?        00:00:00 postgres: checkpointer    xdb       1551  1376  0 16:53 ?        00:00:00 postgres: background writer    xdb       1552  1376  0 16:53 ?        00:00:00 postgres: walwriter    xdb       1553  1376  0 16:53 ?        00:00:00 postgres: autovacuum launcher   xdb       1554  1376  0 16:53 ?        00:00:00 postgres: archiver    xdb       1555  1376  0 16:53 ?        00:00:00 postgres: stats collector    xdb       1556  1376  0 16:53 ?        00:00:00 postgres: logical replication launcher   xdb       1633  1376  0 17:26 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40528) idle xdb       1637  1376  0 17:27 ?        00:00:00 postgres: walsender replicator 192.168.26.26(40530) streaming 0/5D075248 [xdb@localhost ~]$

进程退出(PID=1633),启动了新的进程(PID=1637)

(gdb) n [Inferior 2 (process 1633) exited normally] (gdb)

到此,相信大家对“PostgreSQL中的ProcessRepliesIfAny函数分析”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI