温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Zookeeper接口kazoo的示例分析

发布时间:2021-09-03 13:36:13 来源:亿速云 阅读:193 作者:小新 栏目:开发技术

小编给大家分享一下Zookeeper接口kazoo的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

具体介绍如下。

zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。

1.安装kazoo

yum install python-pip pip install kazoo

安装过程中会出现一些python依赖包未安装的情况,安装即可。

2.运行kazoo基础例子kazoo_basic.py

import time from kazoo.client import KazooClient from kazoo.client import KazooState def main():  zk=KazooClient(hosts='127.0.0.1:2182')  zk.start()    @zk.add_listener  def my_listener(state):  if state == KazooState.LOST:   print("LOST")  elif state == KazooState.SUSPENDED:   print("SUSPENDED")  else:   print("Connected")  #Creating Nodes  # Ensure a path, create if necessary  zk.ensure_path("/my/favorite")  # Create a node with data  zk.create("/my/favorite/node", b"")  zk.create("/my/favorite/node/a", b"A")  #Reading Data  # Determine if a node exists  if zk.exists("/my/favorite"):  print("/my/favorite is existed")  @zk.ChildrenWatch("/my/favorite/node")  def watch_children(children):  print("Children are now: %s" % children)  # Above function called immediately, and from then on  @zk.DataWatch("/my/favorite/node")  def watch_node(data, stat):  print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))  # Print the version of a node and its data  data, stat = zk.get("/my/favorite/node")  print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))  # List the children  children = zk.get_children("/my/favorite/node")  print("There are %s children with names %s" % (len(children), children))  #Updating Data  zk.set("/my/favorite", b"some data")  #Deleting Nodes  zk.delete("/my/favorite/node/a")  #Transactions  transaction = zk.transaction()  transaction.check('/my/favorite/node', version=-1)  transaction.create('/my/favorite/node/b', b"B")  results = transaction.commit()  print ("Transaction results is %s" % results)  zk.delete("/my/favorite/node/b")  zk.delete("/my", recursive=True)  time.sleep(2)  zk.stop() if __name__ == "__main__":  try:  main()  except Exception, ex:  print "Ocurred Exception: %s" % str(ex)  quit()

运行结果:

Children are now: [u'a'] Version: 0, data:  Version: 0, data:  There are 1 children with names [u'a'] Children are now: [] Transaction results is [True, u'/my/favorite/node/b'] Children are now: [u'b'] Children are now: [] No handlers could be found for logger "kazoo.recipe.watchers" LOST

以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。

3.运行通过kazoo实现的分布式锁程序kazoo_lock.py

import logging, os, time from kazoo.client import KazooClient from kazoo.client import KazooState from kazoo.recipe.lock import Lock class ZooKeeperLock():  def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):  self.hosts = hosts  self.id_str = id_str  self.zk_client = None  self.timeout = timeout  self.logger = logger  self.name = lock_name  self.lock_handle = None  self.create_lock()  def create_lock(self):  try:  self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)  self.zk_client.start(timeout=self.timeout)  except Exception, ex:  self.init_ret = False  self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)  logging.error(self.err_str)  return  try:  lock_path = os.path.join("/", "locks", self.name)  self.lock_handle = Lock(self.zk_client, lock_path)  except Exception, ex:  self.init_ret = False  self.err_str = "Create lock failed! Exception: %s" % str(ex)  logging.error(self.err_str)  return  def destroy_lock(self):  #self.release()  if self.zk_client != None:  self.zk_client.stop()  self.zk_client = None  def acquire(self, blocking=True, timeout=None):  if self.lock_handle == None:  return None  try:  return self.lock_handle.acquire(blocking=blocking, timeout=timeout)  except Exception, ex:  self.err_str = "Acquire lock failed! Exception: %s" % str(ex)  logging.error(self.err_str)  return None  def release(self):  if self.lock_handle == None:  return None  return self.lock_handle.release()  def __del__(self):  self.destroy_lock() def main():  logger = logging.getLogger()  logger.setLevel(logging.INFO)  sh = logging.StreamHandler()  formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')  sh.setFormatter(formatter)  logger.addHandler(sh)  zookeeper_hosts = "127.0.0.1:2182"  lock_name = "test"  lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)  ret = lock.acquire()  if not ret:  logging.info("Can't get lock! Ret: %s", ret)  return  logging.info("Get lock! Do something! Sleep 10 secs!")  for i in range(1, 11):  time.sleep(1)  print str(i)  lock.release() if __name__ == "__main__":  try:  main()  except Exception, ex:  print "Ocurred Exception: %s" % str(ex)  quit()

将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

以上是“Zookeeper接口kazoo的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI