Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .clj-kondo/config.edn
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{:lint-as {byte-streams.utils/defprotocol+ clojure.core/defprotocol
byte-streams.utils/deftype+ clojure.core/deftype
byte-streams.utils/defrecord+ clojure.core/defrecord
byte-streams.utils/definterface+ clojure.core/definterface}}
{:lint-as {byte-streams.utils/defprotocol+ clojure.core/defprotocol
byte-streams.utils/deftype+ clojure.core/deftype
byte-streams.utils/defrecord+ clojure.core/defrecord
byte-streams.utils/definterface+ clojure.core/definterface
clj-commons.byte-streams.utils/defprotocol+ clojure.core/defprotocol
clj-commons.byte-streams.utils/deftype+ clojure.core/deftype
clj-commons.byte-streams.utils/defrecord+ clojure.core/defrecord
clj-commons.byte-streams.utils/definterface+ clojure.core/definterface}}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ dump*
DS_Store
.nrepl*
.clj-kondo/.cache
.eastwood
126 changes: 71 additions & 55 deletions src/clj_commons/byte_streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,40 @@
[protocols :as proto]
[pushback-stream :as ps]
[char-sequence :as cs]]
[clojure.java.io :as io]
[clj-commons.primitive-math :as p])
(:import
[clj_commons.byte_streams
Utils
ByteBufferInputStream]
[clj_commons.byte_streams.graph
Type]
[java.nio
ByteBuffer
DirectByteBuffer]
[java.lang.reflect
Array]
[java.util.concurrent.atomic
AtomicBoolean]
[java.io
File
FileOutputStream
FileInputStream
ByteArrayInputStream
ByteArrayOutputStream
PipedOutputStream
PipedInputStream
DataInputStream
InputStream
OutputStream
IOException
RandomAccessFile
Reader
InputStreamReader
BufferedReader]
[java.nio.channels
ReadableByteChannel
WritableByteChannel
FileChannel
FileChannel$MapMode
Channels
Pipe]
[java.nio.channels.spi
AbstractSelectableChannel]))
(clj_commons.byte_streams
Utils
ByteBufferInputStream)
(clj_commons.byte_streams.graph
Type)
(java.nio
ByteBuffer)
(java.lang.reflect
Array)
(java.io
File
ByteArrayInputStream
ByteArrayOutputStream
PipedOutputStream
PipedInputStream
DataInputStream
InputStream
OutputStream
IOException
Reader
BufferedReader
InputStreamReader)
(java.nio.channels
ReadableByteChannel
WritableByteChannel
FileChannel
FileChannel$MapMode
Channels
Pipe)
(java.nio.channels.spi
AbstractSelectableChannel)
(java.nio.file StandardOpenOption)))

;;;

Expand Down Expand Up @@ -576,35 +570,57 @@
;; file => readable-channel
(def-conversion ^{:cost 0} [File ReadableByteChannel]
[file]
(.getChannel (FileInputStream. file)))
(-> file
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The good news was the previous behavior was fine already [1].
However, the .getChannel seemed to be present for legacy reason to be able to leverage java.nio from java.io. This is a welcome change!

[1] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/sun/nio/ch/FileChannelImpl.java#L201-L208

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, ok. The javadocs said they were synced in terms of things like reading position, but didn't actually say anything specifically about closing, so I didn't want to assume.

(.toPath)
(FileChannel/open (into-array StandardOpenOption
[StandardOpenOption/READ]))))

;; file => writable-channel
(def-conversion ^{:cost 0} [File WritableByteChannel]
[file {:keys [append?] :or {append? true}}]
(.getChannel (FileOutputStream. file (boolean append?))))

(def-conversion ^{:cost 0} [File (seq-of ByteBuffer)]
(let [option-array (into-array StandardOpenOption
(cond-> [StandardOpenOption/CREATE
StandardOpenOption/WRITE]
append?
(conj StandardOpenOption/APPEND)))]
(-> file
(.toPath)
(FileChannel/open option-array))))

