Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions core/src/main/scala/flatgraph/misc/DedupTable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package flatgraph.misc

import java.util

/** A basic LinkedHashSet-like structure, based on an open hashmap. Somewhat surprisingly, it actually pays to store part of the hash in the
* upper bits of the index into tha values array
*
* This is a fastpath to avoid some unneeded expensive isequal-checks. (expensive because they imply following an object reference)
*
* The thing is arranged such that a zero value in pos signifies an empty slot
*/
private[flatgraph] class DedupTable {
var capacity = 1024
var size = 0
var strs = new Array[String](capacity)
var pos = new Array[Int](capacity)

def insert(str: String): Int = {
val cap = capacity
val mask = cap - 1
if (str == null) return -1
val h = strengthenHash(str.hashCode)
var p = h & mask
val needle = h & ~mask
while (true) {
val idx0 = pos(p)
val idx = (idx0 & mask) - 1
if (idx == -1) {
val dst = size
size = size + 1
strs(dst) = str
pos(p) = (dst + 1) | needle
if (size + (size >> 1) > capacity) grow()
return dst
} else if ((idx0 & ~mask) == needle && str.equals(strs(idx))) return idx
p = p + 1
if (p == cap) {
// overflow -- rare, so better branch than branchless
p = 0
}
}
???
}

def insertRehash(str: String): Unit = {
val cap = capacity
val mask = cap - 1
val h = strengthenHash(str.hashCode)
var p = h & mask
val needle = h & ~mask
while (true) {
val idx0 = pos(p)
val idx = (idx0 & mask) - 1
if (idx == -1) {
val dst = size
size = size + 1
pos(p) = (dst + 1) | needle
return
}
p = p + 1
if (p == cap) {
// overflow -- rare, so better branch than branchless
p = 0
}
}
???
}

def grow(): Unit = {
val oldsize = size
size = 0
capacity = capacity * 2
pos = new Array[Int](capacity)
strs = util.Arrays.copyOf(strs, capacity)
for (idx <- Range(0, oldsize)) insertRehash(strs(idx))
}

def strengthenHash(hash0: Int): Int = {
// using the simple murmur 32 bit mixing to strengthen the hash
var hash = hash0 ^ (hash0 >>> 16)
hash *= 0x85ebca6b
hash ^= hash >>> 13
hash
// murmur does a bit more, but we don't need that.
// hash *= 0xc2b2ae35
// hash ^= hash >>> 16
// hash & (capacity - 1)
}

}
12 changes: 3 additions & 9 deletions core/src/main/scala/flatgraph/storage/Serialization.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
val writeQueue = mutable.ArrayDeque[concurrent.Future[Any]]()
val jobQueue = mutable.ArrayBuffer[() => (OutlineStorage, Array[Byte])]()
val stringQueue = mutable.ArrayDeque[(OutlineStorage, Array[String])]()
val stringpool = mutable.LinkedHashMap[String, Int]()
val stringpool = new flatgraph.misc.DedupTable

def submitCompress(block: => (OutlineStorage, ByteBuffer)): Unit = {
compressQueue.addOne(executor.submit((() => block)))
Expand All @@ -46,12 +46,6 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
}
}

// NOT threadsafe!
private def insertString(stringPool: mutable.LinkedHashMap[String, Int])(s: String): Int = {
if (s == null) -1
else stringPool.getOrElseUpdate(s, stringPool.size)
}

private[flatgraph] def encodeAny(item: Any, outlineStorage: OutlineStorage = new OutlineStorage, delta: Int = -1): OutlineStorage = {
item match {
case _: DefaultValue => null
Expand Down Expand Up @@ -161,7 +155,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
writeItem(item, bytes)
}
val (item, strings) = stringQueue.removeHead()
val indices = strings.map(insertString(stringpool))
val indices = strings.map(stringpool.insert)
submitCompress {
val bytes = new Array[Byte](4 * strings.length)
ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer().put(indices)
Expand All @@ -172,7 +166,7 @@ class WriterContext(val fileChannel: FileChannel, val executor: concurrent.Execu
val poolLenBytes = new Array[Byte](4 * stringpool.size)
val poolLenBuffer = ByteBuffer.wrap(poolLenBytes).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer()
val poolBytes = new ByteArrayOutputStream()
for (s <- stringpool.keysIterator) {
for (s <- stringpool.strs.iterator.take(stringpool.size)) {
val bytes = s.getBytes(StandardCharsets.UTF_8)
poolBytes.write(bytes)
poolLenBuffer.put(bytes.length)
Expand Down
Loading