Skip to content

Commit efb137d

Browse files
committed
[fix][dataloader] use file descripor instead of file system
1 parent 33e8968 commit efb137d

File tree

6 files changed

+130
-31
lines changed

6 files changed

+130
-31
lines changed

paddle/common/flags.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,6 +1345,19 @@ PHI_DEFINE_EXPORTED_bool(use_shm_cache,
13451345
false,
13461346
"Use shm cache in mmap_allocator.");
13471347

1348+
/**
1349+
* mmap_allocator related FLAG
1350+
* Name: use_file_descripor
1351+
* Since Version: 2.6.2
1352+
* Value Range: bool, default=true
1353+
* Example:
1354+
* Note: . If True, mmap_allocator will use file descripor to open shared memory
1355+
* operation.
1356+
*/
1357+
PHI_DEFINE_EXPORTED_bool(use_file_descripor,
1358+
true,
1359+
"Use file descripor in mmap_allocator.");
1360+
13481361
/**
13491362
* Tensor operants related FLAG
13501363
* Name: tensor_operants_mode

paddle/fluid/memory/allocation/mmap_allocator.cc

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,14 @@ struct CountInfo {
5454
std::atomic<int> refcount;
5555
};
5656

57-
void AllocateMemoryMap(
58-
std::string filename, int flags, size_t size, void **map_ptr_, int *fd_) {
57+
void AllocateMemoryMap(std::string filename,
58+
int *shared_fd,
59+
int flags,
60+
size_t size,
61+
void **map_ptr_) {
5962
// TODO(@ZHUI): support win32
6063
int file_flags = 0;
61-
int fd = -1;
64+
int fd = *shared_fd;
6265
if (flags & MAPPED_SHAREDMEM) {
6366
file_flags = O_RDWR | O_CREAT;
6467
} else {
@@ -71,7 +74,7 @@ void AllocateMemoryMap(
7174
file_flags &= ~O_CREAT;
7275
}
7376

74-
if (!(flags & MAPPED_FROMFD)) {
77+
if (!(flags & MAPPED_FROMFD) && fd == -1) {
7578
if (flags & MAPPED_SHAREDMEM) {
7679
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
7780
PADDLE_ENFORCE_NE(
@@ -83,8 +86,6 @@ void AllocateMemoryMap(
8386
VLOG(6) << "shm_open: " << filename;
8487
MemoryMapFdSet::Instance().Insert(filename);
8588
}
86-
} else {
87-
fd = -1;
8889
}
8990

9091
PADDLE_ENFORCE_EQ(ftruncate(fd, size),
@@ -98,32 +99,38 @@ void AllocateMemoryMap(
9899
*map_ptr_ = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0);
99100
}
100101

102+
if (flags & MAPPED_UNLINK) {
103+
VLOG(6) << "shm_unlink: " << filename;
104+
shm_unlink(filename.c_str());
105+
}
106+
101107
PADDLE_ENFORCE_NE(*map_ptr_,
102108
MAP_FAILED,
103109
platform::errors::Unavailable(
104110
"Memory map failed when create shared memory."));
105-
106111
if (flags & MAPPED_KEEPFD) {
107-
*fd_ = fd;
112+
*shared_fd = fd;
113+
VLOG(6) << "keep fd: " << *shared_fd;
108114
} else {
109115
PADDLE_ENFORCE_NE(::close(fd),
110116
-1,
111117
platform::errors::Unavailable(
112118
"Error closing memory mapped file <", filename, ">"));
113119

114-
*fd_ = -1;
120+
*shared_fd = -1;
115121
}
116122
}
117123

118124
std::shared_ptr<RefcountedMemoryMapAllocation>
119125
AllocateRefcountedMemoryMapAllocation(std::string filename,
126+
int shared_fd,
120127
int flags,
121128
size_t size,
122129
int buffer_id) {
123-
int fd = -1;
130+
int fd = shared_fd;
124131
void *base_ptr = nullptr;
125132
if (buffer_id == -1) {
126-
AllocateMemoryMap(filename, flags, size + mmap_alignment, &base_ptr, &fd);
133+
AllocateMemoryMap(filename, &fd, flags, size + mmap_alignment, &base_ptr);
127134
VLOG(4) << "Create and mmap a new shm: " << filename;
128135
} else {
129136
base_ptr = MemoryMapAllocationPool::Instance().GetById(buffer_id).mmap_ptr_;
@@ -132,7 +139,7 @@ AllocateRefcountedMemoryMapAllocation(std::string filename,
132139
void *aligned_base_ptr =
133140
static_cast<void *>(static_cast<char *>(base_ptr) + mmap_alignment);
134141
return std::make_shared<RefcountedMemoryMapAllocation>(
135-
aligned_base_ptr, size, filename, flags, fd, buffer_id);
142+
aligned_base_ptr, size, filename, fd, flags, buffer_id);
136143
}
137144

138145
RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
@@ -145,11 +152,23 @@ RefcountedMemoryMapAllocation::RefcountedMemoryMapAllocation(
145152
: MemoryMapAllocation(ptr, size, ipc_name, fd, flags) {
146153
// must reset base ptr first.
147154
buffer_id_ = buffer_id;
155+
fd_ = fd;
156+
flags_ = flags;
148157
resetBaseptr();
149158
initializeRefercount();
150159
}
151160

152161
void MemoryMapAllocation::close() {
162+
if (!closed_fd_) {
163+
closed_fd_ = true;
164+
if (flags_ & MAPPED_KEEPFD) {
165+
VLOG(6) << "one close fd: " << fd_;
166+
PADDLE_ENFORCE_NE(::close(fd_),
167+
-1,
168+
platform::errors::Unavailable(
169+
"Error closing file descriptor <", fd_, ">"));
170+
}
171+
}
153172
if (closed_) {
154173
return;
155174
}
@@ -193,6 +212,15 @@ void RefcountedMemoryMapAllocation::close() {
193212
void *data = map_ptr_;
194213
CountInfo *info = reinterpret_cast<CountInfo *>(data);
195214
--info->refcount;
215+
if (flags_ & MAPPED_KEEPFD) {
216+
closed_fd_ = true;
217+
PADDLE_ENFORCE_NE(::close(fd_),
218+
-1,
219+
platform::errors::Unavailable(
220+
"Error closing file descriptor <", fd_, ">"));
221+
VLOG(6) << "close fd: " << fd_;
222+
}
223+
196224
if (FLAGS_use_shm_cache && buffer_id_ != -1) {
197225
return;
198226
} else {
@@ -260,6 +288,7 @@ std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(
260288
const std::string &ipc_name = GetIPCName();
261289
int flags = O_RDWR | O_CREAT;
262290
int fd = shm_open(ipc_name.c_str(), flags, 0600);
291+
263292
PADDLE_ENFORCE_NE(fd,
264293
-1,
265294
platform::errors::Unavailable(
@@ -283,7 +312,6 @@ std::shared_ptr<MemoryMapReaderAllocation> RebuildMemoryMapReaderAllocation(
283312
const std::string &ipc_name, size_t size) {
284313
int flags = O_RDWR | O_CREAT;
285314
flags &= ~O_CREAT;
286-
287315
int fd = shm_open(ipc_name.c_str(), flags, 0600);
288316
PADDLE_ENFORCE_NE(fd,
289317
-1,

paddle/fluid/memory/allocation/mmap_allocator.h

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,17 @@ enum MappedModes {
4444

4545
class MemoryMapAllocation : public Allocation {
4646
public:
47-
explicit MemoryMapAllocation(void *ptr, size_t size, std::string ipc_name)
47+
explicit MemoryMapAllocation(void *ptr,
48+
size_t size,
49+
std::string ipc_name,
50+
int fd)
4851
: Allocation(ptr, size, platform::CPUPlace()),
4952
ipc_name_(std::move(ipc_name)),
53+
fd_(fd),
5054
map_ptr_(ptr),
5155
map_size_(size) {}
5256
explicit MemoryMapAllocation(
53-
void *ptr, size_t size, std::string ipc_name, int flags, int fd)
57+
void *ptr, size_t size, std::string ipc_name, int fd, int flags)
5458
: Allocation(ptr, size, platform::CPUPlace()),
5559
ipc_name_(std::move(ipc_name)),
5660
fd_(fd),
@@ -59,6 +63,7 @@ class MemoryMapAllocation : public Allocation {
5963
map_size_(size) {}
6064

6165
inline const std::string &ipc_name() const { return ipc_name_; }
66+
inline const int shared_fd() const { return fd_; }
6267

6368
virtual void close();
6469

@@ -71,6 +76,7 @@ class MemoryMapAllocation : public Allocation {
7176
void *map_ptr_ = nullptr;
7277
size_t map_size_ = 0;
7378
bool closed_ = false;
79+
bool closed_fd_ = false;
7480
};
7581

7682
class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
@@ -93,11 +99,15 @@ class RefcountedMemoryMapAllocation : public MemoryMapAllocation {
9399
void resetBaseptr();
94100
};
95101

96-
void AllocateMemoryMap(
97-
std::string filename, int flags, size_t size, void **base_ptr_, int *fd_);
102+
void AllocateMemoryMap(std::string filename,
103+
int *shared_fd,
104+
int flags,
105+
size_t size,
106+
void **base_ptr_);
98107

99108
std::shared_ptr<RefcountedMemoryMapAllocation>
100109
AllocateRefcountedMemoryMapAllocation(std::string filename,
110+
int shared_fd,
101111
int flags,
102112
size_t size,
103113
int buffer_id = -1);
@@ -111,11 +121,13 @@ class MemoryMapWriterAllocation : public Allocation {
111121
ipc_name_(std::move(ipc_name)) {}
112122

113123
inline const std::string &ipc_name() const { return ipc_name_; }
124+
inline const int shared_fd() const { return fd_; }
114125

115126
~MemoryMapWriterAllocation() override;
116127

117128
private:
118129
std::string ipc_name_;
130+
int fd_ = -1;
119131
};
120132

121133
class MemoryMapReaderAllocation : public Allocation {
@@ -127,11 +139,13 @@ class MemoryMapReaderAllocation : public Allocation {
127139
ipc_name_(std::move(ipc_name)) {}
128140

129141
inline const std::string &ipc_name() const { return ipc_name_; }
142+
inline const int shared_fd() const { return fd_; }
130143

131144
~MemoryMapReaderAllocation() override;
132145

133146
private:
134147
std::string ipc_name_;
148+
int fd_ = -1;
135149
};
136150

137151
std::shared_ptr<MemoryMapWriterAllocation> AllocateMemoryMapWriterAllocation(

paddle/fluid/pybind/tensor.cc

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ void BindTensor(pybind11::module &m) { // NOLINT
859859
)DOC")
860860
#endif
861861
.def("_share_filename",
862-
[](phi::DenseTensor &self) {
862+
[](phi::DenseTensor &self, bool use_file_descripor) {
863863
if (!self.IsInitialized() || self.numel() == 0)
864864
throw std::runtime_error(
865865
"Tensor not initialized or numel is 0. could not pass to "
@@ -886,6 +886,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
886886

887887
int flags = memory::allocation::MAPPED_SHAREDMEM |
888888
memory::allocation::MAPPED_EXCLUSIVE;
889+
if (use_file_descripor) {
890+
flags = flags | memory::allocation::MAPPED_KEEPFD |
891+
memory::allocation::MAPPED_UNLINK;
892+
}
889893
std::string handle = memory::allocation::GetIPCName();
890894
int find_id = -1;
891895
if (FLAGS_use_shm_cache) {
@@ -894,9 +898,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
894898
if (find_id != -1) {
895899
handle = memory::allocation::MemoryMapAllocationPool::Instance().GetById(find_id).file_name_; // NOLINT
896900
}
901+
int shared_fd = -1;
897902
auto shared_holder =
898903
memory::allocation::AllocateRefcountedMemoryMapAllocation(
899-
handle, flags, data_size, find_id);
904+
handle, shared_fd, flags, data_size, find_id);
900905

901906
// copy data & reset holder
902907
if (platform::is_cuda_pinned_place(holder->place())) {
@@ -914,8 +919,10 @@ void BindTensor(pybind11::module &m) { // NOLINT
914919
int type_idx = static_cast<int>(self.type());
915920

916921
return py::make_tuple(mmap_allocation->ipc_name(),
922+
mmap_allocation->shared_fd(),
917923
mmap_allocation->size(), type_idx,
918-
common::vectorize(self.dims()), self.lod());
924+
common::vectorize(self.dims()), self.lod(),
925+
use_file_descripor);
919926
},
920927
R"DOC(
921928
Serialize CPU lod tensor in shared memory to tuple.
@@ -935,30 +942,37 @@ void BindTensor(pybind11::module &m) { // NOLINT
935942
)DOC")
936943
.def("_new_shared_filename",
937944
[](py::tuple t) { // __setstate__
938-
if (t.size() != 5)
945+
if (t.size() != 7)
939946
throw std::runtime_error("Invalid Tensor meta info state!");
940947

941948
phi::DenseTensor tensor;
942949

943950
// 2. Rebuild Allocation
944951
const std::string &ipc_name = t[0].cast<std::string>();
945-
size_t size = t[1].cast<size_t>();
952+
const int shared_fd = t[1].cast<int>();
953+
const bool use_file_descripor = t[6].cast<bool>();
954+
955+
size_t size = t[2].cast<size_t>();
946956
int flags = memory::allocation::MAPPED_SHAREDMEM |
947957
memory::allocation::MAPPED_NOCREATE;
958+
if (use_file_descripor) {
959+
flags = flags | memory::allocation::MAPPED_KEEPFD |
960+
memory::allocation::MAPPED_UNLINK;
961+
}
948962
int find_id = -1;
949963
if (FLAGS_use_shm_cache) {
950964
find_id = memory::allocation::MemoryMapAllocationPool::Instance().FindFromCache(flags, size, ipc_name, /*check_refcount*/ false); // NOLINT
951965
}
952966
auto shared_holder =
953967
memory::allocation::AllocateRefcountedMemoryMapAllocation(
954-
ipc_name, flags, size, find_id);
968+
ipc_name, shared_fd, flags, size, find_id);
955969

956970
// 3. Rebuild Tensor
957971
tensor.ResetHolderWithType(
958972
shared_holder,
959-
static_cast<phi::DataType>(t[2].cast<int>()));
960-
tensor.Resize(common::make_ddim(t[3].cast<std::vector<int>>()));
961-
tensor.set_lod(t[4].cast<framework::LoD>());
973+
static_cast<phi::DataType>(t[3].cast<int>()));
974+
tensor.Resize(common::make_ddim(t[4].cast<std::vector<int>>()));
975+
tensor.set_lod(t[5].cast<framework::LoD>());
962976

963977
return tensor;
964978
},

python/paddle/base/reader.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import logging
1616
import multiprocessing
17+
import os
1718
import queue
1819
import sys
1920
import threading
@@ -409,6 +410,7 @@ def from_generator(
409410
...
410411
>>> # doctest: -SKIP
411412
"""
413+
os.environ["FLAGS_use_file_descripor"] = "0"
412414
if in_dygraph_mode():
413415
return DygraphGeneratorLoader(
414416
feed_list,

0 commit comments

Comments
 (0)