;;
(def-conversion ^{:cost 0
:doc "Assumes the file size is static."} [File (seq-of ByteBuffer)]
[file {:keys [chunk-size writable?] :or {chunk-size (int 2e9), writable? false}}]
(let [^RandomAccessFile raf (RandomAccessFile. file (if writable? "rw" "r"))
^FileChannel fc (.getChannel raf)
(let [option-array (into-array StandardOpenOption
(cond-> [StandardOpenOption/READ]
writable?
(conj StandardOpenOption/CREATE
StandardOpenOption/WRITE)))
^FileChannel fc (-> file
(.toPath)
(FileChannel/open option-array))
close-fn #(.close fc)
buf-seq (fn buf-seq [offset]
(when-not (<= (.size fc) offset)
(let [remaining (- (.size fc) offset)]
(when (and (.isOpen fc)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess adding a isOpen on the FileChannel causes no harm here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's required, in case the user manually closed the returned closeable-seq. If they close it, that will also .close the RAF and channel, but the seq is still usable, and can be read from up until the point of closing. Attempts to read further would result in Exceptions without checking, this way the seq just ends.

(> (.size fc) offset))
(let [remaining (- (.size fc) offset)
close-after-reading? (<= remaining chunk-size)]
(lazy-seq
(cons
(.map fc
(if writable?
FileChannel$MapMode/READ_WRITE
FileChannel$MapMode/READ_ONLY)
offset
(min remaining chunk-size))
(let [mbb (.map fc
(if writable?
FileChannel$MapMode/READ_WRITE
FileChannel$MapMode/READ_ONLY)
offset
(min remaining chunk-size))]
(when close-after-reading?
(close-fn))
Comment on lines +616 to +617
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been updated to automatically close the file upon exhaustion (Maybe this should be an option? We wouldn't want to stop tailing a growing log file just because we temporarily caught up to the end...)

To me, as we open the resource, we have the responsibility to close it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but that comment's not about responsibility, but default behavior. For a static file, we should close upon reading the last byte, since with the File -> (seq-of ByteBuffer) conversion, there's no outside resource being supplied that the user knows they have to close. Otherwise, they have to close the seq when done with it (which isn't so bad, if it came down to it, but it's not intuitive.)

The catch is, what if you're reading a file that's growing, like tailing a log file? You effectively want an infinite seq. As it is now, if the lazy-seq ever catches up to the current end of the file, it'll stop and close, even if the file is still growing.

The more I think about it, the more I think there should be an option to turn off auto-closing if you don't want it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrrm. Unfortunately, a lot more of the code in that conversion assumes the file is static. In particular, (> (.size fc) offset) will end the seq, even if we don't .close the channel. If we catch up to the end of the file, it'll return nil and the lazy-seq stops, when we would prefer it block if the file is growing.

That test also means shrinking or truncating the file will make it fail to .close, though to be honest, a seq across a shrinking file doesn't make logical sense anyway.

I think I'll just add a comment for now. This will require reworking to add blocking/async behavior if we wanted it to handle growing files.

mbb)
(buf-seq (+ offset chunk-size)))))))]
(g/closeable-seq
(buf-seq 0)
false
#(do
(.close raf)
(.close fc)))))
close-fn)))

;; output-stream => writable-channel
(def-conversion ^{:cost 0} [OutputStream WritableByteChannel]
Expand Down
8 changes: 2 additions & 6 deletions src/clj_commons/byte_streams/graph.clj
Original file line number Diff line number Diff line change
Expand Up @@ -194,25 +194,21 @@
(close-fn)
nil)
(reify

clojure.lang.IPending
(isRealized [_]
(or
(not (instance? clojure.lang.IPending s))
(realized? s)))

Object
(finalize [_]
(close-fn))
Comment on lines -205 to -206
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to move away from finalizers anyway, better today than tomorrow. :)

https://openjdk.java.net/jeps/421


java.io.Closeable
(close [_]
(close-fn))

clojure.lang.Sequential
clojure.lang.ISeq
clojure.lang.Seqable
(seq [this] this)

clojure.lang.ISeq
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected to have moved this line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of, it throws off kondo when a protocol/interface isn't followed by the methods it declares.

Also, the equiv method there isn't part of any of protocol/interfaces used. It's part of the IPersistentCollection interface. reify is more tolerant than I thought; it happily added the equiv method to the output class, despite not being required. I don't see anything calling it, but I figured I'd leave it alone for now.

(cons [_ a]
(closeable-seq (cons a s) exhaustible? close-fn))
(next [this]
Expand Down
112 changes: 98 additions & 14 deletions test/clj_commons/byte_streams_test.clj
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
(ns clj-commons.byte-streams-test
(:require
[clj-commons.byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of]]
[clj-commons.byte-streams :refer [bytes= compare-bytes conversion-path convert dev-null possible-conversions seq-of stream-of to-byte-array to-byte-buffer to-byte-buffers to-input-stream to-string transfer vector-of] :as bs]
[clojure.test :refer :all]
[clj-commons.byte-streams.char-sequence :as cs])
[clojure.java.io :as io]
[clj-commons.primitive-math :as p])
(:refer-clojure
:exclude [vector-of])
(:import
[java.nio.charset
Charset]
[java.io
ByteArrayInputStream
File]
[java.nio
ByteBuffer]
[java.util
Arrays]))
(java.io File Closeable Reader)
(java.nio ByteBuffer)
(java.nio.channels WritableByteChannel ReadableByteChannel)
(java.util Arrays)))

