MyLogicalRepWorker->relstate,
                                    MyLogicalRepWorker->relstate_lsn);
  
 -       /* End wal streaming so wrconn can be re-used to drop the slot. */
 -       walrcv_endstreaming(wrconn, &tli);
 +       /*
 +        * End streaming so that LogRepWorkerWalRcvConn can be used to drop
 +        * the slot.
 +        */
 +       walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
  
         /*
          * Cleanup the tablesync slot.
           * otherwise, it won't be dropped till the corresponding subscription
          * is dropped. So passing missing_ok = false.
          */
 -       ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
 +       ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
  
         finish_sync_worker();
     }
          for (;;)
         {
             /* Try read the data. */
 -           len = walrcv_receive(wrconn, &buf, &fd);
 +           len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
  
             CHECK_FOR_INTERRUPTS();
  
                       "   AND c.relname = %s",
                      quote_literal_cstr(nspname),
                      quote_literal_cstr(relname));
 -   res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
 +   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
 +                     lengthof(tableRow), tableRow);
  
     if (res->status != WALRCV_OK_TUPLES)
         ereport(ERROR,
                       "   AND a.attrelid = %u"
                      " ORDER BY a.attnum",
                      lrel->remoteid,
 -                    (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
 +                    (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
 +                        "AND a.attgenerated = ''" : ""),
                      lrel->remoteid);
 -   res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
 +   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
 +                     lengthof(attrRow), attrRow);
  
     if (res->status != WALRCV_OK_TUPLES)
         ereport(ERROR,
          appendStringInfo(&cmd, " FROM %s) TO STDOUT",
                          quote_qualified_identifier(lrel.nspname, lrel.relname));
     }
 -   res = walrcv_exec(wrconn, cmd.data, 0, NULL);
 +   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
     pfree(cmd.data);
     if (res->status != WALRCV_OK_COPY_OUT)
         ereport(ERROR,
       * application_name, so that it is different from the main apply worker,
      * so that synchronous replication can distinguish them.
      */
 -   wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
 -   if (wrconn == NULL)
 +   LogRepWorkerWalRcvConn =
 +       walrcv_connect(MySubscription->conninfo, true, slotname, &err);
 +   if (LogRepWorkerWalRcvConn == NULL)
         ereport(ERROR,
                 (errmsg("could not connect to the publisher: %s", err)));
  
           * breakdown then it wouldn't have succeeded so trying it next time
          * seems like a better bet.
          */
 -       ReplicationSlotDropAtPubNode(wrconn, slotname, true);
 +       ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
     }
     else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
     {
       * ensures that both the replication slot we create (see below) and the
      * COPY are consistent with each other.
      */
 -   res = walrcv_exec(wrconn,
 +   res = walrcv_exec(LogRepWorkerWalRcvConn,
                       "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                       0, NULL);
     if (res->status != WALRCV_OK_COMMAND)
       * slot leading to a dangling slot on the server.
      */
     HOLD_INTERRUPTS();
 -   walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
 +   walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
                        CRS_USE_SNAPSHOT, origin_startpos);
     RESUME_INTERRUPTS();
  
      copy_table(rel);
     PopActiveSnapshot();
  
 -   res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
 +   res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
     if (res->status != WALRCV_OK_COMMAND)
         ereport(ERROR,
                 (errmsg("table copy could not finish transaction on publisher: %s",
          /* per stream context for streaming transactions */
  static MemoryContext LogicalStreamingContext = NULL;
  
 -WalReceiverConn *wrconn = NULL;
 +WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
  
  Subscription *MySubscription = NULL;
  bool       MySubscriptionValid = false;
   
         MemoryContextSwitchTo(ApplyMessageContext);
  
 -       len = walrcv_receive(wrconn, &buf, &fd);
 +       len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
  
         if (len != 0)
         {
                      MemoryContextReset(ApplyMessageContext);
                 }
  
 -               len = walrcv_receive(wrconn, &buf, &fd);
 +               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
             }
         }
  
      }
  
     /* All done */
 -   walrcv_endstreaming(wrconn, &tli);
 +   walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
  }
  
  /*
           LSN_FORMAT_ARGS(writepos),
          LSN_FORMAT_ARGS(flushpos));
  
 -   walrcv_send(wrconn, reply_message->data, reply_message->len);
 +   walrcv_send(LogRepWorkerWalRcvConn,
 +               reply_message->data, reply_message->len);
  
     if (recvpos > last_recvpos)
         last_recvpos = recvpos;
          origin_startpos = replorigin_session_get_progress(false);
         CommitTransactionCommand();
  
 -       wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
 -                               &err);
 -       if (wrconn == NULL)
 +       LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
 +                                               MySubscription->name, &err);
 +       if (LogRepWorkerWalRcvConn == NULL)
             ereport(ERROR,
                     (errmsg("could not connect to the publisher: %s", err)));
  
           * We don't really use the output identify_system for anything but it
          * does some initializations on the upstream so let's still call it.
          */
 -       (void) walrcv_identify_system(wrconn, &startpointTLI);
 +       (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
     }
  
     /*
      options.startpoint = origin_startpos;
     options.slotname = myslotname;
     options.proto.logical.proto_version =
 -       walrcv_server_version(wrconn) >= 140000 ?
 +       walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
         LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
     options.proto.logical.publication_names = MySubscription->publications;
     options.proto.logical.binary = MySubscription->binary;
     options.proto.logical.streaming = MySubscription->stream;
  
     /* Start normal logical streaming replication. */
 -   walrcv_startstreaming(wrconn, &options);
 +   walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
  
     /* Run the main loop. */
     LogicalRepApplyLoop(origin_startpos);