Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/external/libmct.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ IF((NOT DEFINED LIBMCT_VER) OR (NOT DEFINED LIBMCT_URL))
MESSAGE(STATUS "use pre defined download url")
SET(LIBMCT_VER "0.1.0" CACHE STRING "" FORCE)
SET(LIBMCT_NAME "libmct" CACHE STRING "" FORCE)
SET(LIBMCT_URL "https://pslib.bj.bcebos.com/libmct.tar.gz" CACHE STRING "" FORCE)
SET(LIBMCT_URL "https://pslib.bj.bcebos.com/libmct/libmct.tar.gz" CACHE STRING "" FORCE)
ENDIF()
MESSAGE(STATUS "LIBMCT_NAME: ${LIBMCT_NAME}, LIBMCT_URL: ${LIBMCT_URL}")
SET(LIBMCT_PREFIX_DIR "${THIRD_PARTY_PATH}/libmct")
Expand Down
95 changes: 95 additions & 0 deletions paddle/fluid/distributed/common/chunk_allocator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once
#include <glog/logging.h>

namespace paddle {
namespace distributed {

// Fast allocation and deallocation of objects by allocating them in chunks.
template <class T>
class ChunkAllocator {
public:
explicit ChunkAllocator(size_t chunk_size = 64) {
CHECK(sizeof(Node) == std::max(sizeof(void*), sizeof(T)));
_chunk_size = chunk_size;
_chunks = NULL;
_free_nodes = NULL;
_counter = 0;
}
ChunkAllocator(const ChunkAllocator&) = delete;
~ChunkAllocator() {
while (_chunks != NULL) {
Chunk* x = _chunks;
_chunks = _chunks->next;
free(x);
}
}
template <class... ARGS>
T* acquire(ARGS&&... args) {
if (_free_nodes == NULL) {
create_new_chunk();
}

T* x = (T*)(void*)_free_nodes; // NOLINT
_free_nodes = _free_nodes->next;
new (x) T(std::forward<ARGS>(args)...);
_counter++;
return x;
}
void release(T* x) {
x->~T();
Node* node = (Node*)(void*)x; // NOLINT
node->next = _free_nodes;
_free_nodes = node;
_counter--;
}
size_t size() const { return _counter; }

private:
struct alignas(T) Node {
union {
Node* next;
char data[sizeof(T)];
};
};
struct Chunk {
Chunk* next;
Node nodes[];
};

size_t _chunk_size; // how many elements in one chunk
Chunk* _chunks; // a list
Node* _free_nodes; // a list
size_t _counter; // how many elements are acquired

void create_new_chunk() {
Chunk* chunk;
posix_memalign(reinterpret_cast<void**>(&chunk),
std::max<size_t>(sizeof(void*), alignof(Chunk)),
sizeof(Chunk) + sizeof(Node) * _chunk_size);
chunk->next = _chunks;
_chunks = chunk;

for (size_t i = 0; i < _chunk_size; i++) {
Node* node = &chunk->nodes[i];
node->next = _free_nodes;
_free_nodes = node;
}
}
};

} // namespace distributed
} // namespace paddle
39 changes: 16 additions & 23 deletions paddle/fluid/distributed/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,25 +460,7 @@ void FleetWrapper::PushSparseFromTensorAsync(
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);

std::vector<float> g;
for (framework::LoDTensor* g_tensor : *outputs) {
float* g_ori = g_tensor->data<float>();
// no cvm
if (batch_size_consist) { // TODO(zhaocaibei123): add config
// scale_sparse_gradient_with_batch_size_
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g_ori, g_tensor->numel() / fea_dim, fea_dim);
g_mat.rightCols(fea_dim) *= batch_size;
}

size_t origin = g.size();
size_t add = g_tensor->numel();
g.resize(origin + add);

memcpy(g.data() + origin, g_tensor->data<float>(), add * sizeof(float));
}