(def ^String text
"The suburb of Saffron Park lay on the sunset side of London, as red and ragged as a cloud of sunset. It was built of a bright brick throughout; its sky-line was fantastic, and even its ground plan was wild. It had been the outburst of a speculative builder, faintly tinged with art, who called its architecture sometimes Elizabethan and sometimes Queen Anne, apparently under the impression that the two sovereigns were identical. It was described with some justice as an artistic colony, though it never in any definable way produced any art. But although its pretensions to be an intellectual centre were a little vague, its pretensions to be a pleasant place were quite indisputable. The stranger who looked for the first time at the quaint red houses could only think how very oddly shaped the people must be who could fit in to them. Nor when he met the people was he disappointed in this respect. The place was not only pleasant, but perfect, if once he could regard it not as a deception but rather as a dream. Even if the people were not \"artists,\" the whole was nevertheless artistic. That young man with the long, auburn hair and the impudent face—that young man was not really a poet; but surely he was a poem. That old gentleman with the wild, white beard and the wild, white hat—that venerable humbug was not really a philosopher; but at least he was the cause of philosophy in others. That scientific gentleman with the bald, egg-like head and the bare, bird-like neck had no real right to the airs of science that he assumed. He had not discovered anything new in biology; but what biological creature could he have discovered more singular than himself? Thus, and thus only, the whole place had properly to be regarded; it had to be considered not so much as a workshop for artists, but as a frail but finished work of art. A man who stepped into its social atmosphere felt as if he had stepped into a written comedy.")
Expand Down Expand Up @@ -50,7 +46,7 @@
(str (pr-str src) " -> " (pr-str dst)))))

;; make sure none of our intermediate representations are strings if our target isn't a string
(let [invalid-destinations (->> #{String CharSequence java.io.Reader}
(let [invalid-destinations (->> #{String CharSequence Reader}
(mapcat #(vector % (list 'seq-of %) (list 'stream-of %)))
set)
pairwise-conversions (->> (class ary)
Expand Down Expand Up @@ -139,7 +135,95 @@
(is (bytes= text-bytes (-> text-bytes (to-input-stream {:chunk-size 128}) to-string to-byte-array)))))

(deftest compare-bytes-former-bug
(let [bx (convert (byte-array [0x00 0x00 0x00 0x01]) java.nio.ByteBuffer)
by (convert (byte-array [0x80 0x00 0x00 0x01]) java.nio.ByteBuffer)]
(let [bx (convert (byte-array [0x00 0x00 0x00 0x01]) ByteBuffer)
by (convert (byte-array [0x80 0x00 0x00 0x01]) ByteBuffer)]
(is (= [bx by] (sort compare-bytes [bx by])))
(is (= [bx by] (sort compare-bytes [by bx])))))

(defn- write-zeros-file
"Write out a file of nothing but zeros"
[size]
(let [f (doto (File/createTempFile "byte-streams-test-" nil)
(.deleteOnExit))
buf-size 64
zs (byte-array buf-size (byte 0))
num-bufs (int (quot size buf-size))
remainder (int (mod size buf-size))]
(with-open [os (io/output-stream f)]
(loop [cnt num-bufs]
(when (p/> cnt 0)
(.write os zs)
(recur (p/dec cnt))))
(when (pos? remainder)
(.write os (byte-array remainder (byte 0)))))
f))

(defn- bb-stream-size
[s]
(reduce
(fn [sz ^ByteBuffer bb] (p/+ ^int sz (.limit bb)))
(int 0)
s))

(deftest large-file
(testing "can read whole file"
(let [size (int 1e8)
chunk-size (p// size 10)
f (write-zeros-file size)]
(testing "from streams"
(testing "all at once"
(with-open [in (io/input-stream f)]
(is (== (bb-stream-size (convert in (seq-of ByteBuffer)))
size))))
(testing "in chunks"
(with-open [in (io/input-stream f)]
(is (== (bb-stream-size (convert in
(seq-of ByteBuffer)
{:chunk-size chunk-size}))
size)))))

(testing "from java.io.File"
(is (== (bb-stream-size (convert f (seq-of ByteBuffer)))
size))

(testing "partial read, then close"
(let [n 4
s (convert f (seq-of ByteBuffer) {:chunk-size chunk-size})
_ (nth s n)]
(.close ^Closeable s)
(is (some? (nth s n)))
(is (thrown? Exception
(nth s (inc n))))))))))

(deftest from-io-file
(testing "File conversions not tested elsewhere"
(testing "read/write to same spot"
(let [size 1
val (byte (rand-int 127))
val-bb (ByteBuffer/wrap (byte-array 1 val))
f (write-zeros-file size)]
(with-open [^WritableByteChannel wbc (convert f WritableByteChannel {:append? false})]
(.write wbc val-bb))
(with-open [^ReadableByteChannel rbc (convert f ReadableByteChannel)]
(let [bb (ByteBuffer/allocate 1)]
(.read rbc bb)
(is (bytes= val-bb bb))))))

(testing "append"
(let [^int size (rand-int 100)
val (byte (rand-int 127))
val-bb (ByteBuffer/wrap (byte-array 1 val))
f (write-zeros-file size)]
(with-open [^WritableByteChannel wbc (convert f WritableByteChannel {:append? true})]
(.write wbc val-bb))
(with-open [^ReadableByteChannel rbc (convert f ReadableByteChannel)]
(let [bb (ByteBuffer/allocate (inc size))]
(.read rbc bb)

(let [bb-array (.array bb)]
(dotimes [i size]
(is (= (aget bb-array i) 0)))
(is (= (aget bb-array size) val)))))))))