阿里云DLC运行DDP Sample

简介: PAI提供的云原生基础AI平台,提供灵活、稳定、易用和高性能的机器学习训练环境。该平台支持多种算法框架、超大规模分布式深度学习任务运行及自定义算法框架。本文演示如何在DLC上面运行Pytorch DDP任务。

一、Code Sample

 from __future__ import print_function import os import time import argparse import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms from torch.optim.lr_scheduler import StepLR import torch.distributed as dist def init_distributed_mode(args): '''initilize DDP ''' if "RANK" in os.environ and "WORLD_SIZE" in os.environ: args.rank = int(os.environ["RANK"]) args.world_size = int(os.environ["WORLD_SIZE"]) args.gpu = int(os.environ["LOCAL_RANK"]) elif "SLURM_PROCID" in os.environ: args.rank = int(os.environ["SLURM_PROCID"]) args.gpu = args.rank % torch.cuda.device_count() elif hasattr(args, "rank"): pass else: print("Not using distributed mode") args.distributed = False return args.distributed = True torch.cuda.set_device(args.gpu) args.dist_backend = "nccl" # args.dist_backend = "gloo" print(f"| distributed init (rank {args.rank}): {args.dist_url}, local rank:{args.gpu}, world size:{args.world_size}", flush=True) dist.init_process_group( backend=args.dist_backend, init_method=args.dist_url, world_size=args.world_size, rank=args.rank ) class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.dropout1 = nn.Dropout(0.25) self.dropout2 = nn.Dropout(0.5) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = self.dropout1(x) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output def train(args, model, device, train_loader, optimizer, epoch): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if args.distributed: if dist.get_rank() == 0: if batch_idx % args.log_interval == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, dist.get_world_size() * batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) else: if batch_idx % args.log_interval == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) if args.dry_run: break def test(model, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset))) def main(): # Training settings parser = argparse.ArgumentParser(description='PyTorch MNIST Example') parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)') parser.add_argument('--epochs', type=int, default=14, metavar='N', help='number of epochs to train (default: 14)') parser.add_argument('--lr', type=float, default=1.0, metavar='LR', help='learning rate (default: 1.0)') parser.add_argument('--gamma', type=float, default=0.7, metavar='M', help='Learning rate step gamma (default: 0.7)') parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') parser.add_argument('--dry-run', action='store_true', default=False, help='quickly check a single pass') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') parser.add_argument('--save-model', action='store_true', default=False, help='For Saving the current Model') parser.add_argument('--local_rank', type=int, help='local rank, will passed by ddp') parser.add_argument("--world-size", default=1, type=int, help="number of distributed processes") parser.add_argument("--dist-url", default="env://", type=str, help="url used to set up distributed training") args = parser.parse_args() use_cuda = not args.no_cuda and torch.cuda.is_available() init_distributed_mode(args) torch.manual_seed(args.seed) device = torch.device("cuda" if use_cuda else "cpu") train_kwargs = {'batch_size': args.batch_size} test_kwargs = {'batch_size': args.test_batch_size} if use_cuda: cuda_kwargs = {'num_workers': 1, 'pin_memory': True, } train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs) transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]) dataset_train = datasets.MNIST('./data', train=True, download=True, transform=transform) dataset_val = datasets.MNIST('./data', train=False, transform=transform) if args.distributed: train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train, shuffle=True) else: train_sampler = torch.utils.data.RandomSampler(dataset_train) test_sampler = torch.utils.data.SequentialSampler(dataset_val) train_loader = torch.utils.data.DataLoader(dataset_train, sampler = train_sampler, **train_kwargs) test_loader = torch.utils.data.DataLoader(dataset_val, sampler = test_sampler, **test_kwargs) model = Net().to(device) model_without_ddp = model if args.distributed: model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu]) model_without_ddp = model.module optimizer = optim.Adadelta(model.parameters(), lr=args.lr) scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma) for epoch in range(1, args.epochs + 1): if args.distributed: train_sampler.set_epoch(epoch) train(args, model, device, train_loader, optimizer, epoch) if args.distributed: # Only run validation on GPU 0 process, for simplity, so we do not run validation on multi gpu. if dist.get_rank() == 0: test(model_without_ddp, device, test_loader) else: test(model, device, test_loader) scheduler.step() if args.save_model: if args.distributed: if dist.get_rank() == 0: # only save model on GPU0 process. torch.save(model.state_dict(), f"mnist_cnn.pt") else: torch.save(model.state_dict(), f"mnist_cnn_.pt") if __name__ == '__main__': start = time.time() main() print(f'Total cost time:{time.time() - start} ms')

二、将python文件上传的oss

如果有配置数据源nas等,也可以直接将文件上传到nas中

三、创建任务

图片.png

图片.png

  • 启动脚本示例
wget https://********.oss-cn-hangzhou.aliyuncs.com/mnist-ddp.py && python -m torch.distributed.launch --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT --nproc_per_node=1 --nnodes=$WORLD_SIZE --node_rank=$RANK mnist-ddp.py

四、察看运行结果

图片.png

图片.png

参考链接

提交分布式DLC任务

