Skip to content

Commit c06cd9a

Browse files
Xing Linshvachko
authored andcommitted
HDFS-16125. [FGL] Fix the iterator for PartitionedGSet. Contributed by Xing Lin. (#3197)
1 parent 455e8c0 commit c06cd9a

File tree

4 files changed

+300
-13
lines changed

4 files changed

+300
-13
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/LatchLock.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public abstract class LatchLock<C> {
3030
protected abstract boolean isReadTopLocked();
3131
/** @return true topLock is locked for write by any thread */
3232
protected abstract boolean isWriteTopLocked();
33-
protected abstract void readTopdUnlock();
33+
protected abstract void readTopUnlock();
3434
protected abstract void writeTopUnlock();
3535

3636
protected abstract boolean hasReadChildLock();
@@ -46,7 +46,7 @@ public abstract class LatchLock<C> {
4646
// Public APIs to use with the class
4747
public void readLock() {
4848
readChildLock();
49-
readTopdUnlock();
49+
readTopUnlock();
5050
}
5151

5252
public void readUnlock() {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/PartitionedGSet.java

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.NavigableMap;
2525
import java.util.Set;
2626
import java.util.TreeMap;
27-
27+
import java.util.NoSuchElementException;
2828
import org.apache.hadoop.HadoopIllegalArgumentException;
2929
import org.apache.hadoop.classification.InterfaceAudience;
3030
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
@@ -79,8 +79,7 @@ private class PartitionEntry extends LightWeightGSet<K, E> {
7979

8080
public PartitionedGSet(final int capacity,
8181
final Comparator<? super K> comparator,
82-
final LatchLock<?> latchLock,
83-
final E rootKey) {
82+
final LatchLock<?> latchLock) {
8483
this.partitions = new TreeMap<K, PartitionEntry>(comparator);
8584
this.latchLock = latchLock;
8685
// addNewPartition(rootKey).put(rootKey);
@@ -275,17 +274,36 @@ public Iterator<E> iterator() {
275274
* modifying other partitions, while iterating through the current one.
276275
*/
277276
private class EntryIterator implements Iterator<E> {
278-
private final Iterator<K> keyIterator;
277+
private Iterator<K> keyIterator;
279278
private Iterator<E> partitionIterator;
280279

281280
public EntryIterator() {
282281
keyIterator = partitions.keySet().iterator();
283-
K curKey = partitions.firstKey();
284-
partitionIterator = getPartition(curKey).iterator();
282+
283+
if (!keyIterator.hasNext()) {
284+
partitionIterator = null;
285+
return;
286+
}
287+
288+
K firstKey = keyIterator.next();
289+
partitionIterator = partitions.get(firstKey).iterator();
285290
}
286291

287292
@Override
288293
public boolean hasNext() {
294+
295+
// Special case: an iterator was created for an empty PartitionedGSet.
296+
// Check whether new partitions have been added since then.
297+
if (partitionIterator == null) {
298+
if (partitions.size() == 0) {
299+
return false;
300+
} else {
301+
keyIterator = partitions.keySet().iterator();
302+
K nextKey = keyIterator.next();
303+
partitionIterator = partitions.get(nextKey).iterator();
304+
}
305+
}
306+
289307
while(!partitionIterator.hasNext()) {
290308
if(!keyIterator.hasNext()) {
291309
return false;
@@ -298,9 +316,8 @@ public boolean hasNext() {
298316

299317
@Override
300318
public E next() {
301-
while(!partitionIterator.hasNext()) {
302-
K curKey = keyIterator.next();
303-
partitionIterator = getPartition(curKey).iterator();
319+
if (!hasNext()) {
320+
throw new NoSuchElementException("No more elements in this set.");
304321
}
305322
return partitionIterator.next();
306323
}
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.util;
19+
20+
import java.util.ArrayList;
21+
import java.util.Comparator;
22+
import java.util.Iterator;
23+
import java.util.Random;
24+
import java.util.concurrent.locks.ReentrantReadWriteLock;
25+
import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
26+
import org.junit.Assert;
27+
import org.junit.Test;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/** Testing {@link PartitionedGSet} */
32+
public class TestPartitionedGSet {
33+
public static final Logger LOG =
34+
LoggerFactory.getLogger(TestPartitionedGSet.class);
35+
private static final int ELEMENT_NUM = 100;
36+
37+
/**
38+
* Generate positive random numbers for testing. We want to use only positive
39+
* numbers because the smallest partition used in testing is 0.
40+
*
41+
* @param length
42+
* number of random numbers to be generated.
43+
*
44+
* @param randomSeed
45+
* seed to be used for random number generator.
46+
*
47+
* @return
48+
* An array of Integers
49+
*/
50+
private static ArrayList<Integer> getRandomList(int length, int randomSeed) {
51+
Random random = new Random(randomSeed);
52+
ArrayList<Integer> list = new ArrayList<Integer>(length);
53+
for (int i = 0; i < length; i++) {
54+
list.add(random.nextInt(Integer.MAX_VALUE));
55+
}
56+
return list;
57+
}
58+
59+
private static class TestElement implements LinkedElement {
60+
private final int val;
61+
private LinkedElement next;
62+
63+
TestElement(int val) {
64+
this.val = val;
65+
this.next = null;
66+
}
67+
68+
public int getVal() {
69+
return val;
70+
}
71+
72+
@Override
73+
public void setNext(LinkedElement next) {
74+
this.next = next;
75+
}
76+
77+
@Override
78+
public LinkedElement getNext() {
79+
return next;
80+
}
81+
}
82+
83+
private static class TestElementComparator implements Comparator<TestElement>
84+
{
85+
@Override
86+
public int compare(TestElement e1, TestElement e2) {
87+
if (e1 == null || e2 == null) {
88+
throw new NullPointerException("Cannot compare null elements");
89+
}
90+
91+
return e1.getVal() - e2.getVal();
92+
}
93+
}
94+
95+
protected ReentrantReadWriteLock topLock =
96+
new ReentrantReadWriteLock(false);
97+
/**
98+
* We are NOT testing any concurrent access to a PartitionedGSet here.
99+
*/
100+
private class NoOpLock extends LatchLock<ReentrantReadWriteLock> {
101+
private ReentrantReadWriteLock childLock;
102+
103+
public NoOpLock() {
104+
childLock = new ReentrantReadWriteLock(false);
105+
}
106+
107+
@Override
108+
protected boolean isReadTopLocked() {
109+
return topLock.getReadLockCount() > 0 || isWriteTopLocked();
110+
}
111+
112+
@Override
113+
protected boolean isWriteTopLocked() {
114+
return topLock.isWriteLocked();
115+
}
116+
117+
@Override
118+
protected void readTopUnlock() {
119+
topLock.readLock().unlock();
120+
}
121+
122+
@Override
123+
protected void writeTopUnlock() {
124+
topLock.writeLock().unlock();
125+
}
126+
127+
@Override
128+
protected boolean hasReadChildLock() {
129+
return childLock.getReadLockCount() > 0 || hasWriteChildLock();
130+
}
131+
132+
@Override
133+
protected void readChildLock() {
134+
childLock.readLock().lock();
135+
}
136+
137+
@Override
138+
protected void readChildUnlock() {
139+
childLock.readLock().unlock();
140+
}
141+
142+
@Override
143+
protected boolean hasWriteChildLock() {
144+
return childLock.isWriteLockedByCurrentThread();
145+
}
146+
147+
@Override
148+
protected void writeChildLock() {
149+
childLock.writeLock().lock();
150+
}
151+
152+
@Override
153+
protected void writeChildUnlock() {
154+
childLock.writeLock().unlock();
155+
}
156+
157+
@Override
158+
protected LatchLock<ReentrantReadWriteLock> clone() {
159+
return new NoOpLock();
160+
}
161+
}
162+
163+
/**
164+
* Test iterator for a PartitionedGSet with no partitions.
165+
*/
166+
@Test(timeout=60000)
167+
public void testIteratorForNoPartition() {
168+
PartitionedGSet<TestElement, TestElement> set =
169+
new PartitionedGSet<TestElement, TestElement>(
170+
16, new TestElementComparator(), new NoOpLock());
171+
172+
topLock.readLock().lock();
173+
int count = 0;
174+
Iterator<TestElement> iter = set.iterator();
175+
while( iter.hasNext() ) {
176+
iter.next();
177+
count ++;
178+
}
179+
topLock.readLock().unlock();
180+
Assert.assertEquals(0, count);
181+
}
182+
183+
/**
184+
* Test iterator for a PartitionedGSet with empty partitions.
185+
*/
186+
@Test(timeout=60000)
187+
public void testIteratorForEmptyPartitions() {
188+
PartitionedGSet<TestElement, TestElement> set =
189+
new PartitionedGSet<TestElement, TestElement>(
190+
16, new TestElementComparator(), new NoOpLock());
191+
192+
set.addNewPartition(new TestElement(0));
193+
set.addNewPartition(new TestElement(1000));
194+
set.addNewPartition(new TestElement(2000));
195+
196+
topLock.readLock().lock();
197+
int count = 0;
198+
Iterator<TestElement> iter = set.iterator();
199+
while( iter.hasNext() ) {
200+
iter.next();
201+
count ++;
202+
}
203+
topLock.readLock().unlock();
204+
Assert.assertEquals(0, count);
205+
}
206+
207+
/**
208+
* Test whether the iterator can return the same number of elements as stored
209+
* into the PartitionedGSet.
210+
*/
211+
@Test(timeout=60000)
212+
public void testIteratorCountElements() {
213+
ArrayList<Integer> list = getRandomList(ELEMENT_NUM, 123);
214+
PartitionedGSet<TestElement, TestElement> set =
215+
new PartitionedGSet<TestElement, TestElement>(
216+
16, new TestElementComparator(), new NoOpLock());
217+
218+
set.addNewPartition(new TestElement(0));
219+
set.addNewPartition(new TestElement(1000));
220+
set.addNewPartition(new TestElement(2000));
221+
222+
topLock.writeLock().lock();
223+
for (Integer i : list) {
224+
set.put(new TestElement(i));
225+
}
226+
topLock.writeLock().unlock();
227+
228+
topLock.readLock().lock();
229+
int count = 0;
230+
Iterator<TestElement> iter = set.iterator();
231+
while( iter.hasNext() ) {
232+
iter.next();
233+
count ++;
234+
}
235+
topLock.readLock().unlock();
236+
Assert.assertEquals(ELEMENT_NUM, count);
237+
}
238+
239+
/**
240+
* Test iterator when it is created before partitions/elements are
241+
* added to the PartitionedGSet.
242+
*/
243+
@Test(timeout=60000)
244+
public void testIteratorAddElementsAfterIteratorCreation() {
245+
PartitionedGSet<TestElement, TestElement> set =
246+
new PartitionedGSet<TestElement, TestElement>(
247+
16, new TestElementComparator(), new NoOpLock());
248+
249+
// Create the iterator before partitions are added.
250+
Iterator<TestElement> iter = set.iterator();
251+
252+
set.addNewPartition(new TestElement(0));
253+
set.addNewPartition(new TestElement(1000));
254+
set.addNewPartition(new TestElement(2000));
255+
256+
// Added one element
257+
topLock.writeLock().lock();
258+
set.put(new TestElement(2500));
259+
topLock.writeLock().unlock();
260+
261+
topLock.readLock().lock();
262+
int count = 0;
263+
while( iter.hasNext() ) {
264+
iter.next();
265+
count ++;
266+
}
267+
topLock.readLock().unlock();
268+
Assert.assertEquals(1, count);
269+
}
270+
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ protected boolean isWriteTopLocked() {
121121
}
122122

123123
@Override
124-
protected void readTopdUnlock() {
124+
protected void readTopUnlock() {
125125
namesystem.getFSLock().readUnlock("INodeMap", null, false);
126126
}
127127

@@ -194,7 +194,7 @@ private INodeMap(INodeDirectory rootDir, FSNamesystem ns) {
194194
// Compute the map capacity by allocating 1% of total memory
195195
int capacity = LightWeightGSet.computeCapacity(1, "INodeMap");
196196
this.map = new PartitionedGSet<>(capacity, new INodeKeyComparator(),
197-
new INodeMapLock(), rootDir);
197+
new INodeMapLock());
198198

199199
// Pre-populate initial empty partitions
200200
PartitionedGSet<INode, INodeWithAdditionalFields> pgs =

0 commit comments

Comments
 (0)