2525import java .util .Set ;
2626import java .util .TreeMap ;
2727import java .util .NoSuchElementException ;
28+ import java .util .concurrent .atomic .AtomicInteger ;
29+
2830import org .apache .hadoop .HadoopIllegalArgumentException ;
2931import org .apache .hadoop .classification .InterfaceAudience ;
3032import org .apache .hadoop .util .LightWeightGSet .LinkedElement ;
@@ -61,7 +63,7 @@ public class PartitionedGSet<K, E extends K> implements GSet<K, E> {
6163 /**
6264 * The number of elements in the set.
6365 */
64- protected volatile int size ;
66+ private AtomicInteger size ;
6567
6668 /**
6769 * A single partition of the {@link PartitionedGSet}.
@@ -84,7 +86,7 @@ public PartitionedGSet(final int capacity,
8486 this .latchLock = latchLock ;
8587 // addNewPartition(rootKey).put(rootKey);
8688 // this.size = 1;
87- this .size = 0 ;
89+ this .size = new AtomicInteger ( 0 ) ;
8890 LOG .info ("Partition capacity = {}" , DEFAULT_PARTITION_CAPACITY );
8991 LOG .info ("Partition overflow factor = {}" , DEFAULT_PARTITION_OVERFLOW );
9092 }
@@ -108,7 +110,7 @@ public PartitionEntry addNewPartition(final K key) {
108110 assert oldPart == null :
109111 "RangeMap already has a partition associated with " + key ;
110112
111- LOG .debug ("Total GSet size = {}" , size );
113+ LOG .debug ("Total GSet size = {}" , size . get () );
112114 LOG .debug ("Number of partitions = {}" , partitions .size ());
113115 LOG .debug ("Previous partition size = {}" ,
114116 lastPart == null ? 0 : lastPart .size ());
@@ -118,7 +120,7 @@ public PartitionEntry addNewPartition(final K key) {
118120
119121 @ Override
120122 public int size () {
121- return size ;
123+ return size . get () ;
122124 }
123125
124126 public PartitionEntry getPartition (final K key ) {
@@ -130,7 +132,7 @@ public PartitionEntry getPartition(final K key) {
130132 if (part == null ) {
131133 throw new IllegalStateException ("Null partition for key: " + key );
132134 }
133- assert size == 0 || part .partLock .isReadTopLocked () ||
135+ assert size . get () == 0 || part .partLock .isReadTopLocked () ||
134136 part .partLock .hasReadChildLock () : "Must hold read Lock: key = " + key ;
135137 return part ;
136138 }
@@ -162,7 +164,7 @@ public E put(final E element) {
162164 if (part == null ) {
163165 throw new HadoopIllegalArgumentException ("Illegal key: " + key );
164166 }
165- assert size == 0 || part .partLock .isWriteTopLocked () ||
167+ assert size . get () == 0 || part .partLock .isWriteTopLocked () ||
166168 part .partLock .hasWriteChildLock () :
167169 "Must hold write Lock: key = " + key ;
168170 LOG .debug ("put key: {}" , key );
@@ -173,11 +175,11 @@ public E put(final E element) {
173175 }
174176 E result = part .put (element );
175177 if (result == null ) { // new element
176- size ++ ;
177- LOG .debug ("partitionPGSet.put: added key {}, size is now {} " , key , size );
178+ size . incrementAndGet () ;
179+ LOG .debug ("partitionPGSet.put: added key {}, size is now {} " , key , size . get () );
178180 } else {
179181 LOG .debug ("partitionPGSet.put: replaced key {}, size is now {}" ,
180- key , size );
182+ key , size . get () );
181183 }
182184 return result ;
183185 }
@@ -199,20 +201,20 @@ public E remove(final K key) {
199201 }
200202 E result = part .remove (key );
201203 if (result != null ) {
202- size -- ;
204+ size . decrementAndGet () ;
203205 }
204206 return result ;
205207 }
206208
207209 @ Override
208210 public void clear () {
209- LOG .error ("Total GSet size = {}" , size );
211+ LOG .error ("Total GSet size = {}" , size . get () );
210212 LOG .error ("Number of partitions = {}" , partitions .size ());
211213 printStats ();
212214 // assert latchLock.hasWriteTopLock() : "Must hold write topLock";
213215 // SHV May need to clear all partitions?
214216 partitions .clear ();
215- size = 0 ;
217+ size . set ( 0 ) ;
216218 }
217219
218220 private void printStats () {
0 commit comments