相关实践学习
使用PAI+LLaMA Factory微调Qwen2-VL模型,搭建文旅领域知识问答机器人
使用PAI和LLaMA Factory框架,基于全参方法微调 Qwen2-VL模型,使其能够进行文旅领域知识问答,同时通过人工测试验证了微调的效果。
机器学习概览及常见算法
机器学习(Machine Learning, ML)是人工智能的核心,专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能,它是使计算机具有智能的根本途径,其应用遍及人工智能的各个领域。 本课程将带你入门机器学习,掌握机器学习的概念和常用的算法。
相关文章
|
10月前
|
API 开发工具 Python
阿里云PAI部署DeepSeek及调用
本文介绍如何在阿里云PAI EAS上部署DeepSeek模型,涵盖7B模型的部署、SDK和API调用。7B模型只需一张A10显卡,部署时间约10分钟。文章详细展示了模型信息查看、在线调试及通过OpenAI SDK和Python Requests进行调用的步骤,并附有测试结果和参考文档链接。
3722 11
阿里云PAI部署DeepSeek及调用
|
7月前
|
SQL 分布式计算 DataWorks
使用DataWorks PyODPS节点调用XGBoost算法
本文介绍如何在DataWorks中通过PyODPS3节点调用XGBoost算法完成模型训练与测试,并实现周期离线调度。主要内容包括:1) 使用ODPS SQL构建数据集;2) 创建PyODPS3节点进行数据处理与模型训练;3) 构建支持XGBoost的自定义镜像;4) 测试运行并选择对应镜像。适用于需要集成机器学习算法到大数据工作流的用户。
310 24
|
开发者 Python
阿里云PAI DSW快速部署服务
在使用阿里云DSW实例进行开发的时候,可能需要快速部署服务测试应用效果。DSW实例目前已经支持通过自定义服务访问配置功能,对外提供服务访问能力,您在应用开发过程中无需分享整个DSW实例,即可将服务分享给协作开发者进行测试和验证。
469 23
|
11月前
|
存储 人工智能 网络协议
浅聊阿里云倚天云服务器:c8y、g8y、r8y实例性能详解与活动价格参考
选择一款高性能、高性价比的云服务器对于企业而言至关重要,阿里云推出的倚天云服务器——c8y、g8y、r8y三款实例,它们基于ARM架构,采用阿里自研的倚天710处理器,并基于新一代CIPU架构,通过芯片快速路径加速手段,实现了计算、存储、网络性能的大幅提升。2025年,计算型c8y云服务器活动价格860.65元一年起,通用型g8y云服务器活动价格1187.40元一年起,内存型r8y云服务器活动价格1454.32元一年起。本文将为大家详细解析这三款实例的性能特点、应用场景以及最新的活动价格情况,帮助大家更好地了解阿里云倚天云服务器。
|
供应链 安全 量子技术
OASA走进三未信安、奇安信和360,共建龙蜥安全生态
开展操作系统安全技术创新交流合作,提供更便捷的商业合作渠道,服务好联盟成员间共同的客户、用户。
OASA走进三未信安、奇安信和360,共建龙蜥安全生态
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之开发环境正常,提交到生产时报错,是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
289 0
|
安全 网络安全 数据安全/隐私保护
访问控制列表(ACL)是网络安全管理的重要工具,用于定义和管理网络资源的访问权限。
访问控制列表(ACL)是网络安全管理的重要工具,用于定义和管理网络资源的访问权限。ACL 可应用于路由器、防火墙等设备,通过设定规则控制访问。其类型包括标准、扩展、基于时间和基于用户的ACL,广泛用于企业网络和互联网安全中,以增强安全性、实现精细管理和灵活调整。然而,ACL 也存在管理复杂和可能影响性能的局限性。未来,ACL 将趋向智能化和自动化,与其他安全技术结合,提供更全面的安全保障。
1008 4
|
分布式计算 DataWorks 数据处理
"DataWorks高级技巧揭秘:手把手教你如何在PyODPS节点中将模型一键写入OSS,实现数据处理的完美闭环!"
【10月更文挑战第23天】DataWorks是企业级的云数据开发管理平台,支持强大的数据处理和分析功能。通过PyODPS节点,用户可以编写Python代码执行ODPS任务。本文介绍了如何在DataWorks中训练模型并将其保存到OSS的详细步骤和示例代码,包括初始化ODPS和OSS服务、读取数据、训练模型、保存模型到OSS等关键步骤。
710 3
|
Linux PHP
Linux CentOS 宝塔 Suhosin禁用php5.6版本eval函数详细图文教程
【8月更文挑战第27天】本文介绍两种禁用PHP执行的方法:使用`PHP_diseval_extension`禁用和通过`suhosin`禁用。由于`suhosin`不支持PHP8,仅适用于PHP7及以下版本,若服务器安装了PHP5.6,则需对应安装`suhosin-0.9.38`版本。文章提供了详细的安装步骤,并强调了宝塔环境下与普通环境下的PHP路径差异。安装完成后,在`php.ini`中添加`suhosin.so`扩展并设置`executor.disable_eval = on`以禁用执行功能。最后通过测试代码验证是否成功禁用,并重启`php-fpm`服务生效。
409 2
|
Web App开发 JavaScript 前端开发
什么是JavaScript引擎
【8月更文挑战第14天】什么是JavaScript引擎
336 1
下一篇