CHECK(outputs->size() == inputs->size());
std::vector<uint64_t> push_keys;
push_keys.reserve(MAX_FEASIGN_NUM / 100);
std::vector<std::vector<float>> push_values;
Expand All @@ -495,9 +477,21 @@ void FleetWrapper::PushSparseFromTensorAsync(
const int64_t* clk_tensor = clks->data<int64_t>();

for (size_t index = 0; index < inputs->size(); ++index) {
framework::LoDTensor* g_tensor = outputs->at(index);
float* g = g_tensor->data<float>();
// no cvm
if (batch_size_consist) { // TODO(zhaocaibei123): add config
// scale_sparse_gradient_with_batch_size_
Eigen::Map<
Eigen::Matrix<float, Eigen::Dynamic, Eigen::Dynamic, Eigen::RowMajor>>
g_mat(g, g_tensor->numel() / fea_dim, fea_dim);
g_mat.rightCols(fea_dim) *= batch_size;
}

const framework::LoDTensor* tensor = inputs->at(index);
const int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();
output_len = 0;

if (tensor->lod().size() > 0) {
for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) {
Expand All @@ -519,7 +513,7 @@ void FleetWrapper::PushSparseFromTensorAsync(

float* data = push_values.back().data() + 3;

memcpy(data, g.data() + output_len, sizeof(float) * fea_dim);
memcpy(data, g + output_len, sizeof(float) * fea_dim);

++input_idx;
}
Expand All @@ -542,14 +536,13 @@ void FleetWrapper::PushSparseFromTensorAsync(

float* data = push_values.back().data() + 3;

memcpy(data, g.data() + output_len, sizeof(float) * fea_dim);
memcpy(data, g + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
CHECK(output_len == g_tensor->numel());
}
VLOG(1) << "output_len: " << output_len << " g.size(): " << g.size();
CHECK(output_len == g.size());

std::vector<float*> push_g_vec(input_idx, nullptr);

Expand Down
46 changes: 38 additions & 8 deletions paddle/fluid/distributed/service/brpc_ps_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ int32_t BrpcPsClient::initialize() {
}
}

auto &profiler = CostProfiler::instance();
profiler.register_profiler("pserver_client_pull_dense");
profiler.register_profiler("pserver_client_pull_sparse");
profiler.register_profiler("pserver_client_pull_sparse_local");
profiler.register_profiler("pserver_client_push_sparse");
profiler.register_profiler("pserver_client_push_sparse_parse");
profiler.register_profiler("client_push_sparse_put");
profiler.register_profiler("pserver_client_push_sparse");
profiler.register_profiler("pserver_client_push_sparse_merge");
profiler.register_profiler("pserver_client_push_sparse_rpc");
profiler.register_profiler("pserver_client_push_dense");
profiler.register_profiler("pserver_client_push_dense_parse");
profiler.register_profiler("push_dense_put");
profiler.register_profiler("pserver_client_push_dense_merge");
profiler.register_profiler("pserver_client_push_dense_rpc");
profiler.register_profiler("pserver_client_push_dense_send");

_running = true;
_flushing = false;
// 启动异步push线程
Expand Down Expand Up @@ -588,6 +605,7 @@ std::future<int32_t> BrpcPsClient::push_sparse_param(
std::future<int32_t> BrpcPsClient::pull_dense(Region *regions,
size_t region_num,
size_t table_id) {
auto timer = std::make_shared<CostTimer>("pserver_client_pull_dense");
auto *accessor = table_accessor(table_id);
size_t request_call_num = _server_channels.size();
uint32_t num_per_shard =
Expand Down Expand Up @@ -643,6 +661,7 @@ std::future<int32_t> BrpcPsClient::pull_dense(Region *regions,
}
closure->set_promise_value(ret);
});
closure->add_timer(timer);
auto promise = std::make_shared<std::promise<int32_t>>();
closure->add_promise(promise);
std::future<int> fut = promise->get_future();
Expand Down Expand Up @@ -865,6 +884,9 @@ std::future<int32_t> BrpcPsClient::pull_sparse(float **select_values,
size_t table_id,
const uint64_t *keys, size_t num,
bool is_training) {
auto timer = std::make_shared<CostTimer>("pserver_client_pull_sparse");
auto local_timer =
std::make_shared<CostTimer>("pserver_client_pull_sparse_local");
size_t request_call_num = _server_channels.size();

auto shard_sorted_kvs = std::make_shared<
Expand Down Expand Up @@ -925,7 +947,7 @@ std::future<int32_t> BrpcPsClient::pull_sparse(float **select_values,
}
closure->set_promise_value(ret);
});

closure->add_timer(timer);
auto promise = std::make_shared<std::promise<int32_t>>();
closure->add_promise(promise);
std::future<int> fut = promise->get_future();
Expand Down Expand Up @@ -1110,8 +1132,8 @@ std::future<int32_t> BrpcPsClient::push_sparse(size_t table_id,
const uint64_t *keys,
const float **update_values,
size_t num) {
auto push_timer =
std::make_shared<CostTimer>("pserver_client_push_sparse_parse");
auto push_timer = std::make_shared<CostTimer>("pserver_client_push_sparse");
CostTimer parse_timer("pserver_client_push_sparse_parse");
int push_sparse_async_num = _push_sparse_task_queue_map[table_id]->Size();
while (push_sparse_async_num > FLAGS_pserver_max_async_call_num) {
// LOG(INFO) << "push_sparse Waiting for async_call_num comsume, task_num:"
Expand All @@ -1121,6 +1143,7 @@ std::future<int32_t> BrpcPsClient::push_sparse(size_t table_id,
// push_sparse_async_num = _push_sparse_task_queue_map[table_id]->size();
push_sparse_async_num = _push_sparse_task_queue_map[table_id]->Size();
}
auto put_timer = std::make_shared<CostTimer>("client_push_sparse_put");
thread_local std::vector<std::vector<std::pair<uint64_t, const float *>>>
shard_sorted_kv_list;
auto *accessor = table_accessor(table_id);
Expand Down Expand Up @@ -1250,14 +1273,14 @@ void BrpcPsClient::push_sparse_task_consume() {
for_each(task_list.begin() + 1, task_list.end(),
[&request_kv_num, request_call_num,
closure](std::shared_ptr<SparseAsyncTask> &task) {
// closure->add_timer(task->timer());
closure->add_timer(task->timer());
closure->add_promise(task->promise());
});

// CostTimer merge_timer("pserver_client_push_sparse_merge");
// auto rpc_timer =
// std::make_shared<CostTimer>("pserver_client_push_sparse_rpc");
// closure->add_timer(rpc_timer);
CostTimer merge_timer("pserver_client_push_sparse_merge");
auto rpc_timer =
std::make_shared<CostTimer>("pserver_client_push_sparse_rpc");
closure->add_timer(rpc_timer);

std::vector<std::future<int>> merge_status(request_call_num);
for (int shard_idx = 0; shard_idx < request_call_num; ++shard_idx) {
Expand Down Expand Up @@ -1295,6 +1318,7 @@ void BrpcPsClient::push_sparse_task_consume() {
std::vector<std::future<int>>().swap(merge_status);
}
}
timeline.Pause();
auto wait_ms =
FLAGS_pserver_async_push_sparse_interval_ms - (timeline.ElapsedMS());
if (wait_ms > 0) {
Expand Down Expand Up @@ -1464,10 +1488,12 @@ std::future<int32_t> BrpcPsClient::push_dense(const Region *regions,
usleep(5000); // 5ms
push_dense_async_num = _push_dense_task_queue_map[table_id]->Size();
}
auto push_dense_timer = std::make_shared<CostTimer>("push_dense_put");
// auto dense_data = _dense_matrix_obj_pool.get();
auto dense_data = std::make_shared<std::vector<float>>();
auto async_task = new DenseAsyncTask(dense_data, table_id, push_timer);
size_t request_call_num = _server_channels.size();

uint32_t num_per_shard =
dense_dim_per_shard(accessor->fea_dim(), request_call_num);

Expand Down Expand Up @@ -1567,6 +1593,7 @@ void BrpcPsClient::push_dense_task_consume() {
<< total_send_data[total_send_data_size - 2]
<< total_send_data[0] << " total_send_data[-1]"
<< total_send_data[total_send_data_size - 1];

if (scale_gradient && merge_count > 1) {
Eigen::Map<Eigen::MatrixXf> mat(total_send_data, 1,
total_send_data_size);
Expand All @@ -1585,6 +1612,7 @@ void BrpcPsClient::push_dense_task_consume() {
push_dense_raw_gradient(task_ptr, total_send_data, total_send_data_size,
closure);
}
timeline.Pause();
auto wait_ms =
FLAGS_pserver_async_push_dense_interval_ms - (timeline.ElapsedMS());
if (wait_ms > 0) {
Expand All @@ -1603,6 +1631,8 @@ void BrpcPsClient::push_dense_raw_gradient(
closure->add_timer(timer);
uint32_t num_per_shard =
dense_dim_per_shard(accessor->fea_dim(), request_call_num);
auto send_timer =
std::make_shared<CostTimer>("pserver_client_push_dense_send");
for (size_t i = 0; i < request_call_num; ++i) {
closure->request(i)->set_cmd_id(PS_PUSH_DENSE_TABLE);
closure->request(i)->set_table_id(task->table_id());
Expand Down
10 changes: 10 additions & 0 deletions paddle/fluid/distributed/service/brpc_ps_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "paddle/fluid/distributed/service/brpc_ps_server.h"
#include <thread> // NOLINT
#include "butil/object_pool.h"
#include "paddle/fluid/distributed/common/cost_timer.h"
#include "paddle/fluid/distributed/table/depends/sparse_utils.h"
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/archive.h"
Expand Down Expand Up @@ -117,6 +118,11 @@ int32_t BrpcPsService::initialize() {
_service_handler_map[PS_START_PROFILER] = &BrpcPsService::start_profiler;
_service_handler_map[PS_STOP_PROFILER] = &BrpcPsService::stop_profiler;
_service_handler_map[PS_PUSH_GLOBAL_STEP] = &BrpcPsService::push_global_step;
auto &profiler = CostProfiler::instance();
profiler.register_profiler("pserver_server_pull_dense");
profiler.register_profiler("pserver_server_push_dense");
profiler.register_profiler("pserver_server_pull_sparse");
profiler.register_profiler("pserver_server_push_sparse");

// shard初始化,server启动后才可从env获取到server_list的shard信息
initialize_shard_info();
Expand Down Expand Up @@ -190,6 +196,7 @@ int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request,
"PsRequestMessage.datas is requeired at least 1 for num of dense");
return 0;
}
CostTimer timer("pserver_server_pull_dense");
uint32_t num = *(const uint32_t *)request.params(0).c_str();
if (num < 0) {
set_response_code(response, -1,
Expand Down Expand Up @@ -246,6 +253,7 @@ int32_t BrpcPsService::push_dense(Table *table, const PsRequestMessage &request,
return 0;
}

CostTimer timer("pserver_server_push_dense");
/*
Push Content:
|--num--|---valuesData---|
Expand Down Expand Up @@ -356,6 +364,7 @@ int32_t BrpcPsService::pull_sparse(Table *table,
return 0;
}

CostTimer timer("pserver_server_pull_sparse");
uint32_t num = *(uint32_t *)(request.params(0).c_str());
auto dim = table->value_accesor()->select_dim();

Expand Down Expand Up @@ -396,6 +405,7 @@ int32_t BrpcPsService::push_sparse(Table *table,
"least 1 for num of sparse_key");
return 0;
}
CostTimer timer("pserver_server_push_sparse");
uint32_t num = *(uint32_t *)(request.params(0).c_str());
/*
Push Content:
Expand Down
7 changes: 7 additions & 0 deletions paddle/fluid/distributed/table/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ set_source_files_properties(common_graph_table.cc PROPERTIES COMPILE_FLAGS ${DIS

get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS)

set(PADDLE_LIB_THIRD_PARTY_PATH "${PADDLE_LIB}/third_party/")
include_directories(${PADDLE_LIB_THIRD_PARTY_PATH}libmct/src/extern_libmct/libmct/include)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fopenmp")

set(EXTERN_DEP "")
if(WITH_HETERPS)
set(TABLE_SRC common_sparse_table.cc ssd_sparse_table.cc common_dense_table.cc sparse_geo_table.cc barrier_table.cc common_graph_table.cc)
Expand Down Expand Up @@ -43,3 +48,5 @@ cc_library(ctr_accessor SRCS ctr_accessor.cc DEPS ${TABLE_DEPS} ps_framework_pro
cc_library(memory_sparse_table SRCS memory_sparse_table.cc DEPS ps_framework_proto ${TABLE_DEPS} fs afs_wrapper ctr_accessor common_table)

cc_library(table SRCS table.cc DEPS memory_sparse_table common_table tensor_accessor tensor_table ps_framework_proto string_helper device_context gflags glog boost)

target_link_libraries(table -fopenmp)
Loading