温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Python3 操作 HDFS

发布时间:2020-07-10 09:29:41 来源:网络 阅读:5306 作者:RQSLT 栏目:大数据

【第三方包】

  • pyhdfs(pypi,github,支持HA)


【功能】

  • 重命名 hdfs 文件或目录

# encoding: utf-8 # author: walker # date: 2018-03-17  # summary: 利用 pyhdfs 重命名 hdfs 文件或目录 import os, sys, time from pyhdfs import HdfsClient SrcPath = '/test/xxx' DstPath = '/test/yyy' NameNode = 'nn1.example.com:50070,nn2.example.com:50070' # 将 SrcPath 改名为 DstPath def Rename(SrcPath, DstPath):	fs = HdfsClient(hosts=NameNode)	if not fs.exists(SrcPath):	print('Error: not found %s' % SrcPath)	sys.exit(-1)	print('Reanme ... \n%s\n -> \n%s \n' % (SrcPath, DstPath))	fs.rename(SrcPath, DstPath) if __name__ == '__main__':	Rename(SrcPath, DstPath)
  • 上传文件

# encoding: utf-8 # author: walker # date: 2018-01-23 # summary: 上传本地文件到 hdfs 目录 import os, sys, time from pyhdfs import HdfsClient from configparser import ConfigParser cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) StartTime = time.time() FileSize = 0	#文件总大小 LocalDir = '' HdfsDir = '' NameNode = '' UserName = '' #读取配置文件 def ReadConfig():	global LocalDir, HdfsDir, NameNode, UserName	cfg = ConfigParser()	cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')	if not os.path.exists(cfgFile):	input(cfgFile + ' not found')	sys.exit(-1)	cfgLst = cfg.read(cfgFile)	if len(cfgLst) < 1:	input('Read config.ini failed...')	sys.exit(-1)	LocalDir = cfg.get('config', 'LocalDir').strip()   	if not os.path.exists(LocalDir):	input(LocalDir + ' not found')	sys.exit(-1)	print('LocalDir:' + LocalDir)	HdfsDir = cfg.get('config', 'HdfsDir').strip() 	print('HdfsDir:' + HdfsDir)	NameNode = cfg.get('config', 'NameNode').strip() 	print('NameNode:' + NameNode)	UserName = cfg.get('config', 'UserName').strip() 	print('UserName:' + UserName)	print('Read config.ini successed!') #处理一个 def ProcOne(client, srcFile, dstFile):	global FileSize	print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))	#目标文件已经存在且大小相同	if client.exists(dstFile) and \	(os.path.getsize(srcFile) == client.list_status(dstFile)[0].length):	print('file exists: %s ' % dstFile)	return True	#注意,如果已存在会被覆盖	client.copy_from_local(srcFile, dstFile, overwrite=True)	#校验文件大小	if os.path.getsize(srcFile) == client.list_status(dstFile)[0].length:	FileSize += os.path.getsize(srcFile)	return True	return False #处理所有 def ProcAll():	client = HdfsClient(hosts=NameNode, user_name=UserName)	if not client.exists(HdfsDir):	print(HdfsDir + ' not found')	sys.exit(-1)	total = len(os.listdir(LocalDir))	processed = 0	failedList = list()	for filename in os.listdir(LocalDir):	srcFile = os.path.join(LocalDir, filename)	dstFile = HdfsDir + '/' + filename	if not ProcOne(client, srcFile, dstFile):	failedList.append(srcFile)	processed += 1	print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))	print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))	if failedList:	print('failedList: %s' % repr(failedList))	else:	print('Good! No Error!')	print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \             (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime))) if __name__ == '__main__':	ReadConfig()	ProcAll()	print('Time total: %.2f s' % (time.time()-StartTime))	print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))
  • 下载 HDFS 文件到本地

# encoding: utf-8 # author: walker # date: 2018-06-07 # summary: 下载 HDFS 文件(或目录)到本地 import os, sys, time from pyhdfs import HdfsClient from configparser import ConfigParser cur_dir_fullpath = os.path.dirname(os.path.abspath(__file__)) StartTime = time.time() FileSize = 0        #文件总大小 LocalDir = '' HdfsDir = '' NameNode = '' UserName = '' #读取配置文件  def ReadConfig():     global LocalDir, HdfsDir, NameNode, UserName     cfg = ConfigParser()     cfgFile = os.path.join(cur_dir_fullpath, 'config.ini')     if not os.path.exists(cfgFile):         input(cfgFile + ' not found')         sys.exit(-1)     cfgLst = cfg.read(cfgFile)     if len(cfgLst) < 1:         input('Read config.ini failed...')         sys.exit(-1)          LocalDir = cfg.get('config', 'LocalDir').strip()     if not os.path.exists(LocalDir):         input(LocalDir + ' not found')         sys.exit(-1)     print('LocalDir:' + LocalDir)          HdfsDir = cfg.get('config', 'HdfsDir').strip().rstrip('/')     print('HdfsDir:' + HdfsDir)           NameNode = cfg.get('config', 'NameNode').strip()     print('NameNode:' + NameNode)        UserName = cfg.get('config', 'UserName').strip()     print('UserName:' + UserName)             print('Read config.ini successed!')      #处理一个 def ProcOne(client, srcFile, dstFile):     global FileSize     print('ProcOne \n%s\n -> \n%s ' % (srcFile, dstFile))     dstDir = os.path.dirname(dstFile)     if not os.path.exists(dstDir):         os.makedirs(dstDir)          # 目标文件已经存在且大小相同     if os.path.exists(dstFile) and \         (os.path.getsize(dstFile) == client.list_status(srcFile)[0].length):         print('file exists: %s ' % dstFile)         return True          # 注意,如果已存在会被覆盖     client.copy_to_local(srcFile, dstFile, overwrite=True)          if os.path.getsize(dstFile) != client.list_status(srcFile)[0].length:   #校验文件大小         return False     FileSize += os.path.getsize(dstFile)     return True      #处理所有 def ProcAll():       client = HdfsClient(hosts=NameNode, user_name=UserName)     if not client.exists(HdfsDir):         print(HdfsDir + ' not found')         sys.exit(-1)                  total = 0     # 先遍历一遍,得到总文件个数     for parent, dirnames, filenames in client.walk(HdfsDir):         for filename in filenames:             total += 1     processed = 0     failedList = list()     for parent, dirnames, filenames in client.walk(HdfsDir):         for filename in filenames:             srcFile = '%s/%s' % (parent, filename)             relPath = srcFile[len(HdfsDir)+1:].replace('/', '\\')   # 相对于根目录的路径             dstFile = os.path.join(LocalDir, relPath)             if not ProcOne(client, srcFile, dstFile):                 failedList.append(srcFile)             processed += 1                   print('%d/%d/%d, time cost: %.2f s' % (total, processed, len(failedList), time.time()-StartTime))             print('%d B, %.2f MB/s \n' % (FileSize, FileSize/1024/1024/(time.time()-StartTime)))          if failedList:         print('failedList: %s' % repr(failedList))     else:         print('Good! No Error!')         print('%d B, %.2f MB, %.2f GB, %.2f MB/s' % \ (FileSize, FileSize/1024/1024, FileSize/1024/1024/1024, FileSize/1024/1024/(time.time()-StartTime)))      if __name__ == '__main__':     ReadConfig()     ProcAll()     print('Time total: %.2f s' % (time.time()-StartTime))     print(time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()))


*** walker ***




向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI