温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

如何用源码分析canal的deployer模块

发布时间:2021-10-27 16:24:32 来源:亿速云 阅读:179 作者:柒染 栏目:大数据

这期内容当中小编将会给大家带来有关如何用源码分析canal的deployer模块,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

  • CanalLauncher是启动入口类

  1. 获取canal.properties配置文件

  2. 如果canal.properties配置文件中属性root.admin.manager有值,那么构造PlainCanalConfigClient,调用PlainCanalConfigClient的findServer获取PlainCanal,调用PlainCanal的getProperties方法获取properties

  3. 通过properties构造 CanalStarter并调用其start方法

CanalStarter是启动类

public synchronized void start() throws Throwable {         //首先根据canal.serverMode构造CanalMQProducer,如果是kafka,构造的是CanalKafkaProducer         String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);         if (serverMode.equalsIgnoreCase("kafka")) {             canalMQProducer = new CanalKafkaProducer();         } else if (serverMode.equalsIgnoreCase("rocketmq")) {             canalMQProducer = new CanalRocketMQProducer();         }         if (canalMQProducer != null) {             // disable netty             System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");             // 设置为raw避免ByteString->Entry的二次解析             System.setProperty("canal.instance.memory.rawEntry", "false");         }         //接下来构造CanalController并调用其start方法         logger.info("## start the canal server.");         controller = new CanalController(properties);         controller.start();         logger.info("## the canal server is running now ......");         ...         //构造CanalMQStarter并调用其start方法,同时设置为CanalController的属性         if (canalMQProducer != null) {             canalMQStarter = new CanalMQStarter(canalMQProducer);             MQProperties mqProperties = buildMQProperties(properties);             String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);             canalMQStarter.start(mqProperties, destinations);             controller.setCanalMQStarter(canalMQStarter);         }         ...         running = true;     }
  • CanalController是实例调度控制器

public CanalController(final Properties properties){         // 初始化managerClients用于请求admin         managerClients = MigrateMap.makeComputingMap(new Function<String, PlainCanalConfigClient>() {             public PlainCanalConfigClient apply(String managerAddress) {                 return getManagerClient(managerAddress);             }         });         // 初始化全局参数设置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于创建instance,其根据InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator创建CanalInstance         globalInstanceConfig = initGlobalConfig(properties);         instanceConfigs = new MapMaker().makeMap();         // 初始化instance config,包含了实例mode、lazy、managerAddress、springXml         initInstanceConfig(properties);         ...         // 初始化CanalServerWithEmbedded,将instanceGenerator设置为CanalServerWithEmbedded的属性         embededCanalServer = CanalServerWithEmbedded.instance();         embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator         int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));         embededCanalServer.setMetricsPort(metricsPort);         this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);         this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);         embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));         embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));         ...         final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);         //初始化ZkClientx用于canal集群部署,创建/otteradmin/canal/destinations节点和/otteradmin/canal/cluster节点         if (StringUtils.isNotEmpty(zkServers)) {             zkclientx = ZkClientx.getZkClient(zkServers);             // 初始化系统目录             zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);             zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);         }         // 初始化ServerRunningMonitors的ServerRunningMonitor,用于启动、关闭实例         final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);         ServerRunningMonitors.setServerData(serverData);         ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function<String, ServerRunningMonitor>() {             ...         }));         // 初始化InstanceAction,用于启动和关闭实例         autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));         if (autoScan) {             defaultAction = new InstanceAction() {                 ...             };             // 初始化instanceConfigMonitors,用于获取所有instanceConfig并启动所有instance             instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {                 public InstanceConfigMonitor apply(InstanceMode mode) {                     ...                 }             });         }     }
  • ManagerInstanceConfigMonitor是实例扫描器

public void start() {         super.start();         //启动定时任务,定时扫描所有instance         executor.scheduleWithFixedDelay(new Runnable() {             public void run() {                 try {                     scan();                     if (isFirst) {                         isFirst = false;                     }                 } catch (Throwable e) {                     logger.error("scan failed", e);                 }             }         }, 0, scanIntervalInSecond, TimeUnit.SECONDS);     } private void scan() {         //缓存了所有instance的配置,如果发现有新的instance则启动或者修改了instance则重启         String instances = configClient.findInstances(null);         final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));         List<String> start = Lists.newArrayList();         List<String> stop = Lists.newArrayList();         List<String> restart = Lists.newArrayList();         for (String instance : is) {             if (!configs.containsKey(instance)) {                 PlainCanal newPlainCanal = configClient.findInstance(instance, null);                 if (newPlainCanal != null) {                     configs.put(instance, newPlainCanal);                     start.add(instance);                 }             } else {                 PlainCanal plainCanal = configs.get(instance);                 PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());                 if (newPlainCanal != null) {                     // 配置有变化                     restart.add(instance);                     configs.put(instance, newPlainCanal);                 }             }         }         configs.forEach((instance, plainCanal) -> {             if (!is.contains(instance)) {                 stop.add(instance);             }         });         stop.forEach(instance -> {             notifyStop(instance);         });         restart.forEach(instance -> {             notifyReload(instance);         });         start.forEach(instance -> {             notifyStart(instance);         });     } private void notifyStart(String destination) {         try {             //启动instance调用InstanceAction启动实例,最后是调用ServerRunningMonitor启动实例             defaultAction.start(destination);             actions.put(destination, defaultAction);             // 启动成功后记录配置文件信息         } catch (Throwable e) {             logger.error(String.format("scan add found[%s] but start failed", destination), e);         }     }
  • ServerRunningMonitor是针对server的running实例控制

public ServerRunningMonitor(){         // 创建父节点         dataListener = new IZkDataListener() {             public void handleDataChange(String dataPath, Object data) throws Exception {                 MDC.put("destination", destination);                 ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);                 if (!isMine(runningData.getAddress())) {                     mutex.set(false);                 }                 if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active                     releaseRunning();// 彻底释放mainstem                 }                 activeData = (ServerRunningData) runningData;             }             public void handleDataDeleted(String dataPath) throws Exception {                 MDC.put("destination", destination);                 mutex.set(false);                 if (!release && activeData != null && isMine(activeData.getAddress())) {                     // 如果上一次active的状态就是本机,则即时触发一下active抢占                     initRunning();                 } else {                     // 否则就是等待delayTime,避免因网络瞬端或者zk异常,导致出现频繁的切换操作                     delayExector.schedule(new Runnable() {                         public void run() {                             initRunning();                         }                     }, delayTime, TimeUnit.SECONDS);                 }             }         };     } public synchronized void start() {         super.start();         try {             //首先调用listener的processStart方法             processStart();             if (zkClient != null) {                 // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start                 // 监视/otteradmin/canal/destinations/{0}/running节点变化                 String path = ZookeeperPathUtils.getDestinationServerRunning(destination);                 zkClient.subscribeDataChanges(path, dataListener);                                  initRunning();             } else {                 processActiveEnter();// 没有zk,直接启动             }         } catch (Exception e) {             logger.error("start failed", e);             // 没有正常启动,重置一下状态,避免干扰下一次start             stop();         }     } private void processStart() {         if (listener != null) {             try {                 //processStart方法中创建/otteradmin/canal/destinations/{0}/cluster/{1}节点,0是实例名称,1是当前节点ip:port                 listener.processStart();             } catch (Exception e) {                 logger.error("processStart failed", e);             }         }     } private void initRunning() {         if (!isStart()) {             return;         }         String path = ZookeeperPathUtils.getDestinationServerRunning(destination);         // 序列化         byte[] bytes = JsonUtils.marshalToByte(serverData);         try {             mutex.set(false);             //尝试创建/otteradmin/canal/destinations/{0}/running节点             zkClient.create(path, bytes, CreateMode.EPHEMERAL);             activeData = serverData;             //如果成功则调用listener的processEnter方法,processEnter方法中调用CanalServerWithEmbedded的start方法启动实例和CanalMQStarter的start方法启动实例             processActiveEnter();// 触发一下事件             mutex.set(true);             release = false;         } catch (ZkNodeExistsException e) {             bytes = zkClient.readData(path, true);             if (bytes == null) {// 如果不存在节点,立即尝试一次                 initRunning();             } else {                 activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);             }         } catch (ZkNoNodeException e) {             zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点             initRunning();         }     }
  • canal.properties配置

canal.register.ip = canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.admin.register.auto = true canal.admin.register.cluster =

上述就是小编为大家分享的如何用源码分析canal的deployer模块了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI